Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming mode for downloading all rows as a CSV #315

Merged
merged 5 commits into from
Jun 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ script:
jobs:
include:
- stage: deploy latest.datasette.io
if: branch = master AND type = push
script:
- pip install .
- npm install -g now
Expand All @@ -23,7 +24,6 @@ jobs:
- now alias --token=$NOW_TOKEN
- echo "{\"name\":\"datasette-latest-$ALIAS\",\"alias\":\"$ALIAS.datasette.io\"}" > now.json
- now alias --token=$NOW_TOKEN
on: master
- stage: release tagged version
if: tag IS present
python: 3.6
Expand Down
6 changes: 6 additions & 0 deletions datasette/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@
ConfigOption("cache_size_kb", 0, """
SQLite cache size in KB (0 == use SQLite default)
""".strip()),
ConfigOption("allow_csv_stream", True, """
Allow .csv?_stream=1 to download all rows (ignoring max_returned_rows)
""".strip()),
ConfigOption("max_csv_mb", 100, """
Maximum size allowed for CSV export in MB. Set 0 to disable this limit.
""".strip()),
)
DEFAULT_CONFIG = {
option.name: option.default
Expand Down
19 changes: 19 additions & 0 deletions datasette/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -832,3 +832,22 @@ def value_as_boolean(value):

class ValueAsBooleanError(ValueError):
pass


class WriteLimitExceeded(Exception):
pass


class LimitedWriter:
def __init__(self, writer, limit_mb):
self.writer = writer
self.limit_bytes = limit_mb * 1024 * 1024
self.bytes_count = 0

def write(self, bytes):
self.bytes_count += len(bytes)
if self.limit_bytes and (self.bytes_count > self.limit_bytes):
raise WriteLimitExceeded("CSV contains more than {} bytes".format(
self.limit_bytes
))
self.writer.write(bytes)
67 changes: 50 additions & 17 deletions datasette/views/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
CustomJSONEncoder,
InterruptedError,
InvalidSql,
LimitedWriter,
path_from_row_pks,
path_with_added_args,
path_with_format,
Expand Down Expand Up @@ -150,13 +151,23 @@ async def get(self, request, db_name, **kwargs):
return await self.view_get(request, name, hash, **kwargs)

async def as_csv(self, request, name, hash, **kwargs):
stream = request.args.get("_stream")
if stream:
# Some quick sanity checks
if not self.ds.config["allow_csv_stream"]:
raise DatasetteError("CSV streaming is disabled", status=400)
if request.args.get("_next"):
raise DatasetteError(
"_next not allowed for CSV streaming", status=400
)
kwargs["_size"] = "max"
# Fetch the first page
try:
response_or_template_contexts = await self.data(
request, name, hash, **kwargs
)
if isinstance(response_or_template_contexts, response.HTTPResponse):
return response_or_template_contexts

else:
data, extra_template_data, templates = response_or_template_contexts
except (sqlite3.OperationalError, InvalidSql) as e:
Expand All @@ -167,6 +178,7 @@ async def as_csv(self, request, name, hash, **kwargs):

except DatasetteError:
raise

# Convert rows and columns to CSV
headings = data["columns"]
# if there are expanded_columns we need to add additional headings
Expand All @@ -179,22 +191,40 @@ async def as_csv(self, request, name, hash, **kwargs):
headings.append("{}_label".format(column))

async def stream_fn(r):
writer = csv.writer(r)
writer.writerow(headings)
for row in data["rows"]:
if not expanded_columns:
# Simple path
writer.writerow(row)
else:
# Look for {"value": "label": } dicts and expand
new_row = []
for cell in row:
if isinstance(cell, dict):
new_row.append(cell["value"])
new_row.append(cell["label"])
nonlocal data
writer = csv.writer(LimitedWriter(r, self.ds.config["max_csv_mb"]))
first = True
next = None
while first or (next and stream):
try:
if next:
kwargs["_next"] = next
if not first:
data, extra_template_data, templates = await self.data(
request, name, hash, **kwargs
)
if first:
writer.writerow(headings)
first = False
next = data.get("next")
for row in data["rows"]:
if not expanded_columns:
# Simple path
writer.writerow(row)
else:
new_row.append(cell)
writer.writerow(new_row)
# Look for {"value": "label": } dicts and expand
new_row = []
for cell in row:
if isinstance(cell, dict):
new_row.append(cell["value"])
new_row.append(cell["label"])
else:
new_row.append(cell)
writer.writerow(new_row)
except Exception as e:
print('caught this', e)
r.write(str(e))
return

content_type = "text/plain; charset=utf-8"
headers = {}
Expand Down Expand Up @@ -393,7 +423,8 @@ async def view_get(self, request, name, hash, **kwargs):
return r

async def custom_sql(
self, request, name, hash, sql, editable=True, canned_query=None
self, request, name, hash, sql, editable=True, canned_query=None,
_size=None
):
params = request.raw_args
if "sql" in params:
Expand All @@ -415,6 +446,8 @@ async def custom_sql(
extra_args = {}
if params.get("_timelimit"):
extra_args["custom_time_limit"] = int(params["_timelimit"])
if _size:
extra_args["page_size"] = _size
results = await self.ds.execute(
name, sql, params, truncate=True, **extra_args
)
Expand Down
4 changes: 2 additions & 2 deletions datasette/views/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@

class DatabaseView(BaseView):

async def data(self, request, name, hash, default_labels=False):
async def data(self, request, name, hash, default_labels=False, _size=None):
if request.args.get("sql"):
if not self.ds.config["allow_sql"]:
raise DatasetteError("sql= is not allowed", status=400)
sql = request.raw_args.pop("sql")
validate_sql_select(sql)
return await self.custom_sql(request, name, hash, sql)
return await self.custom_sql(request, name, hash, sql, _size=_size)

info = self.ds.inspect()[name]
metadata = self.ds.metadata.get("databases", {}).get(name, {})
Expand Down
10 changes: 7 additions & 3 deletions datasette/views/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ async def display_columns_and_rows(

class TableView(RowTableShared):

async def data(self, request, name, hash, table, default_labels=False):
async def data(self, request, name, hash, table, default_labels=False, _next=None, _size=None):
canned_query = self.ds.get_canned_query(name, table)
if canned_query is not None:
return await self.custom_sql(
Expand Down Expand Up @@ -375,7 +375,7 @@ async def data(self, request, name, hash, table, default_labels=False):

count_sql = "select count(*) {}".format(from_sql)

_next = special_args.get("_next")
_next = _next or special_args.get("_next")
offset = ""
if _next:
if is_view:
Expand Down Expand Up @@ -462,7 +462,7 @@ async def data(self, request, name, hash, table, default_labels=False):

extra_args = {}
# Handle ?_size=500
page_size = request.raw_args.get("_size")
page_size = _size or request.raw_args.get("_size")
if page_size:
if page_size == "max":
page_size = self.max_returned_rows
Expand Down Expand Up @@ -512,6 +512,8 @@ async def data(self, request, name, hash, table, default_labels=False):
facet_results = {}
facets_timed_out = []
for column in facets:
if _next:
continue
facet_sql = """
select {col} as value, count(*) as count
{from_sql} {and_or_where} {col} is not null
Expand Down Expand Up @@ -665,6 +667,8 @@ async def data(self, request, name, hash, table, default_labels=False):
for facet_column in columns:
if facet_column in facets:
continue
if _next:
continue
if not self.ds.config["suggest_facets"]:
continue
suggested_facet_sql = '''
Expand Down
21 changes: 21 additions & 0 deletions docs/config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,24 @@ Sets the amount of memory SQLite uses for its `per-connection cache <https://www
::

datasette mydatabase.db --config cache_size_kb:5000


allow_csv_stream
----------------

Enables the feature where an entire table (potentially hundreds of thousands of
rows) can be exported as a single CSV file. This is turned on by default - you
can turn it off like this::

::

datasette mydatabase.db --config allow_csv_stream:off


max_csv_mb
----------

The maximum size of CSV that can be exported, in megabytes. Defaults to 100MB.
You can disable the limit entirely by settings this to 0::

datasette mydatabase.db --config max_csv_mb:0
1 change: 1 addition & 0 deletions docs/metadata.rst
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ This will restrict sorting of ``example_table`` to just the ``height`` and
You can also disable sorting entirely by setting ``"sortable_columns": []``

.. _label_columns:

Specifying the label column for a table
---------------------------------------

Expand Down
7 changes: 7 additions & 0 deletions tests/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@ def app_client_larger_cache_size():
})


@pytest.fixture(scope='session')
def app_client_csv_max_mb_one():
yield from app_client(config={
'max_csv_mb': 1,
})


def generate_compound_rows(num):
for a, b, c in itertools.islice(
itertools.product(string.ascii_lowercase, repeat=3), num
Expand Down
2 changes: 2 additions & 0 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -901,6 +901,8 @@ def test_config_json(app_client):
"default_cache_ttl": 365 * 24 * 60 * 60,
"num_sql_threads": 3,
"cache_size_kb": 0,
"allow_csv_stream": True,
"max_csv_mb": 100,
} == response.json


Expand Down
27 changes: 26 additions & 1 deletion tests/test_csv.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .fixtures import app_client # noqa
from .fixtures import app_client, app_client_csv_max_mb_one # noqa

EXPECTED_TABLE_CSV = '''id,content
1,hello
Expand Down Expand Up @@ -59,3 +59,28 @@ def test_table_csv_download(app_client):
assert 'text/csv; charset=utf-8' == response.headers['Content-Type']
expected_disposition = 'attachment; filename="simple_primary_key.csv"'
assert expected_disposition == response.headers['Content-Disposition']


def test_max_csv_mb(app_client_csv_max_mb_one):
response = app_client_csv_max_mb_one.get(
"/fixtures.csv?sql=select+randomblob(10000)+"
"from+compound_three_primary_keys&_stream=1&_size=max"
)
# It's a 200 because we started streaming before we knew the error
assert response.status == 200
# Last line should be an error message
last_line = [line for line in response.body.split(b"\r\n") if line][-1]
assert last_line.startswith(b"CSV contains more than")


def test_table_csv_stream(app_client):
# Without _stream should return header + 100 rows:
response = app_client.get(
"/fixtures/compound_three_primary_keys.csv?_size=max"
)
assert 101 == len([b for b in response.body.split(b"\r\n") if b])
# With _stream=1 should return header + 1001 rows
response = app_client.get(
"/fixtures/compound_three_primary_keys.csv?_stream=1"
)
assert 1002 == len([b for b in response.body.split(b"\r\n") if b])