In [None]:
import pandas as pd
import numpy as np
import time
import os
import sys
import warnings

In [None]:
warnings.filterwarnings("ignore")

In [None]:
project_dir = os.getcwd().rsplit('\\',1)[0]
sys.path.insert(0, project_dir)

from src.utils.database import Database
from src.utils.logger import Logger
from src.utils.constants import *

In [None]:
etl_conn = Database('admin').create_connection()

In [None]:
def balance(connection, filename):
    df = pd.read_csv(f'./source_data/{filename}', sep=';', dtype={'ACCOUNT_RK':'int64', 'CURRENCY_RK':'int64', 'BALANCE_OUT':'float64'}, parse_dates=['ON_DATE'], dayfirst=True).iloc[: , 1:]
    df['ON_DATE'] = pd.to_datetime(df['ON_DATE']).dt.strftime("%Y-%m-%d")

    input = ','.join(str(x) for x in [tuple(x) for x in df.to_numpy()])
    query = \
        f'''
        TRUNCATE TABLE ds.ft_balance_f;
        INSERT INTO ds.ft_balance_f (on_date, account_rk, currency_rk, balance_out)
        VALUES %s
        '''

    Logger().upload_start(filename)

    try:
        connection.cursor().execute(query % input)
        time.sleep(5)

        connection.commit()
        status = 'OK'
    except:
        status = 'FAILED'
        connection.rollback()
    finally:
        Logger().upload_end(filename, len(df), status)
        print(status)

In [None]:
def posting(connection, filename):
    df = pd.read_csv(f'./source_data/{filename}', sep=';', dtype={'CREDIT_ACCOUNT_RK':'int64', 'DEBET_ACCOUNT_RK':'int64', 'CREDIT_AMOUNT':'float64', 'DEBET_AMOUNT':'float64'}, parse_dates=['OPER_DATE'], dayfirst=True).iloc[: , 1:]
    df['OPER_DATE'] = pd.to_datetime(df['OPER_DATE']).dt.strftime("%Y-%m-%d")
    df = df.pivot_table(index=['OPER_DATE', 'CREDIT_ACCOUNT_RK', 'DEBET_ACCOUNT_RK'], values=['CREDIT_AMOUNT', 'DEBET_AMOUNT'], aggfunc='sum').reset_index()

    input = ','.join(str(x) for x in [tuple(x) for x in df.to_numpy()])
    query = \
        f'''
        TRUNCATE TABLE ds.ft_posting_f;
        INSERT INTO ds.ft_posting_f (oper_date,	credit_account_rk, debet_account_rk, credit_amount,	debet_amount)
        VALUES %s
        '''

    Logger().upload_start(filename)

    try:
        connection.cursor().execute(query % input)
        connection.commit()
        status = 'OK'

        time.sleep(5)
    except:
        status = 'FAILED'
        connection.rollback()
    finally:
        Logger().upload_end(filename, len(df), status)
        print(status)

In [None]:
def account(connection, filename): 
    df = pd.read_csv(f'./source_data/{filename}', sep=';', encoding='IBM866', dtype={'CURRENCY_RK':'int64', 'ACCOUNT_NUMBER':'str', 'CHAR_TYPE':'str', 'CURRENCY_RK':'int64', 'CURRENCY_CODE':'str'}, parse_dates=['DATA_ACTUAL_DATE','DATA_ACTUAL_END_DATE'], dayfirst=True, na_values=['   ','NON']).iloc[: , 1:]
    df['DATA_ACTUAL_DATE'] = pd.to_datetime(df['DATA_ACTUAL_DATE']).dt.strftime("%Y-%m-%d")
    df['DATA_ACTUAL_END_DATE'] = pd.to_datetime(df['DATA_ACTUAL_END_DATE']).dt.strftime("%Y-%m-%d")

    input = ','.join(str(x) for x in [tuple(x) for x in df.to_numpy()])
    query = \
        f'''
        TRUNCATE TABLE ds.md_account_d;
        INSERT INTO ds.md_account_d (data_actual_date,data_actual_end_date,account_rk,account_number,char_type,currency_rk,currency_code)
        VALUES %s
        '''
    Logger().upload_start(filename)

    try:
        connection.cursor().execute(query % input)
        connection.commit()
        status = 'OK'

        time.sleep(5)
    except:
        status = 'FAILED'
        connection.rollback()
    finally:
        Logger().upload_end(filename, len(df), status)
        print(status)

In [None]:
def currency(connection, filename):
    df = pd.read_csv(f'./source_data/{filename}', sep=';', encoding='IBM866', dtype={'CURRENCY_RK':'int64', 'CURRENCY_CODE':'str', 'CODE_ISO_CHAR':'str'}, parse_dates=['DATA_ACTUAL_DATE','DATA_ACTUAL_END_DATE'], dayfirst=True, na_values=['   ','NON']).iloc[: , 1:]
    df['DATA_ACTUAL_DATE'] = pd.to_datetime(df['DATA_ACTUAL_DATE']).dt.strftime("%Y-%m-%d")
    df['DATA_ACTUAL_END_DATE'] = pd.to_datetime(df['DATA_ACTUAL_END_DATE']).dt.strftime("%Y-%m-%d")

    Logger().upload_start(filename)

    connection.cursor().execute('''TRUNCATE TABLE ds.md_currency_d''')
    try:
        for idx, row in df.iterrows():
            element = row.dropna()

            columns = list(element.index)
            values = tuple(element.values)

            query = \
                f'''
                INSERT INTO ds.md_currency_d ({','.join(str(x) for x in columns)})
                VALUES {values}
                '''
            #print(query)
            connection.cursor().execute(query)
            connection.commit()
        status = 'OK'

        time.sleep(5)
    except:
        status = 'FAILED'
        connection.rollback()
    finally:
        Logger().upload_end(filename, len(df), status)
        print(status)

In [None]:
def exchange(connection, filename):
    df = pd.read_csv(f'./source_data/{filename}', sep=';', dtype={'CURRENCY_RK':'int64', 'REDUCED_COURCE	':'float64', 'CODE_ISO_CHAR':'str'}, parse_dates=['DATA_ACTUAL_DATE','DATA_ACTUAL_END_DATE'], dayfirst=True).iloc[: , 1:]
    df = df.drop_duplicates(subset=['DATA_ACTUAL_DATE', 'CURRENCY_RK'])
    df['DATA_ACTUAL_DATE'] = pd.to_datetime(df['DATA_ACTUAL_DATE']).dt.strftime("%Y-%m-%d")
    df['DATA_ACTUAL_END_DATE'] = pd.to_datetime(df['DATA_ACTUAL_END_DATE']).dt.strftime("%Y-%m-%d")

    input = ','.join(str(x) for x in [tuple(x) for x in df.to_numpy()])
    query = \
        f'''
        TRUNCATE TABLE ds.md_exchange_rate_d;
        INSERT INTO ds.md_exchange_rate_d (data_actual_date,data_actual_end_date,currency_rk,reduced_cource,code_iso_num)
        VALUES %s
        '''

    Logger().upload_start(filename)

    try:
        connection.cursor().execute(query % input)
        connection.commit()
        status = 'OK'

        time.sleep(5)
    except:
        status = 'FAILED'
        connection.rollback()
    finally:
        Logger().upload_end(filename, len(df), status)
        print(status)

In [None]:
def ledger(connection, filename):
    df = pd.read_csv(f'./source_data/{filename}', sep=';', encoding='IBM866', dtype={'PAIR_ACCOUNT':'str'}, parse_dates=['START_DATE','END_DATE']).iloc[: , 1:]
    df['START_DATE'] = pd.to_datetime(df['START_DATE']).dt.strftime("%Y-%m-%d")
    df['END_DATE'] = pd.to_datetime(df['END_DATE']).dt.strftime("%Y-%m-%d")

    Logger().upload_start(filename)

    connection.cursor().execute('''TRUNCATE TABLE ds.md_ledger_account_s''')
    try:
        for idx, row in df.iterrows():
            element = row.dropna()

            columns = list(element.index)
            values = tuple(element.values)

            query = \
                f'''
                INSERT INTO ds.md_ledger_account_s ({','.join(str(x) for x in columns)})
                VALUES {values}
                '''
            connection.cursor().execute(query)
            connection.commit()
        status = 'OK'

        time.sleep(5)
    except:
        status = 'FAILED'
        connection.rollback()
    finally:
        Logger().upload_end(filename, len(df), status)
        print(status)

In [None]:
balance(etl_conn, 'ft_balance_f.csv')

In [None]:
posting(etl_conn, 'ft_posting_f.csv')

In [None]:
account(etl_conn, 'md_account_d.csv')

In [None]:
currency(etl_conn, 'md_currency_d.csv')

In [None]:
exchange(etl_conn, 'md_exchange_rate_d.csv')

In [None]:
ledger(etl_conn, 'md_ledger_account_s.csv')

In [None]:
etl_conn.close()