In [1]:
import os
import json
from sqlalchemy import create_engine
from airflow.hooks.base import BaseHook
from airflow.providers.oracle.hooks.oracle import OracleHook
import pandas as pd
from datetime import date
import json

In [13]:
with open("/home/airflow/airflow/dags/cuong/declare/dim/danh_sach_cong_doan.json") as f:
    config = json.load(f)

In [14]:
class DIM_DANH_SACH_CONG_DOAN(object):

    def __init__(self, config):
        self.config = config

    def get_engine(self, connection):
        engine = None
        if connection == "dwh_etl":
            conn = BaseHook.get_connection("dwh_etl")
            engine = create_engine(f'postgresql://{conn.login}:{conn.password}@{conn.host}:{conn.port}/{conn.schema}')
        elif connection == "dwh_oracle":
            conn = OracleHook.get_connection("dwh_oracle")
            engine = create_engine(f'oracle+cx_oracle://{conn.login}:{conn.password}@{conn.host}:{conn.port}/{conn.schema}')
        return engine

    def get_df(self, table_config):
        engine = self.get_engine(table_config["CONNECTION"])
        query = """ SELECT * FROM "{}"."{}" """.format(table_config["SCHEMA"], table_config["TABLE"])
        df = pd.read_sql(query, con=engine)
        self.upper_columns(df)
        return df

    def upper_columns(self, df):
        df.columns = [col.upper() for col in df.columns]

    def get_source_df(self):
        df = self.get_df(self.config["source"][0])
        for table_config in self.config["source"][1:]:
            dft = self.get_df(table_config)
            df[table_config["JOIN_COL"][0]]  = df[table_config["JOIN_COL"][0]].str.upper()
            dft[table_config["JOIN_COL"][1]] = dft[table_config["JOIN_COL"][1]].str.upper()
            df = pd.merge(df, dft, how="left", 
                          left_on=table_config["JOIN_COL"][0], 
                          right_on=table_config["JOIN_COL"][1],
                          suffixes=["", "_y"])
        col_map = {}
        for cm in self.config["source"]:
            col_map.update(cm["COLUMNS"])
        df = df[col_map.keys()]
        df.columns = [col_map[col] for col in df.columns]
        return df

    def insert_flow(self):
        key_col = self.config["target"]["KEY_COL"]
        df_target = self.get_df(self.config["target"])
        df_source = self.get_source_df()
        df_source = df_source[~df_source[key_col].isin(df_target[key_col])]
        df_source["TRANG_THAI_BG"] = "A"
        df_source["NGAY_HL_BG"] = date.today()
        df_source["NGAY_HH_BG"] = date(2099, 1, 1)
        df_source.to_sql(self.config["target"]["TABLE"], 
                         con=self.get_engine(self.config["target"]["CONNECTION"]), 
                         schema=self.get_engine(self.config["target"]["SCHEMA"]), 
                         if_exists="append", index=False)
    
    def update_flow(self):
        dst_df = self.get_df(self.config["target"])
        src_df = self.get_source_df()
        

In [15]:
ds_cong_doan = DIM_DANH_SACH_CONG_DOAN(config)

In [16]:
ds_cong_doan.get_source_df()

[[34m2022-12-07 10:54:40,269[0m] {[34mbase.py:[0m68} INFO[0m - Using connection ID 'dwh_oracle' for task execution.[0m
[[34m2022-12-07 10:54:40,313[0m] {[34mbase.py:[0m68} INFO[0m - Using connection ID 'dwh_oracle' for task execution.[0m


Unnamed: 0,MA_CONG_DOAN_CS,TEN_CONG_DOAN_CS,DIA_CHI,MA_LIEN_DOAN_CT
0,CD00132,CĐCS NHNN Chi nhánh tỉnh Bạc Liêu (Đại diện CĐ...,"Số 02 Trần Huỳnh, TX. Bạc Liêu, tỉnh Bạc Liêu",
1,CD00133,CĐCS NHNN Chi nhánh tỉnh Bắc Ninh (Đại diện CĐ...,"Số 20 Lý Thái Tổ, TP. Bắc Ninh, tỉnh Bắc ...",
2,CD00134,CĐCS NHNN Chi nhánh tỉnh Bắc Giang (Đại diện C...,"Số 47 Nguyễn Văn Cừ, TP. Bắc Giang, tỉnh B...",
3,CD00135,CĐCS NHNN Chi nhánh tỉnh Bến Tre (Đại diện CĐN...,"Số 100P Nguyễn Văn Tư, TX. Bến Tre, tỉnh Bến...",
4,CD00136,CĐCS NHNN Chi nhánh tỉnh Bình Dương (Đại diện ...,"Số 161Phú Lợi, P. Phú Lợi, TX. Thủ Dầu Một,...",
...,...,...,...,...
279,CD00280,CĐ Cục Quản lý xây dựng và Chất lượng công trì...,"80 Trần Hưng Đạo, Q.Hoàn Kiếm, Hà Nội",
280,CD00281,CĐ Công ty CP Đầu tư Khai thác Cảng,"Tầng 8, Số 39A Ngô Quyền, Q.Hoàn Kiếm, Hà Nội",
281,CD00282,CĐ Công ty CP Đầu tư Thành Mỹ,"Cụm kho DC 1, Cụm Khu Công nghiệp Duyên Thái, ...",
282,CD00283,CĐ Cơ quan Công đoàn GTVT Việt Nam,"1B Ngô Quyền, Q.Hoàn Kiếm, HN",


In [6]:
ds_cong_doan.insert_flow()

[[34m2022-12-07 10:30:05,214[0m] {[34mbase.py:[0m68} INFO[0m - Using connection ID 'dwh_oracle' for task execution.[0m
[[34m2022-12-07 10:30:05,287[0m] {[34mbase.py:[0m68} INFO[0m - Using connection ID 'dwh_oracle' for task execution.[0m
[[34m2022-12-07 10:30:05,359[0m] {[34mbase.py:[0m68} INFO[0m - Using connection ID 'dwh_oracle' for task execution.[0m


Unnamed: 0,MA_CONG_DOAN_CS,TEN_CONG_DOAN_CS,DIA_CHI,MA_LIEN_DOAN,TRANG_THAI_BG,NGAY_HL_BG,NGAY_HH_BG
0,CD00132,CĐCS NHNN Chi nhánh tỉnh Bạc Liêu (Đại diện CĐ...,"Số 02 Trần Huỳnh, TX. Bạc Liêu, tỉnh Bạc Liêu",,A,2022-12-07,2099-01-01
1,CD00133,CĐCS NHNN Chi nhánh tỉnh Bắc Ninh (Đại diện CĐ...,"Số 20 Lý Thái Tổ, TP. Bắc Ninh, tỉnh Bắc ...",,A,2022-12-07,2099-01-01
2,CD00134,CĐCS NHNN Chi nhánh tỉnh Bắc Giang (Đại diện C...,"Số 47 Nguyễn Văn Cừ, TP. Bắc Giang, tỉnh B...",,A,2022-12-07,2099-01-01
3,CD00135,CĐCS NHNN Chi nhánh tỉnh Bến Tre (Đại diện CĐN...,"Số 100P Nguyễn Văn Tư, TX. Bến Tre, tỉnh Bến...",,A,2022-12-07,2099-01-01
4,CD00136,CĐCS NHNN Chi nhánh tỉnh Bình Dương (Đại diện ...,"Số 161Phú Lợi, P. Phú Lợi, TX. Thủ Dầu Một,...",,A,2022-12-07,2099-01-01
...,...,...,...,...,...,...,...
279,CD00280,CĐ Cục Quản lý xây dựng và Chất lượng công trì...,"80 Trần Hưng Đạo, Q.Hoàn Kiếm, Hà Nội",,A,2022-12-07,2099-01-01
280,CD00281,CĐ Công ty CP Đầu tư Khai thác Cảng,"Tầng 8, Số 39A Ngô Quyền, Q.Hoàn Kiếm, Hà Nội",,A,2022-12-07,2099-01-01
281,CD00282,CĐ Công ty CP Đầu tư Thành Mỹ,"Cụm kho DC 1, Cụm Khu Công nghiệp Duyên Thái, ...",,A,2022-12-07,2099-01-01
282,CD00283,CĐ Cơ quan Công đoàn GTVT Việt Nam,"1B Ngô Quyền, Q.Hoàn Kiếm, HN",,A,2022-12-07,2099-01-01
