-
Notifications
You must be signed in to change notification settings - Fork 106
/
introspection.py
434 lines (408 loc) · 19.8 KB
/
introspection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
# Copyright (c) Microsoft Corporation.
# Licensed under the BSD license.
from django.db import DatabaseError
import pyodbc as Database
from collections import namedtuple
from django import VERSION
from django.db.backends.base.introspection import BaseDatabaseIntrospection
from django.db.backends.base.introspection import FieldInfo as BaseFieldInfo
from django.db.backends.base.introspection import TableInfo as BaseTableInfo
from django.db.models.indexes import Index
from django.conf import settings
SQL_AUTOFIELD = -777555
SQL_BIGAUTOFIELD = -777444
SQL_SMALLAUTOFIELD = -777333
SQL_TIMESTAMP_WITH_TIMEZONE = -155
FieldInfo = namedtuple("FieldInfo", BaseFieldInfo._fields + ("comment",))
TableInfo = namedtuple("TableInfo", BaseTableInfo._fields + ("comment",))
def get_schema_name():
return getattr(settings, 'SCHEMA_TO_INSPECT', 'SCHEMA_NAME()')
class DatabaseIntrospection(BaseDatabaseIntrospection):
# Map type codes to Django Field types.
data_types_reverse = {
SQL_AUTOFIELD: 'AutoField',
SQL_BIGAUTOFIELD: 'BigAutoField',
SQL_SMALLAUTOFIELD: 'SmallAutoField',
Database.SQL_BIGINT: 'BigIntegerField',
# Database.SQL_BINARY: ,
Database.SQL_BIT: 'BooleanField',
Database.SQL_CHAR: 'CharField',
Database.SQL_DECIMAL: 'DecimalField',
Database.SQL_DOUBLE: 'FloatField',
Database.SQL_FLOAT: 'FloatField',
Database.SQL_GUID: 'TextField',
Database.SQL_INTEGER: 'IntegerField',
Database.SQL_LONGVARBINARY: 'BinaryField',
# Database.SQL_LONGVARCHAR: ,
Database.SQL_NUMERIC: 'DecimalField',
Database.SQL_REAL: 'FloatField',
Database.SQL_SMALLINT: 'SmallIntegerField',
Database.SQL_SS_TIME2: 'TimeField',
Database.SQL_TINYINT: 'SmallIntegerField',
Database.SQL_TYPE_DATE: 'DateField',
Database.SQL_TYPE_TIME: 'TimeField',
Database.SQL_TYPE_TIMESTAMP: 'DateTimeField',
SQL_TIMESTAMP_WITH_TIMEZONE: 'DateTimeField',
Database.SQL_VARBINARY: 'BinaryField',
Database.SQL_VARCHAR: 'TextField',
Database.SQL_WCHAR: 'CharField',
Database.SQL_WLONGVARCHAR: 'TextField',
Database.SQL_WVARCHAR: 'TextField',
}
ignored_tables = []
def get_field_type(self, data_type, description):
field_type = super().get_field_type(data_type, description)
# the max nvarchar length is described as 0 or 2**30-1
# (it depends on the driver)
size = description.internal_size
if field_type == 'CharField':
if size == 0 or size >= 2**30 - 1:
field_type = "TextField"
elif field_type == 'TextField':
if size > 0 and size < 2**30 - 1:
field_type = 'CharField'
return field_type
def get_table_list(self, cursor):
"""
Returns a list of table and view names in the current database.
"""
if VERSION >= (4, 2) and self.connection.features.supports_comments:
sql = """SELECT
TABLE_NAME,
TABLE_TYPE,
CAST(ep.value AS VARCHAR) AS COMMENT
FROM INFORMATION_SCHEMA.TABLES i
LEFT JOIN sys.tables t ON t.name = i.TABLE_NAME
LEFT JOIN sys.extended_properties ep ON t.object_id = ep.major_id
AND ((ep.name = 'MS_DESCRIPTION' AND ep.minor_id = 0) OR ep.value IS NULL)
WHERE i.TABLE_SCHEMA = %s""" % (
get_schema_name())
else:
sql = 'SELECT TABLE_NAME, TABLE_TYPE FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = %s' % (get_schema_name())
cursor.execute(sql)
types = {'BASE TABLE': 't', 'VIEW': 'v'}
if VERSION >= (4, 2) and self.connection.features.supports_comments:
return [TableInfo(row[0], types.get(row[1]), row[2])
for row in cursor.fetchall()
if row[0] not in self.ignored_tables]
else:
return [BaseTableInfo(row[0], types.get(row[1]))
for row in cursor.fetchall()
if row[0] not in self.ignored_tables]
def _is_auto_field(self, cursor, table_name, column_name):
"""
Checks whether column is Identity
"""
# COLUMNPROPERTY: http://msdn2.microsoft.com/en-us/library/ms174968.aspx
# from django.db import connection
# cursor.execute("SELECT COLUMNPROPERTY(OBJECT_ID(%s), %s, 'IsIdentity')",
# (connection.ops.quote_name(table_name), column_name))
cursor.execute("SELECT COLUMNPROPERTY(OBJECT_ID(%s), %s, 'IsIdentity')",
(self.connection.ops.quote_name(table_name), column_name))
return cursor.fetchall()[0][0]
def get_table_description(self, cursor, table_name, identity_check=True):
"""Returns a description of the table, with DB-API cursor.description interface.
The 'auto_check' parameter has been added to the function argspec.
If set to True, the function will check each of the table's fields for the
IDENTITY property (the IDENTITY property is the MSSQL equivalent to an AutoField).
When an integer field is found with an IDENTITY property, it is given a custom field number
of SQL_AUTOFIELD, which maps to the 'AutoField' value in the DATA_TYPES_REVERSE dict.
When a bigint field is found with an IDENTITY property, it is given a custom field number
of SQL_BIGAUTOFIELD, which maps to the 'BigAutoField' value in the DATA_TYPES_REVERSE dict.
"""
# map pyodbc's cursor.columns to db-api cursor description
columns = [[c[3], c[4], c[6], c[6], c[6], c[8], c[10], c[12]] for c in cursor.columns(table=table_name)]
if not columns:
raise DatabaseError(f"Table {table_name} does not exist.")
items = []
for column in columns:
if VERSION >= (3, 2):
if self.connection.sql_server_version >= 2019:
sql = """SELECT collation_name
FROM sys.columns c
inner join sys.tables t on c.object_id = t.object_id
WHERE t.name = '%s' and c.name = '%s'
""" % (table_name, column[0])
cursor.execute(sql)
collation_name = cursor.fetchone()
column.append(collation_name[0] if collation_name else '')
else:
column.append('')
if VERSION >= (4, 2) and self.connection.features.supports_comments:
sql = """select CAST(ep.value AS VARCHAR) AS COMMENT
FROM sys.columns c
INNER JOIN sys.tables t ON c.object_id = t.object_id
INNER JOIN sys.extended_properties ep ON c.object_id=ep.major_id AND ep.minor_id = c.column_id
WHERE t.name = '%s' AND c.name = '%s' AND ep.name = 'MS_Description'
""" % (table_name, column[0])
cursor.execute(sql)
comment = cursor.fetchone()
column.append(comment[0] if comment else '')
if identity_check and self._is_auto_field(cursor, table_name, column[0]):
if column[1] == Database.SQL_BIGINT:
column[1] = SQL_BIGAUTOFIELD
elif column[1] == Database.SQL_SMALLINT:
column[1] = SQL_SMALLAUTOFIELD
else:
column[1] = SQL_AUTOFIELD
if column[1] == Database.SQL_WVARCHAR and column[3] < 4000:
column[1] = Database.SQL_WCHAR
# Remove surrounding parentheses for default values
if column[7]:
default_value = column[7]
start = 0
end = -1
for _ in range(2):
if default_value[start] == '(' and default_value[end] == ')':
start += 1
end -= 1
column[7] = default_value[start:end + 1]
if VERSION >= (4, 2) and self.connection.features.supports_comments:
items.append(FieldInfo(*column))
else:
items.append(BaseFieldInfo(*column))
return items
def get_sequences(self, cursor, table_name, table_fields=()):
cursor.execute(f"""
SELECT c.name FROM sys.columns c
INNER JOIN sys.tables t ON c.object_id = t.object_id
WHERE t.schema_id = SCHEMA_ID({get_schema_name()}) AND t.name = %s AND c.is_identity = 1""",
[table_name])
# SQL Server allows only one identity column per table
# https://docs.microsoft.com/en-us/sql/t-sql/statements/create-table-transact-sql-identity-property
row = cursor.fetchone()
return [{'table': table_name, 'column': row[0]}] if row else []
def get_relations(self, cursor, table_name):
"""
Returns a dictionary of {field_name: (field_name_other_table, other_table)}
representing all relationships to the given table.
"""
# CONSTRAINT_COLUMN_USAGE: http://msdn2.microsoft.com/en-us/library/ms174431.aspx
# CONSTRAINT_TABLE_USAGE: http://msdn2.microsoft.com/en-us/library/ms179883.aspx
# REFERENTIAL_CONSTRAINTS: http://msdn2.microsoft.com/en-us/library/ms179987.aspx
# TABLE_CONSTRAINTS: http://msdn2.microsoft.com/en-us/library/ms181757.aspx
sql = f"""
SELECT e.COLUMN_NAME AS column_name,
c.TABLE_NAME AS referenced_table_name,
d.COLUMN_NAME AS referenced_column_name
FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS AS a
INNER JOIN INFORMATION_SCHEMA.REFERENTIAL_CONSTRAINTS AS b
ON a.CONSTRAINT_NAME = b.CONSTRAINT_NAME AND a.TABLE_SCHEMA = b.CONSTRAINT_SCHEMA
INNER JOIN INFORMATION_SCHEMA.CONSTRAINT_TABLE_USAGE AS c
ON b.UNIQUE_CONSTRAINT_NAME = c.CONSTRAINT_NAME AND b.CONSTRAINT_SCHEMA = c.CONSTRAINT_SCHEMA
INNER JOIN INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE AS d
ON c.CONSTRAINT_NAME = d.CONSTRAINT_NAME AND c.CONSTRAINT_SCHEMA = d.CONSTRAINT_SCHEMA
INNER JOIN INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE AS e
ON a.CONSTRAINT_NAME = e.CONSTRAINT_NAME AND a.TABLE_SCHEMA = e.TABLE_SCHEMA
WHERE a.TABLE_SCHEMA = {get_schema_name()} AND a.TABLE_NAME = %s AND a.CONSTRAINT_TYPE = 'FOREIGN KEY'"""
cursor.execute(sql, (table_name,))
return dict([[item[0], (item[2], item[1])] for item in cursor.fetchall()])
def get_key_columns(self, cursor, table_name):
"""
Returns a list of (column_name, referenced_table_name, referenced_column_name) for all
key columns in given table.
"""
key_columns = []
cursor.execute(f"""
SELECT c.name AS column_name, rt.name AS referenced_table_name, rc.name AS referenced_column_name
FROM sys.foreign_key_columns fk
INNER JOIN sys.tables t ON t.object_id = fk.parent_object_id
INNER JOIN sys.columns c ON c.object_id = t.object_id AND c.column_id = fk.parent_column_id
INNER JOIN sys.tables rt ON rt.object_id = fk.referenced_object_id
INNER JOIN sys.columns rc ON rc.object_id = rt.object_id AND rc.column_id = fk.referenced_column_id
WHERE t.schema_id = SCHEMA_ID({get_schema_name()}) AND t.name = %s""", [table_name])
key_columns.extend([tuple(row) for row in cursor.fetchall()])
return key_columns
def get_constraints(self, cursor, table_name):
"""
Retrieves any constraints or keys (unique, pk, fk, check, index)
across one or more columns.
Returns a dict mapping constraint names to their attributes,
where attributes is a dict with keys:
* columns: List of columns this covers
* primary_key: True if primary key, False otherwise
* unique: True if this is a unique constraint, False otherwise
* foreign_key: (table, column) of target, or None
* check: True if check constraint, False otherwise
* index: True if index, False otherwise.
* orders: The order (ASC/DESC) defined for the columns of indexes
* type: The type of the index (btree, hash, etc.)
"""
constraints = {}
# Loop over the key table, collecting things as constraints
# This will get PKs, FKs, and uniques, but not CHECK
cursor.execute(f"""
SELECT
kc.constraint_name,
kc.column_name,
tc.constraint_type,
fk.referenced_table_name,
fk.referenced_column_name
FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE AS kc
INNER JOIN INFORMATION_SCHEMA.TABLE_CONSTRAINTS AS tc ON
kc.table_schema = tc.table_schema AND
kc.table_name = tc.table_name AND
kc.constraint_name = tc.constraint_name
LEFT OUTER JOIN (
SELECT
ps.name AS table_schema,
pt.name AS table_name,
pc.name AS column_name,
rt.name AS referenced_table_name,
rc.name AS referenced_column_name
FROM
sys.foreign_key_columns fkc
INNER JOIN sys.tables pt ON
fkc.parent_object_id = pt.object_id
INNER JOIN sys.schemas ps ON
pt.schema_id = ps.schema_id
INNER JOIN sys.columns pc ON
fkc.parent_object_id = pc.object_id AND
fkc.parent_column_id = pc.column_id
INNER JOIN sys.tables rt ON
fkc.referenced_object_id = rt.object_id
INNER JOIN sys.schemas rs ON
rt.schema_id = rs.schema_id
INNER JOIN sys.columns rc ON
fkc.referenced_object_id = rc.object_id AND
fkc.referenced_column_id = rc.column_id
) fk ON
kc.table_schema = fk.table_schema AND
kc.table_name = fk.table_name AND
kc.column_name = fk.column_name
WHERE
kc.table_schema = {get_schema_name()} AND
kc.table_name = %s
ORDER BY
kc.constraint_name ASC,
kc.ordinal_position ASC
""", [table_name])
for constraint, column, kind, ref_table, ref_column in cursor.fetchall():
# If we're the first column, make the record
if constraint not in constraints:
constraints[constraint] = {
"columns": [],
"primary_key": kind.lower() == "primary key",
# In the sys.indexes table, primary key indexes have is_unique_constraint as false,
# but is_unique as true.
"unique": kind.lower() in ["primary key", "unique"],
"unique_constraint": kind.lower() == "unique",
"foreign_key": (ref_table, ref_column) if kind.lower() == "foreign key" else None,
"check": False,
# Potentially misleading: primary key and unique constraints still have indexes attached to them.
# Should probably be updated with the additional info from the sys.indexes table we fetch later on.
"index": False,
"default": False,
}
# Record the details
constraints[constraint]['columns'].append(column)
# Now get CHECK constraint columns
cursor.execute(f"""
SELECT kc.constraint_name, kc.column_name
FROM INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE AS kc
JOIN INFORMATION_SCHEMA.TABLE_CONSTRAINTS AS c ON
kc.table_schema = c.table_schema AND
kc.table_name = c.table_name AND
kc.constraint_name = c.constraint_name
WHERE
c.constraint_type = 'CHECK' AND
kc.table_schema = {get_schema_name()} AND
kc.table_name = %s
""", [table_name])
for constraint, column in cursor.fetchall():
# If we're the first column, make the record
if constraint not in constraints:
constraints[constraint] = {
"columns": [],
"primary_key": False,
"unique": False,
"unique_constraint": False,
"foreign_key": None,
"check": True,
"index": False,
"default": False,
}
# Record the details
constraints[constraint]['columns'].append(column)
# Now get DEFAULT constraint columns
cursor.execute("""
SELECT
[name],
COL_NAME([parent_object_id], [parent_column_id])
FROM
[sys].[default_constraints]
WHERE
OBJECT_NAME([parent_object_id]) = %s
""", [table_name])
for constraint, column in cursor.fetchall():
# If we're the first column, make the record
if constraint not in constraints:
constraints[constraint] = {
"columns": [],
"primary_key": False,
"unique": False,
"unique_constraint": False,
"foreign_key": None,
"check": False,
"index": False,
"default": True,
}
# Record the details
constraints[constraint]['columns'].append(column)
# Now get indexes
cursor.execute(f"""
SELECT
i.name AS index_name,
i.is_unique,
i.is_unique_constraint,
i.is_primary_key,
i.type,
i.type_desc,
ic.is_descending_key,
c.name AS column_name
FROM
sys.tables AS t
INNER JOIN sys.schemas AS s ON
t.schema_id = s.schema_id
INNER JOIN sys.indexes AS i ON
t.object_id = i.object_id
INNER JOIN sys.index_columns AS ic ON
i.object_id = ic.object_id AND
i.index_id = ic.index_id
INNER JOIN sys.columns AS c ON
ic.object_id = c.object_id AND
ic.column_id = c.column_id
WHERE
t.schema_id = SCHEMA_ID({get_schema_name()}) AND
t.name = %s
ORDER BY
i.index_id ASC,
ic.index_column_id ASC
""", [table_name])
indexes = {}
for index, unique, unique_constraint, primary, type_, desc, order, column in cursor.fetchall():
if index not in indexes:
indexes[index] = {
"columns": [],
"primary_key": primary,
"unique": unique,
"unique_constraint": unique_constraint,
"foreign_key": None,
"check": False,
"default": False,
"index": True,
"orders": [],
"type": Index.suffix if type_ in (1, 2) else desc.lower(),
}
indexes[index]["columns"].append(column)
indexes[index]["orders"].append("DESC" if order == 1 else "ASC")
for index, constraint in indexes.items():
if index not in constraints:
constraints[index] = constraint
return constraints
def get_primary_key_column(self, cursor, table_name):
cursor.execute("SELECT 1 FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = N'%s'" % table_name)
row = cursor.fetchone()
if row is None:
raise ValueError("Table %s does not exist" % table_name)
return super().get_primary_key_column(cursor, table_name)