In [5]:
import os
import sys
import pandas as pd
from datetime import datetime
from pandas import json_normalize

In [6]:
stock_symbol_dict = {
    'insert_date': str(datetime.now()), 
    'source_connection': 'mysql',
    'source_schema': 'stock',
    'source_table': 'stock_symbols', 
    'key_fields': 'ticker_symbol, stock_name',
    'extraction_method': 'jdbc',
    'extraction_type': 'full', 
    'destination_connection': 'postgresql',
    'destination_schema': 'stock',
    'destination_table': 'stock_symbols', 
    'target_fields': 'ticker_symbol, stock_name'
}

stock_values_dict = {
    'insert_date': str(datetime.now()), 
    'source_connection': 'mysql',
    'source_schema': 'stock',
    'source_table': 'stock_values',
    'key_fields': 'ticker_symbol, day_date, close_value, volume, open_value, high_value, low_value',
    'extraction_method': 'jdbc',
    'extraction_type': 'full', 
    'destination_connection': 'postgresql',
    'destination_schema': 'stock',
    'destination_table': 'stock_values',
    'target_fields': 'ticker_symbol, day_date, close_value, volume, open_value, high_value, low_value'
}

In [7]:
data = [stock_symbol_dict, stock_values_dict]

In [8]:
json_normalize(data)

Unnamed: 0,insert_date,source_connection,source_schema,source_table,key_fields,extraction_method,extraction_type,destination_connection,destination_schema,destination_table,target_fields
0,2023-07-03 21:26:58.237753,mysql,stock,stock_symbols,"ticker_symbol, stock_name",jdbc,full,postgresql,stock,stock_symbols,"ticker_symbol, stock_name"
1,2023-07-03 21:26:58.237794,mysql,stock,stock_values,"ticker_symbol, day_date, close_value, volume, ...",jdbc,full,postgresql,stock,stock_values,"ticker_symbol, day_date, close_value, volume, ..."


'/home/rcoronado/project/airflow_docker_labs'

In [20]:
PATH_BASENAME = os.path.abspath('')
PATH_RELATIVE_ROOT = './airflow/'
PATH_ROOT = os.path.realpath(os.path.join(PATH_BASENAME, PATH_RELATIVE_ROOT))
sys.path.append(PATH_ROOT)

print('---------')
print(f'PATH_ROOT: {PATH_ROOT}')
print('---------')

---------
PATH_ROOT: /home/rcoronado/project/airflow_docker_labs/lab-01-databases/airflow
---------


In [25]:
from helpers.connections import Mysql, Postgresql
from pandas import DataFrame
import os
import pandas as pd

POSTGRE_HOST, POSTGRE_PORT, POSTGRE_DB_NAME, POSTGRE_USER, POSTGRE_PASSWORD = 'host.docker.internal', '5433', 'postgres_db', 'postgres', 'postgres'
MYSQL_HOST, MYSQL_PORT, MYSQL_DB_NAME, MYSQL_USER, MYSQL_PASSWORD = 'host.docker.internal', '3306', 'mysql_db', 'root', 'root_mysql'


In [26]:
def download_reference_table() -> DataFrame:
    """
    Description: Downloads reference table from PostgreSQL database.
    """
    conn_obj = Postgresql(
        host=POSTGRE_HOST, port=POSTGRE_PORT, 
        db_name=POSTGRE_DB_NAME, user_name=POSTGRE_USER, 
        password=POSTGRE_PASSWORD)
    query = """SELECT * FROM etl_manager.database_flow_reference_table"""
    ref_table = conn_obj.execute_query(query=query, return_data=True)
    conn_obj.close_connection()  # close connection to greenplum db
    return ref_table

In [27]:
ref_table = download_reference_table()

Successfully connected to postgres_db!
Query successful
Connection is successfully terminated!


In [28]:
ref_table

Unnamed: 0,insert_date,source_connection,source_schema,source_table,key_fields,extraction_method,extraction_type,destination_connection,destination_schema,destination_table,target_fields
0,2023-07-03 21:30:55.325096,mysql,stock,stock_symbols,"ticker_symbol, stock_name",jdbc,full,postgresql,stock,stock_symbols,"ticker_symbol, stock_name"
1,2023-07-03 21:30:55.325130,mysql,stock,stock_values,"ticker_symbol, day_date, close_value, volume, ...",jdbc,full,postgresql,stock,stock_values,"ticker_symbol, day_date, close_value, volume, ..."


In [29]:
ref_table.source_schema

0    stock
1    stock
Name: source_schema, dtype: object

In [38]:
schemas = ref_table.source_schema.unique().tolist()
source_schema = schemas[0]
source_schema

'stock'

In [39]:
schema_tables = ref_table[ref_table.source_schema == source_schema].copy()
schema_tables

Unnamed: 0,insert_date,source_connection,source_schema,source_table,key_fields,extraction_method,extraction_type,destination_connection,destination_schema,destination_table,target_fields
0,2023-07-03 21:30:55.325096,mysql,stock,stock_symbols,"ticker_symbol, stock_name",jdbc,full,postgresql,stock,stock_symbols,"ticker_symbol, stock_name"
1,2023-07-03 21:30:55.325130,mysql,stock,stock_values,"ticker_symbol, day_date, close_value, volume, ...",jdbc,full,postgresql,stock,stock_values,"ticker_symbol, day_date, close_value, volume, ..."


In [40]:
for idx, row in schema_tables.iterrows():
    source_connection = row['source_connection']
    source_table = row['source_table']
    key_fields = row['key_fields']
    print('source_connection:', source_connection)
    print('source_schema:', source_schema)
    print('source_table:', source_table)
    print('key_fields:', key_fields)
    print('--------')

source_connection: mysql
source_schema: stock
source_table: stock_symbols
key_fields: ticker_symbol, stock_name
--------
source_connection: mysql
source_schema: stock
source_table: stock_values
key_fields: ticker_symbol, day_date, close_value, volume, open_value, high_value, low_value
--------
