Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor .csv to be an output renderer - and teach register_output_renderer to stream all rows #1062

Open
simonw opened this issue Oct 29, 2020 · 5 comments
Labels
csv large maybe-not-1.0 Things that might not need to block 1.0 after all plugins refactor
Milestone

Comments

@simonw
Copy link
Owner

simonw commented Oct 29, 2020

This can drive the upgrade of the register_output_renderer hook to be able to handle streaming all rows in a large query.

@simonw simonw added this to the Datasette 1.0 milestone Oct 29, 2020
@simonw simonw added the large label Oct 29, 2020
@simonw
Copy link
Owner Author

simonw commented Oct 29, 2020

Relevant code:

async def as_csv(self, request, database, hash, **kwargs):
stream = request.args.get("_stream")
if stream:
# Some quick sanity checks
if not self.ds.config("allow_csv_stream"):
raise DatasetteError("CSV streaming is disabled", status=400)
if request.args.get("_next"):
raise DatasetteError("_next not allowed for CSV streaming", status=400)
kwargs["_size"] = "max"
# Fetch the first page
try:
response_or_template_contexts = await self.data(
request, database, hash, **kwargs
)
if isinstance(response_or_template_contexts, Response):
return response_or_template_contexts
else:
data, _, _ = response_or_template_contexts
except (sqlite3.OperationalError, InvalidSql) as e:
raise DatasetteError(str(e), title="Invalid SQL", status=400)
except (sqlite3.OperationalError) as e:
raise DatasetteError(str(e))
except DatasetteError:
raise
# Convert rows and columns to CSV
headings = data["columns"]
# if there are expanded_columns we need to add additional headings
expanded_columns = set(data.get("expanded_columns") or [])
if expanded_columns:
headings = []
for column in data["columns"]:
headings.append(column)
if column in expanded_columns:
headings.append("{}_label".format(column))
async def stream_fn(r):
nonlocal data
writer = csv.writer(LimitedWriter(r, self.ds.config("max_csv_mb")))
first = True
next = None
while first or (next and stream):
try:
if next:
kwargs["_next"] = next
if not first:
data, _, _ = await self.data(request, database, hash, **kwargs)
if first:
await writer.writerow(headings)
first = False
next = data.get("next")
for row in data["rows"]:
if not expanded_columns:
# Simple path
await writer.writerow(row)
else:
# Look for {"value": "label": } dicts and expand
new_row = []
for heading, cell in zip(data["columns"], row):
if heading in expanded_columns:
if cell is None:
new_row.extend(("", ""))
else:
assert isinstance(cell, dict)
new_row.append(cell["value"])
new_row.append(cell["label"])
else:
new_row.append(cell)
await writer.writerow(new_row)
except Exception as e:
print("caught this", e)
await r.write(str(e))
return
content_type = "text/plain; charset=utf-8"
headers = {}
if self.ds.cors:
headers["Access-Control-Allow-Origin"] = "*"
if request.args.get("_dl", None):
content_type = "text/csv; charset=utf-8"
disposition = 'attachment; filename="{}.csv"'.format(
kwargs.get("table", database)
)
headers["content-disposition"] = disposition
return AsgiStream(stream_fn, headers=headers, content_type=content_type)

@simonw
Copy link
Owner Author

simonw commented Jun 3, 2021

Implementing this would make #1356 a whole lot more interesting.

@simonw
Copy link
Owner Author

simonw commented Mar 15, 2022

I can get regular .json to stream too, using the pattern described in this TIL: https://til.simonwillison.net/python/output-json-array-streaming

@fgregg
Copy link
Contributor

fgregg commented Sep 28, 2022

for teaching register_output_renderer to stream it seems like the two options are to

  1. a nested query technique to paginate through
  2. a fetching model that looks like something
with sqlite_timelimit(conn, time_limit_ms):
     c.execute(query)
     for chunk in c.fetchmany(chunk_size):
         yield from chunk

currently db.execute is not a generator, so this would probably need a new method?

@fgregg
Copy link
Contributor

fgregg commented Sep 28, 2022

if you went this route:

with sqlite_timelimit(conn, time_limit_ms):
     c.execute(query)
     for chunk in c.fetchmany(chunk_size):
         yield from chunk

then time_limit_ms would probably have to be greatly extended, because the time spent in the loop will depend on the downstream processing.

i wonder if this was why you were thinking this feature would need a dedicated connection?


reading more, there's no real limit i can find on the number of active cursors (or more precisely active prepared statements objects, because sqlite doesn't really have cursors).

maybe something like this would be okay?

with sqlite_timelimit(conn, time_limit_ms):
     c.execute(query)
     # step through at least one to evaluate the statement, not sure if this is necessary
     yield c.execute.fetchone()
for chunk in c.fetchmany(chunk_size):
    yield from chunk

this seems quite weird that there's not more of limit of the number of active prepared statements, but i haven't been able to find one.

@simonw simonw added the maybe-not-1.0 Things that might not need to block 1.0 after all label Dec 11, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
csv large maybe-not-1.0 Things that might not need to block 1.0 after all plugins refactor
Projects
None yet
Development

No branches or pull requests

2 participants