Skip to content

Commit

Permalink
Streaming mode for downloading all rows as a CSV (#315)
Browse files Browse the repository at this point in the history
* table.csv?_stream=1 to download all rows - refs #266

This option causes Datasette to serve ALL rows in the table, by internally
following the _next= pagination links and serving everything out as a stream.

Also added new config option, allow_csv_stream, which can be used to disable
this feature.

* New config option max_csv_mb limiting size of CSV export
  • Loading branch information
Simon Willison committed Jun 18, 2018
1 parent 0d7ba1b commit fc3660c
Show file tree
Hide file tree
Showing 11 changed files with 142 additions and 24 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ script:
jobs:
include:
- stage: deploy latest.datasette.io
if: branch = master AND type = push
script:
- pip install .
- npm install -g now
Expand All @@ -23,7 +24,6 @@ jobs:
- now alias --token=$NOW_TOKEN
- echo "{\"name\":\"datasette-latest-$ALIAS\",\"alias\":\"$ALIAS.datasette.io\"}" > now.json
- now alias --token=$NOW_TOKEN
on: master
- stage: release tagged version
if: tag IS present
python: 3.6
Expand Down
6 changes: 6 additions & 0 deletions datasette/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@
ConfigOption("cache_size_kb", 0, """
SQLite cache size in KB (0 == use SQLite default)
""".strip()),
ConfigOption("allow_csv_stream", True, """
Allow .csv?_stream=1 to download all rows (ignoring max_returned_rows)
""".strip()),
ConfigOption("max_csv_mb", 100, """
Maximum size allowed for CSV export in MB. Set 0 to disable this limit.
""".strip()),
)
DEFAULT_CONFIG = {
option.name: option.default
Expand Down
19 changes: 19 additions & 0 deletions datasette/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -832,3 +832,22 @@ def value_as_boolean(value):

class ValueAsBooleanError(ValueError):
pass


class WriteLimitExceeded(Exception):
pass


class LimitedWriter:
def __init__(self, writer, limit_mb):
self.writer = writer
self.limit_bytes = limit_mb * 1024 * 1024
self.bytes_count = 0

def write(self, bytes):
self.bytes_count += len(bytes)
if self.limit_bytes and (self.bytes_count > self.limit_bytes):
raise WriteLimitExceeded("CSV contains more than {} bytes".format(
self.limit_bytes
))
self.writer.write(bytes)
67 changes: 50 additions & 17 deletions datasette/views/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
CustomJSONEncoder,
InterruptedError,
InvalidSql,
LimitedWriter,
path_from_row_pks,
path_with_added_args,
path_with_format,
Expand Down Expand Up @@ -150,13 +151,23 @@ async def get(self, request, db_name, **kwargs):
return await self.view_get(request, name, hash, **kwargs)

async def as_csv(self, request, name, hash, **kwargs):
stream = request.args.get("_stream")
if stream:
# Some quick sanity checks
if not self.ds.config["allow_csv_stream"]:
raise DatasetteError("CSV streaming is disabled", status=400)
if request.args.get("_next"):
raise DatasetteError(
"_next not allowed for CSV streaming", status=400
)
kwargs["_size"] = "max"
# Fetch the first page
try:
response_or_template_contexts = await self.data(
request, name, hash, **kwargs
)
if isinstance(response_or_template_contexts, response.HTTPResponse):
return response_or_template_contexts

else:
data, extra_template_data, templates = response_or_template_contexts
except (sqlite3.OperationalError, InvalidSql) as e:
Expand All @@ -167,6 +178,7 @@ async def as_csv(self, request, name, hash, **kwargs):

except DatasetteError:
raise

# Convert rows and columns to CSV
headings = data["columns"]
# if there are expanded_columns we need to add additional headings
Expand All @@ -179,22 +191,40 @@ async def as_csv(self, request, name, hash, **kwargs):
headings.append("{}_label".format(column))

async def stream_fn(r):
writer = csv.writer(r)
writer.writerow(headings)
for row in data["rows"]:
if not expanded_columns:
# Simple path
writer.writerow(row)
else:
# Look for {"value": "label": } dicts and expand
new_row = []
for cell in row:
if isinstance(cell, dict):
new_row.append(cell["value"])
new_row.append(cell["label"])
nonlocal data
writer = csv.writer(LimitedWriter(r, self.ds.config["max_csv_mb"]))
first = True
next = None
while first or (next and stream):
try:
if next:
kwargs["_next"] = next
if not first:
data, extra_template_data, templates = await self.data(
request, name, hash, **kwargs
)
if first:
writer.writerow(headings)
first = False
next = data.get("next")
for row in data["rows"]:
if not expanded_columns:
# Simple path
writer.writerow(row)
else:
new_row.append(cell)
writer.writerow(new_row)
# Look for {"value": "label": } dicts and expand
new_row = []
for cell in row:
if isinstance(cell, dict):
new_row.append(cell["value"])
new_row.append(cell["label"])
else:
new_row.append(cell)
writer.writerow(new_row)
except Exception as e:
print('caught this', e)
r.write(str(e))
return

content_type = "text/plain; charset=utf-8"
headers = {}
Expand Down Expand Up @@ -393,7 +423,8 @@ async def view_get(self, request, name, hash, **kwargs):
return r

async def custom_sql(
self, request, name, hash, sql, editable=True, canned_query=None
self, request, name, hash, sql, editable=True, canned_query=None,
_size=None
):
params = request.raw_args
if "sql" in params:
Expand All @@ -415,6 +446,8 @@ async def custom_sql(
extra_args = {}
if params.get("_timelimit"):
extra_args["custom_time_limit"] = int(params["_timelimit"])
if _size:
extra_args["page_size"] = _size
results = await self.ds.execute(
name, sql, params, truncate=True, **extra_args
)
Expand Down
4 changes: 2 additions & 2 deletions datasette/views/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@

class DatabaseView(BaseView):

async def data(self, request, name, hash, default_labels=False):
async def data(self, request, name, hash, default_labels=False, _size=None):
if request.args.get("sql"):
if not self.ds.config["allow_sql"]:
raise DatasetteError("sql= is not allowed", status=400)
sql = request.raw_args.pop("sql")
validate_sql_select(sql)
return await self.custom_sql(request, name, hash, sql)
return await self.custom_sql(request, name, hash, sql, _size=_size)

info = self.ds.inspect()[name]
metadata = self.ds.metadata.get("databases", {}).get(name, {})
Expand Down
10 changes: 7 additions & 3 deletions datasette/views/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ async def display_columns_and_rows(

class TableView(RowTableShared):

async def data(self, request, name, hash, table, default_labels=False):
async def data(self, request, name, hash, table, default_labels=False, _next=None, _size=None):
canned_query = self.ds.get_canned_query(name, table)
if canned_query is not None:
return await self.custom_sql(
Expand Down Expand Up @@ -375,7 +375,7 @@ async def data(self, request, name, hash, table, default_labels=False):

count_sql = "select count(*) {}".format(from_sql)

_next = special_args.get("_next")
_next = _next or special_args.get("_next")
offset = ""
if _next:
if is_view:
Expand Down Expand Up @@ -462,7 +462,7 @@ async def data(self, request, name, hash, table, default_labels=False):

extra_args = {}
# Handle ?_size=500
page_size = request.raw_args.get("_size")
page_size = _size or request.raw_args.get("_size")
if page_size:
if page_size == "max":
page_size = self.max_returned_rows
Expand Down Expand Up @@ -512,6 +512,8 @@ async def data(self, request, name, hash, table, default_labels=False):
facet_results = {}
facets_timed_out = []
for column in facets:
if _next:
continue
facet_sql = """
select {col} as value, count(*) as count
{from_sql} {and_or_where} {col} is not null
Expand Down Expand Up @@ -665,6 +667,8 @@ async def data(self, request, name, hash, table, default_labels=False):
for facet_column in columns:
if facet_column in facets:
continue
if _next:
continue
if not self.ds.config["suggest_facets"]:
continue
suggested_facet_sql = '''
Expand Down
21 changes: 21 additions & 0 deletions docs/config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,24 @@ Sets the amount of memory SQLite uses for its `per-connection cache <https://www
::

datasette mydatabase.db --config cache_size_kb:5000


allow_csv_stream
----------------

Enables the feature where an entire table (potentially hundreds of thousands of
rows) can be exported as a single CSV file. This is turned on by default - you
can turn it off like this::

::

datasette mydatabase.db --config allow_csv_stream:off


max_csv_mb
----------

The maximum size of CSV that can be exported, in megabytes. Defaults to 100MB.
You can disable the limit entirely by settings this to 0::

datasette mydatabase.db --config max_csv_mb:0
1 change: 1 addition & 0 deletions docs/metadata.rst
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ This will restrict sorting of ``example_table`` to just the ``height`` and
You can also disable sorting entirely by setting ``"sortable_columns": []``

.. _label_columns:

Specifying the label column for a table
---------------------------------------

Expand Down
7 changes: 7 additions & 0 deletions tests/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@ def app_client_larger_cache_size():
})


@pytest.fixture(scope='session')
def app_client_csv_max_mb_one():
yield from app_client(config={
'max_csv_mb': 1,
})


def generate_compound_rows(num):
for a, b, c in itertools.islice(
itertools.product(string.ascii_lowercase, repeat=3), num
Expand Down
2 changes: 2 additions & 0 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -901,6 +901,8 @@ def test_config_json(app_client):
"default_cache_ttl": 365 * 24 * 60 * 60,
"num_sql_threads": 3,
"cache_size_kb": 0,
"allow_csv_stream": True,
"max_csv_mb": 100,
} == response.json


Expand Down
27 changes: 26 additions & 1 deletion tests/test_csv.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .fixtures import app_client # noqa
from .fixtures import app_client, app_client_csv_max_mb_one # noqa

EXPECTED_TABLE_CSV = '''id,content
1,hello
Expand Down Expand Up @@ -59,3 +59,28 @@ def test_table_csv_download(app_client):
assert 'text/csv; charset=utf-8' == response.headers['Content-Type']
expected_disposition = 'attachment; filename="simple_primary_key.csv"'
assert expected_disposition == response.headers['Content-Disposition']


def test_max_csv_mb(app_client_csv_max_mb_one):
response = app_client_csv_max_mb_one.get(
"/fixtures.csv?sql=select+randomblob(10000)+"
"from+compound_three_primary_keys&_stream=1&_size=max"
)
# It's a 200 because we started streaming before we knew the error
assert response.status == 200
# Last line should be an error message
last_line = [line for line in response.body.split(b"\r\n") if line][-1]
assert last_line.startswith(b"CSV contains more than")


def test_table_csv_stream(app_client):
# Without _stream should return header + 100 rows:
response = app_client.get(
"/fixtures/compound_three_primary_keys.csv?_size=max"
)
assert 101 == len([b for b in response.body.split(b"\r\n") if b])
# With _stream=1 should return header + 1001 rows
response = app_client.get(
"/fixtures/compound_three_primary_keys.csv?_stream=1"
)
assert 1002 == len([b for b in response.body.split(b"\r\n") if b])

0 comments on commit fc3660c

Please sign in to comment.