Skip to content

Commit

Permalink
Add ReplicatedReplacingMergeTree engine
Browse files Browse the repository at this point in the history
  • Loading branch information
xzkostyan committed Nov 15, 2019
1 parent 4ed7ae3 commit 6691d12
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 22 deletions.
8 changes: 4 additions & 4 deletions clickhouse_sqlalchemy/drivers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,10 +330,10 @@ def _compile_param(self, expr):

def visit_merge_tree(self, engine):

text = '{0}{1}\n'.format(engine.name, self._compile_param(
to_list(
engine.get_parameters())
) or '()')
text = '{0}{1}\n'.format(
engine.name,
self._compile_param(to_list(engine.get_parameters())) or '()'
)
if engine.partition_by:
text += ' PARTITION BY {0}\n'.format(
self._compile_param(
Expand Down
41 changes: 25 additions & 16 deletions clickhouse_sqlalchemy/engines.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,11 +214,8 @@ def get_parameters(self):


class ReplicatedMergeTree(ReplicatedEngineMixin, MergeTree):
def __init__(self,
table_path,
replica_name,
*args,
**kwargs):
def __init__(self, table_path, replica_name,
*args, **kwargs):
ReplicatedEngineMixin.__init__(self, table_path, replica_name)
MergeTree.__init__(self, *args, **kwargs)

Expand All @@ -232,23 +229,34 @@ class ReplicatedCollapsingMergeTree(ReplicatedEngineMixin,
def __init__(self, table_path, replica_name,
*args, **kwargs):
ReplicatedEngineMixin.__init__(self, table_path, replica_name)
CollapsingMergeTree.__init__(
self, *args, **kwargs
)
CollapsingMergeTree.__init__(self, *args, **kwargs)

def get_parameters(self):
return ReplicatedEngineMixin.get_parameters(self) + \
CollapsingMergeTree.get_parameters(self)


class ReplicatedReplacingMergeTree(ReplicatedEngineMixin,
ReplacingMergeTree):
def __init__(self, table_path, replica_name,
*args, **kwargs):
ReplicatedEngineMixin.__init__(self, table_path, replica_name)
ReplacingMergeTree.__init__(self, *args, **kwargs)

def get_parameters(self):
rv = ReplicatedEngineMixin.get_parameters(self)
replacing = ReplacingMergeTree.get_parameters(self)
if replacing is not None:
rv.append(replacing)
return rv


class ReplicatedAggregatingMergeTree(ReplicatedEngineMixin,
AggregatingMergeTree):
def __init__(self, table_path, replica_name,
*args, **kwargs):
ReplicatedEngineMixin.__init__(self, table_path, replica_name)
AggregatingMergeTree.__init__(
self, *args, **kwargs
)
AggregatingMergeTree.__init__(self, *args, **kwargs)

def get_parameters(self):
return ReplicatedEngineMixin.get_parameters(self) + \
Expand All @@ -262,13 +270,14 @@ def __init__(self,
*args,
**kwargs):
ReplicatedEngineMixin.__init__(self, table_path, replica_name)
SummingMergeTree.__init__(
self, *args, **kwargs
)
SummingMergeTree.__init__(self, *args, **kwargs)

def get_parameters(self):
return ReplicatedEngineMixin.get_parameters(self) + \
SummingMergeTree.get_parameters(self)
rv = ReplicatedEngineMixin.get_parameters(self)
summing = SummingMergeTree.get_parameters(self)
if summing is not None:
rv.append(summing)
return rv


class Buffer(Engine):
Expand Down
29 changes: 27 additions & 2 deletions tests/test_engines.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,7 @@ class TestTable(self.base):
self.assertEqual(
self.compile(CreateTable(TestTable.__table__)),
"CREATE TABLE test_table (date Date, x Int32, y String) "
"ENGINE = ReplicatedMergeTree("
"'/table/path', 'name') "
"ENGINE = ReplicatedMergeTree('/table/path', 'name') "
"PARTITION BY date "
"ORDER BY (date, x)"
)
Expand Down Expand Up @@ -265,6 +264,32 @@ class TestTable(self.base):
"ORDER BY (date, x)"
)

def test_replicated_replacing_merge_tree(self):
class TestTable(self.base):
date = Column(types.Date, primary_key=True)
x = Column(types.Int32)
y = Column(types.String)
version = Column(types.Int32)

__table_args__ = (
engines.ReplicatedReplacingMergeTree(
'/table/path', 'name',
'version', 'date', ('date', 'x'),
),
)

self.assertEqual(
self.compile(CreateTable(TestTable.__table__)),
"CREATE TABLE test_table ("
"date Date, x Int32, y String, version Int32"
") "
"ENGINE = ReplicatedReplacingMergeTree("
"'/table/path', 'name', version"
") "
"PARTITION BY date "
"ORDER BY (date, x)"
)

def test_partition_by_func(self):
class TestTable(self.base):
date = Column(types.Date, primary_key=True)
Expand Down

0 comments on commit 6691d12

Please sign in to comment.