Skip to content

Commit

Permalink
finish sync PK and FK for mysql
Browse files Browse the repository at this point in the history
  • Loading branch information
pongthep committed Apr 14, 2021
1 parent 7a98c75 commit 253fced
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 6 deletions.
20 changes: 20 additions & 0 deletions examples/rdbms/mysql_extractor_script.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from metadata_extractor.publisher.atlas_publisher import AtlasPublisher
from metadata_extractor.publisher.rdbms_publisher import RDBMSPublisher
from metadata_extractor.pipeline.rdbms_pipeline import RDBMSPipeline
from metadata_extractor.models.hosts.rdbms_host import RDBMSHost
from metadata_extractor.models.enum.db_engine_enum import DBEngine

if __name__ == "__main__":
host = ''
port = 3306
db_name = ''
db_user = ''
db_password = ''

atlas = AtlasPublisher(host='http://localhost:21000', password='')
rdbms_publisher = RDBMSPublisher(atlas_publisher=atlas)

rdbms_host = RDBMSHost(host=host, port=port, db_name=db_name, db_schema=db_name, db_user=db_user,
db_password=db_password)

RDBMSPipeline.sync_full_db(engine_name=DBEngine.mysql.name, rdbms_publisher=rdbms_publisher, rdbms_host=rdbms_host)
66 changes: 62 additions & 4 deletions metadata_extractor/extractor/rdbms/mysql_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from metadata_extractor.connection.connection_abstract import RDBMSConnection
from metadata_extractor.extractor.rdbms.rdbms_extractor_abstract import RDBMSExtractor
from metadata_extractor.models.atlas_model.rdbms.database_schema import DatabaseSchema
from metadata_extractor.models.atlas_model.rdbms.table_info import Table
from metadata_extractor.models.atlas_model.rdbms.table_info import Table, TableForeignKey
from metadata_extractor.models.atlas_model.rdbms.column_info import Column
from typing import List, Dict, Set

Expand All @@ -15,6 +15,7 @@ def extract_db_schema(self, conn: RDBMSConnection = None, builder: RDBMSBuilder
fetch_size: int = 50
table_map: Dict[str, Table] = {}
column_map: Dict[str, List[Column]] = {}
pk_map: Dict[str, Set[str]] = self.extract_table_pk(conn=conn, builder=builder, db_schema=db_schema)
with conn.get_conn().cursor() as cursor:
sql = "SELECT lower(c.table_schema) AS \"schema\", " \
"lower(c.table_name) AS name, " \
Expand Down Expand Up @@ -46,8 +47,10 @@ def extract_db_schema(self, conn: RDBMSConnection = None, builder: RDBMSBuilder
db_schema=db_schema))
table_map.update({table_name: table_obj})

is_col_pk = col_name in pk_map.get(table_name, {})

col_obj: Column = builder.build_column(column_name=col_name, data_type=col_type, desc=col_desc,
table=table_obj)
is_pk=is_col_pk, table=table_obj)
column_list = column_map.get(table_name, [])
column_list.append(col_obj)
column_map.update({table_name: column_list})
Expand Down Expand Up @@ -75,9 +78,64 @@ def get_table_list(self, cursor: None, db_schema: str = 'public'):

def extract_table_fk(self, conn: RDBMSConnection = None, builder: RDBMSBuilder = None,
db_schema: DatabaseSchema = None) -> Dict[str, Set[str]]:
pass
fetch_size: int = 50
table_fk_map: Dict[str, List[TableForeignKey]] = {}
with conn.get_conn().cursor() as cursor:
sql = "SELECT TABLE_NAME, " \
"COLUMN_NAME, " \
"REFERENCED_TABLE_SCHEMA, " \
"REFERENCED_TABLE_NAME, " \
"REFERENCED_COLUMN_NAME " \
"FROM " \
"INFORMATION_SCHEMA.KEY_COLUMN_USAGE " \
"WHERE " \
" REFERENCED_TABLE_SCHEMA = '{db_name}';".format(db_name=db_schema.db.name)
cursor.execute(sql)
rows = cursor.fetchmany(fetch_size)

while rows is not None and len(rows) > 0:
for row in rows:
base_table_name = row[0]
base_column_name = row[1]
refer_table_schema = row[2]
refer_table_name = row[3]
refer_column_name = row[4]

fk_list = table_fk_map.get(base_table_name, [])
table_fk = TableForeignKey(db_schema_base=db_schema, table_base=base_table_name
, column_base=base_column_name
, schema_refer=refer_table_schema
, table_refer=refer_table_name
, column_refer=refer_column_name)

fk_list.append(table_fk)
table_fk_map.update({base_table_name: fk_list})
rows = cursor.fetchmany(fetch_size)

return table_fk_map

def extract_table_pk(self, conn: RDBMSConnection = None, builder: RDBMSBuilder = None,
db_schema: DatabaseSchema = None) -> Dict[str, Set[str]]:
pass
fetch_size: int = 50
pk_map: Dict[str, Set[str]] = {}
with conn.get_conn().cursor() as cursor:
sql = "SELECT t.table_name,k.column_name " \
"FROM information_schema.table_constraints t " \
"JOIN information_schema.key_column_usage k " \
"USING(constraint_name,table_schema,table_name) " \
"WHERE t.constraint_type='PRIMARY KEY' " \
"AND t.table_schema='{db_name}';".format(db_name=db_schema.db.name)
cursor.execute(sql)
rows = cursor.fetchmany(fetch_size)

while rows is not None and len(rows) > 0:
for row in rows:
table_name = row[0]
column_name = row[1]

column_pk_set = pk_map.get(table_name, set())
column_pk_set.add(column_name)
pk_map.update({table_name: column_pk_set})
rows = cursor.fetchmany(fetch_size)

return pk_map
3 changes: 1 addition & 2 deletions metadata_extractor/extractor/rdbms/postgresql_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@
from metadata_extractor.connection.connection_abstract import RDBMSConnection
from metadata_extractor.builders.rdbms.rdbms_builder_abstract import RDBMSBuilder
from metadata_extractor.models.atlas_model.rdbms.database_schema import DatabaseSchema
from metadata_extractor.models.atlas_model.rdbms.table_info import Table
from metadata_extractor.models.atlas_model.rdbms.table_info import TableForeignKey
from metadata_extractor.models.atlas_model.rdbms.table_info import Table, TableForeignKey
from metadata_extractor.models.atlas_model.rdbms.column_info import Column
from typing import List, Dict, Set

Expand Down

0 comments on commit 253fced

Please sign in to comment.