/
dialect.py
872 lines (770 loc) · 31.8 KB
/
dialect.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
import re
from collections import defaultdict, namedtuple
import pkg_resources
import sqlalchemy as sa
from sqlalchemy import Column, exc, inspect
from sqlalchemy.dialects.postgresql.base import (
PGCompiler, PGDDLCompiler, PGIdentifierPreparer
)
from sqlalchemy.dialects.postgresql.psycopg2 import PGDialect_psycopg2
from sqlalchemy.engine import reflection
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.expression import (
BinaryExpression, BooleanClauseList, Delete
)
from sqlalchemy.types import VARCHAR, NullType
from .commands import (
CopyCommand, UnloadFromSelect, Format, Compression, Encoding
)
from .compat import string_types
try:
from alembic.ddl import postgresql
except ImportError:
pass
else:
from alembic.ddl.base import RenameTable
compiles(RenameTable, 'redshift')(postgresql.visit_rename_table)
class RedshiftImpl(postgresql.PostgresqlImpl):
__dialect__ = 'redshift'
__all__ = [
'CopyCommand', 'UnloadFromSelect', 'RedshiftDialect', 'Compression',
'Encoding', 'Format',
]
# Regex for parsing and identity constraint out of adsrc, e.g.:
# "identity"(445178, 0, '1,1'::text)
IDENTITY_RE = re.compile(r"""
"identity" \(
(?P<current>-?\d+)
,\s
(?P<base>-?\d+)
,\s
'(?P<seed>-?\d+),(?P<step>-?\d+)'
.*
\)
""", re.VERBOSE)
# Regex for SQL identifiers (valid table and column names)
SQL_IDENTIFIER_RE = re.compile(r"""
[_a-zA-Z][\w$]* # SQL standard identifier
| # or
(?:"[^"]+")+ # SQL delimited (quoted) identifier
""", re.VERBOSE)
# Regex for foreign key constraints, e.g.:
# FOREIGN KEY(col1) REFERENCES othertable (col2)
# See https://docs.aws.amazon.com/redshift/latest/dg/r_names.html
# for a definition of valid SQL identifiers.
FOREIGN_KEY_RE = re.compile(r"""
^FOREIGN\ KEY \s* \( # FOREIGN KEY, arbitrary whitespace, literal '('
(?P<columns> # Start a group to capture the referring columns
(?: # Start a non-capturing group
\s* # Arbitrary whitespace
([_a-zA-Z][\w$]* | ("[^"]+")+) # SQL identifier
\s* # Arbitrary whitespace
,? # There will be a colon if this isn't the last one
)+ # Close the non-capturing group; require at least one
) # Close the 'columns' group
\s* \) # Arbitrary whitespace and literal ')'
\s* REFERENCES \s*
((?P<referred_schema>([_a-zA-Z][\w$]* | ("[^"]*")+))\.)? # SQL identifier
(?P<referred_table>[_a-zA-Z][\w$]* | ("[^"]*")+) # SQL identifier
\s* \( # FOREIGN KEY, arbitrary whitespace, literal '('
(?P<referred_columns> # Start a group to capture the referring columns
(?: # Start a non-capturing group
\s* # Arbitrary whitespace
([_a-zA-Z][\w$]* | ("[^"]+")+) # SQL identifier
\s* # Arbitrary whitespace
,? # There will be a colon if this isn't the last one
)+ # Close the non-capturing group; require at least one
) # Close the 'columns' group
\s* \) # Arbitrary whitespace and literal ')'
""", re.VERBOSE)
# Regex for primary key constraints, e.g.:
# PRIMARY KEY (col1, col2)
PRIMARY_KEY_RE = re.compile(r"""
^PRIMARY \s* KEY \s* \( # FOREIGN KEY, arbitrary whitespace, literal '('
(?P<columns> # Start a group to capture column names
(?:
\s* # Arbitrary whitespace
# SQL identifier or delimited identifier
( [_a-zA-Z][\w$]* | ("[^"]*")+ )
\s* # Arbitrary whitespace
,? # There will be a colon if this isn't the last one
)+ # Close the non-capturing group; require at least one
)
\s* \) \s* # Arbitrary whitespace and literal ')'
""", re.VERBOSE)
# Reserved words as extracted from Redshift docs.
# See pull_reserved_words.sh at the top level of this repository
# for the code used to generate this set.
RESERVED_WORDS = set([
"aes128", "aes256", "all", "allowoverwrite", "analyse", "analyze",
"and", "any", "array", "as", "asc", "authorization", "backup",
"between", "binary", "blanksasnull", "both", "bytedict", "bzip2",
"case", "cast", "check", "collate", "column", "constraint", "create",
"credentials", "cross", "current_date", "current_time",
"current_timestamp", "current_user", "current_user_id", "default",
"deferrable", "deflate", "defrag", "delta", "delta32k", "desc",
"disable", "distinct", "do", "else", "emptyasnull", "enable",
"encode", "encrypt", "encryption", "end", "except", "explicit",
"false", "for", "foreign", "freeze", "from", "full", "globaldict256",
"globaldict64k", "grant", "group", "gzip", "having", "identity",
"ignore", "ilike", "in", "initially", "inner", "intersect", "into",
"is", "isnull", "join", "leading", "left", "like", "limit",
"localtime", "localtimestamp", "lun", "luns", "lzo", "lzop", "minus",
"mostly13", "mostly32", "mostly8", "natural", "new", "not",
"notnull", "null", "nulls", "off", "offline", "offset", "oid", "old",
"on", "only", "open", "or", "order", "outer", "overlaps", "parallel",
"partition", "percent", "permissions", "placing", "primary", "raw",
"readratio", "recover", "references", "respect", "rejectlog",
"resort", "restore", "right", "select", "session_user", "similar",
"snapshot", "some", "sysdate", "system", "table", "tag", "tdes",
"text255", "text32k", "then", "timestamp", "to", "top", "trailing",
"true", "truncatecolumns", "union", "unique", "user", "using",
"verbose", "wallet", "when", "where", "with", "without",
])
class RelationKey(namedtuple('RelationKey', ('name', 'schema'))):
"""
Structured tuple of table/view name and schema name.
"""
__slots__ = ()
def __new__(cls, name, schema=None, connection=None):
"""
Construct a new RelationKey with an explicit schema name.
"""
if schema is None and connection is None:
raise ValueError("Must specify either schema or connection")
if schema is None:
schema = inspect(connection).default_schema_name
return super(RelationKey, cls).__new__(cls, name, schema)
def __str__(self):
if self.schema is None:
return self.name
else:
return self.schema + "." + self.name
def unquoted(self):
"""
Return *key* with one level of double quotes removed.
Redshift stores some identifiers without quotes in internal tables,
even though the name must be quoted elsewhere.
In particular, this happens for tables named as a keyword.
"""
key = str(self)
if key.startswith('"') and key.endswith('"'):
return key[1:-1]
return key
class RedshiftCompiler(PGCompiler):
def visit_now_func(self, fn, **kw):
return "SYSDATE"
class RedshiftDDLCompiler(PGDDLCompiler):
"""
Handles Redshift-specific ``CREATE TABLE`` syntax.
Users can specify the `diststyle`, `distkey`, `sortkey` and `encode`
properties per table and per column.
Table level properties can be set using the dialect specific syntax. For
example, to specify a distribution key and style you apply the following:
>>> import sqlalchemy as sa
>>> from sqlalchemy.schema import CreateTable
>>> engine = sa.create_engine('redshift+psycopg2://example')
>>> metadata = sa.MetaData()
>>> user = sa.Table(
... 'user',
... metadata,
... sa.Column('id', sa.Integer, primary_key=True),
... sa.Column('name', sa.String),
... redshift_diststyle='KEY',
... redshift_distkey='id',
... redshift_interleaved_sortkey=['id', 'name'],
... )
>>> print(CreateTable(user).compile(engine))
<BLANKLINE>
CREATE TABLE "user" (
id INTEGER NOT NULL,
name VARCHAR,
PRIMARY KEY (id)
) DISTSTYLE KEY DISTKEY (id) INTERLEAVED SORTKEY (id, name)
<BLANKLINE>
<BLANKLINE>
A single sort key can be applied without a wrapping list:
>>> customer = sa.Table(
... 'customer',
... metadata,
... sa.Column('id', sa.Integer, primary_key=True),
... sa.Column('name', sa.String),
... redshift_sortkey='id',
... )
>>> print(CreateTable(customer).compile(engine))
<BLANKLINE>
CREATE TABLE customer (
id INTEGER NOT NULL,
name VARCHAR,
PRIMARY KEY (id)
) SORTKEY (id)
<BLANKLINE>
<BLANKLINE>
Column-level special syntax can also be applied using the column info
dictionary. For example, we can specify the ENCODE for a column:
>>> product = sa.Table(
... 'product',
... metadata,
... sa.Column('id', sa.Integer, primary_key=True),
... sa.Column('name', sa.String, info={'encode': 'lzo'})
... )
>>> print(CreateTable(product).compile(engine))
<BLANKLINE>
CREATE TABLE product (
id INTEGER NOT NULL,
name VARCHAR ENCODE lzo,
PRIMARY KEY (id)
)
<BLANKLINE>
<BLANKLINE>
We can also specify the distkey and sortkey options:
>>> sku = sa.Table(
... 'sku',
... metadata,
... sa.Column('id', sa.Integer, primary_key=True),
... sa.Column(
... 'name', sa.String, info={'distkey': True, 'sortkey': True}
... )
... )
>>> print(CreateTable(sku).compile(engine))
<BLANKLINE>
CREATE TABLE sku (
id INTEGER NOT NULL,
name VARCHAR DISTKEY SORTKEY,
PRIMARY KEY (id)
)
<BLANKLINE>
<BLANKLINE>
"""
def post_create_table(self, table):
text = ""
info = table.dialect_options['redshift']
diststyle = info.get('diststyle')
if diststyle:
diststyle = diststyle.upper()
if diststyle not in ('EVEN', 'KEY', 'ALL'):
raise exc.CompileError(
u"diststyle {0} is invalid".format(diststyle)
)
text += " DISTSTYLE " + diststyle
distkey = info.get('distkey')
if distkey:
text += " DISTKEY ({0})".format(self.preparer.quote(distkey))
sortkey = info.get('sortkey')
interleaved_sortkey = info.get('interleaved_sortkey')
if sortkey and interleaved_sortkey:
raise exc.ArgumentError(
"Parameters sortkey and interleaved_sortkey are "
"mutually exclusive; you may not specify both."
)
if sortkey or interleaved_sortkey:
if isinstance(sortkey, string_types):
keys = [sortkey]
else:
keys = sortkey or interleaved_sortkey
keys = [key.name if isinstance(key, Column) else key
for key in keys]
if interleaved_sortkey:
text += " INTERLEAVED"
sortkey_string = ", ".join(self.preparer.quote(key)
for key in keys)
text += " SORTKEY ({0})".format(sortkey_string)
return text
def get_column_specification(self, column, **kwargs):
colspec = self.preparer.format_column(column)
colspec += " " + self.dialect.type_compiler.process(column.type)
default = self.get_column_default_string(column)
if default is not None:
# Identity constraints show up as *default* when reflected.
m = IDENTITY_RE.match(default)
if m:
colspec += " IDENTITY({seed},{step})".format(**m.groupdict())
else:
colspec += " DEFAULT " + default
colspec += self._fetch_redshift_column_attributes(column)
if not column.nullable:
colspec += " NOT NULL"
return colspec
def _fetch_redshift_column_attributes(self, column):
text = ""
if not hasattr(column, 'info'):
return text
info = column.info
identity = info.get('identity')
if identity:
text += " IDENTITY({0},{1})".format(identity[0], identity[1])
encode = info.get('encode')
if encode:
text += " ENCODE " + encode
distkey = info.get('distkey')
if distkey:
text += " DISTKEY"
sortkey = info.get('sortkey')
if sortkey:
text += " SORTKEY"
return text
class RedshiftIdentifierPreparer(PGIdentifierPreparer):
reserved_words = RESERVED_WORDS
class RedshiftDialect(PGDialect_psycopg2):
"""
Define Redshift-specific behavior.
Most public methods are overrides of the underlying interfaces defined in
:class:`~sqlalchemy.engine.interfaces.Dialect` and
:class:`~sqlalchemy.engine.Inspector`.
"""
name = 'redshift'
max_identifier_length = 127
statement_compiler = RedshiftCompiler
ddl_compiler = RedshiftDDLCompiler
preparer = RedshiftIdentifierPreparer
construct_arguments = [
(sa.schema.Index, {
"using": False,
"where": None,
"ops": {}
}),
(sa.schema.Table, {
"ignore_search_path": False,
"diststyle": None,
"distkey": None,
"sortkey": None,
"interleaved_sortkey": None,
}),
]
def __init__(self, *args, **kw):
super(RedshiftDialect, self).__init__(*args, **kw)
# Cache domains, as these will be static;
# Redshift does not support user-created domains.
self._domains = None
@reflection.cache
def get_columns(self, connection, table_name, schema=None, **kw):
"""
Return information about columns in `table_name`.
Overrides interface
:meth:`~sqlalchemy.engine.interfaces.Dialect.get_columns`.
"""
cols = self._get_redshift_columns(connection, table_name, schema, **kw)
if not self._domains:
self._domains = self._load_domains(connection)
domains = self._domains
columns = []
for col in cols:
column_info = self._get_column_info(
name=col.name, format_type=col.format_type,
default=col.default, notnull=col.notnull, domains=domains,
enums=[], schema=col.schema, encode=col.encode)
columns.append(column_info)
return columns
@reflection.cache
def get_pk_constraint(self, connection, table_name, schema=None, **kw):
"""
Return information about the primary key constraint on `table_name`.
Overrides interface
:meth:`~sqlalchemy.engine.interfaces.Dialect.get_pk_constraint`.
"""
constraints = self._get_redshift_constraints(connection, table_name,
schema, **kw)
pk_constraints = [c for c in constraints if c.contype == 'p']
if not pk_constraints:
return {'constrained_columns': [], 'name': ''}
pk_constraint = pk_constraints[0]
m = PRIMARY_KEY_RE.match(pk_constraint.condef)
colstring = m.group('columns')
constrained_columns = SQL_IDENTIFIER_RE.findall(colstring)
return {
'constrained_columns': constrained_columns,
'name': pk_constraint.conname,
}
@reflection.cache
def get_foreign_keys(self, connection, table_name, schema=None, **kw):
"""
Return information about foreign keys in `table_name`.
Overrides interface
:meth:`~sqlalchemy.engine.interfaces.Dialect.get_pk_constraint`.
"""
constraints = self._get_redshift_constraints(connection, table_name,
schema, **kw)
fk_constraints = [c for c in constraints if c.contype == 'f']
uniques = defaultdict(lambda: defaultdict(dict))
for con in fk_constraints:
uniques[con.conname]["key"] = con.conkey
uniques[con.conname]["condef"] = con.condef
fkeys = []
for conname, attrs in uniques.items():
m = FOREIGN_KEY_RE.match(attrs['condef'])
colstring = m.group('referred_columns')
referred_columns = SQL_IDENTIFIER_RE.findall(colstring)
referred_table = m.group('referred_table')
referred_schema = m.group('referred_schema')
colstring = m.group('columns')
constrained_columns = SQL_IDENTIFIER_RE.findall(colstring)
fkey_d = {
'name': conname,
'constrained_columns': constrained_columns,
'referred_schema': referred_schema,
'referred_table': referred_table,
'referred_columns': referred_columns,
}
fkeys.append(fkey_d)
return fkeys
@reflection.cache
def get_table_names(self, connection, schema=None, **kw):
"""
Return a list of table names for `schema`.
Overrides interface
:meth:`~sqlalchemy.engine.interfaces.Dialect.get_table_names`.
"""
return self._get_table_or_view_names('r', connection, schema, **kw)
@reflection.cache
def get_view_names(self, connection, schema=None, **kw):
"""
Return a list of view names for `schema`.
Overrides interface
:meth:`~sqlalchemy.engine.interfaces.Dialect.get_view_names`.
"""
return self._get_table_or_view_names('v', connection, schema, **kw)
@reflection.cache
def get_view_definition(self, connection, view_name, schema=None, **kw):
"""Return view definition.
Given a :class:`.Connection`, a string `view_name`,
and an optional string `schema`, return the view definition.
Overrides interface
:meth:`~sqlalchemy.engine.interfaces.Dialect.get_view_definition`.
"""
view = self._get_redshift_relation(connection, view_name, schema, **kw)
return sa.text(view.view_definition)
def get_indexes(self, connection, table_name, schema, **kw):
"""
Return information about indexes in `table_name`.
Because Redshift does not support traditional indexes,
this always returns an empty list.
Overrides interface
:meth:`~sqlalchemy.engine.interfaces.Dialect.get_indexes`.
"""
return []
@reflection.cache
def get_unique_constraints(self, connection, table_name,
schema=None, **kw):
"""
Return information about unique constraints in `table_name`.
Overrides interface
:meth:`~sqlalchemy.engine.interfaces.Dialect.get_unique_constraints`.
"""
constraints = self._get_redshift_constraints(connection,
table_name, schema, **kw)
constraints = [c for c in constraints if c.contype == 'u']
uniques = defaultdict(lambda: defaultdict(dict))
for con in constraints:
uniques[con.conname]["key"] = con.conkey
uniques[con.conname]["cols"][con.attnum] = con.attname
return [
{'name': None,
'column_names': [uc["cols"][i] for i in uc["key"]]}
for name, uc in uniques.items()
]
@reflection.cache
def get_table_options(self, connection, table_name, schema, **kw):
"""
Return a dictionary of options specified when the table of the
given name was created.
Overrides interface
:meth:`~sqlalchemy.engine.Inspector.get_table_options`.
"""
def keyfunc(column):
num = int(column.sortkey)
# If sortkey is interleaved, column numbers alternate
# negative values, so take abs.
return abs(num)
table = self._get_redshift_relation(connection, table_name,
schema, **kw)
columns = self._get_redshift_columns(connection, table_name,
schema, **kw)
sortkey_cols = sorted([col for col in columns if col.sortkey],
key=keyfunc)
interleaved = any([int(col.sortkey) < 0 for col in sortkey_cols])
sortkey = [col.name for col in sortkey_cols]
interleaved_sortkey = None
if interleaved:
interleaved_sortkey = sortkey
sortkey = None
distkeys = [col.name for col in columns if col.distkey]
distkey = distkeys[0] if distkeys else None
return {
'redshift_diststyle': table.diststyle,
'redshift_distkey': distkey,
'redshift_sortkey': sortkey,
'redshift_interleaved_sortkey': interleaved_sortkey,
}
def create_connect_args(self, *args, **kwargs):
"""
Build DB-API compatible connection arguments.
Overrides interface
:meth:`~sqlalchemy.engine.interfaces.Dialect.create_connect_args`.
"""
default_args = {
'sslmode': 'verify-full',
'sslrootcert': pkg_resources.resource_filename(
__name__,
'redshift-ca-bundle.crt'
),
}
cargs, cparams = super(RedshiftDialect, self).create_connect_args(
*args, **kwargs
)
default_args.update(cparams)
return cargs, default_args
def _get_table_or_view_names(self, relkind, connection, schema=None, **kw):
default_schema = inspect(connection).default_schema_name
if not schema:
schema = default_schema
info_cache = kw.get('info_cache')
all_relations = self._get_all_relation_info(connection,
info_cache=info_cache)
relation_names = []
for key, relation in all_relations.items():
if key.schema == schema and relation.relkind == relkind:
relation_names.append(key.name)
return relation_names
def _get_column_info(self, *args, **kwargs):
kw = kwargs.copy()
encode = kw.pop('encode', None)
column_info = super(RedshiftDialect, self)._get_column_info(
*args,
**kw
)
if isinstance(column_info['type'], VARCHAR):
if column_info['type'].length is None:
column_info['type'] = NullType()
if 'info' not in column_info:
column_info['info'] = {}
if encode and encode != 'none':
column_info['info']['encode'] = encode
return column_info
def _get_redshift_relation(self, connection, table_name,
schema=None, **kw):
info_cache = kw.get('info_cache')
all_relations = self._get_all_relation_info(connection,
info_cache=info_cache)
key = RelationKey(table_name, schema, connection)
if key not in all_relations.keys():
key = key.unquoted()
try:
return all_relations[key]
except KeyError:
raise sa.exc.NoSuchTableError(key)
def _get_redshift_columns(self, connection, table_name, schema=None, **kw):
info_cache = kw.get('info_cache')
all_columns = self._get_all_column_info(connection,
info_cache=info_cache)
key = RelationKey(table_name, schema, connection)
if key not in all_columns.keys():
key = key.unquoted()
return all_columns[key]
def _get_redshift_constraints(self, connection, table_name,
schema=None, **kw):
info_cache = kw.get('info_cache')
all_constraints = self._get_all_constraint_info(connection,
info_cache=info_cache)
key = RelationKey(table_name, schema, connection)
if key not in all_constraints.keys():
key = key.unquoted()
return all_constraints[key]
@reflection.cache
def _get_all_relation_info(self, connection, **kw):
result = connection.execute("""
SELECT
c.relkind,
n.oid as "schema_oid",
n.nspname as "schema",
c.oid as "rel_oid",
c.relname,
CASE c.reldiststyle
WHEN 0 THEN 'EVEN' WHEN 1 THEN 'KEY' WHEN 8 THEN 'ALL' END
AS "diststyle",
c.relowner AS "owner_id",
u.usename AS "owner_name",
TRIM(TRAILING ';' FROM pg_catalog.pg_get_viewdef(c.oid, true))
AS "view_definition",
pg_catalog.array_to_string(c.relacl, '\n') AS "privileges"
FROM pg_catalog.pg_class c
LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
JOIN pg_catalog.pg_user u ON u.usesysid = c.relowner
WHERE c.relkind IN ('r', 'v', 'm', 'S', 'f')
AND n.nspname !~ '^pg_'
ORDER BY c.relkind, n.oid, n.nspname;
""")
relations = {}
for rel in result:
key = RelationKey(rel.relname, rel.schema, connection)
relations[key] = rel
return relations
@reflection.cache
def _get_all_column_info(self, connection, **kw):
all_columns = defaultdict(list)
with connection.contextual_connect() as cc:
# We fetch the current search_path, which may or may not quote
# '$user' depending on whether other schemas need quoting.
search_path = cc.execute("SHOW search_path").scalar()
if '$user' in search_path and '"$user"' not in search_path:
search_path = search_path.replace('$user', '"$user"')
# Because pg_table_def only shows results for schemas on the
# search_path, we explicitly include all non-system schemas, then
# replace the original value for search_path.
schema_names = ['"%s"' % r.name for r in cc.execute("""
SELECT nspname AS "name"
FROM pg_catalog.pg_namespace n
WHERE nspname !~ '^pg_'
AND nspname <> 'information_schema'
AND n.oid NOT IN
(SELECT esoid FROM pg_catalog.pg_external_schema)
ORDER BY 1
""")]
modified_search_path = ','.join(schema_names)
cc.execute("SET LOCAL search_path TO %s" % modified_search_path)
result = cc.execute("""
SELECT
n.nspname as "schema",
c.relname as "table_name",
d.column as "name",
encoding as "encode",
type, distkey, sortkey, "notnull", adsrc, attnum,
pg_catalog.format_type(att.atttypid, att.atttypmod),
pg_catalog.pg_get_expr(ad.adbin, ad.adrelid) AS DEFAULT,
n.oid as "schema_oid",
c.oid as "table_oid"
FROM pg_catalog.pg_class c
LEFT JOIN pg_catalog.pg_namespace n
ON n.oid = c.relnamespace
JOIN pg_catalog.pg_table_def d
ON (d.schemaname, d.tablename) = (n.nspname, c.relname)
JOIN pg_catalog.pg_attribute att
ON (att.attrelid, att.attname) = (c.oid, d.column)
LEFT JOIN pg_catalog.pg_attrdef ad
ON (att.attrelid, att.attnum) = (ad.adrelid, ad.adnum)
WHERE n.nspname !~ '^pg_'
ORDER BY n.nspname, c.relname, att.attnum
""")
for col in result:
key = RelationKey(col.table_name, col.schema, connection)
all_columns[key].append(col)
cc.execute("SET LOCAL search_path TO %s" % search_path)
return dict(all_columns)
@reflection.cache
def _get_all_constraint_info(self, connection, **kw):
result = connection.execute("""
SELECT
n.nspname as "schema",
c.relname as "table_name",
t.contype,
t.conname,
t.conkey,
a.attnum,
a.attname,
pg_catalog.pg_get_constraintdef(t.oid, true) as condef,
n.oid as "schema_oid",
c.oid as "rel_oid"
FROM pg_catalog.pg_class c
LEFT JOIN pg_catalog.pg_namespace n
ON n.oid = c.relnamespace
JOIN pg_catalog.pg_constraint t
ON t.conrelid = c.oid
JOIN pg_catalog.pg_attribute a
ON t.conrelid = a.attrelid AND a.attnum = ANY(t.conkey)
WHERE n.nspname !~ '^pg_'
ORDER BY n.nspname, c.relname
""")
all_constraints = defaultdict(list)
for con in result:
key = RelationKey(con.table_name, con.schema, connection)
all_constraints[key].append(con)
return all_constraints
def gen_columns_from_children(root):
"""
Generates columns that are being used in child elements of the delete query
this will be used to determine tables for the using clause.
:param root: the delete query
:return: a generator of columns
"""
if isinstance(root, (Delete, BinaryExpression, BooleanClauseList)):
for child in root.get_children():
yc = gen_columns_from_children(child)
for it in yc:
yield it
elif isinstance(root, sa.Column):
yield root
@compiles(Delete, 'redshift')
def visit_delete_stmt(element, compiler, **kwargs):
"""
Adds redshift-dialect specific compilation rule for the
delete statement.
Redshift DELETE syntax can be found here:
https://docs.aws.amazon.com/redshift/latest/dg/r_DELETE.html
.. :code-block: sql
DELETE [ FROM ] table_name
[ { USING } table_name, ...]
[ WHERE condition ]
By default, SqlAlchemy compiles DELETE statements with the
syntax:
.. :code-block: sql
DELETE [ FROM ] table_name
[ WHERE condition ]
problem illustration:
>>> from sqlalchemy import Table, Column, Integer, MetaData, delete
>>> from sqlalchemy_redshift.dialect import RedshiftDialect
>>> meta = MetaData()
>>> table1 = Table(
... 'table_1',
... meta,
... Column('pk', Integer, primary_key=True)
... )
...
>>> table2 = Table(
... 'table_2',
... meta,
... Column('pk', Integer, primary_key=True)
... )
...
>>> del_stmt = delete(table1).where(table1.c.pk==table2.c.pk)
>>> str(del_stmt.compile(dialect=RedshiftDialect()))
'DELETE FROM table_1 USING table_2 WHERE table_1.pk = table_2.pk'
>>> str(del_stmt)
'DELETE FROM table_1 WHERE table_1.pk = table_2.pk'
>>> del_stmt2 = delete(table1)
>>> str(del_stmt2)
'DELETE FROM table_1'
>>> del_stmt3 = delete(table1).where(table1.c.pk > 1000)
>>> str(del_stmt3)
'DELETE FROM table_1 WHERE table_1.pk > :pk_1'
>>> str(del_stmt3.compile(dialect=RedshiftDialect()))
'DELETE FROM table_1 WHERE table_1.pk > %(pk_1)s'
"""
# Set empty strings for the default where clause and using clause
whereclause = ''
usingclause = ''
# determine if the delete query needs a ``USING`` injected
# by inspecting the whereclause's children & their children...
# first, the where clause text is buit, if applicable
# then, the using clause text is built, if applicable
# note:
# the tables in the using clause are sorted in the order in
# which they first appear in the where clause.
delete_stmt_table = compiler.process(element.table, asfrom=True, **kwargs)
whereclause_tuple = element.get_children()
if whereclause_tuple:
usingclause_tables = []
whereclause = ' WHERE {clause}'.format(
clause=compiler.process(*whereclause_tuple, **kwargs)
)
whereclause_columns = gen_columns_from_children(element)
for col in whereclause_columns:
table = compiler.process(col.table, asfrom=True, **kwargs)
if table != delete_stmt_table and table not in usingclause_tables:
usingclause_tables.append(table)
if usingclause_tables:
usingclause = ' USING {clause}'.format(
clause=', '.join(usingclause_tables)
)
return 'DELETE FROM {table}{using}{where}'.format(
table=delete_stmt_table,
using=usingclause,
where=whereclause)