Skip to content
This repository has been archived by the owner on Jan 28, 2021. It is now read-only.

Commit

Permalink
use a named cursor so the pg backend doesn't load the full dataset in…
Browse files Browse the repository at this point in the history
…to a local cursor
  • Loading branch information
vegitron committed Feb 27, 2017
1 parent c8a45fe commit ccd07be
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 8 deletions.
15 changes: 15 additions & 0 deletions sqlshare_rest/backend/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,21 @@ def get_query_plan(self, sql, user):
"""
return ""

def run_named_cursor_query(self, *args, **kwargs):
"""
For the postgres backend to run effectively, it needs to create a
named cursor. Most of the backends don't need that, so just pass
values to run_query.
"""
return self.run_query(*args, **kwargs)

def finish_named_cursor(self, user, cursor):
"""
A chance to close transactions/re-enable autocommit/do any other
cleanup work
"""
pass

def run_query(self, sql, user, params=None, return_cursor=False,
query=None):
self._not_implemented("run_query")
Expand Down
33 changes: 32 additions & 1 deletion sqlshare_rest/backend/pg.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from django.db import connection
from django.conf import settings
from logging import getLogger
import time
import random
import re
import tempfile

Expand Down Expand Up @@ -54,9 +56,36 @@ def remove_schema(self, schema):
cursor.execute(sql)
cursor.close()

def run_named_cursor_query(self, sql, user, params=None,
return_cursor=False, query=None):
connection = self.get_connection_for_user(user)
connection.set_session(autocommit=False)

if query:
query.backend_terminate_data = "%s" % connection.get_backend_pid()
query.save()

cursor = connection.cursor('cursor-%s-%s' % (time.time(),
random.random()))
cursor.execute(sql.encode('utf-8'), params)

if return_cursor:
return cursor
data = cursor.fetchall()
self.finish_named_cursor(user, cursor)
return data

def finish_named_cursor(self, user, cursor):
cursor.close()
connection = self.get_connection_for_user(user)
connection.commit()
connection.set_session(autocommit=True)
self.close_user_connection(user)

def run_query(self, sql, user, params=None, return_cursor=False,
query=None):
connection = self.get_connection_for_user(user)
connection.set_session(autocommit=False)

if query:
query.backend_terminate_data = "%s" % connection.get_backend_pid()
Expand All @@ -67,7 +96,9 @@ def run_query(self, sql, user, params=None, return_cursor=False,

if return_cursor:
return cursor
return cursor.fetchall()
data = cursor.fetchall()
connection.commit()
return data

def _create_view_sql(self, schema, name, sql):
return 'CREATE VIEW %s."%s" AS %s' % (schema, name, sql)
Expand Down
18 changes: 11 additions & 7 deletions sqlshare_rest/views/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def response_for_query(sql, user, download_name):
backend = get_backend()
if sql == "":
raise Exception("Missing sql statement")
cursor = backend.run_query(sql, user, return_cursor=True)
cursor = backend.run_named_cursor_query(sql, user, return_cursor=True)
disposition = 'attachment; filename="%s"' % download_name
response = StreamingHttpResponse(stream_query(cursor, user),
content_type='text/csv')
Expand All @@ -52,6 +52,9 @@ def response_for_query(sql, user, download_name):


def stream_query(cursor, user):
rows = cursor.fetchmany(100)

# Need to fetch columns after fetching a bit of data :(
columns = frontend_description_from_cursor(cursor)

names = ",".join(list(map(lambda x: csv_encode(x["name"]), columns)))
Expand All @@ -60,13 +63,14 @@ def stream_query(cursor, user):
yield names
yield "\n"

row = cursor.fetchone()
while row:
yield ",".join(list(map(lambda x: csv_encode("%s" % x), row)))
yield "\n"
row = cursor.fetchone()
while rows:
for row in rows:
yield ",".join(list(map(lambda x: csv_encode("%s" % x), row)))
yield "\n"
rows = cursor.fetchmany(100)

get_backend().close_user_connection(user)
backend = get_backend()
backend.finish_named_cursor(user, cursor)


def csv_encode(value):
Expand Down

0 comments on commit ccd07be

Please sign in to comment.