Skip to content

Commit

Permalink
Receive packet after each block insert, so profile events are not mis…
Browse files Browse the repository at this point in the history
…sed (#392)

* Receive packet after each block insert, so profile events are not missed
  • Loading branch information
insomnes committed Oct 19, 2023
1 parent 0ce990b commit db3dca2
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 2 deletions.
63 changes: 61 additions & 2 deletions clickhouse_driver/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ def process_insert_query(self, query_without_data, data,
if sample_block:
rv = self.send_data(sample_block, data,
types_check=types_check, columnar=columnar)
self.receive_end_of_query()
self.receive_end_of_insert_query()
return rv

def receive_sample_block(self):
Expand Down Expand Up @@ -651,8 +651,15 @@ def send_data(self, sample_block, data, types_check=False, columnar=False):
self.connection.send_data(block)
inserted_rows += block.num_rows

# Starting from the specific revision there are profile events
# sent by server in response to each inserted block
self.receive_profile_events()

# Empty block means end of data.
self.connection.send_data(block_cls())
# If enabled by revision profile events are also sent after empty block
self.receive_profile_events()

return inserted_rows

def receive_end_of_query(self):
Expand All @@ -679,7 +686,59 @@ def receive_end_of_query(self):

else:
message = self.connection.unexpected_packet_message(
'Exception, EndOfStream or Log', packet.type
'Exception, EndOfStream, Progress, TableColumns, '
'ProfileEvents or Log', packet.type
)
raise errors.UnexpectedPacketFromServerError(message)

def receive_end_of_insert_query(self):
while True:
packet = self.connection.receive_packet()

if packet.type == ServerPacketTypes.END_OF_STREAM:
break

elif packet.type == ServerPacketTypes.LOG:
log_block(packet.block)

elif packet.type == ServerPacketTypes.PROGRESS:
continue

elif packet.type == ServerPacketTypes.EXCEPTION:
raise packet.exception

else:
message = self.connection.unexpected_packet_message(
'EndOfStream, Log, Progress or Exception', packet.type
)
raise errors.UnexpectedPacketFromServerError(message)

def receive_profile_events(self):
revision = self.connection.server_info.used_revision
if (
revision <
defines.DBMS_MIN_PROTOCOL_VERSION_WITH_PROFILE_EVENTS_IN_INSERT
):
return None

while True:
packet = self.connection.receive_packet()

if packet.type == ServerPacketTypes.PROFILE_EVENTS:
break

elif packet.type == ServerPacketTypes.PROGRESS:
self.last_query.store_progress(packet.progress)

elif packet.type == ServerPacketTypes.LOG:
log_block(packet.block)

elif packet.type == ServerPacketTypes.EXCEPTION:
raise packet.exception

else:
message = self.connection.unexpected_packet_message(
'ProfileEvents, Progress, Log or Exception', packet.type
)
raise errors.UnexpectedPacketFromServerError(message)

Expand Down
1 change: 1 addition & 0 deletions clickhouse_driver/defines.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
DBMS_MIN_PROTOCOL_VERSION_WITH_INCREMENTAL_PROFILE_EVENTS = 54451
DBMS_MIN_REVISION_WITH_PARALLEL_REPLICAS = 54453
DBMS_MIN_REVISION_WITH_CUSTOM_SERIALIZATION = 54454
DBMS_MIN_PROTOCOL_VERSION_WITH_PROFILE_EVENTS_IN_INSERT = 54456
DBMS_MIN_PROTOCOL_VERSION_WITH_ADDENDUM = 54458
DBMS_MIN_PROTOCOL_VERSION_WITH_QUOTA_KEY = 54458
DBMS_MIN_PROTOCOL_VERSION_WITH_PARAMETERS = 54459
Expand Down
7 changes: 7 additions & 0 deletions tests/test_insert.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,13 @@ def test_insert_from_input(self):
inserted = self.emit_cli(query)
self.assertEqual(inserted, '1\n')

def test_profile_events(self):
with self.create_table('x Int32'):
data = [{'x': 1}]
self.client.execute(
'INSERT INTO test (x) VALUES', data
)


class InsertColumnarTestCase(BaseTestCase):
def test_insert_tuple_ok(self):
Expand Down

0 comments on commit db3dca2

Please sign in to comment.