From 4083e67d704d3190083805a643262ced42577b5f Mon Sep 17 00:00:00 2001 From: "Paul\\Home" Date: Wed, 14 Apr 2021 14:17:22 +0700 Subject: [PATCH 1/5] publish fk relation --- metadata_extractor/{services => publish}/__init__.py | 0 metadata_extractor/{services => publish}/atlas_service.py | 0 metadata_extractor/{services => publish}/rdbms_service.py | 0 3 files changed, 0 insertions(+), 0 deletions(-) rename metadata_extractor/{services => publish}/__init__.py (100%) rename metadata_extractor/{services => publish}/atlas_service.py (100%) rename metadata_extractor/{services => publish}/rdbms_service.py (100%) diff --git a/metadata_extractor/services/__init__.py b/metadata_extractor/publish/__init__.py similarity index 100% rename from metadata_extractor/services/__init__.py rename to metadata_extractor/publish/__init__.py diff --git a/metadata_extractor/services/atlas_service.py b/metadata_extractor/publish/atlas_service.py similarity index 100% rename from metadata_extractor/services/atlas_service.py rename to metadata_extractor/publish/atlas_service.py diff --git a/metadata_extractor/services/rdbms_service.py b/metadata_extractor/publish/rdbms_service.py similarity index 100% rename from metadata_extractor/services/rdbms_service.py rename to metadata_extractor/publish/rdbms_service.py From fd6840b8e2a931433d2de9e5f11612cea29473f5 Mon Sep 17 00:00:00 2001 From: "Paul\\Home" Date: Wed, 14 Apr 2021 14:17:44 +0700 Subject: [PATCH 2/5] finish fk link --- examples/rdbms/postgresql_extractor_script.py | 22 ++--- .../builders/rdbms/rdbms_builder_abstract.py | 8 +- .../extractor/rdbms/mysql_extractor.py | 11 ++- .../extractor/rdbms/postgresql_extractor.py | 84 ++++++++++++++++++- .../rdbms/rdbms_extractor_abstract.py | 12 ++- .../factories/connection_factory.py | 6 +- .../models/atlas_model/rdbms/column_info.py | 16 ++-- .../models/atlas_model/rdbms/table_info.py | 27 +++++- metadata_extractor/pipeline/rdbms_pipeline.py | 28 ++++--- .../{publish => publisher}/__init__.py | 0 .../atlas_publisher.py} | 2 +- .../rdbms_publisher.py} | 53 ++++++++++-- 12 files changed, 219 insertions(+), 50 deletions(-) rename metadata_extractor/{publish => publisher}/__init__.py (100%) rename metadata_extractor/{publish/atlas_service.py => publisher/atlas_publisher.py} (99%) rename metadata_extractor/{publish/rdbms_service.py => publisher/rdbms_publisher.py} (86%) diff --git a/examples/rdbms/postgresql_extractor_script.py b/examples/rdbms/postgresql_extractor_script.py index 491466d..8d25b92 100644 --- a/examples/rdbms/postgresql_extractor_script.py +++ b/examples/rdbms/postgresql_extractor_script.py @@ -1,21 +1,21 @@ -from metadata_extractor.services.atlas_service import AtlasService -from metadata_extractor.services.rdbms_service import RDBMSService +from metadata_extractor.publisher.atlas_publisher import AtlasPublisher +from metadata_extractor.publisher.rdbms_publisher import RDBMSPublisher from metadata_extractor.pipeline.rdbms_pipeline import RDBMSPipeline from metadata_extractor.models.hosts.rdbms_host import RDBMSHost from metadata_extractor.models.enum.db_engine_enum import DBEngine if __name__ == "__main__": - host = '' - port = 5432 - db_name = '' - db_schema = '' - db_user = '' - db_password = '' + host = '192.168.1.131' + port = 55001 + db_name = 'postgres' + db_schema = 'public' + db_user = 'postgres' + db_password = 'q1w2e3r4' - atlas = AtlasService(host='http://localhost:21000', password='') - rdbms_service = RDBMSService(atlas_service=atlas) + atlas = AtlasPublisher(host='http://192.168.1.131:21000', password='admin') + rdbms_publisher = RDBMSPublisher(atlas_publisher=atlas) rdbms_host = RDBMSHost(host=host, port=port, db_name=db_name, db_schema=db_schema, db_user=db_user, db_password=db_password) - RDBMSPipeline.sync_full_db(image_name=DBEngine.postgresql.name, rdbms_service=rdbms_service, rdbms_host=rdbms_host) + RDBMSPipeline.sync_full_db(engine_name=DBEngine.postgresql.name, rdbms_publisher=rdbms_publisher, rdbms_host=rdbms_host) diff --git a/metadata_extractor/builders/rdbms/rdbms_builder_abstract.py b/metadata_extractor/builders/rdbms/rdbms_builder_abstract.py index 2bd08c5..b82dc2f 100644 --- a/metadata_extractor/builders/rdbms/rdbms_builder_abstract.py +++ b/metadata_extractor/builders/rdbms/rdbms_builder_abstract.py @@ -18,17 +18,17 @@ def build_instance(self, conn: RDBMSConnection = None) -> RDBMSInstance: @staticmethod def build_database(name: str = '', instance: RDBMSInstance = None) -> Database: - return Database(name, instance) + return Database(name=name, instance=instance) @staticmethod def build_database_schema(name: str = 'public', db: Database = None) -> DatabaseSchema: - return DatabaseSchema(name, db) + return DatabaseSchema(name=name, db=db) @staticmethod def build_table(table_name: str = '', desc: str = '', db_schema: DatabaseSchema = None) -> Table: return Table(name=table_name, db_schema=db_schema, desc=desc) @staticmethod - def build_column(column_name: str = '', data_type: str = '', length: str = '', desc: str = '', + def build_column(column_name: str = '', data_type: str = '', length: str = '', desc: str = '', is_pk: bool = False, table: Table = None) -> Column: - return Column(column_name, data_type, length, desc, table) + return Column(name=column_name, data_type=data_type, length=length, desc=desc, is_pk=is_pk, table=table) diff --git a/metadata_extractor/extractor/rdbms/mysql_extractor.py b/metadata_extractor/extractor/rdbms/mysql_extractor.py index 46c7545..a9997e7 100644 --- a/metadata_extractor/extractor/rdbms/mysql_extractor.py +++ b/metadata_extractor/extractor/rdbms/mysql_extractor.py @@ -6,7 +6,7 @@ from metadata_extractor.models.atlas_model.rdbms.database_schema import DatabaseSchema from metadata_extractor.models.atlas_model.rdbms.table_info import Table from metadata_extractor.models.atlas_model.rdbms.column_info import Column -from typing import List, Dict +from typing import List, Dict, Set class MysqlExtractor(RDBMSExtractor): @@ -72,3 +72,12 @@ def get_table_list(self, cursor: None, db_schema: str = 'public'): table_list.append(table_name[0]) return table_list + + def extract_table_fk(self, conn: RDBMSConnection = None, builder: RDBMSBuilder = None, + db_schema: DatabaseSchema = None) -> Dict[str, Set[str]]: + pass + + def extract_table_pk(self, conn: RDBMSConnection = None, builder: RDBMSBuilder = None, + db_schema: DatabaseSchema = None) -> Dict[str, Set[str]]: + pass + diff --git a/metadata_extractor/extractor/rdbms/postgresql_extractor.py b/metadata_extractor/extractor/rdbms/postgresql_extractor.py index 978dfc3..9e6cf46 100644 --- a/metadata_extractor/extractor/rdbms/postgresql_extractor.py +++ b/metadata_extractor/extractor/rdbms/postgresql_extractor.py @@ -9,12 +9,12 @@ from metadata_extractor.builders.rdbms.rdbms_builder_abstract import RDBMSBuilder from metadata_extractor.models.atlas_model.rdbms.database_schema import DatabaseSchema from metadata_extractor.models.atlas_model.rdbms.table_info import Table -from metadata_extractor.models.atlas_model.rdbms .column_info import Column -from typing import List, Dict +from metadata_extractor.models.atlas_model.rdbms.table_info import TableForeignKey +from metadata_extractor.models.atlas_model.rdbms.column_info import Column +from typing import List, Dict, Set class PostgresqlExtractor(RDBMSExtractor): - # TODO need to be updated or deleted @staticmethod def get_column_key(conn: RDBMSConnection = None, db_schema: str = '', table_name: str = ''): @@ -65,6 +65,7 @@ def extract_db_schema(self, conn: RDBMSConnection = None, builder: RDBMSBuilder fetch_size: int = 50 table_map: Dict[str, Table] = {} column_map: Dict[str, List[Column]] = {} + pk_map: Dict[str, Set[str]] = self.extract_table_pk(conn=conn, builder=builder, db_schema=db_schema) with conn.get_conn().cursor() as cursor: sql = "select c.table_schema as schema, c.table_name as name" \ ", pgtd.description as description,c.column_name as col_name, c.data_type as col_type" \ @@ -91,8 +92,10 @@ def extract_db_schema(self, conn: RDBMSConnection = None, builder: RDBMSBuilder db_schema=db_schema)) table_map.update({table_name: table_obj}) + is_col_pk = col_name in pk_map.get(table_name, {}) + col_obj: Column = builder.build_column(column_name=col_name, data_type=col_type, desc=col_desc, - table=table_obj) + is_pk=is_col_pk, table=table_obj) column_list = column_map.get(table_name, []) column_list.append(col_obj) column_map.update({table_name: column_list}) @@ -127,3 +130,76 @@ def extract_column(self, conn: RDBMSConnection = None, db_schema: str = 'public' column_schema = cursor.fetchall() # TODO pack result into object return column_schema + + def extract_table_pk(self, conn: RDBMSConnection = None, builder: RDBMSBuilder = None, + db_schema: DatabaseSchema = None) -> Dict[str, Set[str]]: + fetch_size: int = 50 + pk_map: Dict[str, Set[str]] = {} + with conn.get_conn().cursor() as cursor: + sql = "SELECT tc.table_name,c.column_name,tc.constraint_type " \ + "FROM information_schema.table_constraints tc " \ + "JOIN information_schema.constraint_column_usage AS ccu USING (constraint_schema, constraint_name) " \ + "JOIN information_schema.columns AS c ON c.table_schema = tc.constraint_schema " \ + " AND tc.table_name = c.table_name AND ccu.column_name = c.column_name " \ + " where c.table_schema = '{db_schema}' and tc.constraint_type = 'PRIMARY KEY';" \ + .format(db_schema=db_schema.name) + cursor.execute(sql) + rows = cursor.fetchmany(fetch_size) + + while rows is not None and len(rows) > 0: + for row in rows: + table_name = row[1] + column_name = row[2] + + column_pk_set = pk_map.get(table_name, set()) + column_pk_set.add(column_name) + pk_map.update({table_name: column_pk_set}) + rows = cursor.fetchmany(fetch_size) + + return pk_map + + def extract_table_fk(self, conn: RDBMSConnection = None, builder: RDBMSBuilder = None, + db_schema: DatabaseSchema = None) -> Dict[str, List[TableForeignKey]]: + + fetch_size: int = 50 + table_fk_map: Dict[str, List[TableForeignKey]] = {} + + with conn.get_conn().cursor() as cursor: + sql = "SELECT tc.table_name, " \ + "kcu.column_name, " \ + "ccu.table_schema AS foreign_table_schema, " \ + "ccu.table_name AS foreign_table_name, " \ + "ccu.column_name AS foreign_column_name " \ + "FROM " \ + "information_schema.table_constraints AS tc " \ + "JOIN information_schema.key_column_usage AS kcu " \ + "ON tc.constraint_name = kcu.constraint_name " \ + "AND tc.table_schema = kcu.table_schema " \ + "JOIN information_schema.constraint_column_usage AS ccu " \ + "ON ccu.constraint_name = tc.constraint_name " \ + "AND ccu.table_schema = tc.table_schema " \ + "where tc.constraint_type = 'FOREIGN KEY' and tc.table_schema = '{db_schema}'"\ + .format(db_schema=db_schema.name) + cursor.execute(sql) + rows = cursor.fetchmany(fetch_size) + + while rows is not None and len(rows) > 0: + for row in rows: + base_table_name = row[0] + base_column_name = row[1] + refer_table_schema = row[2] + refer_table_name = row[3] + refer_column_name = row[4] + + fk_list = table_fk_map.get(base_table_name, []) + table_fk = TableForeignKey(db_schema_base=db_schema, table_base=base_table_name + , column_base=base_column_name + , schema_refer=refer_table_schema + , table_refer=refer_table_name + , column_refer=refer_column_name) + + fk_list.append(table_fk) + table_fk_map.update({base_table_name: fk_list}) + rows = cursor.fetchmany(fetch_size) + + return table_fk_map diff --git a/metadata_extractor/extractor/rdbms/rdbms_extractor_abstract.py b/metadata_extractor/extractor/rdbms/rdbms_extractor_abstract.py index 3604d0f..f7db2f6 100644 --- a/metadata_extractor/extractor/rdbms/rdbms_extractor_abstract.py +++ b/metadata_extractor/extractor/rdbms/rdbms_extractor_abstract.py @@ -3,7 +3,7 @@ from metadata_extractor.builders.rdbms.rdbms_builder_abstract import RDBMSBuilder from metadata_extractor.models.atlas_model.rdbms.database_schema import DatabaseSchema from metadata_extractor.models.atlas_model.rdbms.column_info import Column -from typing import List, Dict +from typing import List, Dict, Set class RDBMSExtractor(ABC): @@ -19,3 +19,13 @@ def extract_db_schema(self, conn: RDBMSConnection = None, builder: RDBMSBuilder @abstractmethod def extract_column(self, conn: RDBMSConnection = None, db_schema: str = '', table_name: str = ''): pass + + @abstractmethod + def extract_table_pk(self, conn: RDBMSConnection = None, builder: RDBMSBuilder = None, + db_schema: DatabaseSchema = None) -> Dict[str, Set[str]]: + pass + + @abstractmethod + def extract_table_fk(self, conn: RDBMSConnection = None, builder: RDBMSBuilder = None, + db_schema: DatabaseSchema = None) -> Dict[str, Set[str]]: + pass diff --git a/metadata_extractor/factories/connection_factory.py b/metadata_extractor/factories/connection_factory.py index b4ba7fe..3675503 100644 --- a/metadata_extractor/factories/connection_factory.py +++ b/metadata_extractor/factories/connection_factory.py @@ -6,17 +6,17 @@ class RDBMSConnectionFactory: @staticmethod - def create(image_name: str, host: str = '', port: int = 1111, db_name: str = '', user: str = '', + def create(engine_name: str, host: str = '', port: int = 1111, db_name: str = '', user: str = '', password: str = '', **kwargs: {}) -> RDBMSConnection: switcher = { DBEngine.postgresql.name: PostgresqlConnection(), DBEngine.mysql.name: MysqlConnection() } - builder_obj = switcher.get(image_name, None) + builder_obj = switcher.get(engine_name, None) if builder_obj is not None: builder_obj.create_conn(host, port, db_name, user, password, **kwargs) return builder_obj else: - raise ValueError('Not found connection named \'%s\'' % image_name) + raise ValueError('Not found connection named \'%s\'' % engine_name) diff --git a/metadata_extractor/models/atlas_model/rdbms/column_info.py b/metadata_extractor/models/atlas_model/rdbms/column_info.py index 9b4fd7c..70a8252 100644 --- a/metadata_extractor/models/atlas_model/rdbms/column_info.py +++ b/metadata_extractor/models/atlas_model/rdbms/column_info.py @@ -5,21 +5,25 @@ def get_delimiter(): return '.' -def get_qualified_name(db_schema: str, table_qn: str, col_name: str) -> str: - return '{db_schema}{delimiter}{table}{delimiter}{column}' \ +def get_qualified_name(table_qn: str, col_name: str) -> str: + return '{table}{delimiter}{column}' \ .format(delimiter=get_delimiter() - , db_schema=db_schema , table=table_qn , column=col_name) class Column: - def __init__(self, name: str, data_type: str, length: str, desc: str, table: Table): + def __init__(self, name: str = '', + data_type: str = '', + length: str = '', + desc: str = '', + is_pk: bool = False, + table: Table = None): self.name: str = name self.data_type: str = data_type self.length: str = length self.desc: str = desc + self.is_pk: str = is_pk self.table: Table = table - self.qualified_name = get_qualified_name(db_schema=table.db_schema.name - , table_qn=table.qualified_name + self.qualified_name = get_qualified_name(table_qn=table.qualified_name , col_name=name) diff --git a/metadata_extractor/models/atlas_model/rdbms/table_info.py b/metadata_extractor/models/atlas_model/rdbms/table_info.py index dcb6a35..c62112d 100644 --- a/metadata_extractor/models/atlas_model/rdbms/table_info.py +++ b/metadata_extractor/models/atlas_model/rdbms/table_info.py @@ -19,4 +19,29 @@ def __init__(self, name: str = '', db_schema: DatabaseSchema = None, desc: str = self.desc: str = desc self.tags: List[str] = tags - self.qualified_name: str = get_qualified_name(db_schema=db_schema.name, table_name=name) + self.qualified_name: str = get_qualified_name(db_schema=db_schema.qualified_name, table_name=name) + + +class TableForeignKey: + def __init__(self, + db_schema_base: DatabaseSchema = None, + table_base: str = '', + column_base: str = '', + schema_refer: str = '', + table_refer: str = '', + column_refer: str = ''): + # Table that contain FK is base table + self.table_base_qn: str = get_qualified_name(db_schema=db_schema_base.qualified_name, + table_name=table_base) + self.column_base_qn: str = '{table_qn}{delimiter}{column_name}' \ + .format(table_qn=self.table_base_qn, delimiter=get_delimiter(), column_name=column_base) + + self.table_refer_qn: str = '{db_qn}{delimiter}{schema_refer_name}{delimiter}{table_table}' \ + .format(db_qn=db_schema_base.db.name, delimiter=get_delimiter(), schema_refer_name=schema_refer, + table_table=table_refer) + self.column_refer_qn: str = '{table_qn}{delimiter}{column_name}' \ + .format(table_qn=self.table_refer_qn, delimiter=get_delimiter(), column_name=column_refer) + + self.qualified_name: str = '{base_table_column}#FK#{refer_table_column}' \ + .format(base_table_column=self.column_base_qn, + refer_table_column=self.column_refer_qn) diff --git a/metadata_extractor/pipeline/rdbms_pipeline.py b/metadata_extractor/pipeline/rdbms_pipeline.py index 12289ad..da9c0c2 100644 --- a/metadata_extractor/pipeline/rdbms_pipeline.py +++ b/metadata_extractor/pipeline/rdbms_pipeline.py @@ -1,6 +1,6 @@ -from typing import List, Dict +from typing import List, Dict, Set from metadata_extractor.models.hosts.rdbms_host import RDBMSHost -from metadata_extractor.services.rdbms_service import RDBMSService +from metadata_extractor.publisher.rdbms_publisher import RDBMSPublisher from metadata_extractor.connection.connection_abstract import RDBMSConnection from metadata_extractor.extractor.rdbms.rdbms_extractor_abstract import RDBMSExtractor from metadata_extractor.builders.rdbms.rdbms_builder_abstract import RDBMSBuilder @@ -10,25 +10,27 @@ from metadata_extractor.models.atlas_model.rdbms.rdbms_instance import RDBMSInstance from metadata_extractor.models.atlas_model.rdbms.database_info import Database from metadata_extractor.models.atlas_model.rdbms.column_info import Column +from metadata_extractor.models.atlas_model.rdbms.table_info import TableForeignKey class RDBMSPipeline: @staticmethod - def sync_full_db(image_name: str, rdbms_service: RDBMSService, rdbms_host: RDBMSHost): - conn: RDBMSConnection = RDBMSConnectionFactory.create(image_name=image_name, + def sync_full_db(engine_name: str, rdbms_publisher: RDBMSPublisher, rdbms_host: RDBMSHost): + conn: RDBMSConnection = RDBMSConnectionFactory.create(engine_name=engine_name, host=rdbms_host.host, port=rdbms_host.port, db_name=rdbms_host.db_name, user=rdbms_host.db_user, password=rdbms_host.db_password) - extractor: RDBMSExtractor = RDBMSExtractorFactory.create(image_name) - builder: RDBMSBuilder = RDBMSBuilderFactory.create(image_name) + # create extractor + extractor: RDBMSExtractor = RDBMSExtractorFactory.create(engine_name) + builder: RDBMSBuilder = RDBMSBuilderFactory.create(engine_name) instance: RDBMSInstance = builder.build_instance(conn) - rdbms_service.publish_instance(instance=instance, db_name=conn.db_name) + rdbms_publisher.publish_instance(instance=instance, db_name=conn.db_name) db: Database = builder.build_database(conn.db_name, instance) - rdbms_service.publish_database(instance_qualified_name=instance.qualified_name, db=db) + rdbms_publisher.publish_database(instance_qualified_name=instance.qualified_name, db=db) db_schema = builder.build_database_schema(name=rdbms_host.db_schema, db=db) table_dict: Dict[str, List[Column]] = extractor.extract_db_schema(conn=conn, builder=builder, @@ -37,9 +39,15 @@ def sync_full_db(image_name: str, rdbms_service: RDBMSService, rdbms_host: RDBMS for table_name in table_dict: column_list = table_dict.get(table_name) table = column_list[0].table - rdbms_service.publish_table(db_qualified_name=db.qualified_name, table=table) + rdbms_publisher.publish_table(db_qualified_name=db.qualified_name, table=table) for column in column_list: - rdbms_service.publish_column(table_qualified_name=table.qualified_name, column=column) + rdbms_publisher.publish_column(table_qualified_name=table.qualified_name, column=column) + + table_fk_dict: Dict[str, List[TableForeignKey]] = extractor.extract_table_fk(conn=conn, builder=builder, + db_schema=db_schema) + for table_name, table_fk_list in table_fk_dict.items(): + for table_fk_obj in table_fk_list: + rdbms_publisher.publish_table_foreign_key(table_fk_obj) conn.close_connection() diff --git a/metadata_extractor/publish/__init__.py b/metadata_extractor/publisher/__init__.py similarity index 100% rename from metadata_extractor/publish/__init__.py rename to metadata_extractor/publisher/__init__.py diff --git a/metadata_extractor/publish/atlas_service.py b/metadata_extractor/publisher/atlas_publisher.py similarity index 99% rename from metadata_extractor/publish/atlas_service.py rename to metadata_extractor/publisher/atlas_publisher.py index 16b3afa..5d29140 100644 --- a/metadata_extractor/publish/atlas_service.py +++ b/metadata_extractor/publisher/atlas_publisher.py @@ -2,7 +2,7 @@ import time -class AtlasService: +class AtlasPublisher: def __init__(self, host: str = 'http://localhost:21000', user: str = 'admin', password: str = 'admin'): # TODO Change to env var self.host: str = host diff --git a/metadata_extractor/publish/rdbms_service.py b/metadata_extractor/publisher/rdbms_publisher.py similarity index 86% rename from metadata_extractor/publish/rdbms_service.py rename to metadata_extractor/publisher/rdbms_publisher.py index 165d8af..2781430 100644 --- a/metadata_extractor/publish/rdbms_service.py +++ b/metadata_extractor/publisher/rdbms_publisher.py @@ -1,19 +1,19 @@ from metadata_extractor.models.atlas_model.rdbms.rdbms_instance import RDBMSInstance from metadata_extractor.models.atlas_model.rdbms.database_info import Database -from metadata_extractor.models.atlas_model.rdbms.table_info import Table +from metadata_extractor.models.atlas_model.rdbms.table_info import Table, TableForeignKey from metadata_extractor.models.atlas_model.rdbms.column_info import Column -from metadata_extractor.services.atlas_service import AtlasService +from metadata_extractor.publisher.atlas_publisher import AtlasPublisher import time from metadata_extractor.models.atlas_model.rdbms.database_info import get_qualified_name as db_get_qualified_name from metadata_extractor.models.atlas_model.rdbms.database_info import get_delimiter as db_get_delimiter -class RDBMSService: - def __init__(self, atlas_service: AtlasService = None): - if not atlas_service: - self.__atlas = AtlasService() +class RDBMSPublisher: + def __init__(self, atlas_publisher: AtlasPublisher = None): + if not atlas_publisher: + self.__atlas = AtlasPublisher() else: - self.__atlas = atlas_service + self.__atlas = atlas_publisher self.__default_create_by = 'metadata-extractor' def publish_instance(self, instance: RDBMSInstance, db_name: str): @@ -105,7 +105,8 @@ def publish_column(self, table_qualified_name: str, column: Column): "description": column.desc, "createTime": int(time.time()), "name": column.name, - "data_type": column.data_type + "data_type": column.data_type, + "isPrimaryKey": column.is_pk }, "status": "ACTIVE", "createdBy": self.__default_create_by, @@ -123,6 +124,42 @@ def publish_column(self, table_qualified_name: str, column: Column): self.__atlas.publish_entity(column_json) + def publish_table_foreign_key(self, table_fk: TableForeignKey): + table_fk_json = { + "entity": { + "typeName": "rdbms_foreign_key", + "attributes": { + "name": "{column_base_qn}_foreign_key".format(column_base_qn=table_fk.column_base_qn), + "qualifiedName": table_fk.qualified_name, + }, + "status": "ACTIVE", + "createdBy": self.__default_create_by, + "updatedBy": self.__default_create_by, + "createTime": int(time.time()), + "updateTime": int(time.time()), + "relationshipAttributes": { + "table": { + "uniqueAttributes": {"qualifiedName": table_fk.table_base_qn}, + "typeName": "rdbms_table" + }, + "key_columns": [{ + "uniqueAttributes": {"qualifiedName": table_fk.column_base_qn}, + "typeName": "rdbms_column" + }], + "references_table": { + "uniqueAttributes": {"qualifiedName": table_fk.table_refer_qn}, + "typeName": "rdbms_table" + }, + "references_columns": [{ + "uniqueAttributes": {"qualifiedName": table_fk.column_refer_qn}, + "typeName": "rdbms_column" + }], + } + } + } + + self.__atlas.publish_entity(table_fk_json) + def get_table_meta_data_missing(self, host_name: str = None , database_name: str = None): From 7111ec6c16168dfaaf8d4775d7aa9d650c0e4d52 Mon Sep 17 00:00:00 2001 From: "Paul\\Home" Date: Wed, 14 Apr 2021 14:36:14 +0700 Subject: [PATCH 3/5] fix isPrimaryKey --- metadata_extractor/extractor/rdbms/postgresql_extractor.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/metadata_extractor/extractor/rdbms/postgresql_extractor.py b/metadata_extractor/extractor/rdbms/postgresql_extractor.py index 9e6cf46..0282752 100644 --- a/metadata_extractor/extractor/rdbms/postgresql_extractor.py +++ b/metadata_extractor/extractor/rdbms/postgresql_extractor.py @@ -148,9 +148,9 @@ def extract_table_pk(self, conn: RDBMSConnection = None, builder: RDBMSBuilder = while rows is not None and len(rows) > 0: for row in rows: - table_name = row[1] - column_name = row[2] - + table_name = row[0] + column_name = row[1] + print(column_name) column_pk_set = pk_map.get(table_name, set()) column_pk_set.add(column_name) pk_map.update({table_name: column_pk_set}) From 7a98c753e0f7ad5599ae87553a16b3fb0f2039f0 Mon Sep 17 00:00:00 2001 From: "Paul\\Home" Date: Wed, 14 Apr 2021 14:39:26 +0700 Subject: [PATCH 4/5] clean code postgresql sync --- examples/rdbms/postgresql_extractor_script.py | 14 +++++++------- .../extractor/rdbms/postgresql_extractor.py | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/examples/rdbms/postgresql_extractor_script.py b/examples/rdbms/postgresql_extractor_script.py index 8d25b92..f8360a6 100644 --- a/examples/rdbms/postgresql_extractor_script.py +++ b/examples/rdbms/postgresql_extractor_script.py @@ -5,14 +5,14 @@ from metadata_extractor.models.enum.db_engine_enum import DBEngine if __name__ == "__main__": - host = '192.168.1.131' - port = 55001 - db_name = 'postgres' - db_schema = 'public' - db_user = 'postgres' - db_password = 'q1w2e3r4' + host = '' + port = 5432 + db_name = '' + db_schema = '' + db_user = '' + db_password = '' - atlas = AtlasPublisher(host='http://192.168.1.131:21000', password='admin') + atlas = AtlasPublisher(host='http://localhost:21000', password='') rdbms_publisher = RDBMSPublisher(atlas_publisher=atlas) rdbms_host = RDBMSHost(host=host, port=port, db_name=db_name, db_schema=db_schema, db_user=db_user, diff --git a/metadata_extractor/extractor/rdbms/postgresql_extractor.py b/metadata_extractor/extractor/rdbms/postgresql_extractor.py index 0282752..37a7397 100644 --- a/metadata_extractor/extractor/rdbms/postgresql_extractor.py +++ b/metadata_extractor/extractor/rdbms/postgresql_extractor.py @@ -150,7 +150,7 @@ def extract_table_pk(self, conn: RDBMSConnection = None, builder: RDBMSBuilder = for row in rows: table_name = row[0] column_name = row[1] - print(column_name) + column_pk_set = pk_map.get(table_name, set()) column_pk_set.add(column_name) pk_map.update({table_name: column_pk_set}) From 253fced45777fb3ab619171f013e5336c8ac673b Mon Sep 17 00:00:00 2001 From: "Paul\\Home" Date: Wed, 14 Apr 2021 15:06:29 +0700 Subject: [PATCH 5/5] finish sync PK and FK for mysql --- examples/rdbms/mysql_extractor_script.py | 20 ++++++ .../extractor/rdbms/mysql_extractor.py | 66 +++++++++++++++++-- .../extractor/rdbms/postgresql_extractor.py | 3 +- 3 files changed, 83 insertions(+), 6 deletions(-) create mode 100644 examples/rdbms/mysql_extractor_script.py diff --git a/examples/rdbms/mysql_extractor_script.py b/examples/rdbms/mysql_extractor_script.py new file mode 100644 index 0000000..e5ea8c2 --- /dev/null +++ b/examples/rdbms/mysql_extractor_script.py @@ -0,0 +1,20 @@ +from metadata_extractor.publisher.atlas_publisher import AtlasPublisher +from metadata_extractor.publisher.rdbms_publisher import RDBMSPublisher +from metadata_extractor.pipeline.rdbms_pipeline import RDBMSPipeline +from metadata_extractor.models.hosts.rdbms_host import RDBMSHost +from metadata_extractor.models.enum.db_engine_enum import DBEngine + +if __name__ == "__main__": + host = '' + port = 3306 + db_name = '' + db_user = '' + db_password = '' + + atlas = AtlasPublisher(host='http://localhost:21000', password='') + rdbms_publisher = RDBMSPublisher(atlas_publisher=atlas) + + rdbms_host = RDBMSHost(host=host, port=port, db_name=db_name, db_schema=db_name, db_user=db_user, + db_password=db_password) + + RDBMSPipeline.sync_full_db(engine_name=DBEngine.mysql.name, rdbms_publisher=rdbms_publisher, rdbms_host=rdbms_host) diff --git a/metadata_extractor/extractor/rdbms/mysql_extractor.py b/metadata_extractor/extractor/rdbms/mysql_extractor.py index a9997e7..8b91a4b 100644 --- a/metadata_extractor/extractor/rdbms/mysql_extractor.py +++ b/metadata_extractor/extractor/rdbms/mysql_extractor.py @@ -4,7 +4,7 @@ from metadata_extractor.connection.connection_abstract import RDBMSConnection from metadata_extractor.extractor.rdbms.rdbms_extractor_abstract import RDBMSExtractor from metadata_extractor.models.atlas_model.rdbms.database_schema import DatabaseSchema -from metadata_extractor.models.atlas_model.rdbms.table_info import Table +from metadata_extractor.models.atlas_model.rdbms.table_info import Table, TableForeignKey from metadata_extractor.models.atlas_model.rdbms.column_info import Column from typing import List, Dict, Set @@ -15,6 +15,7 @@ def extract_db_schema(self, conn: RDBMSConnection = None, builder: RDBMSBuilder fetch_size: int = 50 table_map: Dict[str, Table] = {} column_map: Dict[str, List[Column]] = {} + pk_map: Dict[str, Set[str]] = self.extract_table_pk(conn=conn, builder=builder, db_schema=db_schema) with conn.get_conn().cursor() as cursor: sql = "SELECT lower(c.table_schema) AS \"schema\", " \ "lower(c.table_name) AS name, " \ @@ -46,8 +47,10 @@ def extract_db_schema(self, conn: RDBMSConnection = None, builder: RDBMSBuilder db_schema=db_schema)) table_map.update({table_name: table_obj}) + is_col_pk = col_name in pk_map.get(table_name, {}) + col_obj: Column = builder.build_column(column_name=col_name, data_type=col_type, desc=col_desc, - table=table_obj) + is_pk=is_col_pk, table=table_obj) column_list = column_map.get(table_name, []) column_list.append(col_obj) column_map.update({table_name: column_list}) @@ -75,9 +78,64 @@ def get_table_list(self, cursor: None, db_schema: str = 'public'): def extract_table_fk(self, conn: RDBMSConnection = None, builder: RDBMSBuilder = None, db_schema: DatabaseSchema = None) -> Dict[str, Set[str]]: - pass + fetch_size: int = 50 + table_fk_map: Dict[str, List[TableForeignKey]] = {} + with conn.get_conn().cursor() as cursor: + sql = "SELECT TABLE_NAME, " \ + "COLUMN_NAME, " \ + "REFERENCED_TABLE_SCHEMA, " \ + "REFERENCED_TABLE_NAME, " \ + "REFERENCED_COLUMN_NAME " \ + "FROM " \ + "INFORMATION_SCHEMA.KEY_COLUMN_USAGE " \ + "WHERE " \ + " REFERENCED_TABLE_SCHEMA = '{db_name}';".format(db_name=db_schema.db.name) + cursor.execute(sql) + rows = cursor.fetchmany(fetch_size) + + while rows is not None and len(rows) > 0: + for row in rows: + base_table_name = row[0] + base_column_name = row[1] + refer_table_schema = row[2] + refer_table_name = row[3] + refer_column_name = row[4] + + fk_list = table_fk_map.get(base_table_name, []) + table_fk = TableForeignKey(db_schema_base=db_schema, table_base=base_table_name + , column_base=base_column_name + , schema_refer=refer_table_schema + , table_refer=refer_table_name + , column_refer=refer_column_name) + + fk_list.append(table_fk) + table_fk_map.update({base_table_name: fk_list}) + rows = cursor.fetchmany(fetch_size) + + return table_fk_map def extract_table_pk(self, conn: RDBMSConnection = None, builder: RDBMSBuilder = None, db_schema: DatabaseSchema = None) -> Dict[str, Set[str]]: - pass + fetch_size: int = 50 + pk_map: Dict[str, Set[str]] = {} + with conn.get_conn().cursor() as cursor: + sql = "SELECT t.table_name,k.column_name " \ + "FROM information_schema.table_constraints t " \ + "JOIN information_schema.key_column_usage k " \ + "USING(constraint_name,table_schema,table_name) " \ + "WHERE t.constraint_type='PRIMARY KEY' " \ + "AND t.table_schema='{db_name}';".format(db_name=db_schema.db.name) + cursor.execute(sql) + rows = cursor.fetchmany(fetch_size) + + while rows is not None and len(rows) > 0: + for row in rows: + table_name = row[0] + column_name = row[1] + + column_pk_set = pk_map.get(table_name, set()) + column_pk_set.add(column_name) + pk_map.update({table_name: column_pk_set}) + rows = cursor.fetchmany(fetch_size) + return pk_map diff --git a/metadata_extractor/extractor/rdbms/postgresql_extractor.py b/metadata_extractor/extractor/rdbms/postgresql_extractor.py index 37a7397..3651162 100644 --- a/metadata_extractor/extractor/rdbms/postgresql_extractor.py +++ b/metadata_extractor/extractor/rdbms/postgresql_extractor.py @@ -8,8 +8,7 @@ from metadata_extractor.connection.connection_abstract import RDBMSConnection from metadata_extractor.builders.rdbms.rdbms_builder_abstract import RDBMSBuilder from metadata_extractor.models.atlas_model.rdbms.database_schema import DatabaseSchema -from metadata_extractor.models.atlas_model.rdbms.table_info import Table -from metadata_extractor.models.atlas_model.rdbms.table_info import TableForeignKey +from metadata_extractor.models.atlas_model.rdbms.table_info import Table, TableForeignKey from metadata_extractor.models.atlas_model.rdbms.column_info import Column from typing import List, Dict, Set