Skip to content

Commit

Permalink
Use NumPy only for data blocks reading
Browse files Browse the repository at this point in the history
  • Loading branch information
xzkostyan committed Jun 2, 2022
1 parent ae8aa4a commit 26a4f11
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 11 deletions.
8 changes: 4 additions & 4 deletions clickhouse_driver/columns/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,11 @@ def create_column_with_options(x):
raise errors.UnknownTypeError('Unknown type {}'.format(spec))


def read_column(context, column_spec, n_items, buf):
def read_column(context, column_spec, n_items, buf, use_numpy=None):
column_options = {'context': context}
column = get_column_by_spec(column_spec, column_options)
column.read_state_prefix(buf)
return column.read_data(n_items, buf)
col = get_column_by_spec(column_spec, column_options, use_numpy=use_numpy)
col.read_state_prefix(buf)
return col.read_data(n_items, buf)


def write_column(context, column_name, column_spec, items, buf,
Expand Down
12 changes: 7 additions & 5 deletions clickhouse_driver/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ def receive_packet(self):
packet.type = packet_type = read_varint(self.fin)

if packet_type == ServerPacketTypes.DATA:
packet.block = self.receive_data()
packet.block = self.receive_data(may_be_use_numpy=True)

elif packet_type == ServerPacketTypes.EXCEPTION:
packet.exception = self.receive_exception()
Expand All @@ -500,7 +500,7 @@ def receive_packet(self):
packet.block = self.receive_data()

elif packet_type == ServerPacketTypes.LOG:
packet.block = self.receive_data(raw=True)
packet.block = self.receive_data(may_be_compressed=False)
log_block(packet.block)

elif packet_type == ServerPacketTypes.END_OF_STREAM:
Expand All @@ -519,7 +519,7 @@ def receive_packet(self):
packet.block = self.receive_data()

elif packet_type == ServerPacketTypes.PROFILE_EVENTS:
packet.block = self.receive_data(raw=True)
packet.block = self.receive_data(may_be_compressed=False)

else:
message = 'Unknown packet {} from server {}'.format(
Expand Down Expand Up @@ -549,13 +549,15 @@ def get_block_out_stream(self):
else:
return BlockOutputStream(self.fout, self.context)

def receive_data(self, raw=False):
def receive_data(self, may_be_compressed=True, may_be_use_numpy=False):
revision = self.server_info.revision

if revision >= defines.DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES:
read_binary_str(self.fin)

return (self.block_in_raw if raw else self.block_in).read()
reader = self.block_in if may_be_compressed else self.block_in_raw
use_numpy = False if not may_be_use_numpy else None
return reader.read(use_numpy=use_numpy)

def receive_exception(self):
return read_exception(self.fin)
Expand Down
4 changes: 2 additions & 2 deletions clickhouse_driver/streams/native.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def __init__(self, fin, context):

super(BlockInputStream, self).__init__()

def read(self):
def read(self, use_numpy=None):
info = BlockInfo()

revision = self.context.server_info.revision
Expand All @@ -72,7 +72,7 @@ def read(self):

if n_rows:
column = read_column(self.context, column_type, n_rows,
self.fin)
self.fin, use_numpy=use_numpy)
data.append(column)

if self.context.client_settings['use_numpy']:
Expand Down

0 comments on commit 26a4f11

Please sign in to comment.