diff --git a/examples/rdbms/mysql_extractor_script.py b/examples/rdbms/mysql_extractor_script.py new file mode 100644 index 0000000..e5ea8c2 --- /dev/null +++ b/examples/rdbms/mysql_extractor_script.py @@ -0,0 +1,20 @@ +from metadata_extractor.publisher.atlas_publisher import AtlasPublisher +from metadata_extractor.publisher.rdbms_publisher import RDBMSPublisher +from metadata_extractor.pipeline.rdbms_pipeline import RDBMSPipeline +from metadata_extractor.models.hosts.rdbms_host import RDBMSHost +from metadata_extractor.models.enum.db_engine_enum import DBEngine + +if __name__ == "__main__": + host = '' + port = 3306 + db_name = '' + db_user = '' + db_password = '' + + atlas = AtlasPublisher(host='http://localhost:21000', password='') + rdbms_publisher = RDBMSPublisher(atlas_publisher=atlas) + + rdbms_host = RDBMSHost(host=host, port=port, db_name=db_name, db_schema=db_name, db_user=db_user, + db_password=db_password) + + RDBMSPipeline.sync_full_db(engine_name=DBEngine.mysql.name, rdbms_publisher=rdbms_publisher, rdbms_host=rdbms_host) diff --git a/examples/rdbms/postgresql_extractor_script.py b/examples/rdbms/postgresql_extractor_script.py index 491466d..f8360a6 100644 --- a/examples/rdbms/postgresql_extractor_script.py +++ b/examples/rdbms/postgresql_extractor_script.py @@ -1,5 +1,5 @@ -from metadata_extractor.services.atlas_service import AtlasService -from metadata_extractor.services.rdbms_service import RDBMSService +from metadata_extractor.publisher.atlas_publisher import AtlasPublisher +from metadata_extractor.publisher.rdbms_publisher import RDBMSPublisher from metadata_extractor.pipeline.rdbms_pipeline import RDBMSPipeline from metadata_extractor.models.hosts.rdbms_host import RDBMSHost from metadata_extractor.models.enum.db_engine_enum import DBEngine @@ -12,10 +12,10 @@ db_user = '' db_password = '' - atlas = AtlasService(host='http://localhost:21000', password='') - rdbms_service = RDBMSService(atlas_service=atlas) + atlas = AtlasPublisher(host='http://localhost:21000', password='') + rdbms_publisher = RDBMSPublisher(atlas_publisher=atlas) rdbms_host = RDBMSHost(host=host, port=port, db_name=db_name, db_schema=db_schema, db_user=db_user, db_password=db_password) - RDBMSPipeline.sync_full_db(image_name=DBEngine.postgresql.name, rdbms_service=rdbms_service, rdbms_host=rdbms_host) + RDBMSPipeline.sync_full_db(engine_name=DBEngine.postgresql.name, rdbms_publisher=rdbms_publisher, rdbms_host=rdbms_host) diff --git a/metadata_extractor/builders/rdbms/rdbms_builder_abstract.py b/metadata_extractor/builders/rdbms/rdbms_builder_abstract.py index 2bd08c5..b82dc2f 100644 --- a/metadata_extractor/builders/rdbms/rdbms_builder_abstract.py +++ b/metadata_extractor/builders/rdbms/rdbms_builder_abstract.py @@ -18,17 +18,17 @@ def build_instance(self, conn: RDBMSConnection = None) -> RDBMSInstance: @staticmethod def build_database(name: str = '', instance: RDBMSInstance = None) -> Database: - return Database(name, instance) + return Database(name=name, instance=instance) @staticmethod def build_database_schema(name: str = 'public', db: Database = None) -> DatabaseSchema: - return DatabaseSchema(name, db) + return DatabaseSchema(name=name, db=db) @staticmethod def build_table(table_name: str = '', desc: str = '', db_schema: DatabaseSchema = None) -> Table: return Table(name=table_name, db_schema=db_schema, desc=desc) @staticmethod - def build_column(column_name: str = '', data_type: str = '', length: str = '', desc: str = '', + def build_column(column_name: str = '', data_type: str = '', length: str = '', desc: str = '', is_pk: bool = False, table: Table = None) -> Column: - return Column(column_name, data_type, length, desc, table) + return Column(name=column_name, data_type=data_type, length=length, desc=desc, is_pk=is_pk, table=table) diff --git a/metadata_extractor/extractor/rdbms/mysql_extractor.py b/metadata_extractor/extractor/rdbms/mysql_extractor.py index 46c7545..8b91a4b 100644 --- a/metadata_extractor/extractor/rdbms/mysql_extractor.py +++ b/metadata_extractor/extractor/rdbms/mysql_extractor.py @@ -4,9 +4,9 @@ from metadata_extractor.connection.connection_abstract import RDBMSConnection from metadata_extractor.extractor.rdbms.rdbms_extractor_abstract import RDBMSExtractor from metadata_extractor.models.atlas_model.rdbms.database_schema import DatabaseSchema -from metadata_extractor.models.atlas_model.rdbms.table_info import Table +from metadata_extractor.models.atlas_model.rdbms.table_info import Table, TableForeignKey from metadata_extractor.models.atlas_model.rdbms.column_info import Column -from typing import List, Dict +from typing import List, Dict, Set class MysqlExtractor(RDBMSExtractor): @@ -15,6 +15,7 @@ def extract_db_schema(self, conn: RDBMSConnection = None, builder: RDBMSBuilder fetch_size: int = 50 table_map: Dict[str, Table] = {} column_map: Dict[str, List[Column]] = {} + pk_map: Dict[str, Set[str]] = self.extract_table_pk(conn=conn, builder=builder, db_schema=db_schema) with conn.get_conn().cursor() as cursor: sql = "SELECT lower(c.table_schema) AS \"schema\", " \ "lower(c.table_name) AS name, " \ @@ -46,8 +47,10 @@ def extract_db_schema(self, conn: RDBMSConnection = None, builder: RDBMSBuilder db_schema=db_schema)) table_map.update({table_name: table_obj}) + is_col_pk = col_name in pk_map.get(table_name, {}) + col_obj: Column = builder.build_column(column_name=col_name, data_type=col_type, desc=col_desc, - table=table_obj) + is_pk=is_col_pk, table=table_obj) column_list = column_map.get(table_name, []) column_list.append(col_obj) column_map.update({table_name: column_list}) @@ -72,3 +75,67 @@ def get_table_list(self, cursor: None, db_schema: str = 'public'): table_list.append(table_name[0]) return table_list + + def extract_table_fk(self, conn: RDBMSConnection = None, builder: RDBMSBuilder = None, + db_schema: DatabaseSchema = None) -> Dict[str, Set[str]]: + fetch_size: int = 50 + table_fk_map: Dict[str, List[TableForeignKey]] = {} + with conn.get_conn().cursor() as cursor: + sql = "SELECT TABLE_NAME, " \ + "COLUMN_NAME, " \ + "REFERENCED_TABLE_SCHEMA, " \ + "REFERENCED_TABLE_NAME, " \ + "REFERENCED_COLUMN_NAME " \ + "FROM " \ + "INFORMATION_SCHEMA.KEY_COLUMN_USAGE " \ + "WHERE " \ + " REFERENCED_TABLE_SCHEMA = '{db_name}';".format(db_name=db_schema.db.name) + cursor.execute(sql) + rows = cursor.fetchmany(fetch_size) + + while rows is not None and len(rows) > 0: + for row in rows: + base_table_name = row[0] + base_column_name = row[1] + refer_table_schema = row[2] + refer_table_name = row[3] + refer_column_name = row[4] + + fk_list = table_fk_map.get(base_table_name, []) + table_fk = TableForeignKey(db_schema_base=db_schema, table_base=base_table_name + , column_base=base_column_name + , schema_refer=refer_table_schema + , table_refer=refer_table_name + , column_refer=refer_column_name) + + fk_list.append(table_fk) + table_fk_map.update({base_table_name: fk_list}) + rows = cursor.fetchmany(fetch_size) + + return table_fk_map + + def extract_table_pk(self, conn: RDBMSConnection = None, builder: RDBMSBuilder = None, + db_schema: DatabaseSchema = None) -> Dict[str, Set[str]]: + fetch_size: int = 50 + pk_map: Dict[str, Set[str]] = {} + with conn.get_conn().cursor() as cursor: + sql = "SELECT t.table_name,k.column_name " \ + "FROM information_schema.table_constraints t " \ + "JOIN information_schema.key_column_usage k " \ + "USING(constraint_name,table_schema,table_name) " \ + "WHERE t.constraint_type='PRIMARY KEY' " \ + "AND t.table_schema='{db_name}';".format(db_name=db_schema.db.name) + cursor.execute(sql) + rows = cursor.fetchmany(fetch_size) + + while rows is not None and len(rows) > 0: + for row in rows: + table_name = row[0] + column_name = row[1] + + column_pk_set = pk_map.get(table_name, set()) + column_pk_set.add(column_name) + pk_map.update({table_name: column_pk_set}) + rows = cursor.fetchmany(fetch_size) + + return pk_map diff --git a/metadata_extractor/extractor/rdbms/postgresql_extractor.py b/metadata_extractor/extractor/rdbms/postgresql_extractor.py index 978dfc3..3651162 100644 --- a/metadata_extractor/extractor/rdbms/postgresql_extractor.py +++ b/metadata_extractor/extractor/rdbms/postgresql_extractor.py @@ -8,13 +8,12 @@ from metadata_extractor.connection.connection_abstract import RDBMSConnection from metadata_extractor.builders.rdbms.rdbms_builder_abstract import RDBMSBuilder from metadata_extractor.models.atlas_model.rdbms.database_schema import DatabaseSchema -from metadata_extractor.models.atlas_model.rdbms.table_info import Table -from metadata_extractor.models.atlas_model.rdbms .column_info import Column -from typing import List, Dict +from metadata_extractor.models.atlas_model.rdbms.table_info import Table, TableForeignKey +from metadata_extractor.models.atlas_model.rdbms.column_info import Column +from typing import List, Dict, Set class PostgresqlExtractor(RDBMSExtractor): - # TODO need to be updated or deleted @staticmethod def get_column_key(conn: RDBMSConnection = None, db_schema: str = '', table_name: str = ''): @@ -65,6 +64,7 @@ def extract_db_schema(self, conn: RDBMSConnection = None, builder: RDBMSBuilder fetch_size: int = 50 table_map: Dict[str, Table] = {} column_map: Dict[str, List[Column]] = {} + pk_map: Dict[str, Set[str]] = self.extract_table_pk(conn=conn, builder=builder, db_schema=db_schema) with conn.get_conn().cursor() as cursor: sql = "select c.table_schema as schema, c.table_name as name" \ ", pgtd.description as description,c.column_name as col_name, c.data_type as col_type" \ @@ -91,8 +91,10 @@ def extract_db_schema(self, conn: RDBMSConnection = None, builder: RDBMSBuilder db_schema=db_schema)) table_map.update({table_name: table_obj}) + is_col_pk = col_name in pk_map.get(table_name, {}) + col_obj: Column = builder.build_column(column_name=col_name, data_type=col_type, desc=col_desc, - table=table_obj) + is_pk=is_col_pk, table=table_obj) column_list = column_map.get(table_name, []) column_list.append(col_obj) column_map.update({table_name: column_list}) @@ -127,3 +129,76 @@ def extract_column(self, conn: RDBMSConnection = None, db_schema: str = 'public' column_schema = cursor.fetchall() # TODO pack result into object return column_schema + + def extract_table_pk(self, conn: RDBMSConnection = None, builder: RDBMSBuilder = None, + db_schema: DatabaseSchema = None) -> Dict[str, Set[str]]: + fetch_size: int = 50 + pk_map: Dict[str, Set[str]] = {} + with conn.get_conn().cursor() as cursor: + sql = "SELECT tc.table_name,c.column_name,tc.constraint_type " \ + "FROM information_schema.table_constraints tc " \ + "JOIN information_schema.constraint_column_usage AS ccu USING (constraint_schema, constraint_name) " \ + "JOIN information_schema.columns AS c ON c.table_schema = tc.constraint_schema " \ + " AND tc.table_name = c.table_name AND ccu.column_name = c.column_name " \ + " where c.table_schema = '{db_schema}' and tc.constraint_type = 'PRIMARY KEY';" \ + .format(db_schema=db_schema.name) + cursor.execute(sql) + rows = cursor.fetchmany(fetch_size) + + while rows is not None and len(rows) > 0: + for row in rows: + table_name = row[0] + column_name = row[1] + + column_pk_set = pk_map.get(table_name, set()) + column_pk_set.add(column_name) + pk_map.update({table_name: column_pk_set}) + rows = cursor.fetchmany(fetch_size) + + return pk_map + + def extract_table_fk(self, conn: RDBMSConnection = None, builder: RDBMSBuilder = None, + db_schema: DatabaseSchema = None) -> Dict[str, List[TableForeignKey]]: + + fetch_size: int = 50 + table_fk_map: Dict[str, List[TableForeignKey]] = {} + + with conn.get_conn().cursor() as cursor: + sql = "SELECT tc.table_name, " \ + "kcu.column_name, " \ + "ccu.table_schema AS foreign_table_schema, " \ + "ccu.table_name AS foreign_table_name, " \ + "ccu.column_name AS foreign_column_name " \ + "FROM " \ + "information_schema.table_constraints AS tc " \ + "JOIN information_schema.key_column_usage AS kcu " \ + "ON tc.constraint_name = kcu.constraint_name " \ + "AND tc.table_schema = kcu.table_schema " \ + "JOIN information_schema.constraint_column_usage AS ccu " \ + "ON ccu.constraint_name = tc.constraint_name " \ + "AND ccu.table_schema = tc.table_schema " \ + "where tc.constraint_type = 'FOREIGN KEY' and tc.table_schema = '{db_schema}'"\ + .format(db_schema=db_schema.name) + cursor.execute(sql) + rows = cursor.fetchmany(fetch_size) + + while rows is not None and len(rows) > 0: + for row in rows: + base_table_name = row[0] + base_column_name = row[1] + refer_table_schema = row[2] + refer_table_name = row[3] + refer_column_name = row[4] + + fk_list = table_fk_map.get(base_table_name, []) + table_fk = TableForeignKey(db_schema_base=db_schema, table_base=base_table_name + , column_base=base_column_name + , schema_refer=refer_table_schema + , table_refer=refer_table_name + , column_refer=refer_column_name) + + fk_list.append(table_fk) + table_fk_map.update({base_table_name: fk_list}) + rows = cursor.fetchmany(fetch_size) + + return table_fk_map diff --git a/metadata_extractor/extractor/rdbms/rdbms_extractor_abstract.py b/metadata_extractor/extractor/rdbms/rdbms_extractor_abstract.py index 3604d0f..f7db2f6 100644 --- a/metadata_extractor/extractor/rdbms/rdbms_extractor_abstract.py +++ b/metadata_extractor/extractor/rdbms/rdbms_extractor_abstract.py @@ -3,7 +3,7 @@ from metadata_extractor.builders.rdbms.rdbms_builder_abstract import RDBMSBuilder from metadata_extractor.models.atlas_model.rdbms.database_schema import DatabaseSchema from metadata_extractor.models.atlas_model.rdbms.column_info import Column -from typing import List, Dict +from typing import List, Dict, Set class RDBMSExtractor(ABC): @@ -19,3 +19,13 @@ def extract_db_schema(self, conn: RDBMSConnection = None, builder: RDBMSBuilder @abstractmethod def extract_column(self, conn: RDBMSConnection = None, db_schema: str = '', table_name: str = ''): pass + + @abstractmethod + def extract_table_pk(self, conn: RDBMSConnection = None, builder: RDBMSBuilder = None, + db_schema: DatabaseSchema = None) -> Dict[str, Set[str]]: + pass + + @abstractmethod + def extract_table_fk(self, conn: RDBMSConnection = None, builder: RDBMSBuilder = None, + db_schema: DatabaseSchema = None) -> Dict[str, Set[str]]: + pass diff --git a/metadata_extractor/factories/connection_factory.py b/metadata_extractor/factories/connection_factory.py index b4ba7fe..3675503 100644 --- a/metadata_extractor/factories/connection_factory.py +++ b/metadata_extractor/factories/connection_factory.py @@ -6,17 +6,17 @@ class RDBMSConnectionFactory: @staticmethod - def create(image_name: str, host: str = '', port: int = 1111, db_name: str = '', user: str = '', + def create(engine_name: str, host: str = '', port: int = 1111, db_name: str = '', user: str = '', password: str = '', **kwargs: {}) -> RDBMSConnection: switcher = { DBEngine.postgresql.name: PostgresqlConnection(), DBEngine.mysql.name: MysqlConnection() } - builder_obj = switcher.get(image_name, None) + builder_obj = switcher.get(engine_name, None) if builder_obj is not None: builder_obj.create_conn(host, port, db_name, user, password, **kwargs) return builder_obj else: - raise ValueError('Not found connection named \'%s\'' % image_name) + raise ValueError('Not found connection named \'%s\'' % engine_name) diff --git a/metadata_extractor/models/atlas_model/rdbms/column_info.py b/metadata_extractor/models/atlas_model/rdbms/column_info.py index 9b4fd7c..70a8252 100644 --- a/metadata_extractor/models/atlas_model/rdbms/column_info.py +++ b/metadata_extractor/models/atlas_model/rdbms/column_info.py @@ -5,21 +5,25 @@ def get_delimiter(): return '.' -def get_qualified_name(db_schema: str, table_qn: str, col_name: str) -> str: - return '{db_schema}{delimiter}{table}{delimiter}{column}' \ +def get_qualified_name(table_qn: str, col_name: str) -> str: + return '{table}{delimiter}{column}' \ .format(delimiter=get_delimiter() - , db_schema=db_schema , table=table_qn , column=col_name) class Column: - def __init__(self, name: str, data_type: str, length: str, desc: str, table: Table): + def __init__(self, name: str = '', + data_type: str = '', + length: str = '', + desc: str = '', + is_pk: bool = False, + table: Table = None): self.name: str = name self.data_type: str = data_type self.length: str = length self.desc: str = desc + self.is_pk: str = is_pk self.table: Table = table - self.qualified_name = get_qualified_name(db_schema=table.db_schema.name - , table_qn=table.qualified_name + self.qualified_name = get_qualified_name(table_qn=table.qualified_name , col_name=name) diff --git a/metadata_extractor/models/atlas_model/rdbms/table_info.py b/metadata_extractor/models/atlas_model/rdbms/table_info.py index dcb6a35..c62112d 100644 --- a/metadata_extractor/models/atlas_model/rdbms/table_info.py +++ b/metadata_extractor/models/atlas_model/rdbms/table_info.py @@ -19,4 +19,29 @@ def __init__(self, name: str = '', db_schema: DatabaseSchema = None, desc: str = self.desc: str = desc self.tags: List[str] = tags - self.qualified_name: str = get_qualified_name(db_schema=db_schema.name, table_name=name) + self.qualified_name: str = get_qualified_name(db_schema=db_schema.qualified_name, table_name=name) + + +class TableForeignKey: + def __init__(self, + db_schema_base: DatabaseSchema = None, + table_base: str = '', + column_base: str = '', + schema_refer: str = '', + table_refer: str = '', + column_refer: str = ''): + # Table that contain FK is base table + self.table_base_qn: str = get_qualified_name(db_schema=db_schema_base.qualified_name, + table_name=table_base) + self.column_base_qn: str = '{table_qn}{delimiter}{column_name}' \ + .format(table_qn=self.table_base_qn, delimiter=get_delimiter(), column_name=column_base) + + self.table_refer_qn: str = '{db_qn}{delimiter}{schema_refer_name}{delimiter}{table_table}' \ + .format(db_qn=db_schema_base.db.name, delimiter=get_delimiter(), schema_refer_name=schema_refer, + table_table=table_refer) + self.column_refer_qn: str = '{table_qn}{delimiter}{column_name}' \ + .format(table_qn=self.table_refer_qn, delimiter=get_delimiter(), column_name=column_refer) + + self.qualified_name: str = '{base_table_column}#FK#{refer_table_column}' \ + .format(base_table_column=self.column_base_qn, + refer_table_column=self.column_refer_qn) diff --git a/metadata_extractor/pipeline/rdbms_pipeline.py b/metadata_extractor/pipeline/rdbms_pipeline.py index 12289ad..da9c0c2 100644 --- a/metadata_extractor/pipeline/rdbms_pipeline.py +++ b/metadata_extractor/pipeline/rdbms_pipeline.py @@ -1,6 +1,6 @@ -from typing import List, Dict +from typing import List, Dict, Set from metadata_extractor.models.hosts.rdbms_host import RDBMSHost -from metadata_extractor.services.rdbms_service import RDBMSService +from metadata_extractor.publisher.rdbms_publisher import RDBMSPublisher from metadata_extractor.connection.connection_abstract import RDBMSConnection from metadata_extractor.extractor.rdbms.rdbms_extractor_abstract import RDBMSExtractor from metadata_extractor.builders.rdbms.rdbms_builder_abstract import RDBMSBuilder @@ -10,25 +10,27 @@ from metadata_extractor.models.atlas_model.rdbms.rdbms_instance import RDBMSInstance from metadata_extractor.models.atlas_model.rdbms.database_info import Database from metadata_extractor.models.atlas_model.rdbms.column_info import Column +from metadata_extractor.models.atlas_model.rdbms.table_info import TableForeignKey class RDBMSPipeline: @staticmethod - def sync_full_db(image_name: str, rdbms_service: RDBMSService, rdbms_host: RDBMSHost): - conn: RDBMSConnection = RDBMSConnectionFactory.create(image_name=image_name, + def sync_full_db(engine_name: str, rdbms_publisher: RDBMSPublisher, rdbms_host: RDBMSHost): + conn: RDBMSConnection = RDBMSConnectionFactory.create(engine_name=engine_name, host=rdbms_host.host, port=rdbms_host.port, db_name=rdbms_host.db_name, user=rdbms_host.db_user, password=rdbms_host.db_password) - extractor: RDBMSExtractor = RDBMSExtractorFactory.create(image_name) - builder: RDBMSBuilder = RDBMSBuilderFactory.create(image_name) + # create extractor + extractor: RDBMSExtractor = RDBMSExtractorFactory.create(engine_name) + builder: RDBMSBuilder = RDBMSBuilderFactory.create(engine_name) instance: RDBMSInstance = builder.build_instance(conn) - rdbms_service.publish_instance(instance=instance, db_name=conn.db_name) + rdbms_publisher.publish_instance(instance=instance, db_name=conn.db_name) db: Database = builder.build_database(conn.db_name, instance) - rdbms_service.publish_database(instance_qualified_name=instance.qualified_name, db=db) + rdbms_publisher.publish_database(instance_qualified_name=instance.qualified_name, db=db) db_schema = builder.build_database_schema(name=rdbms_host.db_schema, db=db) table_dict: Dict[str, List[Column]] = extractor.extract_db_schema(conn=conn, builder=builder, @@ -37,9 +39,15 @@ def sync_full_db(image_name: str, rdbms_service: RDBMSService, rdbms_host: RDBMS for table_name in table_dict: column_list = table_dict.get(table_name) table = column_list[0].table - rdbms_service.publish_table(db_qualified_name=db.qualified_name, table=table) + rdbms_publisher.publish_table(db_qualified_name=db.qualified_name, table=table) for column in column_list: - rdbms_service.publish_column(table_qualified_name=table.qualified_name, column=column) + rdbms_publisher.publish_column(table_qualified_name=table.qualified_name, column=column) + + table_fk_dict: Dict[str, List[TableForeignKey]] = extractor.extract_table_fk(conn=conn, builder=builder, + db_schema=db_schema) + for table_name, table_fk_list in table_fk_dict.items(): + for table_fk_obj in table_fk_list: + rdbms_publisher.publish_table_foreign_key(table_fk_obj) conn.close_connection() diff --git a/metadata_extractor/services/__init__.py b/metadata_extractor/publisher/__init__.py similarity index 100% rename from metadata_extractor/services/__init__.py rename to metadata_extractor/publisher/__init__.py diff --git a/metadata_extractor/services/atlas_service.py b/metadata_extractor/publisher/atlas_publisher.py similarity index 99% rename from metadata_extractor/services/atlas_service.py rename to metadata_extractor/publisher/atlas_publisher.py index 16b3afa..5d29140 100644 --- a/metadata_extractor/services/atlas_service.py +++ b/metadata_extractor/publisher/atlas_publisher.py @@ -2,7 +2,7 @@ import time -class AtlasService: +class AtlasPublisher: def __init__(self, host: str = 'http://localhost:21000', user: str = 'admin', password: str = 'admin'): # TODO Change to env var self.host: str = host diff --git a/metadata_extractor/services/rdbms_service.py b/metadata_extractor/publisher/rdbms_publisher.py similarity index 86% rename from metadata_extractor/services/rdbms_service.py rename to metadata_extractor/publisher/rdbms_publisher.py index 165d8af..2781430 100644 --- a/metadata_extractor/services/rdbms_service.py +++ b/metadata_extractor/publisher/rdbms_publisher.py @@ -1,19 +1,19 @@ from metadata_extractor.models.atlas_model.rdbms.rdbms_instance import RDBMSInstance from metadata_extractor.models.atlas_model.rdbms.database_info import Database -from metadata_extractor.models.atlas_model.rdbms.table_info import Table +from metadata_extractor.models.atlas_model.rdbms.table_info import Table, TableForeignKey from metadata_extractor.models.atlas_model.rdbms.column_info import Column -from metadata_extractor.services.atlas_service import AtlasService +from metadata_extractor.publisher.atlas_publisher import AtlasPublisher import time from metadata_extractor.models.atlas_model.rdbms.database_info import get_qualified_name as db_get_qualified_name from metadata_extractor.models.atlas_model.rdbms.database_info import get_delimiter as db_get_delimiter -class RDBMSService: - def __init__(self, atlas_service: AtlasService = None): - if not atlas_service: - self.__atlas = AtlasService() +class RDBMSPublisher: + def __init__(self, atlas_publisher: AtlasPublisher = None): + if not atlas_publisher: + self.__atlas = AtlasPublisher() else: - self.__atlas = atlas_service + self.__atlas = atlas_publisher self.__default_create_by = 'metadata-extractor' def publish_instance(self, instance: RDBMSInstance, db_name: str): @@ -105,7 +105,8 @@ def publish_column(self, table_qualified_name: str, column: Column): "description": column.desc, "createTime": int(time.time()), "name": column.name, - "data_type": column.data_type + "data_type": column.data_type, + "isPrimaryKey": column.is_pk }, "status": "ACTIVE", "createdBy": self.__default_create_by, @@ -123,6 +124,42 @@ def publish_column(self, table_qualified_name: str, column: Column): self.__atlas.publish_entity(column_json) + def publish_table_foreign_key(self, table_fk: TableForeignKey): + table_fk_json = { + "entity": { + "typeName": "rdbms_foreign_key", + "attributes": { + "name": "{column_base_qn}_foreign_key".format(column_base_qn=table_fk.column_base_qn), + "qualifiedName": table_fk.qualified_name, + }, + "status": "ACTIVE", + "createdBy": self.__default_create_by, + "updatedBy": self.__default_create_by, + "createTime": int(time.time()), + "updateTime": int(time.time()), + "relationshipAttributes": { + "table": { + "uniqueAttributes": {"qualifiedName": table_fk.table_base_qn}, + "typeName": "rdbms_table" + }, + "key_columns": [{ + "uniqueAttributes": {"qualifiedName": table_fk.column_base_qn}, + "typeName": "rdbms_column" + }], + "references_table": { + "uniqueAttributes": {"qualifiedName": table_fk.table_refer_qn}, + "typeName": "rdbms_table" + }, + "references_columns": [{ + "uniqueAttributes": {"qualifiedName": table_fk.column_refer_qn}, + "typeName": "rdbms_column" + }], + } + } + } + + self.__atlas.publish_entity(table_fk_json) + def get_table_meta_data_missing(self, host_name: str = None , database_name: str = None):