-
-
Notifications
You must be signed in to change notification settings - Fork 119
/
base.py
529 lines (440 loc) · 16.8 KB
/
base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
import enum
from sqlalchemy import schema, types as sqltypes, util as sa_util, text
from sqlalchemy.engine import default, reflection
from sqlalchemy.sql import (
compiler, elements
)
from sqlalchemy.util import (
warn,
)
from .compilers.ddlcompiler import ClickHouseDDLCompiler
from .compilers.sqlcompiler import ClickHouseSQLCompiler
from .compilers.typecompiler import ClickHouseTypeCompiler
from .reflection import ClickHouseInspector
from .util import get_inner_spec, parse_arguments
from .. import types
# Column specifications
colspecs = {}
# Type converters
ischema_names = {
'Int256': types.Int256,
'Int128': types.Int128,
'Int64': types.Int64,
'Int32': types.Int32,
'Int16': types.Int16,
'Int8': types.Int8,
'UInt256': types.UInt256,
'UInt128': types.UInt128,
'UInt64': types.UInt64,
'UInt32': types.UInt32,
'UInt16': types.UInt16,
'UInt8': types.UInt8,
'Date': types.Date,
'Date32': types.Date32,
'DateTime': types.DateTime,
'DateTime64': types.DateTime64,
'Float64': types.Float64,
'Float32': types.Float32,
'Decimal': types.Decimal,
'String': types.String,
'Bool': types.Boolean,
'Boolean': types.Boolean,
'UUID': types.UUID,
'IPv4': types.IPv4,
'IPv6': types.IPv6,
'FixedString': types.String,
'Enum8': types.Enum8,
'Enum16': types.Enum16,
'Object(\'json\')': types.JSON,
'_array': types.Array,
'_nullable': types.Nullable,
'_lowcardinality': types.LowCardinality,
'_tuple': types.Tuple,
'_map': types.Map,
'_aggregatefunction': types.AggregateFunction,
'_simpleaggregatefunction': types.SimpleAggregateFunction,
}
class ClickHouseIdentifierPreparer(compiler.IdentifierPreparer):
reserved_words = compiler.IdentifierPreparer.reserved_words | set((
'index', # reserved in the 'create table' syntax, at least.
))
# Alternatively, use `_requires_quotes = lambda self, value: True`
def _escape_identifier(self, value):
value = value.replace(self.escape_quote, self.escape_to_quote)
return value.replace('%', '%%')
class ClickHouseExecutionContextBase(default.DefaultExecutionContext):
@sa_util.memoized_property
def should_autocommit(self):
return False # No DML supported, never autocommit
class ClickHouseDialect(default.DefaultDialect):
name = 'clickhouse'
supports_cast = True
supports_unicode_statements = True
supports_unicode_binds = True
supports_sane_rowcount = False
supports_sane_multi_rowcount = False
supports_native_decimal = True
supports_native_boolean = True
non_native_boolean_check_constraint = False
supports_alter = True
supports_sequences = False
supports_native_enum = True # Do not render check constraints on enums.
supports_multivalues_insert = True
supports_statement_cache = True
supports_comments = True
inline_comments = True
# Dialect related-features
supports_delete = True
supports_update = True
supports_engine_reflection = True
supports_table_comment_reflection = True
engine_reflection = True # Disables engine reflection from URL.
max_identifier_length = 127
default_paramstyle = 'pyformat'
colspecs = colspecs
ischema_names = ischema_names
convert_unicode = True
returns_unicode_strings = True
description_encoding = None
postfetch_lastrowid = False
forced_server_version_string = None
preparer = ClickHouseIdentifierPreparer
type_compiler = ClickHouseTypeCompiler
statement_compiler = ClickHouseSQLCompiler
ddl_compiler = ClickHouseDDLCompiler
construct_arguments = [
(schema.Table, {
'data': [],
'cluster': None,
}),
(schema.Column, {
'codec': None,
'materialized': None,
'alias': None,
'after': None,
}),
]
inspector = ClickHouseInspector
def __init__(
self,
json_serializer=None,
json_deserializer=None,
**kwargs,
):
default.DefaultDialect.__init__(self, **kwargs)
self._json_deserializer = json_deserializer
self._json_serializer = json_serializer
def initialize(self, connection):
super(ClickHouseDialect, self).initialize(connection)
version = self.server_version_info
self.supports_delete = version >= (1, 1, 54388)
self.supports_update = version >= (18, 12, 14)
self.supports_engine_reflection = version >= (18, 16)
self.supports_table_comment_reflection = version >= (21, 6)
def _execute(self, connection, sql, scalar=False, **kwargs):
raise NotImplementedError
@reflection.cache
def get_view_names(self, connection, schema=None, **kw):
query = text(
"SELECT name FROM system.tables WHERE engine LIKE '%View' "
"AND database = :database"
)
database = schema or connection.engine.url.database
rows = self._execute(connection, query, database=database)
return [row.name for row in rows]
def has_table(self, connection, table_name, schema=None, **kw):
quote = self._quote_table_name
if schema:
qualified_name = quote(schema) + '.' + quote(table_name)
else:
qualified_name = quote(table_name)
query = text('EXISTS TABLE {}'.format(qualified_name))
for r in self._execute(connection, query):
if r.result == 1:
return True
return False
def _quote_table_name(self, table_name):
# Use case: `describe table (select ...)`, over a TextClause.
if isinstance(table_name, elements.TextClause):
return str(table_name)
return self.identifier_preparer.quote_identifier(table_name)
@reflection.cache
def get_columns(self, connection, table_name, schema=None, **kw):
quote = self._quote_table_name
if schema:
qualified_name = quote(schema) + '.' + quote(table_name)
else:
qualified_name = quote(table_name)
query = 'DESCRIBE TABLE {}'.format(qualified_name)
rows = self._execute(connection, query)
return [
self._get_column_info(
r.name, r.type, r.default_type, r.default_expression,
getattr(r, 'comment', None)
) for r in rows
]
def _get_column_info(self, name, format_type, default_type,
default_expression, comment):
col_type = self._get_column_type(name, format_type)
col_default = self._get_column_default(default_type,
default_expression)
result = {
'name': name,
'type': col_type,
'nullable': format_type.startswith('Nullable('),
'default': col_default,
'comment': comment or None
}
return result
def _get_column_default(self, default_type, default_expression):
if default_type == 'DEFAULT':
return default_expression
return None
def _get_column_type(self, name, spec):
if spec.startswith('Array'):
inner = spec[6:-1]
coltype = self.ischema_names['_array']
return coltype(self._get_column_type(name, inner))
elif spec.startswith('FixedString'):
length = int(spec[12:-1])
return self.ischema_names['FixedString'](length)
elif spec.startswith('Nullable'):
inner = spec[9:-1]
coltype = self.ischema_names['_nullable']
return coltype(self._get_column_type(name, inner))
elif spec.startswith('LowCardinality'):
inner = spec[15:-1]
coltype = self.ischema_names['_lowcardinality']
return coltype(self._get_column_type(name, inner))
elif spec.startswith('AggregateFunction'):
params = spec[18:-1]
arguments = parse_arguments(params)
agg_func, inner = arguments[0], arguments[1:]
inner_types = [
self._get_column_type(name, param)
for param in inner
]
coltype = self.ischema_names['_aggregatefunction']
return coltype(agg_func, *inner_types)
elif spec.startswith('SimpleAggregateFunction'):
params = spec[24:-1]
arguments = parse_arguments(params)
agg_func, inner = arguments[0], arguments[1:]
inner_types = [
self._get_column_type(name, param)
for param in inner
]
coltype = self.ischema_names['_simpleaggregatefunction']
return coltype(agg_func, *inner_types)
elif spec.startswith('Tuple'):
inner = spec[6:-1]
coltype = self.ischema_names['_tuple']
inner_types = [
self._get_column_type(name, t.strip())
for t in inner.split(',')
]
return coltype(*inner_types)
elif spec.startswith('Map'):
inner = spec[4:-1]
coltype = self.ischema_names['_map']
inner_types = [
self._get_column_type(name, t.strip())
for t in inner.split(',', 1)
]
return coltype(*inner_types)
elif spec.startswith('Enum'):
pos = spec.find('(')
type = spec[:pos]
coltype = self.ischema_names[type]
options = dict()
if pos >= 0:
options = self._parse_options(
spec[pos + 1: spec.rfind(')')]
)
if not options:
return sqltypes.NullType
type_enum = enum.Enum('%s_enum' % name, options)
return lambda: coltype(type_enum)
elif spec.startswith('Decimal'):
coltype = self.ischema_names['Decimal']
return coltype(*self._parse_decimal_params(spec))
elif spec.startswith('DateTime64'):
coltype = self.ischema_names['DateTime64']
return coltype(*self._parse_detetime64_params(spec))
elif spec.startswith('DateTime'):
coltype = self.ischema_names['DateTime']
return coltype(*self._parse_detetime_params(spec))
else:
try:
return self.ischema_names[spec]
except KeyError:
warn("Did not recognize type '%s' of column '%s'" %
(spec, name))
return sqltypes.NullType
@staticmethod
def _parse_decimal_params(spec):
inner_spec = get_inner_spec(spec)
precision, scale = inner_spec.split(',')
return int(precision.strip()), int(scale.strip())
@staticmethod
def _parse_detetime64_params(spec):
inner_spec = get_inner_spec(spec)
if not inner_spec:
return []
params = inner_spec.split(',', 1)
params[0] = int(params[0])
if len(params) > 1:
params[1] = params[1].strip()
return params
@staticmethod
def _parse_detetime_params(spec):
inner_spec = get_inner_spec(spec)
if not inner_spec:
return []
return [inner_spec]
@staticmethod
def _parse_options(option_string):
options = dict()
after_name = False
escaped = False
quote_character = None
name = ''
value = ''
for ch in option_string:
if escaped:
name += ch
escaped = False # Accepting escaped character
elif after_name:
if ch in (' ', '='):
pass
elif ch == ',':
options[name] = int(value)
after_name = False
name = ''
value = '' # Reset before collecting new option
else:
value += ch
elif quote_character:
if ch == '\\':
escaped = True
elif ch == quote_character:
quote_character = None
after_name = True # Start collecting option value
else:
name += ch
else:
if ch == "'":
quote_character = ch
if after_name:
options.setdefault(name, int(value)) # Word after last comma
return options
@reflection.cache
def get_schema_names(self, connection, **kw):
rows = self._execute(connection, 'SHOW DATABASES')
return [row.name for row in rows]
@reflection.cache
def get_foreign_keys(self, connection, table_name, schema=None, **kw):
# No support for foreign keys.
return []
@reflection.cache
def get_pk_constraint(self, connection, table_name, schema=None, **kw):
if not self.supports_engine_reflection:
return {}
if schema:
query = (("SELECT primary_key FROM system.tables "
"WHERE database='{}' AND name='{}'")
.format(schema, table_name))
else:
query = (
"SELECT primary_key FROM system.tables WHERE name='{}'"
).format(table_name)
rows = self._execute(connection, query)
for r in rows:
primary_keys = r.primary_key
if primary_keys:
return {
"constrained_columns": tuple(primary_keys.split(", ")),
}
return {}
@reflection.cache
def get_indexes(self, connection, table_name, schema=None, **kw):
# No support for indexes.
return []
@reflection.cache
def get_table_names(self, connection, schema=None, **kw):
query = text(
"SELECT name FROM system.tables "
"WHERE engine NOT LIKE '%View' "
"AND name NOT LIKE '.inner%' "
"AND database = :database"
)
database = schema or connection.engine.url.database
rows = self._execute(connection, query, database=database)
return [row.name for row in rows]
@reflection.cache
def get_engine(self, connection, table_name, schema=None, **kw):
columns = [
'name', 'engine_full', 'engine', 'partition_key', 'sorting_key',
'primary_key', 'sampling_key'
]
database = schema if schema else connection.engine.url.database
query = text(
'SELECT {} FROM system.tables '
'WHERE database = :database AND name = :name'
.format(', '.join(columns))
)
rows = self._execute(
connection, query, database=database, name=table_name
)
row = next(rows, None)
if row:
return {x: getattr(row, x, None) for x in columns}
@reflection.cache
def get_table_comment(self, connection, table_name, schema=None, **kw):
if not self.supports_table_comment_reflection:
raise NotImplementedError()
database = schema if schema else connection.engine.url.database
query = text(
'SELECT comment FROM system.tables '
'WHERE database = :database AND name = :name'
)
comment = self._execute(
connection, query, database=database, name=table_name, scalar=True
)
return {'text': comment or None}
def do_rollback(self, dbapi_connection):
# No support for transactions.
pass
def do_executemany(self, cursor, statement, parameters, context=None):
# render single insert inplace
if (
context
and context.isinsert
and context.compiled.insert_single_values_expr
and not len(context.compiled_parameters[0])
):
parameters = None
cursor.executemany(statement, parameters, context=context)
def do_execute(self, cursor, statement, parameters, context=None):
cursor.execute(statement, parameters, context=context)
def _check_unicode_returns(self, connection, additional_tests=None):
return True
def _check_unicode_description(self, connection):
return True
def _get_server_version_info(self, connection):
version = self.forced_server_version_string
if version is None:
version = self._execute(
connection, 'select version()', scalar=True
)
# The first three are numeric, but the last is an alphanumeric build.
return tuple(int(p) if p.isdigit() else p for p in version.split('.'))
def _get_default_schema_name(self, connection):
return self._execute(
connection, 'select currentDatabase()', scalar=True
)
def connect(self, *cargs, **cparams):
self.forced_server_version_string = cparams.pop(
'server_version', self.forced_server_version_string)
return super(ClickHouseDialect, self).connect(*cargs, **cparams)
clickhouse_dialect = ClickHouseDialect()