In [73]:
from sqlalchemy import create_engine, MetaData, Table
from sqlalchemy.dialects.postgresql import insert
import pandas as pd
import numpy as np
from datetime import datetime

In [72]:
engine = create_engine("postgresql+psycopg2://neoflex_user:neoflex_user@localhost:5432/neo_bank_1")

In [3]:
''' Загрузка полученных df в БД '''
def load_to_db(df, table_name):
    metadata_obj = MetaData(schema = 'ds')
    table = Table(table_name, metadata_obj, autoload_with=engine)
    
    insert_statement = insert(table).values(df.values.tolist())
    upsert_statement = insert_statement.on_conflict_do_update(
        constraint=table.primary_key,
        set_=dict(insert_statement.excluded),
    )
    engine.execute(upsert_statement)

In [209]:
#''' Загрузка (сносит ключи, если что-то не так) '''
#def load_to_db(df, table_name):
#    df.columns = df.columns.str.lower()
    
#    df.to_sql(
#        name = table_name,
#        con = engine,
#        schema = 'ds',
 #       if_exists = 'replace',
  #      index = False,
   #     method = 'multi'
    #)

In [4]:
''' Сбор данных из ft_balance_f.csv, преобразование и загрузка в neo_bank_1  '''
def etl_ft_balance_f():
    df = pd.read_csv(filepath_or_buffer='../../../src/1/1/ft_balance_f.csv', 
        header = 'infer', 
        sep = ';',
        usecols = range(1,5),
        parse_dates = ['ON_DATE', ],
        dayfirst = True
    )
    
    load_to_db(df, 'ft_balance_f')

In [53]:
''' Сбор данных из et_md_account_d.csv, преобразование и загрузка в neo_bank_1  '''
def etl_md_account_d():
    df = pd.read_csv(
        filepath_or_buffer='../../../src/1/1/md_account_d.csv', 
        header='infer', 
        sep=';',
        keep_default_na=False,
        encoding='UTF-8'
    )
    df = df.iloc[: , 1:]
    
    #load_to_db(df, 'md_account_d')
    return df
#etl_md_account_d()

In [20]:
''' Сбор данных из ft_posting_f.csv, преобразование и загрузка в neo_bank_1  '''
def etl_ft_posting_f():
    df = pd.read_csv(
        filepath_or_buffer='../../../src/1/1/ft_posting_f.csv', 
        header='infer', 
        sep=';', 
        keep_default_na=False,
        encoding='UTF-8'
    )

    df = df.iloc[: , 1:]
    df = df.groupby(['OPER_DATE', 'CREDIT_ACCOUNT_RK', 'DEBET_ACCOUNT_RK'], as_index=False).sum()
    #df = df.drop_duplicates(subset=['OPER_DATE', 'CREDIT_ACCOUNT_RK', 'DEBET_ACCOUNT_RK'])
    
    load_to_db(df, 'ft_posting_f')
    #return df2
etl_ft_posting_f()

In [7]:
''' Сбор данных из md_currency_d.csv, преобразование и загрузка в neo_bank_1  '''
def etl_md_currency_d():
    df = pd.read_csv(
        filepath_or_buffer='../../../src/1/1/md_currency_d.csv', 
        header='infer', 
        sep=';',
        encoding = 'latin1',
        usecols=range(1,6),
        parse_dates=['DATA_ACTUAL_DATE', 'DATA_ACTUAL_END_DATE'],
    )
    load_to_db(df, 'md_currency_d')

In [8]:
''' Сбор данных из md_currency_d.csv, преобразование и загрузка в neo_bank_1  '''
def etl_md_exchange_rate_d():
    df = pd.read_csv(
        filepath_or_buffer='../../../src/1/1/md_exchange_rate_d.csv', 
        header='infer', 
        sep=';',
        usecols=range(1,6),
        parse_dates=['DATA_ACTUAL_DATE', 'DATA_ACTUAL_END_DATE'],
    )
    df = df.drop_duplicates()
    
    load_to_db(df, 'md_exchange_rate_d')

In [9]:
''' Сбор данных из md_ledger_account_s.csv, преобразование и загрузка в neo_bank_1  '''
def etl_md_ledger_account_s():
    df = pd.read_csv(
        filepath_or_buffer='../../../src/1/1/md_ledger_account_s.csv', 
        header='infer', 
        sep=';',
        encoding = 'IBM866',
        parse_dates=['START_DATE', 'END_DATE'],
        usecols=range(1,29),
        dtype = {
            'PAIR_ACCOUNT': str,
            'MIN_TERM': str,
            'MAX_TERM': str,
            'MAX_TERM_MEASURE': str,
            'LEDGER_ACC_FULL_NAME_TRANSLIT': str,
            'IS_REVALUATION': str,
            'IS_CORRECT': str
        }
    )
    df = df.replace(np.nan, None)
    load_to_db(df, 'md_ledger_account_s')

In [33]:
def set_logs(status_messange):
    columns = ['action_date', 'status']
    data = list(zip([datetime.now(), ], [status_messange, ]))

    pd.DataFrame(data=data, columns = columns) \
        .to_sql(
            name = 'logs_info_etl_11_process',
            con = engine,
            schema = 'logs',
            if_exists = 'append',
            index = False
        )

In [41]:
set_logs('START')
etl_ft_balance_f()
etl_md_account_d()
etl_ft_posting_f()
etl_md_currency_d()
etl_md_exchange_rate_d()
etl_md_ledger_account_s()
set_logs('END')

In [282]:
TMP_PATH_SAVE_FILES = '/home/grigorii/airflow/dag_src'
columns_type_str = [
        'PAIR_ACCOUNT', 'MIN_TERM',
        'MAX_TERM', 'MAX_TERM_MEASURE',
        'LEDGER_ACC_FULL_NAME_TRANSLIT',
        'IS_REVALUATION', 'IS_CORRECT'
    ]

def read_tmp(fname):
    df = pd.read_csv(
        filepath_or_buffer=f'{TMP_PATH_SAVE_FILES}/{fname}.csv', 
            header='infer', keep_default_na=False
    )
    return df


In [181]:
def save_tmp(df, fname):
    os.makedirs(TMP_PATH_SAVE_FILES, exist_ok=True)
    df.to_csv(
        f'{TMP_PATH_SAVE_FILES}/{fname}.csv', 
        index=False, 
        encoding='UTF-8'
    )

In [None]:
def extract_csv(name_file_csv, encode_type):
    df = pd.read_csv(
        filepath_or_buffer=f'{PATH_TO_FILES_CSV}/{name_file_csv}.csv', 
        header='infer', 
        sep=';',
        encoding ='CP866',
        keep_default_na=False
    )
    return df
    #save_tmp(df, name_file_csv)
df = extract_csv('ft_balance_f', 'UTF-8')


df

In [264]:
name_df = 'md_currency_d'

columns_type_date = ['DATA_ACTUAL_DATE', 'DATA_ACTUAL_END_DATE']

pd_df = read_tmp(name_df)
    
pd_df[columns_type_date] = pd_df[columns_type_date].apply(pd.to_datetime)
pd_df = pd_df.iloc[: , 1:]

save_tmp(pd_df, name_df)

In [248]:
def load_postgres(table_name):
    metadata_obj = MetaData(schema = 'ds')
    table = Table(table_name, metadata_obj, autoload_with=engine)
    
    df = read_tmp(table_name)
    insert_statement = insert(table).values(df.values.tolist())
    upsert_statement = insert_statement.on_conflict_do_update(
        constraint=table.primary_key,
        set_=dict(insert_statement.excluded),
    )
    engine.execute(upsert_statement)

In [88]:
d = datetime.now()
engine.execute(f'''
    INSERT INTO logs.logs_info_etl_11_process 
        (action_date, status)
    VALUES ('{d}','start')
''')

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x7fc73743d4b0>

In [95]:
from dateutil import tz

timestamp=1690592864.334429
from_zone = tz.gettz('UTC')

dt = datetime.fromtimestamp(timestamp, from_zone)

engine.execute('''
    INSERT INTO logs.log_table 
    VALUES ('md_ledger_account_s', '2023-07-29 01:44:22.320746',
                'start', 1)
''')

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x7fc7369ff490>