Skip to content

Commit

Permalink
Store progress and elapsed in last_query
Browse files Browse the repository at this point in the history
  • Loading branch information
xzkostyan committed Apr 13, 2019
1 parent 4e5a74e commit 350051b
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 15 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## [Unreleased]
### Added
- LowCardinality type.
- Access for read bytes, read rows and elapsed time of the last executed query.

## [0.0.19] - 2019-03-31
### Added
Expand Down
9 changes: 7 additions & 2 deletions clickhouse_driver/client.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from time import time
import types

from . import errors, defines
Expand Down Expand Up @@ -109,6 +110,7 @@ def receive_packet(self):
raise packet.exception

elif packet.type == ServerPacketTypes.PROGRESS:
self.last_query.store_progress(packet)
return packet

elif packet.type == ServerPacketTypes.END_OF_STREAM:
Expand Down Expand Up @@ -187,6 +189,7 @@ def execute(self, query, params=None, with_column_types=False,
and types.
"""

start_time = time()
self.make_query_settings(settings)
self.connection.force_connect()
self.last_query = QueryInfo()
Expand All @@ -197,17 +200,19 @@ def execute(self, query, params=None, with_column_types=False,
is_insert = isinstance(params, (list, tuple, types.GeneratorType))

if is_insert:
return self.process_insert_query(
rv = self.process_insert_query(
query, params, external_tables=external_tables,
query_id=query_id, types_check=types_check
)
else:
return self.process_ordinary_query(
rv = self.process_ordinary_query(
query, params=params, with_column_types=with_column_types,
external_tables=external_tables,
query_id=query_id, types_check=types_check,
columnar=columnar
)
self.last_query.store_elapsed(time() - start_time)
return rv

except Exception:
self.disconnect()
Expand Down
3 changes: 3 additions & 0 deletions clickhouse_driver/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ def __init__(self, name, version_major, version_minor, version_patch,

super(ServerInfo, self).__init__()

def version_tuple(self):
return self.version_major, self.version_minor, self.version_patch


class Connection(object):
"""
Expand Down
11 changes: 11 additions & 0 deletions clickhouse_driver/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,17 @@ def next(self):
class QueryInfo(object):
def __init__(self):
self.profile_info = None
self.progress = None
self.elapsed = None

def store_profile(self, packet):
self.profile_info = packet.profile_info

def store_progress(self, packet):
progress = packet.progress
if progress.bytes == 0 and self.progress and self.progress.bytes != 0:
return
self.progress = progress

def store_elapsed(self, elapsed):
self.elapsed = elapsed
38 changes: 33 additions & 5 deletions docs/features.rst
Original file line number Diff line number Diff line change
Expand Up @@ -161,16 +161,44 @@ You can turn it on by `types_check` option:
... )
Reading query profile info
Query execution statistics
--------------------------

Last query's profile info can be examined. `rows_before_limit` examine example:
Client stores statistics about last query execution. It can be obtained by
accessing `last_query` attribute.
Statistics is sent from ClickHouse server and calculated on client side.
`last_query` contains info about:

* profile: rows before limit

.. code-block:: python
>>> client.execute('SELECT arrayJoin(range(100)) LIMIT 3')
[(0,), (1,), (2,)]
>>> client.last_query.profile_info.rows_before_limit
100
* progress: read rows, bytes and total rows

.. code-block:: python
>>> client.execute('SELECT max(number) FROM numbers(10)')
[(9,)]
>>> client.last_query.progress.rows
10
>>> client.last_query.progress.bytes
80
>>> client.last_query.progress.total_rows
10
* elapsed time:

.. code-block:: python
>>> rows = client.execute('SELECT arrayJoin(range(100)) LIMIT 3')
>>> print(rows, client.last_query.profile_info.rows_before_limit)
([(0,), (1,), (2,)], 100)
>>> client.execute('SELECT sleep(1)')
[(0,)]
>>> client.last_query.elapsed
1.0060372352600098
Receiving server logs
Expand Down
3 changes: 1 addition & 2 deletions tests/columns/test_decimal.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ def client_kwargs(self, version):
return {'settings': {'allow_experimental_decimal_type': True}}

def cli_client_kwargs(self):
i = self.client.connection.server_info
current = (i.version_major, i.version_minor, i.version_patch)
current = self.client.connection.server_info.version_tuple()

if self.stable_support_version > current:
return {'allow_experimental_decimal_type': 1}
Expand Down
49 changes: 45 additions & 4 deletions tests/test_query_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ class QueryInfoTestCase(BaseTestCase):

@property
def sample_query(self):
return 'SELECT * FROM test ORDER BY foo DESC LIMIT 5'
return 'SELECT * FROM test GROUP BY foo ORDER BY foo DESC LIMIT 5'

@contextmanager
def sample_table(self):
Expand All @@ -21,7 +21,7 @@ def sample_table(self):
def test_default_value(self):
assert self.client.last_query is None

def test_store_profile_info_after_execute(self):
def test_store_last_query_after_execute(self):
with self.sample_table():
self.client.execute(self.sample_query)

Expand All @@ -30,7 +30,15 @@ def test_store_profile_info_after_execute(self):
assert last_query.profile_info is not None
assert last_query.profile_info.rows_before_limit == 42

def test_store_profile_info_after_execute_iter(self):
assert last_query.progress is not None
assert last_query.progress.rows == 42
assert last_query.progress.bytes == 42
assert last_query.progress.total_rows == 0

assert last_query.elapsed is not None
assert last_query.elapsed >= 0

def test_last_query_after_execute_iter(self):
with self.sample_table():
list(self.client.execute_iter(self.sample_query))

Expand All @@ -39,7 +47,14 @@ def test_store_profile_info_after_execute_iter(self):
assert last_query.profile_info is not None
assert last_query.profile_info.rows_before_limit == 42

def test_store_profile_info_after_execute_with_progress(self):
assert last_query.progress is not None
assert last_query.progress.rows == 42
assert last_query.progress.bytes == 42
assert last_query.progress.total_rows == 0

assert last_query.elapsed is None

def test_last_query_after_execute_with_progress(self):
with self.sample_table():
progress = self.client.execute_with_progress(self.sample_query)
list(progress)
Expand All @@ -50,6 +65,32 @@ def test_store_profile_info_after_execute_with_progress(self):
assert last_query.profile_info is not None
assert last_query.profile_info.rows_before_limit == 42

assert last_query.progress is not None
assert last_query.progress.rows == 42
assert last_query.progress.bytes == 42
assert last_query.progress.total_rows == 0

assert last_query.elapsed is None

def test_last_query_progress_total_rows(self):
self.client.execute('SELECT max(number) FROM numbers(10)')

last_query = self.client.last_query
assert last_query is not None
assert last_query.profile_info is not None
assert last_query.profile_info.rows_before_limit == 10

assert last_query.progress is not None
assert last_query.progress.rows == 10
assert last_query.progress.bytes == 80

current = self.client.connection.server_info.version_tuple()
total_rows = 10 if current > (19, 4) else 0
assert last_query.progress.total_rows == total_rows

assert last_query.elapsed is not None
assert last_query.elapsed >= 0

def test_override_after_subsequent_queries(self):
query = 'SELECT * FROM test WHERE foo < %(i)s ORDER BY foo LIMIT 5'
with self.sample_table():
Expand Down
3 changes: 1 addition & 2 deletions tests/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ def wrapper(*args, **kwargs):
self = args[0]
self.client.connection.connect()

i = self.client.connection.server_info
current = (i.version_major, i.version_minor, i.version_patch)
current = self.client.connection.server_info.version_tuple()

if version_required <= current:
return f(*args, **kwargs)
Expand Down

0 comments on commit 350051b

Please sign in to comment.