Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor TableView.data() method #617

Open
simonw opened this issue Nov 8, 2019 · 8 comments
Open

Refactor TableView.data() method #617

simonw opened this issue Nov 8, 2019 · 8 comments
Labels

Comments

@simonw
Copy link
Owner

@simonw simonw commented Nov 8, 2019

This is by far the most complex piece of Datasette - the TableView.data() method is over 500 lines long and is increasingly getting in the way of cleanly implementing new features (e.g. #615 and #613).

Need to break it up into smaller, cleaner pieces.

@simonw

This comment has been minimized.

Copy link
Owner Author

@simonw simonw commented Nov 8, 2019

Here's the current monstrosity:

async def data(
self,
request,
database,
hash,
table,
default_labels=False,
_next=None,
_size=None,
):
canned_query = self.ds.get_canned_query(database, table)
if canned_query is not None:
return await self.custom_sql(
request,
database,
hash,
canned_query["sql"],
metadata=canned_query,
editable=False,
canned_query=table,
)
db = self.ds.databases[database]
is_view = bool(await db.get_view_definition(table))
table_exists = bool(await db.table_exists(table))
if not is_view and not table_exists:
raise NotFound("Table not found: {}".format(table))
pks = await db.primary_keys(table)
table_columns = await db.table_columns(table)
select_columns = ", ".join(escape_sqlite(t) for t in table_columns)
use_rowid = not pks and not is_view
if use_rowid:
select = "rowid, {}".format(select_columns)
order_by = "rowid"
order_by_pks = "rowid"
else:
select = select_columns
order_by_pks = ", ".join([escape_sqlite(pk) for pk in pks])
order_by = order_by_pks
if is_view:
order_by = ""
# Ensure we don't drop anything with an empty value e.g. ?name__exact=
args = RequestParameters(
urllib.parse.parse_qs(request.query_string, keep_blank_values=True)
)
# Special args start with _ and do not contain a __
# That's so if there is a column that starts with _
# it can still be queried using ?_col__exact=blah
special_args = {}
other_args = []
for key, value in args.items():
if key.startswith("_") and "__" not in key:
special_args[key] = value[0]
else:
for v in value:
other_args.append((key, v))
# Handle ?_filter_column and redirect, if present
redirect_params = filters_should_redirect(special_args)
if redirect_params:
return self.redirect(
request,
path_with_added_args(request, redirect_params),
forward_querystring=False,
)
# Spot ?_sort_by_desc and redirect to _sort_desc=(_sort)
if "_sort_by_desc" in special_args:
return self.redirect(
request,
path_with_added_args(
request,
{
"_sort_desc": special_args.get("_sort"),
"_sort_by_desc": None,
"_sort": None,
},
),
forward_querystring=False,
)
table_metadata = self.ds.table_metadata(database, table)
units = table_metadata.get("units", {})
filters = Filters(sorted(other_args), units, ureg)
where_clauses, params = filters.build_where_clauses(table)
extra_wheres_for_ui = []
# Add _where= from querystring
if "_where" in request.args:
if not self.ds.config("allow_sql"):
raise DatasetteError("_where= is not allowed", status=400)
else:
where_clauses.extend(request.args["_where"])
extra_wheres_for_ui = [
{
"text": text,
"remove_url": path_with_removed_args(request, {"_where": text}),
}
for text in request.args["_where"]
]
# Support for ?_through={table, column, value}
extra_human_descriptions = []
if "_through" in request.args:
for through in request.args["_through"]:
through_data = json.loads(through)
through_table = through_data["table"]
other_column = through_data["column"]
value = through_data["value"]
outgoing_foreign_keys = await db.get_outbound_foreign_keys(
through_table
)
try:
fk_to_us = [
fk for fk in outgoing_foreign_keys if fk["other_table"] == table
][0]
except IndexError:
raise DatasetteError(
"Invalid _through - could not find corresponding foreign key"
)
param = "p{}".format(len(params))
where_clauses.append(
"{our_pk} in (select {our_column} from {through_table} where {other_column} = :{param})".format(
through_table=escape_sqlite(through_table),
our_pk=escape_sqlite(fk_to_us["other_column"]),
our_column=escape_sqlite(fk_to_us["column"]),
other_column=escape_sqlite(other_column),
param=param,
)
)
params[param] = value
extra_human_descriptions.append(
'{}.{} = "{}"'.format(through_table, other_column, value)
)
# _search support:
fts_table = special_args.get("_fts_table")
fts_table = fts_table or table_metadata.get("fts_table")
fts_table = fts_table or await db.fts_table(table)
fts_pk = special_args.get("_fts_pk", table_metadata.get("fts_pk", "rowid"))
search_args = dict(
pair for pair in special_args.items() if pair[0].startswith("_search")
)
search = ""
if fts_table and search_args:
if "_search" in search_args:
# Simple ?_search=xxx
search = search_args["_search"]
where_clauses.append(
"{fts_pk} in (select rowid from {fts_table} where {fts_table} match :search)".format(
fts_table=escape_sqlite(fts_table), fts_pk=escape_sqlite(fts_pk)
)
)
extra_human_descriptions.append('search matches "{}"'.format(search))
params["search"] = search
else:
# More complex: search against specific columns
for i, (key, search_text) in enumerate(search_args.items()):
search_col = key.split("_search_", 1)[1]
if search_col not in await db.table_columns(fts_table):
raise DatasetteError("Cannot search by that column", status=400)
where_clauses.append(
"rowid in (select rowid from {fts_table} where {search_col} match :search_{i})".format(
fts_table=escape_sqlite(fts_table),
search_col=escape_sqlite(search_col),
i=i,
)
)
extra_human_descriptions.append(
'search column "{}" matches "{}"'.format(
search_col, search_text
)
)
params["search_{}".format(i)] = search_text
sortable_columns = set()
sortable_columns = await self.sortable_columns_for_table(
database, table, use_rowid
)
# Allow for custom sort order
sort = special_args.get("_sort")
if sort:
if sort not in sortable_columns:
raise DatasetteError("Cannot sort table by {}".format(sort))
order_by = escape_sqlite(sort)
sort_desc = special_args.get("_sort_desc")
if sort_desc:
if sort_desc not in sortable_columns:
raise DatasetteError("Cannot sort table by {}".format(sort_desc))
if sort:
raise DatasetteError("Cannot use _sort and _sort_desc at the same time")
order_by = "{} desc".format(escape_sqlite(sort_desc))
from_sql = "from {table_name} {where}".format(
table_name=escape_sqlite(table),
where=("where {} ".format(" and ".join(where_clauses)))
if where_clauses
else "",
)
# Copy of params so we can mutate them later:
from_sql_params = dict(**params)
count_sql = "select count(*) {}".format(from_sql)
_next = _next or special_args.get("_next")
offset = ""
if _next:
if is_view:
# _next is an offset
offset = " offset {}".format(int(_next))
else:
components = urlsafe_components(_next)
# If a sort order is applied, the first of these is the sort value
if sort or sort_desc:
sort_value = components[0]
# Special case for if non-urlencoded first token was $null
if _next.split(",")[0] == "$null":
sort_value = None
components = components[1:]
# Figure out the SQL for next-based-on-primary-key first
next_by_pk_clauses = []
if use_rowid:
next_by_pk_clauses.append("rowid > :p{}".format(len(params)))
params["p{}".format(len(params))] = components[0]
else:
# Apply the tie-breaker based on primary keys
if len(components) == len(pks):
param_len = len(params)
next_by_pk_clauses.append(
compound_keys_after_sql(pks, param_len)
)
for i, pk_value in enumerate(components):
params["p{}".format(param_len + i)] = pk_value
# Now add the sort SQL, which may incorporate next_by_pk_clauses
if sort or sort_desc:
if sort_value is None:
if sort_desc:
# Just items where column is null ordered by pk
where_clauses.append(
"({column} is null and {next_clauses})".format(
column=escape_sqlite(sort_desc),
next_clauses=" and ".join(next_by_pk_clauses),
)
)
else:
where_clauses.append(
"({column} is not null or ({column} is null and {next_clauses}))".format(
column=escape_sqlite(sort),
next_clauses=" and ".join(next_by_pk_clauses),
)
)
else:
where_clauses.append(
"({column} {op} :p{p}{extra_desc_only} or ({column} = :p{p} and {next_clauses}))".format(
column=escape_sqlite(sort or sort_desc),
op=">" if sort else "<",
p=len(params),
extra_desc_only=""
if sort
else " or {column2} is null".format(
column2=escape_sqlite(sort or sort_desc)
),
next_clauses=" and ".join(next_by_pk_clauses),
)
)
params["p{}".format(len(params))] = sort_value
order_by = "{}, {}".format(order_by, order_by_pks)
else:
where_clauses.extend(next_by_pk_clauses)
where_clause = ""
if where_clauses:
where_clause = "where {} ".format(" and ".join(where_clauses))
if order_by:
order_by = "order by {} ".format(order_by)
extra_args = {}
# Handle ?_size=500
page_size = _size or request.raw_args.get("_size")
if page_size:
if page_size == "max":
page_size = self.ds.max_returned_rows
try:
page_size = int(page_size)
if page_size < 0:
raise ValueError
except ValueError:
raise DatasetteError("_size must be a positive integer", status=400)
if page_size > self.ds.max_returned_rows:
raise DatasetteError(
"_size must be <= {}".format(self.ds.max_returned_rows), status=400
)
extra_args["page_size"] = page_size
else:
page_size = self.ds.page_size
sql_no_limit = "select {select} from {table_name} {where}{order_by}".format(
select=select,
table_name=escape_sqlite(table),
where=where_clause,
order_by=order_by,
)
sql = "{sql_no_limit} limit {limit}{offset}".format(
sql_no_limit=sql_no_limit.rstrip(), limit=page_size + 1, offset=offset
)
if request.raw_args.get("_timelimit"):
extra_args["custom_time_limit"] = int(request.raw_args["_timelimit"])
results = await self.ds.execute(
database, sql, params, truncate=True, **extra_args
)
# Number of filtered rows in whole set:
filtered_table_rows_count = None
if count_sql:
try:
count_rows = list(
await self.ds.execute(database, count_sql, from_sql_params)
)
filtered_table_rows_count = count_rows[0][0]
except QueryInterrupted:
pass
# facets support
if not self.ds.config("allow_facet") and any(
arg.startswith("_facet") for arg in request.args
):
raise DatasetteError("_facet= is not allowed", status=400)
# pylint: disable=no-member
facet_classes = list(
itertools.chain.from_iterable(pm.hook.register_facet_classes())
)
facet_results = {}
facets_timed_out = []
facet_instances = []
for klass in facet_classes:
facet_instances.append(
klass(
self.ds,
request,
database,
sql=sql_no_limit,
params=params,
table=table,
metadata=table_metadata,
row_count=filtered_table_rows_count,
)
)
for facet in facet_instances:
(
instance_facet_results,
instance_facets_timed_out,
) = await facet.facet_results()
facet_results.update(instance_facet_results)
facets_timed_out.extend(instance_facets_timed_out)
# Figure out columns and rows for the query
columns = [r[0] for r in results.description]
rows = list(results.rows)
filter_columns = columns[:]
if use_rowid and filter_columns[0] == "rowid":
filter_columns = filter_columns[1:]
# Expand labeled columns if requested
expanded_columns = []
expandable_columns = await self.expandable_columns(database, table)
columns_to_expand = None
try:
all_labels = value_as_boolean(special_args.get("_labels", ""))
except ValueError:
all_labels = default_labels
# Check for explicit _label=
if "_label" in request.args:
columns_to_expand = request.args["_label"]
if columns_to_expand is None and all_labels:
# expand all columns with foreign keys
columns_to_expand = [fk["column"] for fk, _ in expandable_columns]
if columns_to_expand:
expanded_labels = {}
for fk, _ in expandable_columns:
column = fk["column"]
if column not in columns_to_expand:
continue
expanded_columns.append(column)
# Gather the values
column_index = columns.index(column)
values = [row[column_index] for row in rows]
# Expand them
expanded_labels.update(
await self.ds.expand_foreign_keys(database, table, column, values)
)
if expanded_labels:
# Rewrite the rows
new_rows = []
for row in rows:
new_row = CustomRow(columns)
for column in row.keys():
value = row[column]
if (column, value) in expanded_labels and value is not None:
new_row[column] = {
"value": value,
"label": expanded_labels[(column, value)],
}
else:
new_row[column] = value
new_rows.append(new_row)
rows = new_rows
# Pagination next link
next_value = None
next_url = None
if len(rows) > page_size and page_size > 0:
if is_view:
next_value = int(_next or 0) + page_size
else:
next_value = path_from_row_pks(rows[-2], pks, use_rowid)
# If there's a sort or sort_desc, add that value as a prefix
if (sort or sort_desc) and not is_view:
prefix = rows[-2][sort or sort_desc]
if isinstance(prefix, dict) and "value" in prefix:
prefix = prefix["value"]
if prefix is None:
prefix = "$null"
else:
prefix = urllib.parse.quote_plus(str(prefix))
next_value = "{},{}".format(prefix, next_value)
added_args = {"_next": next_value}
if sort:
added_args["_sort"] = sort
else:
added_args["_sort_desc"] = sort_desc
else:
added_args = {"_next": next_value}
next_url = self.ds.absolute_url(
request, path_with_replaced_args(request, added_args)
)
rows = rows[:page_size]
# Detect suggested facets
suggested_facets = []
if (
self.ds.config("suggest_facets")
and self.ds.config("allow_facet")
and not _next
):
for facet in facet_instances:
suggested_facets.extend(await facet.suggest())
# human_description_en combines filters AND search, if provided
human_description_en = filters.human_description_en(
extra=extra_human_descriptions
)
if sort or sort_desc:
sorted_by = "sorted by {}{}".format(
(sort or sort_desc), " descending" if sort_desc else ""
)
human_description_en = " ".join(
[b for b in [human_description_en, sorted_by] if b]
)
async def extra_template():
display_columns, display_rows = await self.display_columns_and_rows(
database,
table,
results.description,
rows,
link_column=not is_view,
truncate_cells=self.ds.config("truncate_cells_html"),
)
metadata = (
(self.ds.metadata("databases") or {})
.get(database, {})
.get("tables", {})
.get(table, {})
)
self.ds.update_with_inherited_metadata(metadata)
form_hidden_args = []
for arg in ("_fts_table", "_fts_pk"):
if arg in special_args:
form_hidden_args.append((arg, special_args[arg]))
if request.args.get("_where"):
for where_text in request.args["_where"]:
form_hidden_args.append(("_where", where_text))
return {
"supports_search": bool(fts_table),
"search": search or "",
"use_rowid": use_rowid,
"filters": filters,
"display_columns": display_columns,
"filter_columns": filter_columns,
"display_rows": display_rows,
"facets_timed_out": facets_timed_out,
"sorted_facet_results": sorted(
facet_results.values(),
key=lambda f: (len(f["results"]), f["name"]),
reverse=True,
),
"extra_wheres_for_ui": extra_wheres_for_ui,
"form_hidden_args": form_hidden_args,
"is_sortable": any(c["sortable"] for c in display_columns),
"path_with_replaced_args": path_with_replaced_args,
"path_with_removed_args": path_with_removed_args,
"append_querystring": append_querystring,
"request": request,
"sort": sort,
"sort_desc": sort_desc,
"disable_sort": is_view,
"custom_table_templates": [
"_table-{}-{}.html".format(
to_css_class(database), to_css_class(table)
),
"_table-table-{}-{}.html".format(
to_css_class(database), to_css_class(table)
),
"_table.html",
],
"metadata": metadata,
"view_definition": await db.get_view_definition(table),
"table_definition": await db.get_table_definition(table),
}
return (
{
"database": database,
"table": table,
"is_view": is_view,
"human_description_en": human_description_en,
"rows": rows[:page_size],
"truncated": results.truncated,
"filtered_table_rows_count": filtered_table_rows_count,
"expanded_columns": expanded_columns,
"expandable_columns": expandable_columns,
"columns": columns,
"primary_keys": pks,
"units": units,
"query": {"sql": sql, "params": params},
"facet_results": facet_results,
"suggested_facets": suggested_facets,
"next": next_value and str(next_value) or None,
"next_url": next_url,
},
extra_template,
(
"table-{}-{}.html".format(to_css_class(database), to_css_class(table)),
"table.html",
),
)

@simonw

This comment has been minimized.

Copy link
Owner Author

@simonw simonw commented Nov 8, 2019

This is what the code does:

  • Check if the table name is actually a canned query - if so execute that
  • Check for other parameters that need to be redirected - e.g. ?_filter_column=x or _sort_by_desc=y
  • Turn the table + querystring parameters (?foo=bar, ?bar__contains=baz, ?_where=, _sort= etc) into a SQL query
  • Figure out pagination (apply limit, and handle keyset pagination)
  • Calculate suggested facets against the non-paginated version of the query
  • Calculate facet counts against the non-paginated version of the query
  • Execute the query
  • Expand any foreign key references from the results
@simonw

This comment has been minimized.

Copy link
Owner Author

@simonw simonw commented Nov 8, 2019

A clean starting point would be to refactor out the "take a table and a bunch of querystring parameters and turn them into a SQL query" portion.

Added bonus to this: I've long wanted to be able to apply the various trimmings of the TableView (like faceting and foreign key expansion) to other arbitrary SQL queries. Splitting out the code that builds the SELECT query will go a long way to making that a reality.

@simonw

This comment has been minimized.

Copy link
Owner Author

@simonw simonw commented Nov 9, 2019

The function that builds the query could go in a new datasette.utils.sql module.

I can design it to only take simple arguments (the table name, list of columns, list of primary keys and a list of key/value tuples from the query string). This will make it really easy to test (and means it won't need to be async since it won't have to use queries to retrieve those values).

@simonw

This comment has been minimized.

Copy link
Owner Author

@simonw simonw commented Nov 11, 2019

As noted in #621 (comment) a common pattern in this method is blocks of code that append new items to the where_clauses, params and extra_human_descriptions arrays. This is a useful refactoring opportunity.

Code that fits this pattern:

  • The code that builds based on the filters: where_clauses, params = filters.build_where_clauses(table) and human_description_en = filters.human_description_en(extra=extra_human_descriptions)
  • Code that handles ?_where=: where_clauses.extend(request.args["_where"]) - though note that this also appends to a extra_wheres_for_ui array which nothing else uses
  • The _through= code, see #621 for details
  • The code that deals with ?_search= FTS

The keyset pagination code modifies where_clauses and params too, but I don't think it's quite going to work with the same abstraction that would cover the above examples.

@simonw

This comment has been minimized.

Copy link
Owner Author

@simonw simonw commented Nov 11, 2019

I experimented with a table_filter() plugin hook a while ago which looks very much like the abstraction I'm now talking about here: 5116c4e - more details here: https://simonwillison.net/2018/Aug/6/russian-facebook-ads/#Weird_implementation_details_106

@simonw

This comment has been minimized.

Copy link
Owner Author

@simonw simonw commented Nov 11, 2019

That table_filter() plugin hook should probably be renamed though, since it could now apply to the regular custom SQL view as well as the table view.

@simonw

This comment has been minimized.

Copy link
Owner Author

@simonw simonw commented Nov 18, 2019

I made some comments on an annotated version of this method a few days ago: https://gist.github.com/simonw/66aca058195d77bae5f614ef73352eb5

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
1 participant
You can’t perform that action at this time.