Skip to content

Commit

Permalink
Add LowCardinality type support
Browse files Browse the repository at this point in the history
  • Loading branch information
xzkostyan committed Apr 13, 2019
1 parent eafbab3 commit 8a69cbf
Show file tree
Hide file tree
Showing 8 changed files with 309 additions and 0 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Changelog

## [Unreleased]
### Added
- LowCardinality type.

## [0.0.19] - 2019-03-31
### Added
Expand Down
1 change: 1 addition & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ Features
* UUID
* Decimal
* IPv4/IPv6
* LowCardinality(T)

- Query progress information.

Expand Down
6 changes: 6 additions & 0 deletions clickhouse_driver/columns/arraycolumn.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ def _write(self, value, buf):
self._write_nulls_data(value, buf)
self._write_data(value, buf)

def read_state_prefix(self, buf):
return self.nested_column.read_state_prefix(buf)

def write_state_prefix(self, buf):
self.nested_column.write_state_prefix(buf)

def _read(self, size, buf):
q = Queue()
q.put((self, size, 0))
Expand Down
6 changes: 6 additions & 0 deletions clickhouse_driver/columns/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,12 @@ def _read_data(self, n_items, buf, nulls_map=None):
def read_items(self, n_items, buf):
raise NotImplementedError

def read_state_prefix(self, buf):
pass

def write_state_prefix(self, buf):
pass


class FormatColumn(Column):
"""
Expand Down
123 changes: 123 additions & 0 deletions clickhouse_driver/columns/lowcardinalitycolumn.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
from math import log

from ..reader import read_binary_uint64
from ..writer import write_binary_int64
from .base import Column
from .intcolumn import UInt8Column, UInt16Column, UInt32Column, UInt64Column


def create_low_cardinality_column(spec, column_by_spec_getter):
inner = spec[15:-1]
nested = column_by_spec_getter(inner)
return LowCardinalityColumn(nested)


class LowCardinalityColumn(Column):
"""
Stores column as index (unique elements) and keys.
Good for de-duplication of large values with low cardinality.
"""
int_types = {
0: UInt8Column,
1: UInt16Column,
2: UInt32Column,
3: UInt64Column
}

# Need to read additional keys.
# Additional keys are stored before indexes as value N and N keys
# after them.
has_additional_keys_bit = 1 << 9
# Need to update dictionary.
# It means that previous granule has different dictionary.
need_update_dictionary = 1 << 10

serialization_type = has_additional_keys_bit | need_update_dictionary

def __init__(self, nested_column, **kwargs):
self.nested_column = nested_column
super(LowCardinalityColumn, self).__init__(**kwargs)

def read_state_prefix(self, buf):
return read_binary_uint64(buf)

def write_state_prefix(self, buf):
# KeysSerializationVersion. See ClickHouse docs.
write_binary_int64(1, buf)

def _write_data(self, items, buf):
index, keys = [], []
key_by_index_element = {}

if self.nested_column.nullable:
# First element represents NULL if column is nullable.
index.append(0)
# Prevent null map writing. Reset nested column nullable flag.
self.nested_column.nullable = False

for x in items:
if x is None:
# Zero element for null.
keys.append(0)

else:
key = key_by_index_element.get(x)
# Get key from index or add it to index.
if key is None:
key = len(key_by_index_element)
key_by_index_element[x] = key
index.append(x)

keys.append(key + 1)
else:
for x in items:
key = key_by_index_element.get(x)

# Get key from index or add it to index.
if key is None:
key = len(key_by_index_element)
key_by_index_element[x] = len(key_by_index_element)
index.append(x)

keys.append(key)

# Do not write anything for empty column.
# May happen while writing empty arrays.
if not len(index):
return

int_type = int(log(len(index), 2) / 8)
int_column = self.int_types[int_type]()

serialization_type = self.serialization_type | int_type

write_binary_int64(serialization_type, buf)
write_binary_int64(len(index), buf)

self.nested_column.write_data(index, buf)
write_binary_int64(len(items), buf)
int_column.write_data(keys, buf)

def _read_data(self, n_items, buf, nulls_map=None):
if not n_items:
return tuple()

serialization_type = read_binary_uint64(buf)

# Lowest byte contains info about key type.
key_type = serialization_type & 0xf
keys_column = self.int_types[key_type]()

nullable = self.nested_column.nullable
# Prevent null map reading. Reset nested column nullable flag.
self.nested_column.nullable = False

index_size = read_binary_uint64(buf)
index = self.nested_column.read_data(index_size, buf)
if nullable:
index = (None, ) + index[1:]

read_binary_uint64(buf) # number of keys
keys = keys_column.read_data(n_items, buf)

return tuple(index[x] for x in keys)
6 changes: 6 additions & 0 deletions clickhouse_driver/columns/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
Int8Column, Int16Column, Int32Column, Int64Column,
UInt8Column, UInt16Column, UInt32Column, UInt64Column
)
from .lowcardinalitycolumn import create_low_cardinality_column
from .nothingcolumn import NothingColumn
from .nullcolumn import NullColumn
from .nullablecolumn import create_nullable_column
Expand Down Expand Up @@ -58,6 +59,9 @@ def create_column_with_options(x):
elif spec.startswith('Nullable'):
return create_nullable_column(spec, create_column_with_options)

elif spec.startswith('LowCardinality'):
return create_low_cardinality_column(spec, create_column_with_options)

else:
try:
cls = column_by_type[spec]
Expand All @@ -70,6 +74,7 @@ def create_column_with_options(x):
def read_column(context, column_spec, n_items, buf):
column_options = {'context': context}
column = get_column_by_spec(column_spec, column_options=column_options)
column.read_state_prefix(buf)
return column.read_data(n_items, buf)


Expand All @@ -82,6 +87,7 @@ def write_column(context, column_name, column_spec, items, buf,
column = get_column_by_spec(column_spec, column_options)

try:
column.write_state_prefix(buf)
column.write_data(items, buf)

except column_exceptions.ColumnTypeMismatchException as e:
Expand Down
12 changes: 12 additions & 0 deletions docs/types.rst
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ SELECT type: :class:`~decimal.Decimal`.
IPv4/IPv6
---------

*New in version 0.0.19.*

INSERT types: :class:`~ipaddress.IPv4Address`/:class:`~ipaddress.IPv6Address`, :class:`int`, :class:`long`, :class:`str`/:func:`basestring <basestring>`.

SELECT type: :class:`~ipaddress.IPv4Address`/:class:`~ipaddress.IPv6Address`.
Expand Down Expand Up @@ -211,3 +213,13 @@ SELECT type: :class:`~ipaddress.IPv4Address`/:class:`~ipaddress.IPv6Address`.
>>>
For Python 2.7 `ipaddress <https://pypi.org/project/ipaddress>`_ package is used.


LowCardinality(T)
-----------------

*New in version 0.0.20.*

INSERT types: ``T``.

SELECT type: ``T``.
153 changes: 153 additions & 0 deletions tests/columns/test_low_cardinality.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
from datetime import date, timedelta
from decimal import Decimal

from tests.testcase import BaseTestCase
from tests.util import require_server_version


class LowCardinalityTestCase(BaseTestCase):
@require_server_version(19, 3, 3)
def test_uint8(self):
with self.create_table('a LowCardinality(UInt8)'):
data = [(x, ) for x in range(255)]
self.client.execute('INSERT INTO test (a) VALUES', data)

query = 'SELECT * FROM test'
inserted = self.emit_cli(query)
self.assertEqual(
inserted,
'\n'.join(str(x[0]) for x in data) + '\n'
)

inserted = self.client.execute(query)
self.assertEqual(inserted, data)

@require_server_version(19, 3, 3)
def test_int8(self):
with self.create_table('a LowCardinality(Int8)'):
data = [(x - 127, ) for x in range(255)]
self.client.execute('INSERT INTO test (a) VALUES', data)

query = 'SELECT * FROM test'
inserted = self.emit_cli(query)
self.assertEqual(
inserted,
'\n'.join(str(x[0]) for x in data) + '\n'

)

inserted = self.client.execute(query)
self.assertEqual(inserted, data)

@require_server_version(19, 3, 3)
def test_nullable_int8(self):
with self.create_table('a LowCardinality(Nullable(Int8))'):
data = [(None, ), (-1, ), (0, ), (1, ), (None, )]
self.client.execute('INSERT INTO test (a) VALUES', data)

query = 'SELECT * FROM test'
inserted = self.emit_cli(query)
self.assertEqual(inserted, '\\N\n-1\n0\n1\n\\N\n')

inserted = self.client.execute(query)
self.assertEqual(inserted, data)

@require_server_version(19, 3, 3)
def test_date(self):
with self.create_table('a LowCardinality(Date)'):
start = date(1970, 1, 1)
data = [(start + timedelta(x), ) for x in range(300)]
self.client.execute('INSERT INTO test (a) VALUES', data)

query = 'SELECT * FROM test'
inserted = self.client.execute(query)
self.assertEqual(inserted, data)

@require_server_version(19, 3, 3)
def test_float(self):
with self.create_table('a LowCardinality(Float)'):
data = [(float(x),) for x in range(300)]
self.client.execute('INSERT INTO test (a) VALUES', data)

query = 'SELECT * FROM test'
inserted = self.client.execute(query)
self.assertEqual(inserted, data)

@require_server_version(19, 3, 3)
def test_decimal(self):
with self.create_table('a LowCardinality(Float)'):
data = [(Decimal(x),) for x in range(300)]
self.client.execute('INSERT INTO test (a) VALUES', data)

query = 'SELECT * FROM test'
inserted = self.client.execute(query)
self.assertEqual(inserted, data)

@require_server_version(19, 3, 3)
def test_array(self):
with self.create_table('a Array(LowCardinality(Int16))'):
data = [((100, 500), )]
self.client.execute('INSERT INTO test (a) VALUES', data)

query = 'SELECT * FROM test'
inserted = self.emit_cli(query)
self.assertEqual(inserted, '[100,500]\n')

inserted = self.client.execute(query)
self.assertEqual(inserted, data)

@require_server_version(19, 3, 3)
def test_empty_array(self):
with self.create_table('a Array(LowCardinality(Int16))'):
data = [(tuple(), )]
self.client.execute('INSERT INTO test (a) VALUES', data)

query = 'SELECT * FROM test'
inserted = self.emit_cli(query)
self.assertEqual(inserted, '[]\n')

inserted = self.client.execute(query)
self.assertEqual(inserted, data)

@require_server_version(19, 3, 3)
def test_string(self):
with self.create_table('a LowCardinality(String)'):
data = [
('test', ), ('low', ), ('cardinality', ),
('test', ), ('test', ), ('', )
]
self.client.execute('INSERT INTO test (a) VALUES', data)

query = 'SELECT * FROM test'
inserted = self.emit_cli(query)
self.assertEqual(
inserted,
'test\nlow\ncardinality\ntest\ntest\n\n'
)

inserted = self.client.execute(query)
self.assertEqual(inserted, data)

@require_server_version(19, 3, 3)
def test_fixed_string(self):
with self.create_table('a LowCardinality(FixedString(12))'):
data = [
('test', ), ('low', ), ('cardinality', ),
('test', ), ('test', ), ('', )
]
self.client.execute('INSERT INTO test (a) VALUES', data)

query = 'SELECT * FROM test'
inserted = self.emit_cli(query)
self.assertEqual(
inserted,
'test\\0\\0\\0\\0\\0\\0\\0\\0\n'
'low\\0\\0\\0\\0\\0\\0\\0\\0\\0\n'
'cardinality\\0\n'
'test\\0\\0\\0\\0\\0\\0\\0\\0\n'
'test\\0\\0\\0\\0\\0\\0\\0\\0\n'
'\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0\n'
)

inserted = self.client.execute(query)
self.assertEqual(inserted, data)

0 comments on commit 8a69cbf

Please sign in to comment.