Skip to content

Commit

Permalink
insert columnar data (#122)
Browse files Browse the repository at this point in the history
* insert columnar data
  • Loading branch information
Alexander Volkovsky authored and xzkostyan committed Dec 31, 2019
1 parent 34f505c commit 5c70e41
Show file tree
Hide file tree
Showing 9 changed files with 250 additions and 77 deletions.
166 changes: 115 additions & 51 deletions clickhouse_driver/block.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from itertools import chain

from .reader import read_varint, read_binary_uint8, read_binary_int32
from .varint import write_varint
from .writer import write_binary_uint8, write_binary_int32
Expand Down Expand Up @@ -32,47 +30,137 @@ def read(self, buf):
self.bucket_num = read_binary_int32(buf)


class Block(object):
class BaseBlock(object):
def __init__(self, columns_with_types=None, data=None,
info=None, types_check=False):
self.columns_with_types = columns_with_types or []
self.types_check = types_check
self.info = info or BlockInfo()
self.data = self.normalize(data or [])

super(BaseBlock, self).__init__()

def normalize(self, data):
return data

@property
def num_columns(self):
raise NotImplementedError

@property
def num_rows(self):
raise NotImplementedError

def get_columns(self):
raise NotImplementedError

def get_rows(self):
raise NotImplementedError

def get_column_by_index(self, index):
raise NotImplementedError

def transposed(self):
return list(map(tuple, zip(*self.data)))


class ColumnOrientedBlock(BaseBlock):
def normalize(self, data):
if not data:
return []

self._check_number_of_columns(data)
self._check_all_columns_equal_length(data)
return data

@property
def num_columns(self):
return len(self.data)

@property
def num_rows(self):
return len(self.data[0]) if self.num_columns else 0

def get_columns(self):
return self.data

def get_rows(self):
return self.transposed()

def get_column_by_index(self, index):
return self.data[index]

def _check_number_of_columns(self, data):
expected_row_len = len(self.columns_with_types)

got = len(data)
if expected_row_len != got:
msg = 'Expected {} columns, got {}'.format(expected_row_len, got)
raise ValueError(msg)

def _check_all_columns_equal_length(self, data):
expected = len(data[0])

for column in data:
got = len(column)
if got != expected:
msg = 'Expected {} rows, got {}'.format(expected, got)
raise ValueError(msg)


class RowOrientedBlock(BaseBlock):
dict_row_types = (dict, )
tuple_row_types = (list, tuple)
supported_row_types = dict_row_types + tuple_row_types

def __init__(self, columns_with_types=None, data=None, info=None,
types_check=False, received_from_server=False):
self.columns_with_types = columns_with_types or []
self.data = data or []
self.types_check = types_check
def normalize(self, data):
if not data:
return []

if data and not received_from_server:
# Guessing about whole data format by first row.
first_row = data[0]
# Guessing about whole data format by first row.
first_row = data[0]

if self.types_check:
self.check_row_type(first_row)
if self.types_check:
self._check_row_type(first_row)

if isinstance(first_row, dict):
self.dicts_to_rows(data)
else:
self.check_rows(data)
if isinstance(first_row, dict):
self._mutate_dicts_to_rows(data)
else:
self._check_rows(data)

self.info = info or BlockInfo()
return data

@property
def num_columns(self):
return len(self.data[0]) if self.num_rows else 0

@property
def num_rows(self):
return len(self.data)

super(Block, self).__init__()
def get_columns(self):
return self.transposed()

def dicts_to_rows(self, data):
def get_rows(self):
return self.data

def get_column_by_index(self, index):
return [row[index] for row in self.data]

def _mutate_dicts_to_rows(self, data):
column_names = [x[0] for x in self.columns_with_types]

check_row_type = False
if self.types_check:
check_row_type = self.check_dict_row_type
check_row_type = self._check_dict_row_type

for i, row in enumerate(data):
if check_row_type:
check_row_type(row)

self.data[i] = [row[name] for name in column_names]
data[i] = [row[name] for name in column_names]

def check_rows(self, data):
def _check_rows(self, data):
expected_row_len = len(self.columns_with_types)

got = len(data[0])
Expand All @@ -81,51 +169,27 @@ def check_rows(self, data):
raise ValueError(msg)

if self.types_check:
check_row_type = self.check_tuple_row_type
check_row_type = self._check_tuple_row_type
for row in data:
check_row_type(row)

def get_columns(self):
return self.data

def get_rows(self):
if not self.data:
return self.data

# Transpose results: columns -> rows.
n_rows = self.rows

flat_data = tuple(chain.from_iterable(self.data))

# Make rows from slices.
# Pick every `n_rows` element from chained columns.
return [flat_data[i::n_rows] for i in range(n_rows)]

def check_row_type(self, row):
def _check_row_type(self, row):
if not isinstance(row, self.supported_row_types):
raise TypeError(
'Unsupported row type: {}. dict, list or tuple is expected.'
.format(type(row))
)

def check_tuple_row_type(self, row):
def _check_tuple_row_type(self, row):
if not isinstance(row, self.tuple_row_types):
raise TypeError(
'Unsupported row type: {}. list or tuple is expected.'
.format(type(row))
)

def check_dict_row_type(self, row):
def _check_dict_row_type(self, row):
if not isinstance(row, self.dict_row_types):
raise TypeError(
'Unsupported row type: {}. dict is expected.'
.format(type(row))
)

@property
def columns(self):
return len(self.data)

@property
def rows(self):
return len(self.data[0]) if self.columns else 0
32 changes: 19 additions & 13 deletions clickhouse_driver/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@
import types

from . import errors, defines
from .block import Block
from .block import ColumnOrientedBlock, RowOrientedBlock
from .connection import Connection
from .protocol import ServerPacketTypes
from .result import (
IterQueryResult, ProgressQueryResult, QueryResult, QueryInfo
)
from .util.compat import urlparse, parse_qs, asbool
from .util.escape import escape_params
from .util.helpers import chunks
from .util.helpers import column_chunks, chunks


class Client(object):
Expand Down Expand Up @@ -177,8 +177,9 @@ def execute(self, query, params=None, with_column_types=False,
Defaults to ``None`` (no additional settings).
:param types_check: enables type checking of data for INSERT queries.
Causes additional overhead. Defaults to ``False``.
:param columnar: if specified the result will be returned in
column-oriented form.
:param columnar: if specified the result of the SELECT query will be
returned in column-oriented form.
It also allows to INSERT data in columnar form.
Defaults to ``False`` (row-like form).
:return: * Number of inserted rows for INSERT queries with data.
Expand Down Expand Up @@ -206,7 +207,8 @@ def execute(self, query, params=None, with_column_types=False,
if is_insert:
rv = self.process_insert_query(
query, params, external_tables=external_tables,
query_id=query_id, types_check=types_check
query_id=query_id, types_check=types_check,
columnar=columnar
)
else:
rv = self.process_ordinary_query(
Expand Down Expand Up @@ -353,14 +355,15 @@ def iter_process_ordinary_query(

def process_insert_query(self, query_without_data, data,
external_tables=None, query_id=None,
types_check=False):
types_check=False, columnar=False):
self.connection.send_query(query_without_data, query_id=query_id)
self.connection.send_external_tables(external_tables,
types_check=types_check)

sample_block = self.receive_sample_block()
if sample_block:
rv = self.send_data(sample_block, data, types_check=types_check)
rv = self.send_data(sample_block, data,
types_check=types_check, columnar=columnar)
packet = self.connection.receive_packet()
if packet.exception:
raise packet.exception
Expand All @@ -385,18 +388,21 @@ def receive_sample_block(self):
)
raise errors.UnexpectedPacketFromServerError(message)

def send_data(self, sample_block, data, types_check=False):
def send_data(self, sample_block, data, types_check=False, columnar=False):
inserted_rows = 0

client_settings = self.connection.context.client_settings
for chunk in chunks(data, client_settings['insert_block_size']):
block = Block(sample_block.columns_with_types, chunk,
types_check=types_check)
block_cls = ColumnOrientedBlock if columnar else RowOrientedBlock
slicer = column_chunks if columnar else chunks

for chunk in slicer(data, client_settings['insert_block_size']):
block = block_cls(sample_block.columns_with_types, chunk,
types_check=types_check)
self.connection.send_data(block)
inserted_rows += len(chunk)
inserted_rows += block.num_rows

# Empty block means end of data.
self.connection.send_data(Block())
self.connection.send_data(block_cls())
return inserted_rows

def cancel(self, with_column_types=False):
Expand Down
8 changes: 4 additions & 4 deletions clickhouse_driver/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from . import defines
from . import errors
from .block import Block
from .block import RowOrientedBlock
from .blockstreamprofileinfo import BlockStreamProfileInfo
from .bufferedreader import BufferedSocketReader
from .bufferedwriter import BufferedSocketWriter
Expand Down Expand Up @@ -515,12 +515,12 @@ def send_cancel(self):

def send_external_tables(self, tables, types_check=False):
for table in tables or []:
block = Block(table['structure'], table['data'],
types_check=types_check)
block = RowOrientedBlock(table['structure'], table['data'],
types_check=types_check)
self.send_data(block, table_name=table['name'])

# Empty block, end of data transfer.
self.send_data(Block())
self.send_data(RowOrientedBlock())

@contextmanager
def timeout_setter(self, new_timeout):
Expand Down
2 changes: 1 addition & 1 deletion clickhouse_driver/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def store(self, packet):
return

# Header block contains no rows. Pick columns from it.
if block.rows:
if block.num_rows:
if self.columnar:
columns = block.get_columns()
if self.data:
Expand Down
11 changes: 5 additions & 6 deletions clickhouse_driver/streams/native.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from ..block import Block, BlockInfo
from ..block import ColumnOrientedBlock, BlockInfo
from ..columns.service import read_column, write_column
from ..reader import read_binary_str
from ..varint import write_varint, read_varint
Expand All @@ -19,8 +19,8 @@ def write(self, block):
block.info.write(self.fout)

# We write transposed data.
n_columns = block.rows
n_rows = block.columns
n_columns = block.num_columns
n_rows = block.num_rows

write_varint(n_columns, self.fout)
write_varint(n_rows, self.fout)
Expand All @@ -31,7 +31,7 @@ def write(self, block):

if n_columns:
try:
items = [row[i] for row in block.data]
items = block.get_column_by_index(i)
except IndexError:
raise ValueError('Different rows length')

Expand Down Expand Up @@ -75,11 +75,10 @@ def read(self):
self.fin)
data.append(column)

block = Block(
block = ColumnOrientedBlock(
columns_with_types=list(zip(names, types)),
data=data,
info=info,
received_from_server=True
)

return block

0 comments on commit 5c70e41

Please sign in to comment.