Skip to content

Commit

Permalink
Execute some TableView queries in parallel
Browse files Browse the repository at this point in the history
Use ?_noparallel=1 to opt out (undocumented, useful for benchmark comparisons)

Refs #1723, #1715
  • Loading branch information
simonw committed Apr 26, 2022
1 parent 8a0c38f commit 942411e
Showing 1 changed file with 67 additions and 26 deletions.
93 changes: 67 additions & 26 deletions datasette/views/table.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import asyncio
import itertools
import json

import markupsafe

from datasette.plugins import pm
from datasette.database import QueryInterrupted
from datasette import tracer
from datasette.utils import (
await_me_maybe,
CustomRow,
Expand Down Expand Up @@ -150,6 +152,16 @@ async def data(
default_labels=False,
_next=None,
_size=None,
):
with tracer.trace_child_tasks():
return await self._data_traced(request, default_labels, _next, _size)

async def _data_traced(
self,
request,
default_labels=False,
_next=None,
_size=None,
):
database_route = tilde_decode(request.url_vars["database"])
table_name = tilde_decode(request.url_vars["table"])
Expand All @@ -159,6 +171,20 @@ async def data(
raise NotFound("Database not found: {}".format(database_route))
database_name = db.name

# For performance profiling purposes, ?_noparallel=1 turns off asyncio.gather
async def _gather_parallel(*args):
return await asyncio.gather(*args)

async def _gather_sequential(*args):
results = []
for fn in args:
results.append(await fn)
return results

gather = (
_gather_sequential if request.args.get("_noparallel") else _gather_parallel
)

# If this is a canned query, not a table, then dispatch to QueryView instead
canned_query = await self.ds.get_canned_query(
database_name, table_name, request.actor
Expand All @@ -174,8 +200,12 @@ async def data(
write=bool(canned_query.get("write")),
)

is_view = bool(await db.get_view_definition(table_name))
table_exists = bool(await db.table_exists(table_name))
is_view, table_exists = map(
bool,
await gather(
db.get_view_definition(table_name), db.table_exists(table_name)
),
)

# If table or view not found, return 404
if not is_view and not table_exists:
Expand Down Expand Up @@ -497,33 +527,44 @@ async def data(
)
)

if not nofacet:
for facet in facet_instances:
(
async def execute_facets():
if not nofacet:
# Run them in parallel
facet_awaitables = [facet.facet_results() for facet in facet_instances]
facet_awaitable_results = await gather(*facet_awaitables)
for (
instance_facet_results,
instance_facets_timed_out,
) = await facet.facet_results()
for facet_info in instance_facet_results:
base_key = facet_info["name"]
key = base_key
i = 1
while key in facet_results:
i += 1
key = f"{base_key}_{i}"
facet_results[key] = facet_info
facets_timed_out.extend(instance_facets_timed_out)

# Calculate suggested facets
) in facet_awaitable_results:
for facet_info in instance_facet_results:
base_key = facet_info["name"]
key = base_key
i = 1
while key in facet_results:
i += 1
key = f"{base_key}_{i}"
facet_results[key] = facet_info
facets_timed_out.extend(instance_facets_timed_out)

suggested_facets = []
if (
self.ds.setting("suggest_facets")
and self.ds.setting("allow_facet")
and not _next
and not nofacet
and not nosuggest
):
for facet in facet_instances:
suggested_facets.extend(await facet.suggest())

async def execute_suggested_facets():
# Calculate suggested facets
if (
self.ds.setting("suggest_facets")
and self.ds.setting("allow_facet")
and not _next
and not nofacet
and not nosuggest
):
# Run them in parallel
facet_suggest_awaitables = [
facet.suggest() for facet in facet_instances
]
for suggest_result in await gather(*facet_suggest_awaitables):
suggested_facets.extend(suggest_result)

await gather(execute_facets(), execute_suggested_facets())

# Figure out columns and rows for the query
columns = [r[0] for r in results.description]
Expand Down

0 comments on commit 942411e

Please sign in to comment.