Skip to content

Commit

Permalink
Merge pull request #4 from pongthep/feature/rdbms_table_relation
Browse files Browse the repository at this point in the history
Feature/rdbms table relation
  • Loading branch information
pongthep committed Apr 14, 2021
2 parents bb9931e + 253fced commit 814f10f
Show file tree
Hide file tree
Showing 13 changed files with 293 additions and 47 deletions.
20 changes: 20 additions & 0 deletions examples/rdbms/mysql_extractor_script.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from metadata_extractor.publisher.atlas_publisher import AtlasPublisher
from metadata_extractor.publisher.rdbms_publisher import RDBMSPublisher
from metadata_extractor.pipeline.rdbms_pipeline import RDBMSPipeline
from metadata_extractor.models.hosts.rdbms_host import RDBMSHost
from metadata_extractor.models.enum.db_engine_enum import DBEngine

if __name__ == "__main__":
host = ''
port = 3306
db_name = ''
db_user = ''
db_password = ''

atlas = AtlasPublisher(host='http://localhost:21000', password='')
rdbms_publisher = RDBMSPublisher(atlas_publisher=atlas)

rdbms_host = RDBMSHost(host=host, port=port, db_name=db_name, db_schema=db_name, db_user=db_user,
db_password=db_password)

RDBMSPipeline.sync_full_db(engine_name=DBEngine.mysql.name, rdbms_publisher=rdbms_publisher, rdbms_host=rdbms_host)
10 changes: 5 additions & 5 deletions examples/rdbms/postgresql_extractor_script.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from metadata_extractor.services.atlas_service import AtlasService
from metadata_extractor.services.rdbms_service import RDBMSService
from metadata_extractor.publisher.atlas_publisher import AtlasPublisher
from metadata_extractor.publisher.rdbms_publisher import RDBMSPublisher
from metadata_extractor.pipeline.rdbms_pipeline import RDBMSPipeline
from metadata_extractor.models.hosts.rdbms_host import RDBMSHost
from metadata_extractor.models.enum.db_engine_enum import DBEngine
Expand All @@ -12,10 +12,10 @@
db_user = ''
db_password = ''

atlas = AtlasService(host='http://localhost:21000', password='')
rdbms_service = RDBMSService(atlas_service=atlas)
atlas = AtlasPublisher(host='http://localhost:21000', password='')
rdbms_publisher = RDBMSPublisher(atlas_publisher=atlas)

rdbms_host = RDBMSHost(host=host, port=port, db_name=db_name, db_schema=db_schema, db_user=db_user,
db_password=db_password)

RDBMSPipeline.sync_full_db(image_name=DBEngine.postgresql.name, rdbms_service=rdbms_service, rdbms_host=rdbms_host)
RDBMSPipeline.sync_full_db(engine_name=DBEngine.postgresql.name, rdbms_publisher=rdbms_publisher, rdbms_host=rdbms_host)
8 changes: 4 additions & 4 deletions metadata_extractor/builders/rdbms/rdbms_builder_abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,17 @@ def build_instance(self, conn: RDBMSConnection = None) -> RDBMSInstance:

@staticmethod
def build_database(name: str = '', instance: RDBMSInstance = None) -> Database:
return Database(name, instance)
return Database(name=name, instance=instance)

@staticmethod
def build_database_schema(name: str = 'public', db: Database = None) -> DatabaseSchema:
return DatabaseSchema(name, db)
return DatabaseSchema(name=name, db=db)

@staticmethod
def build_table(table_name: str = '', desc: str = '', db_schema: DatabaseSchema = None) -> Table:
return Table(name=table_name, db_schema=db_schema, desc=desc)

@staticmethod
def build_column(column_name: str = '', data_type: str = '', length: str = '', desc: str = '',
def build_column(column_name: str = '', data_type: str = '', length: str = '', desc: str = '', is_pk: bool = False,
table: Table = None) -> Column:
return Column(column_name, data_type, length, desc, table)
return Column(name=column_name, data_type=data_type, length=length, desc=desc, is_pk=is_pk, table=table)
73 changes: 70 additions & 3 deletions metadata_extractor/extractor/rdbms/mysql_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
from metadata_extractor.connection.connection_abstract import RDBMSConnection
from metadata_extractor.extractor.rdbms.rdbms_extractor_abstract import RDBMSExtractor
from metadata_extractor.models.atlas_model.rdbms.database_schema import DatabaseSchema
from metadata_extractor.models.atlas_model.rdbms.table_info import Table
from metadata_extractor.models.atlas_model.rdbms.table_info import Table, TableForeignKey
from metadata_extractor.models.atlas_model.rdbms.column_info import Column
from typing import List, Dict
from typing import List, Dict, Set


class MysqlExtractor(RDBMSExtractor):
Expand All @@ -15,6 +15,7 @@ def extract_db_schema(self, conn: RDBMSConnection = None, builder: RDBMSBuilder
fetch_size: int = 50
table_map: Dict[str, Table] = {}
column_map: Dict[str, List[Column]] = {}
pk_map: Dict[str, Set[str]] = self.extract_table_pk(conn=conn, builder=builder, db_schema=db_schema)
with conn.get_conn().cursor() as cursor:
sql = "SELECT lower(c.table_schema) AS \"schema\", " \
"lower(c.table_name) AS name, " \
Expand Down Expand Up @@ -46,8 +47,10 @@ def extract_db_schema(self, conn: RDBMSConnection = None, builder: RDBMSBuilder
db_schema=db_schema))
table_map.update({table_name: table_obj})

is_col_pk = col_name in pk_map.get(table_name, {})

col_obj: Column = builder.build_column(column_name=col_name, data_type=col_type, desc=col_desc,
table=table_obj)
is_pk=is_col_pk, table=table_obj)
column_list = column_map.get(table_name, [])
column_list.append(col_obj)
column_map.update({table_name: column_list})
Expand All @@ -72,3 +75,67 @@ def get_table_list(self, cursor: None, db_schema: str = 'public'):
table_list.append(table_name[0])

return table_list

def extract_table_fk(self, conn: RDBMSConnection = None, builder: RDBMSBuilder = None,
db_schema: DatabaseSchema = None) -> Dict[str, Set[str]]:
fetch_size: int = 50
table_fk_map: Dict[str, List[TableForeignKey]] = {}
with conn.get_conn().cursor() as cursor:
sql = "SELECT TABLE_NAME, " \
"COLUMN_NAME, " \
"REFERENCED_TABLE_SCHEMA, " \
"REFERENCED_TABLE_NAME, " \
"REFERENCED_COLUMN_NAME " \
"FROM " \
"INFORMATION_SCHEMA.KEY_COLUMN_USAGE " \
"WHERE " \
" REFERENCED_TABLE_SCHEMA = '{db_name}';".format(db_name=db_schema.db.name)
cursor.execute(sql)
rows = cursor.fetchmany(fetch_size)

while rows is not None and len(rows) > 0:
for row in rows:
base_table_name = row[0]
base_column_name = row[1]
refer_table_schema = row[2]
refer_table_name = row[3]
refer_column_name = row[4]

fk_list = table_fk_map.get(base_table_name, [])
table_fk = TableForeignKey(db_schema_base=db_schema, table_base=base_table_name
, column_base=base_column_name
, schema_refer=refer_table_schema
, table_refer=refer_table_name
, column_refer=refer_column_name)

fk_list.append(table_fk)
table_fk_map.update({base_table_name: fk_list})
rows = cursor.fetchmany(fetch_size)

return table_fk_map

def extract_table_pk(self, conn: RDBMSConnection = None, builder: RDBMSBuilder = None,
db_schema: DatabaseSchema = None) -> Dict[str, Set[str]]:
fetch_size: int = 50
pk_map: Dict[str, Set[str]] = {}
with conn.get_conn().cursor() as cursor:
sql = "SELECT t.table_name,k.column_name " \
"FROM information_schema.table_constraints t " \
"JOIN information_schema.key_column_usage k " \
"USING(constraint_name,table_schema,table_name) " \
"WHERE t.constraint_type='PRIMARY KEY' " \
"AND t.table_schema='{db_name}';".format(db_name=db_schema.db.name)
cursor.execute(sql)
rows = cursor.fetchmany(fetch_size)

while rows is not None and len(rows) > 0:
for row in rows:
table_name = row[0]
column_name = row[1]

column_pk_set = pk_map.get(table_name, set())
column_pk_set.add(column_name)
pk_map.update({table_name: column_pk_set})
rows = cursor.fetchmany(fetch_size)

return pk_map
85 changes: 80 additions & 5 deletions metadata_extractor/extractor/rdbms/postgresql_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,12 @@
from metadata_extractor.connection.connection_abstract import RDBMSConnection
from metadata_extractor.builders.rdbms.rdbms_builder_abstract import RDBMSBuilder
from metadata_extractor.models.atlas_model.rdbms.database_schema import DatabaseSchema
from metadata_extractor.models.atlas_model.rdbms.table_info import Table
from metadata_extractor.models.atlas_model.rdbms .column_info import Column
from typing import List, Dict
from metadata_extractor.models.atlas_model.rdbms.table_info import Table, TableForeignKey
from metadata_extractor.models.atlas_model.rdbms.column_info import Column
from typing import List, Dict, Set


class PostgresqlExtractor(RDBMSExtractor):

# TODO need to be updated or deleted
@staticmethod
def get_column_key(conn: RDBMSConnection = None, db_schema: str = '', table_name: str = ''):
Expand Down Expand Up @@ -65,6 +64,7 @@ def extract_db_schema(self, conn: RDBMSConnection = None, builder: RDBMSBuilder
fetch_size: int = 50
table_map: Dict[str, Table] = {}
column_map: Dict[str, List[Column]] = {}
pk_map: Dict[str, Set[str]] = self.extract_table_pk(conn=conn, builder=builder, db_schema=db_schema)
with conn.get_conn().cursor() as cursor:
sql = "select c.table_schema as schema, c.table_name as name" \
", pgtd.description as description,c.column_name as col_name, c.data_type as col_type" \
Expand All @@ -91,8 +91,10 @@ def extract_db_schema(self, conn: RDBMSConnection = None, builder: RDBMSBuilder
db_schema=db_schema))
table_map.update({table_name: table_obj})

is_col_pk = col_name in pk_map.get(table_name, {})

col_obj: Column = builder.build_column(column_name=col_name, data_type=col_type, desc=col_desc,
table=table_obj)
is_pk=is_col_pk, table=table_obj)
column_list = column_map.get(table_name, [])
column_list.append(col_obj)
column_map.update({table_name: column_list})
Expand Down Expand Up @@ -127,3 +129,76 @@ def extract_column(self, conn: RDBMSConnection = None, db_schema: str = 'public'
column_schema = cursor.fetchall()
# TODO pack result into object
return column_schema

def extract_table_pk(self, conn: RDBMSConnection = None, builder: RDBMSBuilder = None,
db_schema: DatabaseSchema = None) -> Dict[str, Set[str]]:
fetch_size: int = 50
pk_map: Dict[str, Set[str]] = {}
with conn.get_conn().cursor() as cursor:
sql = "SELECT tc.table_name,c.column_name,tc.constraint_type " \
"FROM information_schema.table_constraints tc " \
"JOIN information_schema.constraint_column_usage AS ccu USING (constraint_schema, constraint_name) " \
"JOIN information_schema.columns AS c ON c.table_schema = tc.constraint_schema " \
" AND tc.table_name = c.table_name AND ccu.column_name = c.column_name " \
" where c.table_schema = '{db_schema}' and tc.constraint_type = 'PRIMARY KEY';" \
.format(db_schema=db_schema.name)
cursor.execute(sql)
rows = cursor.fetchmany(fetch_size)

while rows is not None and len(rows) > 0:
for row in rows:
table_name = row[0]
column_name = row[1]

column_pk_set = pk_map.get(table_name, set())
column_pk_set.add(column_name)
pk_map.update({table_name: column_pk_set})
rows = cursor.fetchmany(fetch_size)

return pk_map

def extract_table_fk(self, conn: RDBMSConnection = None, builder: RDBMSBuilder = None,
db_schema: DatabaseSchema = None) -> Dict[str, List[TableForeignKey]]:

fetch_size: int = 50
table_fk_map: Dict[str, List[TableForeignKey]] = {}

with conn.get_conn().cursor() as cursor:
sql = "SELECT tc.table_name, " \
"kcu.column_name, " \
"ccu.table_schema AS foreign_table_schema, " \
"ccu.table_name AS foreign_table_name, " \
"ccu.column_name AS foreign_column_name " \
"FROM " \
"information_schema.table_constraints AS tc " \
"JOIN information_schema.key_column_usage AS kcu " \
"ON tc.constraint_name = kcu.constraint_name " \
"AND tc.table_schema = kcu.table_schema " \
"JOIN information_schema.constraint_column_usage AS ccu " \
"ON ccu.constraint_name = tc.constraint_name " \
"AND ccu.table_schema = tc.table_schema " \
"where tc.constraint_type = 'FOREIGN KEY' and tc.table_schema = '{db_schema}'"\
.format(db_schema=db_schema.name)
cursor.execute(sql)
rows = cursor.fetchmany(fetch_size)

while rows is not None and len(rows) > 0:
for row in rows:
base_table_name = row[0]
base_column_name = row[1]
refer_table_schema = row[2]
refer_table_name = row[3]
refer_column_name = row[4]

fk_list = table_fk_map.get(base_table_name, [])
table_fk = TableForeignKey(db_schema_base=db_schema, table_base=base_table_name
, column_base=base_column_name
, schema_refer=refer_table_schema
, table_refer=refer_table_name
, column_refer=refer_column_name)

fk_list.append(table_fk)
table_fk_map.update({base_table_name: fk_list})
rows = cursor.fetchmany(fetch_size)

return table_fk_map
12 changes: 11 additions & 1 deletion metadata_extractor/extractor/rdbms/rdbms_extractor_abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from metadata_extractor.builders.rdbms.rdbms_builder_abstract import RDBMSBuilder
from metadata_extractor.models.atlas_model.rdbms.database_schema import DatabaseSchema
from metadata_extractor.models.atlas_model.rdbms.column_info import Column
from typing import List, Dict
from typing import List, Dict, Set


class RDBMSExtractor(ABC):
Expand All @@ -19,3 +19,13 @@ def extract_db_schema(self, conn: RDBMSConnection = None, builder: RDBMSBuilder
@abstractmethod
def extract_column(self, conn: RDBMSConnection = None, db_schema: str = '', table_name: str = ''):
pass

@abstractmethod
def extract_table_pk(self, conn: RDBMSConnection = None, builder: RDBMSBuilder = None,
db_schema: DatabaseSchema = None) -> Dict[str, Set[str]]:
pass

@abstractmethod
def extract_table_fk(self, conn: RDBMSConnection = None, builder: RDBMSBuilder = None,
db_schema: DatabaseSchema = None) -> Dict[str, Set[str]]:
pass
6 changes: 3 additions & 3 deletions metadata_extractor/factories/connection_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,17 @@

class RDBMSConnectionFactory:
@staticmethod
def create(image_name: str, host: str = '', port: int = 1111, db_name: str = '', user: str = '',
def create(engine_name: str, host: str = '', port: int = 1111, db_name: str = '', user: str = '',
password: str = '', **kwargs: {}) -> RDBMSConnection:
switcher = {
DBEngine.postgresql.name: PostgresqlConnection(),
DBEngine.mysql.name: MysqlConnection()
}

builder_obj = switcher.get(image_name, None)
builder_obj = switcher.get(engine_name, None)

if builder_obj is not None:
builder_obj.create_conn(host, port, db_name, user, password, **kwargs)
return builder_obj
else:
raise ValueError('Not found connection named \'%s\'' % image_name)
raise ValueError('Not found connection named \'%s\'' % engine_name)
16 changes: 10 additions & 6 deletions metadata_extractor/models/atlas_model/rdbms/column_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,25 @@ def get_delimiter():
return '.'


def get_qualified_name(db_schema: str, table_qn: str, col_name: str) -> str:
return '{db_schema}{delimiter}{table}{delimiter}{column}' \
def get_qualified_name(table_qn: str, col_name: str) -> str:
return '{table}{delimiter}{column}' \
.format(delimiter=get_delimiter()
, db_schema=db_schema
, table=table_qn
, column=col_name)


class Column:
def __init__(self, name: str, data_type: str, length: str, desc: str, table: Table):
def __init__(self, name: str = '',
data_type: str = '',
length: str = '',
desc: str = '',
is_pk: bool = False,
table: Table = None):
self.name: str = name
self.data_type: str = data_type
self.length: str = length
self.desc: str = desc
self.is_pk: str = is_pk
self.table: Table = table
self.qualified_name = get_qualified_name(db_schema=table.db_schema.name
, table_qn=table.qualified_name
self.qualified_name = get_qualified_name(table_qn=table.qualified_name
, col_name=name)
27 changes: 26 additions & 1 deletion metadata_extractor/models/atlas_model/rdbms/table_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,29 @@ def __init__(self, name: str = '', db_schema: DatabaseSchema = None, desc: str =
self.desc: str = desc
self.tags: List[str] = tags

self.qualified_name: str = get_qualified_name(db_schema=db_schema.name, table_name=name)
self.qualified_name: str = get_qualified_name(db_schema=db_schema.qualified_name, table_name=name)


class TableForeignKey:
def __init__(self,
db_schema_base: DatabaseSchema = None,
table_base: str = '',
column_base: str = '',
schema_refer: str = '',
table_refer: str = '',
column_refer: str = ''):
# Table that contain FK is base table
self.table_base_qn: str = get_qualified_name(db_schema=db_schema_base.qualified_name,
table_name=table_base)
self.column_base_qn: str = '{table_qn}{delimiter}{column_name}' \
.format(table_qn=self.table_base_qn, delimiter=get_delimiter(), column_name=column_base)

self.table_refer_qn: str = '{db_qn}{delimiter}{schema_refer_name}{delimiter}{table_table}' \
.format(db_qn=db_schema_base.db.name, delimiter=get_delimiter(), schema_refer_name=schema_refer,
table_table=table_refer)
self.column_refer_qn: str = '{table_qn}{delimiter}{column_name}' \
.format(table_qn=self.table_refer_qn, delimiter=get_delimiter(), column_name=column_refer)

self.qualified_name: str = '{base_table_column}#FK#{refer_table_column}' \
.format(base_table_column=self.column_base_qn,
refer_table_column=self.column_refer_qn)
Loading

0 comments on commit 814f10f

Please sign in to comment.