Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CSV ?_stream=on redundantly calculates facets for every page #1349

Closed
simonw opened this issue May 29, 2021 · 9 comments
Closed

CSV ?_stream=on redundantly calculates facets for every page #1349

simonw opened this issue May 29, 2021 · 9 comments

Comments

@simonw
Copy link
Owner

simonw commented May 29, 2021

I'm trying to figure out why a full CSV export from https://covid-19.datasettes.com/covid/ny_times_us_counties runs unbearably slowly.

It's because the streaming endpoint works by scrolling through every page, and it turns out every page calculates facets and suggested facets!

@simonw
Copy link
Owner Author

simonw commented May 29, 2021

Relevant code:

async def as_csv(self, request, database, hash, **kwargs):
stream = request.args.get("_stream")
if stream:
# Some quick sanity checks
if not self.ds.setting("allow_csv_stream"):
raise BadRequest("CSV streaming is disabled")
if request.args.get("_next"):
raise BadRequest("_next not allowed for CSV streaming")
kwargs["_size"] = "max"
# Fetch the first page
try:
response_or_template_contexts = await self.data(
request, database, hash, **kwargs
)
if isinstance(response_or_template_contexts, Response):
return response_or_template_contexts
else:
data, _, _ = response_or_template_contexts
except (sqlite3.OperationalError, InvalidSql) as e:
raise DatasetteError(str(e), title="Invalid SQL", status=400)
except sqlite3.OperationalError as e:
raise DatasetteError(str(e))
except DatasetteError:
raise
# Convert rows and columns to CSV
headings = data["columns"]
# if there are expanded_columns we need to add additional headings
expanded_columns = set(data.get("expanded_columns") or [])
if expanded_columns:
headings = []
for column in data["columns"]:
headings.append(column)
if column in expanded_columns:
headings.append(f"{column}_label")
async def stream_fn(r):
nonlocal data
writer = csv.writer(LimitedWriter(r, self.ds.setting("max_csv_mb")))
first = True
next = None
while first or (next and stream):
try:
if next:
kwargs["_next"] = next
if not first:
data, _, _ = await self.data(request, database, hash, **kwargs)
if first:
if request.args.get("_header") != "off":
await writer.writerow(headings)
first = False

@simonw
Copy link
Owner Author

simonw commented May 29, 2021

Related issue: #263 - "Facets should not execute for ?shape=array|object"

@simonw
Copy link
Owner Author

simonw commented May 29, 2021

It's not just facets, I think it's trying to execute suggested facets too!

@simonw
Copy link
Owner Author

simonw commented May 31, 2021

Now that I have ?_nofacets=1 I can use that to fix this. The challenge is that in this block of code I need to modify the incoming request's query string arguments, which isn't something I've done before:

async def as_csv(self, request, database, hash, **kwargs):
stream = request.args.get("_stream")
if stream:
# Some quick sanity checks
if not self.ds.setting("allow_csv_stream"):
raise BadRequest("CSV streaming is disabled")
if request.args.get("_next"):
raise BadRequest("_next not allowed for CSV streaming")

@simonw
Copy link
Owner Author

simonw commented May 31, 2021

I think the right way to do this is to construct a new Request with a modified ASGI scope:

@property
def query_string(self):
return (self.scope.get("query_string") or b"").decode("latin-1")
@property
def full_path(self):
qs = self.query_string
return "{}{}".format(self.path, ("?" + qs) if qs else "")
@property
def args(self):
return MultiParams(parse_qs(qs=self.query_string))

@simonw
Copy link
Owner Author

simonw commented May 31, 2021

Actually there is precedent for swapping out request.scope for a new scope, as seen here in the routing code:

datasette/datasette/app.py

Lines 1117 to 1122 in c5ae119

scope = dict(scope, **scope_modifications)
for regex, view in self.routes:
match = regex.match(path)
if match is not None:
new_scope = dict(scope, url_route={"kwargs": match.groupdict()})
request.scope = new_scope

@simonw
Copy link
Owner Author

simonw commented May 31, 2021

I'm having a really hard time figuring out how to unit test this - ideally I'd monitor which SQL queries are executed using the tracing mechanism, but that's not set up to work with anything other than HTML or JSON outputs:

if "text/html" in content_type and b"</body>" in accumulated_body:
extra = json.dumps(trace_info, indent=2)
extra_html = f"<pre>{extra}</pre></body>".encode("utf8")
accumulated_body = accumulated_body.replace(b"</body>", extra_html)
elif "json" in content_type and accumulated_body.startswith(b"{"):
data = json.loads(accumulated_body.decode("utf8"))
if "_trace" not in data:
data["_trace"] = trace_info
accumulated_body = json.dumps(data).encode("utf8")
await send({"type": "http.response.body", "body": accumulated_body})

@simonw
Copy link
Owner Author

simonw commented May 31, 2021

I think it's worth getting ?_trace=1 to work with streaming CSV - this would have helped me spot this issue a long time ago.

@simonw
Copy link
Owner Author

simonw commented Jun 1, 2021

Fixed in d1d06ac

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant