Skip to content

Commit

Permalink
Merge branch 'feature-iterator-support' into HEAD
Browse files Browse the repository at this point in the history
  • Loading branch information
xzkostyan committed Aug 26, 2018
2 parents 64e526f + 4951d10 commit 0f0e98a
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 25 deletions.
65 changes: 44 additions & 21 deletions clickhouse_sqlalchemy/drivers/native/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def __init__(self, connection):
self._connection = connection
self._reset_state()
self._arraysize = 1
self._stream_results = False
super(Cursor, self).__init__()

@property
Expand Down Expand Up @@ -109,58 +110,75 @@ def make_external_tables(self, dialect, execution_options):

return tables

def _prepare(self, context):
execution_options = context.execution_options

external_tables = self.make_external_tables(
context.dialect, execution_options
)

transport = self._connection.transport
execute = transport.execute
execute_iter = getattr(transport, 'execute_iter', None)

self._stream_results = execution_options.get('stream_results', False)
settings = execution_options.get('settings')

if self._stream_results and execute_iter:
execute = execute_iter
settings = settings or {}
settings['max_block_size'] = execution_options['max_row_buffer']

return external_tables, execute, settings

def execute(self, operation, parameters=None, context=None):
self._reset_state()
self._begin_query()

settings = context.execution_options.get('settings')

transport = self._connection.transport
try:
external_tables = self.make_external_tables(
context.dialect, context.execution_options
)
response = transport.execute(
external_tables, execute, settings = self._prepare(context)

response = execute(
operation, params=parameters, with_column_types=True,
external_tables=external_tables, settings=settings
)

except DriverError as orig:
raise DatabaseException(orig)

self._process_response(response)
self._process_response(response, context)
self._end_query()

def executemany(self, operation, seq_of_parameters, context=None):
self._reset_state()
self._begin_query()

settings = context.execution_options.get('settings')

transport = self._connection.transport
try:
external_tables = self.make_external_tables(
context.dialect, context.execution_options
)
response = transport.execute(
external_tables, execute, settings = self._prepare(context)

response = execute(
operation, params=seq_of_parameters,
external_tables=external_tables, settings=settings
)

except DriverError as orig:
raise DatabaseException(orig)

self._process_response(response)
self._process_response(response, context)
self._end_query()

def fetchone(self):
if self._state == self._states.NONE:
raise RuntimeError("No query yet")

if not self._rows:
return None
if self._stream_results:
return next(self._rows, None)

return self._rows.pop(0)
else:
if not self._rows:
return None

return self._rows.pop(0)

def fetchmany(self, size=None):
if size is None:
Expand Down Expand Up @@ -210,12 +228,17 @@ def __next__(self):
def __iter__(self):
return self

def _process_response(self, response):
def _process_response(self, response, context):
if not response:
self._columns = self._types = self._rows = []
return

rows, columns_with_types = response
if self._stream_results:
columns_with_types = next(response)
rows = response

else:
rows, columns_with_types = response

if columns_with_types:
self._columns, self._types = zip(*columns_with_types)
Expand Down
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def read_version():
install_requires=[
'sqlalchemy',
'requests',
'clickhouse_driver>=0.0.8'
'clickhouse_driver>=0.0.14'
],

# Registering `clickhouse` as dialect.
Expand All @@ -97,7 +97,7 @@ def read_version():
test_suite='nose.collector',
tests_require=[
'nose',
'SQLAlchemy>=1.0',
'SQLAlchemy>=1.1.0',
'mock',
'requests',
'responses',
Expand Down
41 changes: 40 additions & 1 deletion tests/orm/test_select.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from sqlalchemy import Column, exc, func, literal
from sqlalchemy import text
from sqlalchemy import tuple_

from clickhouse_sqlalchemy import types, Table
from clickhouse_sqlalchemy.ext.clauses import Lambda
from tests.session import session
from tests.testcase import BaseTestCase
from tests.testcase import BaseTestCase, NativeSessionTestCase


class SelectTestCase(BaseTestCase):
Expand Down Expand Up @@ -140,3 +141,41 @@ def test_joins(self):
"SELECT x AS t0_x, x AS t1_x FROM t0 "
"GLOBAL ALL LEFT OUTER JOIN t1 USING x, y"
)


class YieldTest(NativeSessionTestCase):
def test_yield_per_and_execution_options(self):
numbers = Table(
'numbers', self.metadata(),
Column('number', types.UInt64, primary_key=True),
)

query = self.session.query(numbers.c.number).limit(100).yield_per(15)
query = query.execution_options(foo='bar')
self.assertIsNotNone(query._yield_per)
self.assertEqual(
query._execution_options,
{'stream_results': True, 'foo': 'bar', 'max_row_buffer': 15}
)

def test_basic(self):
numbers = Table(
'numbers', self.metadata(),
Column('number', types.UInt64, primary_key=True),
)

q = iter(
self.session.query(numbers.c.number)
.yield_per(1)
.from_statement(text('SELECT * FROM system.numbers LIMIT 3'))
)

ret = []
ret.append(next(q))
ret.append(next(q))
ret.append(next(q))
try:
next(q)
assert False
except StopIteration:
pass
4 changes: 3 additions & 1 deletion tests/testcase.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def compile(self, clause, **kwargs):
)


class TypesTestCase(BaseTestCase):
class NativeSessionTestCase(BaseTestCase):
session = native_session

@classmethod
Expand All @@ -54,6 +54,8 @@ def setUpClass(cls):

super(BaseTestCase, cls).setUpClass()


class TypesTestCase(NativeSessionTestCase):
@contextmanager
def create_table(self, table):
table.drop(bind=self.session.bind, if_exists=True)
Expand Down

0 comments on commit 0f0e98a

Please sign in to comment.