In [1]:
import datetime
import getpass
import zipfile
import os
import pandas as pd
import psycopg2
import sqlalchemy
import time
from pathlib import Path
from sqlalchemy import text

# 1. Исследовательский анализ данных

In [2]:
# Указываем входные данные
zip_folder = Path(input('В какой папке лежит архив?'))
unzip_folder = Path(input('В какую папку извлечь архив? - по умолчанию установлена папка, где лежит архив') or zip_folder)
zip_file = input('Как называется файл с архивом? - по умолчанию, файлы.zip') or 'файлы.zip'
login = input('Укажите логин для Postgresql: ')
passw = getpass.getpass(prompt='Укажите пароль для Postgresql: ')

В какой папке лежит архив? D:\Project
В какую папку извлечь архив? - по умолчанию установлена папка, где лежит архив D:\Project\report
Как называется файл с архивом? - по умолчанию, файлы.zip для формы.zip
Укажите логин для Postgresql:  postgres
Укажите пароль для Postgresql:  ········


In [3]:
# Распаковываем архив
with zipfile.ZipFile(os.path.join(zip_folder, zip_file), 'r') as zip_ref:
    zip_ref.extractall(unzip_folder)

Архив распакован. Последовательно откроем каждый файл с данными и проверим следующее:
1. Соответствие столбцов и типов данных в них требованиям задания.
2. Формат данных.
3. Наличие пропусков в данных и дубликатов.
4. Отсутствие ошибок в датах.

### Таблица ft_balance_f

In [4]:
# Изучаем таблицу с данными
ft_balance_f = pd.read_csv(os.path.join(unzip_folder, 'ft_balance_f.csv'), sep = ';')
print('\033[1m' + 'Общая характеристика таблицы' + '\033[0m')
ft_balance_f.info()
print()
print('****************************************')
print('\033[1m' + 'Первые 5 строк' + '\033[0m')
display(ft_balance_f.head())
print()

[1mОбщая характеристика таблицы[0m
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 114 entries, 0 to 113
Data columns (total 4 columns):
 #   Column       Non-Null Count  Dtype  
---  ------       --------------  -----  
 0   ON_DATE      114 non-null    object 
 1   ACCOUNT_RK   114 non-null    int64  
 2   CURRENCY_RK  114 non-null    int64  
 3   BALANCE_OUT  114 non-null    float64
dtypes: float64(1), int64(2), object(1)
memory usage: 3.7+ KB

****************************************
[1mПервые 5 строк[0m


Unnamed: 0,ON_DATE,ACCOUNT_RK,CURRENCY_RK,BALANCE_OUT
0,31.12.2017,36237725,35,38318.13
1,31.12.2017,24656,35,80533.62
2,31.12.2017,18849846,34,63891.96
3,31.12.2017,1972647,34,5087732.1
4,31.12.2017,34157174,34,7097806.9





In [5]:
# Функция для поиска дублей по первичному ключу
def dubles(file, columns):
    if file.duplicated(subset = columns).sum() == 0:
        print('В таблице нет дублей по первичному ключу')
    else:
        print('В строках ниже указан дублирующий первичный ключ')
        display(file[file[columns].duplicated()])

In [6]:
# Ищем дубли в таблице ft_balance_f
columns = ['ON_DATE', 'ACCOUNT_RK']
dubles(ft_balance_f, columns)

В таблице нет дублей по первичному ключу


Далее проверим отсутствие ошибок в датах. Для этого переведем даты в формат дат, затем посмотрим характеристики распределения. Если в датах есть ошибки, то либо нам не удастся изменить формат, либо в характеристиках распределения встретятся аномально низкие либо высокие значения.

In [7]:
ft_balance_f['ON_DATE'] = pd.to_datetime(ft_balance_f['ON_DATE'], format = '%d.%m.%Y')
ft_balance_f['ON_DATE'].describe()

count                    114
mean     2017-12-31 00:00:00
min      2017-12-31 00:00:00
25%      2017-12-31 00:00:00
50%      2017-12-31 00:00:00
75%      2017-12-31 00:00:00
max      2017-12-31 00:00:00
Name: ON_DATE, dtype: object

In [8]:
# Пересохраним файл с измененным форматом даты
ft_balance_f.to_csv(os.path.join(unzip_folder, 'ft_balance_f.csv'), sep = ';', index = False)

### Таблица ft_posting_f

In [9]:
# Изучаем таблицу с данными
ft_posting_f = pd.read_csv(os.path.join(unzip_folder, 'ft_posting_f.csv'), sep = ';')
print('\033[1m' + 'Общая характеристика таблицы' + '\033[0m')
ft_posting_f.info()
print()
print('****************************************')
print('\033[1m' + 'Первые 5 строк' + '\033[0m')
display(ft_posting_f.head())
print()

[1mОбщая характеристика таблицы[0m
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 33892 entries, 0 to 33891
Data columns (total 5 columns):
 #   Column             Non-Null Count  Dtype  
---  ------             --------------  -----  
 0   OPER_DATE          33892 non-null  object 
 1   CREDIT_ACCOUNT_RK  33892 non-null  int64  
 2   DEBET_ACCOUNT_RK   33892 non-null  int64  
 3   CREDIT_AMOUNT      33892 non-null  float64
 4   DEBET_AMOUNT       33892 non-null  float64
dtypes: float64(2), int64(2), object(1)
memory usage: 1.3+ MB

****************************************
[1mПервые 5 строк[0m


Unnamed: 0,OPER_DATE,CREDIT_ACCOUNT_RK,DEBET_ACCOUNT_RK,CREDIT_AMOUNT,DEBET_AMOUNT
0,09-01-2018,13630,17436,94333.93,18337.76
1,09-01-2018,15698716,13630,68294.14,31542.06
2,09-01-2018,12048338,13630,2192.96,98734.33
3,09-01-2018,393808409,17244,44179.86,98544.65
4,09-01-2018,409685020,13630,18843.05,889.74





In [10]:
# Ищем дубли в таблице ft_posting_f. Как такового первичного ключа тут нет, поэтому в качестве него будут выступать все столбцы таблицы. 
# То есть ищем полные дубли строк
columns = ft_posting_f.columns.tolist()
dubles(ft_posting_f, columns)

В таблице нет дублей по первичному ключу


Дубли строк отсутствуют, но даже если бы их и нашли, это не свидетельствовало бы об ошибках в данных.  
Возможна ситуация, когда в одну и ту же дату с одного и того же счета на один и тот же счет произвдена транзакция на определенную сумму, при этом остаток после обеих проводок одинаковый.

In [11]:
# Проверяем даты на наличие ошибок
ft_posting_f['OPER_DATE'] = pd.to_datetime(ft_posting_f['OPER_DATE'], format = '%d-%m-%Y')
ft_posting_f['OPER_DATE'].describe()

count                            33892
mean     2018-01-19 06:37:13.164168448
min                2018-01-09 00:00:00
25%                2018-01-12 00:00:00
50%                2018-01-18 00:00:00
75%                2018-01-25 00:00:00
max                2018-01-31 00:00:00
Name: OPER_DATE, dtype: object

In [12]:
# Пересохраним файл с измененным форматом даты
ft_posting_f.to_csv(os.path.join(unzip_folder, 'ft_posting_f.csv'), sep = ';', index = False)

### Таблица md_account_d

In [13]:
# Изучаем таблицу с данными
md_account_d = pd.read_csv(os.path.join(unzip_folder, 'md_account_d.csv'), sep = ';')
print('\033[1m' + 'Общая характеристика таблицы' + '\033[0m')
md_account_d.info()
print()
print('****************************************')
print('\033[1m' + 'Первые 5 строк' + '\033[0m')
display(md_account_d.head())
print()

[1mОбщая характеристика таблицы[0m
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 112 entries, 0 to 111
Data columns (total 7 columns):
 #   Column                Non-Null Count  Dtype 
---  ------                --------------  ----- 
 0   DATA_ACTUAL_DATE      112 non-null    object
 1   DATA_ACTUAL_END_DATE  112 non-null    object
 2   ACCOUNT_RK            112 non-null    int64 
 3   ACCOUNT_NUMBER        112 non-null    object
 4   CHAR_TYPE             112 non-null    object
 5   CURRENCY_RK           112 non-null    int64 
 6   CURRENCY_CODE         112 non-null    int64 
dtypes: int64(3), object(4)
memory usage: 6.3+ KB

****************************************
[1mПервые 5 строк[0m


Unnamed: 0,DATA_ACTUAL_DATE,DATA_ACTUAL_END_DATE,ACCOUNT_RK,ACCOUNT_NUMBER,CHAR_TYPE,CURRENCY_RK,CURRENCY_CODE
0,2018-01-01,2018-01-31,36237725,30425840700000583001,А,35,840
1,2018-01-01,2018-01-31,24656,30114840700000770002,А,35,840
2,2018-01-01,2018-01-31,18849846,30109810500000435003,П,34,643
3,2018-01-01,2018-01-31,1972647,30111810700000908001,П,34,643
4,2018-01-01,2018-01-31,34157174,30424810100000583001,А,34,643





In [14]:
# Ищем дубли
columns = ['DATA_ACTUAL_DATE', 'ACCOUNT_RK']
dubles(md_account_d, columns)

В таблице нет дублей по первичному ключу


In [15]:
# Проверяем даты на наличие ошибок
md_account_d['DATA_ACTUAL_DATE'] = pd.to_datetime(md_account_d['DATA_ACTUAL_DATE'], format = '%Y-%m-%d')
md_account_d['DATA_ACTUAL_END_DATE'] = pd.to_datetime(md_account_d['DATA_ACTUAL_END_DATE'], format = '%Y-%m-%d')
md_account_d[['DATA_ACTUAL_DATE', 'DATA_ACTUAL_END_DATE']].describe()

Unnamed: 0,DATA_ACTUAL_DATE,DATA_ACTUAL_END_DATE
count,112,112
mean,2018-01-01 00:00:00,2018-01-31 00:00:00
min,2018-01-01 00:00:00,2018-01-31 00:00:00
25%,2018-01-01 00:00:00,2018-01-31 00:00:00
50%,2018-01-01 00:00:00,2018-01-31 00:00:00
75%,2018-01-01 00:00:00,2018-01-31 00:00:00
max,2018-01-01 00:00:00,2018-01-31 00:00:00


In [16]:
# Пересохраним файл с измененным форматом даты
md_account_d.to_csv(os.path.join(unzip_folder, 'md_account_d.csv'), sep = ';', index = False)

### Таблица md_currency_d

In [17]:
# Изучаем таблицу с данными
md_currency_d = pd.read_csv(os.path.join(unzip_folder, 'md_currency_d.csv'), sep = ';', encoding='ansi')
print('\033[1m' + 'Общая характеристика таблицы' + '\033[0m')
md_currency_d.info()
print()
print('****************************************')
print('\033[1m' + 'Первые 5 строк' + '\033[0m')
display(md_currency_d.head())
print()

[1mОбщая характеристика таблицы[0m
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 50 entries, 0 to 49
Data columns (total 5 columns):
 #   Column                Non-Null Count  Dtype  
---  ------                --------------  -----  
 0   CURRENCY_RK           50 non-null     int64  
 1   DATA_ACTUAL_DATE      50 non-null     object 
 2   DATA_ACTUAL_END_DATE  50 non-null     object 
 3   CURRENCY_CODE         49 non-null     float64
 4   CODE_ISO_CHAR         49 non-null     object 
dtypes: float64(1), int64(1), object(3)
memory usage: 2.1+ KB

****************************************
[1mПервые 5 строк[0m


Unnamed: 0,CURRENCY_RK,DATA_ACTUAL_DATE,DATA_ACTUAL_END_DATE,CURRENCY_CODE,CODE_ISO_CHAR
0,4586704,2011-09-06,2050-12-31,0.0,NON
1,50,2017-05-11,2050-12-31,356.0,INR
2,51,2017-05-11,2050-12-31,484.0,MXN
3,52,2017-05-11,2050-12-31,434.0,LYD
4,53,2017-05-11,2050-12-31,422.0,LBR





In [18]:
# Ищем дубли
columns = ['CURRENCY_RK', 'DATA_ACTUAL_DATE']
dubles(md_currency_d, columns)

В таблице нет дублей по первичному ключу


In [19]:
# Проверяем даты на наличие ошибок
md_currency_d['DATA_ACTUAL_DATE'] = pd.to_datetime(md_currency_d['DATA_ACTUAL_DATE'], format = '%Y-%m-%d')
md_currency_d['DATA_ACTUAL_END_DATE'] = pd.to_datetime(md_currency_d['DATA_ACTUAL_END_DATE'], format = '%Y-%m-%d')
md_currency_d[['DATA_ACTUAL_DATE', 'DATA_ACTUAL_END_DATE']].describe()

Unnamed: 0,DATA_ACTUAL_DATE,DATA_ACTUAL_END_DATE
count,50,50
mean,2017-04-08 02:52:48,2050-12-31 00:00:00
min,2011-09-06 00:00:00,2050-12-31 00:00:00
25%,2017-05-11 00:00:00,2050-12-31 00:00:00
50%,2017-05-11 00:00:00,2050-12-31 00:00:00
75%,2017-05-11 00:00:00,2050-12-31 00:00:00
max,2017-12-12 00:00:00,2050-12-31 00:00:00


In [20]:
# Избавимся от дробной части в столбце CURRENCY_CODE. Переведем данные столбца в строковый тип
md_currency_d['CURRENCY_CODE'] = md_currency_d['CURRENCY_CODE'].astype('str').apply(lambda x: x[:3] if x != 'NaN' and x != 'nan' else None)
md_currency_d.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 50 entries, 0 to 49
Data columns (total 5 columns):
 #   Column                Non-Null Count  Dtype         
---  ------                --------------  -----         
 0   CURRENCY_RK           50 non-null     int64         
 1   DATA_ACTUAL_DATE      50 non-null     datetime64[ns]
 2   DATA_ACTUAL_END_DATE  50 non-null     datetime64[ns]
 3   CURRENCY_CODE         49 non-null     object        
 4   CODE_ISO_CHAR         49 non-null     object        
dtypes: datetime64[ns](2), int64(1), object(2)
memory usage: 2.1+ KB


In [21]:
# Проверим, есть ли в столбце CODE_ISO_CHAR данные, длина которых превышает 3 знака.
md_currency_d[md_currency_d['CODE_ISO_CHAR'].str.len() > 3]

Unnamed: 0,CURRENCY_RK,DATA_ACTUAL_DATE,DATA_ACTUAL_END_DATE,CURRENCY_CODE,CODE_ISO_CHAR


In [22]:
# Обрежем данные в столбце CODE_ISO_CHAR до 3 знаков
md_currency_d['CODE_ISO_CHAR'] = md_currency_d['CODE_ISO_CHAR'].astype('str').apply(lambda x: x[:3] if x != 'NaN' and x != 'nan' else None)
md_currency_d[md_currency_d['CODE_ISO_CHAR'].str.len() > 3]

Unnamed: 0,CURRENCY_RK,DATA_ACTUAL_DATE,DATA_ACTUAL_END_DATE,CURRENCY_CODE,CODE_ISO_CHAR


In [23]:
# Пересохраним файл с изменениями
md_currency_d.to_csv(os.path.join(unzip_folder, 'md_currency_d.csv'), sep = ';', index = False)

### Таблица md_exchange_rate_d

In [24]:
# Изучаем таблицу с данными
md_exchange_rate_d = pd.read_csv(os.path.join(unzip_folder, 'md_exchange_rate_d.csv'), sep = ';', encoding='ansi')
print('\033[1m' + 'Общая характеристика таблицы' + '\033[0m')
md_exchange_rate_d.info()
print()
print('****************************************')
print('\033[1m' + 'Первые 5 строк' + '\033[0m')
display(md_exchange_rate_d.head())
print()

[1mОбщая характеристика таблицы[0m
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 892 entries, 0 to 891
Data columns (total 5 columns):
 #   Column                Non-Null Count  Dtype  
---  ------                --------------  -----  
 0   DATA_ACTUAL_DATE      892 non-null    object 
 1   DATA_ACTUAL_END_DATE  892 non-null    object 
 2   CURRENCY_RK           892 non-null    int64  
 3   REDUCED_COURCE        892 non-null    float64
 4   CODE_ISO_NUM          892 non-null    int64  
dtypes: float64(1), int64(2), object(2)
memory usage: 35.0+ KB

****************************************
[1mПервые 5 строк[0m


Unnamed: 0,DATA_ACTUAL_DATE,DATA_ACTUAL_END_DATE,CURRENCY_RK,REDUCED_COURCE,CODE_ISO_NUM
0,2016-07-01,2050-12-31,38,31.8884,974
1,2018-01-31,2018-01-31,427870281,28.3798,933
2,2018-01-31,2018-01-31,29,7.11613,752
3,2018-01-31,2018-01-31,529511970,33.0543,944
4,2018-01-31,2018-01-31,62,8.87555,156





In [25]:
# Ищем дубли
columns = ['DATA_ACTUAL_DATE', 'CURRENCY_RK']
dubles(md_exchange_rate_d, columns)

В строках ниже указан дублирующий первичный ключ


Unnamed: 0,DATA_ACTUAL_DATE,DATA_ACTUAL_END_DATE,CURRENCY_RK,REDUCED_COURCE,CODE_ISO_NUM
82,2018-01-27,2018-01-29,427870281,28.339500,933
83,2018-01-27,2018-01-29,29,7.089730,752
84,2018-01-27,2018-01-29,529511970,32.782600,944
85,2018-01-27,2018-01-29,62,8.833810,156
86,2018-01-27,2018-01-29,205699733,2.743830,203
...,...,...,...,...,...
887,2017-12-30,2018-01-09,205699734,0.222052,348
888,2017-12-30,2018-01-09,205699733,2.693050,203
889,2017-12-30,2018-01-09,529511969,0.119009,51
890,2017-12-30,2018-01-09,529511970,33.822800,944


Обнаружены дубли. Исходя из описания данных, записи, дублирующиеся по первичным ключам, должны дублироваться по всем столбцам. Првоерим это.

In [26]:
print('Количество полных дублей строк:', md_exchange_rate_d.duplicated(subset = ['DATA_ACTUAL_DATE', 'CURRENCY_RK']).sum())

Количество полных дублей строк: 432


Количество полных дублей строк совпадает с количеством дублей по первичному ключу. В таблицу PSQL не удастся вставить такие данные при условии, что столбцы DATA_ACTUAL_DATE и CURRENCY_RK будут первичным ключом. Кроме того, дублирование данных записей не дает полезную информацию, а только создает избыточность в данных. В связи с чем предлагаю все дубли удалить.

In [27]:
# Удаляем дубли
md_exchange_rate_d = md_exchange_rate_d.drop_duplicates()
md_exchange_rate_d.info()

<class 'pandas.core.frame.DataFrame'>
Index: 460 entries, 0 to 621
Data columns (total 5 columns):
 #   Column                Non-Null Count  Dtype  
---  ------                --------------  -----  
 0   DATA_ACTUAL_DATE      460 non-null    object 
 1   DATA_ACTUAL_END_DATE  460 non-null    object 
 2   CURRENCY_RK           460 non-null    int64  
 3   REDUCED_COURCE        460 non-null    float64
 4   CODE_ISO_NUM          460 non-null    int64  
dtypes: float64(1), int64(2), object(2)
memory usage: 21.6+ KB


In [28]:
# Проверяем даты на наличие ошибок
md_exchange_rate_d['DATA_ACTUAL_DATE'] = pd.to_datetime(md_exchange_rate_d['DATA_ACTUAL_DATE'], format = '%Y-%m-%d')
md_exchange_rate_d['DATA_ACTUAL_END_DATE'] = pd.to_datetime(md_exchange_rate_d['DATA_ACTUAL_END_DATE'], format = '%Y-%m-%d')
md_exchange_rate_d[['DATA_ACTUAL_DATE', 'DATA_ACTUAL_END_DATE']].describe()

Unnamed: 0,DATA_ACTUAL_DATE,DATA_ACTUAL_END_DATE
count,460,460
mean,2018-01-17 15:35:59.999999744,2018-02-14 23:34:57.391304448
min,2016-07-01 00:00:00,2018-01-09 00:00:00
25%,2018-01-13 00:00:00,2018-01-15 00:00:00
50%,2018-01-19 00:00:00,2018-01-19 00:00:00
75%,2018-01-25 00:00:00,2018-01-25 00:00:00
max,2018-01-31 00:00:00,2050-12-31 00:00:00


In [29]:
# Пересохраним файл с внесенными изменениями
md_exchange_rate_d.to_csv(os.path.join(unzip_folder, 'md_exchange_rate_d.csv'), sep = ';', index = False)

### Таблица md_ledger_account_s

In [30]:
# Изучаем таблицу с данными
md_ledger_account_s = pd.read_csv(os.path.join(unzip_folder, 'md_ledger_account_s.csv'), sep = ';')
print('\033[1m' + 'Общая характеристика таблицы' + '\033[0m')
md_ledger_account_s.info()
print()
print('****************************************')
print('\033[1m' + 'Первые 5 строк' + '\033[0m')
display(md_ledger_account_s.head())
print()

[1mОбщая характеристика таблицы[0m
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 18 entries, 0 to 17
Data columns (total 12 columns):
 #   Column                Non-Null Count  Dtype 
---  ------                --------------  ----- 
 0   CHAPTER               18 non-null     object
 1   CHAPTER_NAME          18 non-null     object
 2   SECTION_NUMBER        18 non-null     int64 
 3   SECTION_NAME          18 non-null     object
 4   SUBSECTION_NAME       18 non-null     object
 5   LEDGER1_ACCOUNT       18 non-null     int64 
 6   LEDGER1_ACCOUNT_NAME  18 non-null     object
 7   LEDGER_ACCOUNT        18 non-null     int64 
 8   LEDGER_ACCOUNT_NAME   18 non-null     object
 9   CHARACTERISTIC        18 non-null     object
 10  START_DATE            18 non-null     object
 11  END_DATE              18 non-null     object
dtypes: int64(3), object(9)
memory usage: 1.8+ KB

****************************************
[1mПервые 5 строк[0m


Unnamed: 0,CHAPTER,CHAPTER_NAME,SECTION_NUMBER,SECTION_NAME,SUBSECTION_NAME,LEDGER1_ACCOUNT,LEDGER1_ACCOUNT_NAME,LEDGER_ACCOUNT,LEDGER_ACCOUNT_NAME,CHARACTERISTIC,START_DATE,END_DATE
0,А,Балансовые счета,3,МЕЖБАНКОВСКИЕ ОПЕРАЦИИ,МЕЖБАНКОВСКИЕ РАСЧЕТЫ,302,Счета кредитных организаций по другим операциям,30204,Обязательные резервы кредитных организаций по ...,А,2014-01-01,2050-12-31
1,А,Балансовые счета,3,МЕЖБАНКОВСКИЕ ОПЕРАЦИИ,МЕЖБАНКОВСКИЕ РАСЧЕТЫ,301,Корреспондентские счета,30109,Корреспондентские счета кредитных организаций ...,П,2008-01-01,2050-12-31
2,А,Балансовые счета,3,МЕЖБАНКОВСКИЕ ОПЕРАЦИИ,МЕЖБАНКОВСКИЕ РАСЧЕТЫ,301,Корреспондентские счета,30110,Корреспондентские счета в кредитных организаци...,А,2008-01-01,2050-12-31
3,А,Балансовые счета,3,МЕЖБАНКОВСКИЕ ОПЕРАЦИИ,МЕЖБАНКОВСКИЕ РАСЧЕТЫ,301,Корреспондентские счета,30111,Корреспондентские счета банков- нерезидентов,П,2008-01-01,2050-12-31
4,А,Балансовые счета,3,МЕЖБАНКОВСКИЕ ОПЕРАЦИИ,МЕЖБАНКОВСКИЕ РАСЧЕТЫ,301,Корреспондентские счета,30126,Резервы на возможные потери,П,2008-01-01,2050-12-31





В файле меньше столбцов, чем в описании таблиц. Однако, все обязательные поля заполнены.

In [31]:
# Ищем дубли
columns = ['LEDGER_ACCOUNT', 'START_DATE']
dubles(md_ledger_account_s, columns)

В таблице нет дублей по первичному ключу


In [32]:
# Проверяем даты на наличие ошибок
md_ledger_account_s['START_DATE'] = pd.to_datetime(md_ledger_account_s['START_DATE'], format = '%Y-%m-%d')
md_ledger_account_s['END_DATE'] = pd.to_datetime(md_ledger_account_s['END_DATE'], format = '%Y-%m-%d')
md_ledger_account_s[['START_DATE', 'END_DATE']].describe()

Unnamed: 0,START_DATE,END_DATE
count,18,18
mean,2010-08-11 22:40:00,2050-12-31 00:00:00
min,2008-01-01 00:00:00,2050-12-31 00:00:00
25%,2008-01-01 00:00:00,2050-12-31 00:00:00
50%,2008-01-01 00:00:00,2050-12-31 00:00:00
75%,2013-10-01 18:00:00,2050-12-31 00:00:00
max,2016-01-01 00:00:00,2050-12-31 00:00:00


In [33]:
# Пересохраним файл с внесенными изменениями
md_ledger_account_s.to_csv(os.path.join(unzip_folder, 'md_ledger_account_s.csv'), sep = ';', index = False)

### Промежуточный вывод

В ходе осмотра файлов с данными обнаружены следующие проблемы:  
1. Даты в таблицах сохранены в разном формате. Приведено к единому формату.
2. В таблице md_exchange_rate_d обнаружено 432 дублирующие строки, в том числе и по первичному ключу. Все дубли удалены.
3. В таблице md_currency_d обнаружено по 1 пропуску в столбцах currency_code и code_iso_char. Пропуски оставлены как есть, так как нет данных для их заполнения. Формат данных в столбцах приведен к строковому типу.
4. В таблице md_currency_d в столбце code_iso_char обнаружено значение, длина которого превышает 3 символа. Визуально данное значение аномально. Так как данный столбец не потребуется для дальнейших расчетов, а превышение длины не даст произвести вставку в таблицу, принято решение обрезать обнаруженное аномальное значение до 3 знаков.

# 2. Создание БД в PostgreSQL

In [34]:
# Создание базы данных
conn = psycopg2.connect(user=login, password=passw)
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
cursor = conn.cursor()
cursor.execute('CREATE DATABASE form101')
cursor.close()

engine = sqlalchemy.create_engine(f'postgresql+psycopg2://{login}:{passw}@127.0.0.1:5432/form101')

In [35]:
# Функция для запросов
def create(engine, query):
    conn = engine.connect()
    conn.execute(text('commit'))
    conn.execute(text(query))
    conn.close()

### Создание схемы и таблиц для хранения данных

In [36]:
# Создание схемы DS
query = 'CREATE SCHEMA IF NOT EXISTS DS'
create(engine, query)

In [37]:
# Создание таблицы DS.FT_BALANCE_F
query = '''CREATE TABLE IF NOT EXISTS DS.FT_BALANCE_F
(on_date date not null,
account_rk integer not null,
currency_rk integer,
balance_out numeric,
CONSTRAINT balance_pkey PRIMARY KEY(on_date, account_rk))'''
create(engine, query)

In [38]:
# Реализация стратегии обновления UPSERT для таблицы DS.FT_BALANCE_F
query = '''CREATE OR REPLACE FUNCTION ds.upsert_ft_balance_f()
RETURNS TRIGGER AS $$
BEGIN
    UPDATE ds.ft_balance_f
    SET currency_rk = NEW.currency_rk, balance_out = NEW.balance_out
    WHERE on_date = NEW.on_date AND account_rk = NEW.account_rk;
    IF FOUND THEN
        RETURN NULL;
    END IF;
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE OR REPLACE TRIGGER trg_upsert_ft_balance_f
BEFORE INSERT ON ds.ft_balance_f
FOR EACH ROW EXECUTE FUNCTION ds.upsert_ft_balance_f();'''
create(engine, query)

In [39]:
# Создание таблицы DS.FT_POSTING_F
# Для этой таблицы стратегия UPSERT не применяется, так как в таблице нет первичного ключа
query = '''CREATE TABLE IF NOT EXISTS DS.FT_POSTING_F
(oper_date date not null,
credit_account_rk integer not null,
debet_account_rk integer not null,
credit_amount numeric,
debet_amount numeric)'''
create(engine, query)

In [40]:
# Создание таблицы DS.MD_ACCOUNT_D
query = '''CREATE TABLE IF NOT EXISTS DS.MD_ACCOUNT_D
(data_actual_date date not null,
data_actual_end_date date not null,
account_rk integer not null,
account_number varchar(20) not null,
char_type varchar(1) not null,
currency_rk integer not null,
currency_code varchar(3) not null,
CONSTRAINT md_account_d_pkey PRIMARY KEY(data_actual_date, account_rk))'''
create(engine, query)

In [41]:
# Реализация стратегии обновления UPSERT для таблицы DS.MD_ACCOUNT_D
query = '''CREATE OR REPLACE FUNCTION ds.upsert_md_account_d()
RETURNS TRIGGER AS $$
BEGIN
    UPDATE ds.md_account_d
    SET data_actual_end_date = NEW.data_actual_end_date,
    account_number = NEW.account_number,
    char_type = NEW.char_type,
    currency_rk = NEW.currency_rk,
    currency_code = NEW.currency_code
    WHERE data_actual_date = NEW.data_actual_date AND account_rk = NEW.account_rk;
    IF FOUND THEN
        RETURN NULL;
    END IF;
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE OR REPLACE TRIGGER trg_upsert_md_account_d
BEFORE INSERT ON ds.md_account_d
FOR EACH ROW EXECUTE FUNCTION ds.upsert_md_account_d();'''
create(engine, query)

In [42]:
# Создание таблицы DS.MD_CURRENCY_D
query = '''CREATE TABLE IF NOT EXISTS DS.MD_CURRENCY_D
(currency_rk integer not null,
data_actual_date date not null,
data_actual_end_date date,
currency_code varchar(3),
code_iso_char varchar(3),
CONSTRAINT md_currency_d_pkey PRIMARY KEY(currency_rk, data_actual_date))'''
create(engine, query)

In [43]:
# Реализация стратегии обновления UPSERT для таблицы DS.MD_CURRENCY_D
query = '''CREATE OR REPLACE FUNCTION ds.upsert_md_currency_d()
RETURNS TRIGGER AS $$
BEGIN
    UPDATE ds.md_currency_d
    SET data_actual_end_date = NEW.data_actual_end_date,
    currency_code = NEW.currency_code,
    code_iso_char = NEW.code_iso_char
    WHERE currency_rk = NEW.currency_rk AND data_actual_date = NEW.data_actual_date;
    IF FOUND THEN
        RETURN NULL;
    END IF;
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE OR REPLACE TRIGGER trg_upsert_md_currency_d
BEFORE INSERT ON ds.md_currency_d
FOR EACH ROW EXECUTE FUNCTION ds.upsert_md_currency_d();'''
create(engine, query)

In [44]:
# Создание таблицы DS.MD_EXCHANGE_RATE_D
query = '''CREATE TABLE IF NOT EXISTS DS.MD_EXCHANGE_RATE_D
(data_actual_date date not null,
data_actual_end_date date,
currency_rk integer not null,
reduced_cource numeric,
code_iso_num varchar(3),
CONSTRAINT md_exchange_rate_d_pkey PRIMARY KEY(data_actual_date, currency_rk))'''
create(engine, query)

In [45]:
# Реализация стратегии обновления UPSERT для таблицы DS.MD_EXCHANGE_RATE_D
query = '''CREATE OR REPLACE FUNCTION ds.upsert_md_exchange_rate_d()
RETURNS TRIGGER AS $$
BEGIN
    UPDATE ds.md_exchange_rate_d
    SET data_actual_end_date = NEW.data_actual_end_date,
    reduced_cource = NEW.reduced_cource,
    code_iso_num = NEW.code_iso_num
    WHERE data_actual_date = NEW.data_actual_date AND currency_rk = NEW.currency_rk;
    IF FOUND THEN
        RETURN NULL;
    END IF;
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE OR REPLACE TRIGGER trg_upsert_md_exchange_rate_d
BEFORE INSERT ON ds.md_exchange_rate_d
FOR EACH ROW EXECUTE FUNCTION ds.upsert_md_exchange_rate_d();'''
create(engine, query)

In [46]:
# Создание таблицы DS.MD_LEDGER_ACCOUNT_S
query = '''CREATE TABLE IF NOT EXISTS DS.MD_LEDGER_ACCOUNT_S
(chapter varchar(1),
chapter_name varchar(16),
section_number integer,
section_name varchar(22),
subsection_name varchar(22),
ledger1_account integer,
ledger1_account_name varchar(47),
ledger_account integer not null,
ledger_account_name varchar(153),
characteristic varchar(1),
is_resident integer,
is_reserve integer,
is_reserved integer,
is_loan integer,
is_reserved_assets integer,
is_overdue integer,
is_interest integer,
pair_account varchar(5),
start_date date not null,
end_date date,
is_rub_only integer,
min_term varchar(1),
min_term_measure varchar(1),
max_term varchar(1),
max_term_measure varchar(1),
ledger_acc_full_name_translit varchar(1),
is_revaluation varchar(1),
is_correct varchar(1),
CONSTRAINT md_ledger_account_s_pkey PRIMARY KEY(ledger_account, start_date))'''
create(engine, query)

In [47]:
# Реализация стратегии обновления UPSERT для таблицы DS.MD_LEDGER_ACCOUNT_S
query = '''CREATE OR REPLACE FUNCTION ds.upsert_md_ledger_account_s()
RETURNS TRIGGER AS $$
BEGIN
    UPDATE ds.md_ledger_account_s
    SET chapter = NEW.chapter,
    chapter_name = NEW.chapter_name,
    section_number = NEW.section_number,
    section_name = NEW.section_name,
    subsection_name = NEW.subsection_name,
    ledger1_account = NEW.ledger1_account,
    ledger1_account_name = NEW.ledger1_account_name,
    ledger_account_name = NEW.ledger_account_name,
    characteristic = NEW.characteristic,
    is_resident = NEW.is_resident,
    is_reserve = NEW.is_reserve,
    is_reserved = NEW.is_reserved,
    is_loan = NEW.is_loan,
    is_reserved_assets = NEW.is_reserved_assets,
    is_overdue = NEW.is_overdue,
    is_interest = NEW.is_interest,
    pair_account = NEW.pair_account,
    end_date = NEW.end_date,
    is_rub_only = NEW.is_rub_only,
    min_term = NEW.min_term,
    min_term_measure = NEW.min_term_measure,
    max_term = NEW.max_term,
    max_term_measure = NEW.max_term_measure,
    ledger_acc_full_name_translit = NEW.ledger_acc_full_name_translit,
    is_revaluation = NEW.is_revaluation,
    is_correct = NEW.is_correct
    WHERE ledger_account = NEW.ledger_account AND start_date = NEW.start_date;
    IF FOUND THEN
        RETURN NULL;
    END IF;
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE OR REPLACE TRIGGER trg_upsert_md_ledger_account_s
BEFORE INSERT ON ds.md_ledger_account_s
FOR EACH ROW EXECUTE FUNCTION ds.upsert_md_ledger_account_s();'''
create(engine, query)

### Создание схемы и таблицы для хранения логов

In [48]:
# Создание схемы LOGS
query = 'CREATE SCHEMA IF NOT EXISTS LOGS'
create(engine, query)

In [49]:
# Создание таблицы для хранения логов
query = '''CREATE TABLE IF NOT EXISTS LOGS.insert_logs
(log_id serial PRIMARY KEY,
transaction_id bigint,
table_name varchar(100) not null,
operation_type varchar(10) not null,
start_time timestamp with time zone not null,
end_time timestamp with time zone,
user_name varchar(100) not null,
rows_affected integer)'''
create(engine, query)

In [50]:
# Создание функций для автоматического заполнения таблицы логов
query = '''CREATE OR REPLACE FUNCTION LOGS.log_change_start()
RETURNS TRIGGER AS $$
BEGIN
    IF NOT EXISTS (
        SELECT 1 FROM LOGS.insert_logs 
        WHERE transaction_id = txid_current() 
        AND table_name = TG_TABLE_NAME
		AND operation_type = TG_OP
    ) THEN
        INSERT INTO LOGS.insert_logs (
            transaction_id,
            table_name,
            operation_type,
            start_time,
            user_name,
			rows_affected
        ) VALUES (
            txid_current(),
            TG_TABLE_NAME,
			TG_OP,
            CURRENT_TIMESTAMP,
            current_user,
			0
        );
    END IF;
    
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE OR REPLACE FUNCTION LOGS.log_change_end()
RETURNS TRIGGER AS $$
BEGIN
    -- Обновляем только если запись существует и время окончания еще не установлено
    UPDATE LOGS.insert_logs 
    SET end_time = CURRENT_TIMESTAMP,
	    rows_affected = rows_affected + 1
    WHERE transaction_id = txid_current()
    AND table_name = TG_TABLE_NAME
	AND operation_type = TG_OP;

	PERFORM LOGS.cleanup_incomplete_logs();
    
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE OR REPLACE FUNCTION LOGS.cleanup_incomplete_logs()
RETURNS VOID AS $$
BEGIN
    DELETE FROM LOGS.insert_logs
    WHERE end_time IS NULL;
END;
$$ LANGUAGE plpgsql;'''
create(engine, query)

In [51]:
# Создание триггеров заполнения логов для каждой таблицы
query = '''CREATE OR REPLACE TRIGGER log_start_ft_balance_f
BEFORE INSERT OR UPDATE ON DS.ft_balance_f
FOR EACH ROW
EXECUTE FUNCTION LOGS.log_change_start();

CREATE OR REPLACE TRIGGER log_end_ft_balance_f
AFTER INSERT OR UPDATE ON DS.ft_balance_f
FOR EACH ROW
EXECUTE FUNCTION LOGS.log_change_end();

CREATE OR REPLACE TRIGGER log_start_ft_posting_f
BEFORE INSERT OR UPDATE ON DS.ft_posting_f
FOR EACH ROW
EXECUTE FUNCTION LOGS.log_change_start();

CREATE OR REPLACE TRIGGER log_end_ft_posting_f
AFTER INSERT OR UPDATE ON DS.ft_posting_f
FOR EACH ROW
EXECUTE FUNCTION LOGS.log_change_end();

CREATE OR REPLACE TRIGGER log_start_md_account_d
BEFORE INSERT OR UPDATE ON DS.md_account_d
FOR EACH ROW
EXECUTE FUNCTION LOGS.log_change_start();

CREATE OR REPLACE TRIGGER log_end_md_account_d
AFTER INSERT OR UPDATE ON DS.md_account_d
FOR EACH ROW
EXECUTE FUNCTION LOGS.log_change_end();

CREATE OR REPLACE TRIGGER log_start_md_currency_d
BEFORE INSERT OR UPDATE ON DS.md_currency_d
FOR EACH ROW
EXECUTE FUNCTION LOGS.log_change_start();

CREATE OR REPLACE TRIGGER log_end_md_currency_d
AFTER INSERT OR UPDATE ON DS.md_currency_d
FOR EACH ROW
EXECUTE FUNCTION LOGS.log_change_end();

CREATE OR REPLACE TRIGGER log_start_md_exchange_rate_d
BEFORE INSERT OR UPDATE ON DS.md_exchange_rate_d
FOR EACH ROW
EXECUTE FUNCTION LOGS.log_change_start();

CREATE OR REPLACE TRIGGER log_end_md_exchange_rate_d
AFTER INSERT OR UPDATE ON DS.md_exchange_rate_d
FOR EACH ROW
EXECUTE FUNCTION LOGS.log_change_end();

CREATE OR REPLACE TRIGGER log_start_md_ledger_account_s
BEFORE INSERT OR UPDATE ON DS.md_ledger_account_s
FOR EACH ROW
EXECUTE FUNCTION LOGS.log_change_start();

CREATE OR REPLACE TRIGGER log_end_md_ledger_account_s
AFTER INSERT OR UPDATE ON DS.md_ledger_account_s
FOR EACH ROW
EXECUTE FUNCTION LOGS.log_change_end();'''
create(engine, query)

# 3. Загрузка данных в созданную БД

In [52]:
# Универсальная функция для вставки данных в таблицы
def insert(engine, table, file):
    conn = engine.connect()
    file = os.path.join(unzip_folder, file)
    query = f'''CREATE TEMP TABLE temp (LIKE {table} INCLUDING DEFAULTS) ON COMMIT DROP;
    COPY temp FROM '{file}'
    WITH (FORMAT csv, HEADER true, DELIMITER ';');
    INSERT INTO {table}
    SELECT *
    FROM temp;
    '''
    conn.execute(text('commit'))
    conn.execute(text(query))
    #time.sleep(5)
    query = '''UPDATE LOGS.insert_logs
    SET end_time = CURRENT_TIMESTAMP
    WHERE transaction_id = (
        SELECT transaction_id
        FROM LOGS.insert_logs
        ORDER BY start_time DESC
        LIMIT 1)'''
    conn.execute(text(query))
    conn.close()

In [53]:
# Загрузка данных в таблицу DS.ft_balance_f
insert(engine, 'DS.ft_balance_f', 'ft_balance_f.csv')

In [54]:
# Загрузка данных в таблицу DS.ft_posting_f
insert(engine, 'DS.ft_posting_f', 'ft_posting_f.csv')

In [55]:
# Загрузка данных в таблицу DS.md_account_d
insert(engine, 'DS.md_account_d', 'md_account_d.csv')

In [56]:
# Загрузка данных в таблицу DS.md_currency_d
insert(engine, 'DS.md_currency_d', 'md_currency_d.csv')

In [57]:
# Загрузка данных в таблицу DS.md_exchange_rate_d
insert(engine, 'DS.md_exchange_rate_d', 'md_exchange_rate_d.csv')

In [58]:
# для таблицы md_ledger_account_s реализован отдельный алгоритм вставки данных
# Это связано с тем, что перед заливкой из файла таблица должна очищаться
# Кроме того, в исходном файле содержатся не все столбцы таблицы
conn = engine.connect()
file = os.path.join(unzip_folder, 'md_ledger_account_s.csv')
query = 'TRUNCATE TABLE DS.md_ledger_account_s'
conn.execute(text('commit'))
conn.execute(text(query))
query = f'''CREATE TEMP TABLE temp (LIKE DS.md_ledger_account_s INCLUDING DEFAULTS) ON COMMIT DROP;
COPY temp(chapter, chapter_name, section_number, section_name, subsection_name, ledger1_account,
ledger1_account_name, ledger_account, ledger_account_name, characteristic, start_date, end_date) FROM '{file}'
WITH (FORMAT csv, HEADER true, DELIMITER ';');
INSERT INTO DS.md_ledger_account_s
SELECT *
FROM temp;
'''
conn.execute(text(query))
#time.sleep(5)
query = '''UPDATE LOGS.insert_logs
SET end_time = CURRENT_TIMESTAMP
WHERE transaction_id = (
    SELECT transaction_id
    FROM LOGS.insert_logs
    ORDER BY start_time DESC
    LIMIT 1)'''
conn.execute(text(query))
conn.close()

# 4. Витрины оборотов

In [59]:
# Создание схемы DM
query = 'CREATE SCHEMA IF NOT EXISTS DM'
create(engine, query)

In [60]:
# Создание таблицы для хранения логов расчета витрин
query = '''CREATE TABLE IF NOT EXISTS LOGS.datamart_logs
(log_id serial PRIMARY KEY,
table_name varchar(100) not null,
mart_date date,
mart_type varchar(10) not null,
start_time timestamp with time zone not null,
end_time timestamp with time zone,
user_name varchar(100) not null)'''
create(engine, query)

In [61]:
# Создание процедуры расчета витрины оборотов на основании переданной даты
query = '''CREATE OR REPLACE PROCEDURE ds.fill_account_turnover_f(i_OnDate DATE)
LANGUAGE plpgsql
AS $$
DECLARE
    new_table_name TEXT;
BEGIN
    new_table_name := 'dm_account_turnover_f_' || TO_CHAR(i_OnDate, 'DDMMYY');
    
    IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = 'dm'
	           AND table_name = new_table_name) THEN
        EXECUTE format('TRUNCATE TABLE dm.%I', new_table_name);
    ELSE
        EXECUTE FORMAT('CREATE TABLE dm.%I
            (on_date date,
			account_rk integer,
			credit_amount numeric(23,8),
			credit_amount_rub numeric(23,8),
			debet_amount numeric(23,8),
			debet_amount_rub numeric(23,8))',
            new_table_name);
    END IF;

    INSERT INTO LOGS.datamart_logs(table_name, mart_date, mart_type, start_time, user_name)
	VALUES(new_table_name, i_OnDate, 'turnover', NOW(), current_user);
 
	EXECUTE FORMAT('INSERT INTO dm.%I
	WITH credits AS (SELECT oper_date, credit_account_rk, SUM(credit_amount) AS credit_amount,
                     SUM(COALESCE(credit_amount * reduced_cource, credit_amount)) AS credit_amount_rub
                     FROM DS.ft_posting_f f
                     LEFT JOIN DS.MD_ACCOUNT_D d ON credit_account_rk = account_rk
                     AND oper_date BETWEEN d.data_actual_date AND d.data_actual_end_date
                     LEFT JOIN DS.MD_EXCHANGE_RATE_D d2 ON d.currency_rk = d2.currency_rk
                     AND oper_date BETWEEN d2.data_actual_date AND d2.data_actual_end_date
                     WHERE oper_date = %L
                     GROUP BY oper_date, credit_account_rk, reduced_cource),

          debets AS (SELECT oper_date, debet_account_rk, SUM(debet_amount) AS debet_amount,
                     SUM(COALESCE(debet_amount * reduced_cource, debet_amount)) AS debet_amount_rub
                     FROM DS.ft_posting_f f
                     LEFT JOIN DS.MD_ACCOUNT_D d ON debet_account_rk = account_rk
                     AND oper_date BETWEEN d.data_actual_date AND d.data_actual_end_date
                     LEFT JOIN DS.MD_EXCHANGE_RATE_D d2 ON d.currency_rk = d2.currency_rk
                     AND oper_date BETWEEN d2.data_actual_date AND d2.data_actual_end_date
                     WHERE oper_date = %L
                     GROUP BY oper_date, debet_account_rk)

         SELECT COALESCE(c.oper_date, d.oper_date) AS on_date, COALESCE(credit_account_rk, debet_account_rk) AS account_rk, 
                COALESCE(credit_amount, 0) AS credit_amount, COALESCE(credit_amount_rub, 0) AS credit_amount_rub, 
                COALESCE(debet_amount, 0) AS debet_amount, COALESCE(debet_amount_rub, 0) AS debet_amount_rub
         FROM credits c
         FULL JOIN debets d ON credit_account_rk = debet_account_rk
         AND c.oper_date = d.oper_date', new_table_name, i_OnDate, i_OnDate);

	UPDATE LOGS.datamart_logs
	SET end_time = NOW()
	WHERE table_name = new_table_name
    AND log_id = (
       SELECT log_id
	   FROM LOGS.datamart_logs
	   WHERE table_name = new_table_name
	   ORDER BY log_id DESC
	   LIMIT 1);
END;
$$;'''
create(engine, query)

In [62]:
# Расчет витрин оборотов за каждый день с 1 по 31 января 2018 года
start = pd.to_datetime("01-01-2018").date()
end = pd.to_datetime("31-01-2018").date()
delta = datetime.timedelta(days=1)
with engine.connect() as connection:
    while start <= end:
        connection.execute(text(f'CALL ds.fill_account_turnover_f(:date)'), {'date': start})
        connection.commit()
        start += delta
    connection.close()

  end = pd.to_datetime("31-01-2018").date()


# 5. Витрины остатков

In [63]:
# Создание витрины остатков за 31.12.2017
query = '''CREATE TABLE IF NOT EXISTS DM.dm_account_balance_f_311217
(on_date date,
account_rk integer,
balance_out numeric(23,8),
balance_out_rub numeric(23,8));

TRUNCATE TABLE DM.dm_account_balance_f_311217;

INSERT INTO LOGS.datamart_logs(table_name, mart_date, mart_type, start_time, user_name)
VALUES('dm_account_balance_f_311217', '2017-12-31', 'balance', NOW(), current_user);
 
INSERT INTO DM.dm_account_balance_f_311217
SELECT on_date, account_rk, balance_out, 
COALESCE(balance_out * reduced_cource, balance_out) AS balance_out_rub
FROM DS.ft_balance_f f
LEFT JOIN DS.md_exchange_rate_d d ON f.currency_rk = d.currency_rk
AND on_date BETWEEN data_actual_date AND data_actual_end_date;

UPDATE LOGS.datamart_logs
SET end_time = NOW()
WHERE table_name = 'dm_account_balance_f_311217';
'''
create(engine, query)

In [64]:
# Создание процедуры расчета витрины остатков на основании переданной даты
query = '''CREATE OR REPLACE PROCEDURE ds.fill_account_balance_f(i_OnDate DATE)
LANGUAGE plpgsql
AS $$
DECLARE
    new_table_name TEXT;
	prev_table_name TEXT;
	turnover_table_name TEXT;
BEGIN
    new_table_name := 'dm_account_balance_f_' || TO_CHAR(i_OnDate, 'DDMMYY');
	prev_table_name := 'dm_account_balance_f_' || TO_CHAR(i_OnDate - INTERVAL '1 day', 'DDMMYY');
	turnover_table_name := 'dm_account_turnover_f_' || TO_CHAR(i_OnDate, 'DDMMYY');
    
    IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = 'dm'
	           AND table_name = new_table_name) THEN
        EXECUTE format('TRUNCATE TABLE dm.%I', new_table_name);
    ELSE
        EXECUTE FORMAT('CREATE TABLE dm.%I
            (on_date date,
            account_rk integer,
            balance_out numeric(23,8),
            balance_out_rub numeric(23,8))',
            new_table_name);
    END IF;

    INSERT INTO LOGS.datamart_logs(table_name, mart_date, mart_type, start_time, user_name)
	VALUES(new_table_name, i_OnDate, 'balance', NOW(), current_user);
 
	EXECUTE FORMAT('INSERT INTO dm.%I
	
    SELECT %L::date AS on_date, account_rk,
    COALESCE(COALESCE(balance_out, 0) + debet_amount - credit_amount, balance_out) AS balance_out,
    COALESCE(COALESCE(balance_out_rub, 0) + debet_amount_rub - credit_amount_rub, balance_out_rub) AS balance_out_rub
    FROM DS.md_account_d
    LEFT JOIN dm.%I USING(account_rk)
    LEFT JOIN dm.%I USING(account_rk)
    WHERE char_type = ''А''
    AND %L BETWEEN data_actual_date AND data_actual_end_date

    UNION

    SELECT %L::date AS on_date, account_rk,
    COALESCE(COALESCE(balance_out, 0) - debet_amount + credit_amount, balance_out) AS balance_out,
    COALESCE(COALESCE(balance_out_rub, 0) - debet_amount_rub + credit_amount_rub, balance_out_rub) AS balance_out_rub
    FROM DS.md_account_d
    LEFT JOIN dm.%I USING(account_rk)
    LEFT JOIN dm.%I USING(account_rk)
    WHERE char_type = ''П''
    AND %L BETWEEN data_actual_date AND data_actual_end_date
	
	UNION
	
	SELECT %L::date AS on_date, credit_account_rk AS account_rk, Null AS balance_out, Null AS balance_out_rub
    FROM DS.FT_POSTING_F
    WHERE credit_account_rk NOT IN (
        SELECT account_rk
        FROM DS.md_account_d
    )
    AND debet_account_rk NOT IN (
        SELECT account_rk
        FROM DS.md_account_d
    )

	UNION 

	SELECT %L::date AS on_date, debet_account_rk AS account_rk, Null AS balance_out, Null AS balance_out_rub
	FROM DS.ft_posting_f
	WHERE credit_account_rk NOT IN (
        SELECT account_rk
        FROM DS.md_account_d
    )
    AND debet_account_rk NOT IN (
        SELECT account_rk
        FROM DS.md_account_d
    )',
    new_table_name, 
    i_OnDate, prev_table_name, turnover_table_name, i_OnDate, 
    i_OnDate, prev_table_name, turnover_table_name, i_OnDate,
	i_OnDate, i_OnDate);

	UPDATE LOGS.datamart_logs
	SET end_time = NOW()
	WHERE table_name = new_table_name
    AND log_id = (
       SELECT log_id
	   FROM LOGS.datamart_logs
	   WHERE table_name = new_table_name
	   ORDER BY log_id DESC
	   LIMIT 1);
END;
$$;'''
create(engine, query)

In [65]:
# Расчет витрин остатков за каждый день с 1 по 31 января 2018 года
start = pd.to_datetime("01-01-2018").date()
end = pd.to_datetime("31-01-2018").date()
delta = datetime.timedelta(days=1)
with engine.connect() as connection:
    while start <= end:
        connection.execute(text(f'CALL ds.fill_account_balance_f(:date)'), {'date': start})
        connection.commit()
        start += delta
    connection.close()

  end = pd.to_datetime("31-01-2018").date()


# 6. Расчет формы 101

In [66]:
# Создание вспомогательной процедуры расчета остатков за день, предшествующий первому дню отчетного периода
query = '''CREATE OR REPLACE PROCEDURE dm.calculate_balance_in(i_OnDate DATE)
LANGUAGE plpgsql
AS $$
DECLARE
    balance_table_name TEXT;
BEGIN
    balance_table_name := 'dm_account_balance_f_' || TO_CHAR(i_OnDate, 'DDMMYY');
	
	CREATE TABLE dm.balance_in(
    ledger_account integer,
	balance_in_rub numeric(23,8),
	balance_in_val numeric(23,8),
	balance_in_total numeric(23,8)
	);

	EXECUTE FORMAT('INSERT INTO dm.balance_in
WITH rub AS (SELECT LEFT(account_number, 5)::integer AS ledger_account, 
SUM(balance_out_rub) AS balance_in_rub
FROM DS.md_account_d d
LEFT JOIN dm.%I USING(account_rk)
WHERE currency_code = ''810'' OR currency_code = ''643''
GROUP BY LEFT(account_number, 5)),

val AS (SELECT LEFT(account_number, 5)::integer AS ledger_account, 
SUM(balance_out) AS balance_in_val
FROM DS.md_account_d d
LEFT JOIN dm.%I USING(account_rk)
WHERE currency_code != ''810'' AND currency_code != ''643''
GROUP BY LEFT(account_number, 5)),

total AS (SELECT LEFT(account_number, 5)::integer AS ledger_account, 
SUM(balance_out_rub) AS balance_in_total
FROM DS.md_account_d d
LEFT JOIN dm.%I USING(account_rk)
GROUP BY LEFT(account_number, 5))

SELECT ledger_account, 
COALESCE(balance_in_rub, 0) AS balance_in_rub, 
COALESCE(balance_in_val, 0) AS balance_in_val,
balance_in_total
FROM total
LEFT JOIN rub USING(ledger_account)
LEFT JOIN val USING(ledger_account)',
balance_table_name, balance_table_name, balance_table_name);

END;
$$;'''
create(engine, query)

In [67]:
# Создание вспомогательной процедуры расчета оборотов за все дни отчетного периода
query = '''CREATE OR REPLACE PROCEDURE dm.calculate_turns(period_start DATE, period_end DATE)
LANGUAGE plpgsql
AS $$
DECLARE
    turnover_table TEXT;
	cur_date DATE;
BEGIN
    CREATE TABLE dm.turns(
    ledger_account integer,
	turn_deb_rub numeric(23,8),
	turn_deb_val numeric(23,8),
	turn_deb_total numeric(23,8),
	turn_cre_rub numeric(23,8),
	turn_cre_val numeric(23,8),
	turn_cre_total numeric(23,8)
	);

	CREATE TEMP TABLE temp (LIKE dm.turns INCLUDING DEFAULTS) ON COMMIT DROP;

    cur_date := period_start;
	
	WHILE cur_date <= period_end LOOP
    turnover_table := 'dm_account_turnover_f_' || TO_CHAR(cur_date, 'DDMMYY');

	EXECUTE FORMAT('INSERT INTO temp
	WITH rub AS (SELECT LEFT(account_number, 5)::integer AS ledger_account, 
	SUM(credit_amount) AS turn_cre_rub, SUM(debet_amount) AS turn_deb_rub
	FROM DS.md_account_d d
	LEFT JOIN dm.%I USING(account_rk)
	WHERE currency_code = ''810'' OR currency_code = ''643''
	GROUP BY LEFT(account_number, 5)),
	
	val AS (SELECT LEFT(account_number, 5)::integer AS ledger_account, 
	SUM(credit_amount) AS turn_cre_val, SUM(debet_amount) AS turn_deb_val
	FROM DS.md_account_d d
	LEFT JOIN dm.%I USING(account_rk)
	WHERE currency_code != ''810'' AND currency_code != ''643''
	GROUP BY LEFT(account_number, 5)),
	
	total AS (SELECT LEFT(account_number, 5)::integer AS ledger_account, 
	SUM(credit_amount_rub) AS turn_cre_total, SUM(debet_amount_rub) AS turn_deb_total
	FROM DS.md_account_d d
	LEFT JOIN dm.%I USING(account_rk)
	GROUP BY LEFT(account_number, 5))
	
	SELECT ledger_account, 
	COALESCE(turn_deb_rub, 0) AS turn_deb_rub, COALESCE(turn_deb_val, 0) AS turn_deb_val, 
	COALESCE(turn_deb_total, 0) AS turn_deb_total, COALESCE (turn_cre_rub , 0) AS turn_cre_rub,
	COALESCE(turn_cre_val, 0) AS turn_cre_val, COALESCE(turn_cre_total, 0) AS turn_cre_total
	FROM total
	LEFT JOIN rub USING(ledger_account)
	LEFT JOIN val USING(ledger_account)',
	turnover_table, turnover_table, turnover_table);
	
	cur_date := cur_date + INTERVAL '1 day';
	
END LOOP;
    INSERT INTO dm.turns
	SELECT ledger_account,
	SUM(turn_deb_rub) AS turn_deb_rub, SUM(turn_deb_val) AS turn_deb_val, 
	SUM(turn_deb_total) AS turn_deb_total, SUM(turn_cre_rub) AS turn_cre_rub,
	SUM(turn_cre_val) AS turn_cre_val, SUM(turn_cre_total) AS turn_cre_total
	FROM temp
	GROUP BY ledger_account;
END;
$$;'''
create(engine, query)

In [68]:
# Создание вспомогательной процедуры расчета остатков за последний день отчетного периода
query = '''CREATE OR REPLACE PROCEDURE dm.calculate_balance_out(i_OnDate DATE)
LANGUAGE plpgsql
AS $$
DECLARE
    balance_table_name TEXT;
BEGIN
    balance_table_name := 'dm_account_balance_f_' || TO_CHAR(i_OnDate, 'DDMMYY');
	
	CREATE TABLE dm.balance_out(
    ledger_account integer,
	balance_out_rub numeric(23,8),
	balance_out_val numeric(23,8),
	balance_out_total numeric(23,8)
	);

	EXECUTE FORMAT('INSERT INTO dm.balance_out
WITH rub AS (SELECT LEFT(account_number, 5)::integer AS ledger_account, 
SUM(balance_out_rub) AS balance_out_rub
FROM DS.md_account_d d
LEFT JOIN dm.%I USING(account_rk)
WHERE currency_code = ''810'' OR currency_code = ''643''
GROUP BY LEFT(account_number, 5)),

val AS (SELECT LEFT(account_number, 5)::integer AS ledger_account, 
SUM(balance_out) AS balance_out_val
FROM DS.md_account_d d
LEFT JOIN dm.%I USING(account_rk)
WHERE currency_code != ''810'' AND currency_code != ''643''
GROUP BY LEFT(account_number, 5)),

total AS (SELECT LEFT(account_number, 5)::integer AS ledger_account, 
SUM(balance_out_rub) AS balance_out_total
FROM DS.md_account_d d
LEFT JOIN dm.%I USING(account_rk)
GROUP BY LEFT(account_number, 5))

SELECT ledger_account, 
COALESCE(balance_out_rub, 0) AS balance_out_rub, 
COALESCE(balance_out_val, 0) AS balance_out_val,
balance_out_total
FROM total
LEFT JOIN rub USING(ledger_account)
LEFT JOIN val USING(ledger_account)',
balance_table_name, balance_table_name, balance_table_name);

END;
$$;'''
create(engine, query)

In [69]:
# Создание основной процедуры для расчеты формы 101
query = '''CREATE OR REPLACE PROCEDURE dm.fill_f101_round_f(i_OnDate DATE)
LANGUAGE plpgsql
AS $$
DECLARE
    from_date date;
	to_date date;
	prev_date date;
	new_table_name text;
BEGIN
    to_date := DATE_TRUNC('month', i_OnDate::date)::date - INTERVAL '1 day';
	from_date := DATE_TRUNC('month', to_date::date)::date;
	prev_date := DATE_TRUNC('month', from_date::date)::date - INTERVAL '1 day';

	new_table_name := 'dm_f101_round_f_' || TO_CHAR(to_date, 'MMYY');
    
    IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = 'dm'
	           AND table_name = new_table_name) THEN
        EXECUTE format('TRUNCATE TABLE dm.%I', new_table_name);
    ELSE
	   EXECUTE format('CREATE TABLE IF NOT EXISTS DM.%I(
                       from_date date,
                       to_date date,
                       chapter char(1),
                       ledger_account char(5),
                       characteristic char(1),
                       balance_in_rub numeric(23,8),
                       balance_in_val numeric(23,8),
                       balance_in_total numeric(23,8),
                       turn_deb_rub numeric(23,8),
                       turn_deb_val numeric(23,8),
                       turn_deb_total numeric(23,8),
                       turn_cre_rub numeric(23,8),
                       turn_cre_val numeric(23,8),
                       turn_cre_total numeric(23,8),
                       balance_out_rub numeric(23,8),
                       balance_out_val numeric(23,8),
                       balance_out_total numeric(23,8))', new_table_name);
    END IF;

	INSERT INTO LOGS.datamart_logs(table_name, mart_date, mart_type, start_time, user_name)
	VALUES(new_table_name, i_OnDate, 'form101', NOW(), current_user);

	CALL dm.calculate_balance_in(prev_date);
	CALL dm.calculate_turns(from_date, to_date);
	CALL dm.calculate_balance_out(to_date);

	EXECUTE format('INSERT INTO DM.%I
	SELECT %L::date AS from_date, %L::date AS to_date, chapter, ledger_account, characteristic,
	balance_in_rub, balance_in_val, balance_in_total,
	turn_deb_rub, turn_deb_val, turn_deb_total, turn_cre_rub, turn_cre_val, turn_cre_total,
	balance_out_rub, balance_out_val, balance_out_total
	FROM DS.md_ledger_account_s
	LEFT JOIN DS.md_account_d d ON ledger_account = LEFT(account_number, 5)::integer
	LEFT JOIN dm.balance_in USING(ledger_account)
	LEFT JOIN dm.turns USING(ledger_account)
	LEFT JOIN dm.balance_out USING(ledger_account)
	GROUP BY from_date, to_date, chapter, ledger_account, characteristic,
	balance_in_rub, balance_in_val, balance_in_total,
	turn_deb_rub, turn_deb_val, turn_deb_total, turn_cre_rub, turn_cre_val, turn_cre_total,
	balance_out_rub, balance_out_val, balance_out_total', 
	new_table_name, from_date, to_date);

	UPDATE LOGS.datamart_logs
	SET end_time = NOW()
	WHERE table_name = new_table_name
	AND log_id = (
       SELECT log_id
	   FROM LOGS.datamart_logs
	   WHERE table_name = new_table_name
	   ORDER BY log_id DESC
	   LIMIT 1);

	DROP TABLE IF EXISTS dm.balance_in;
	DROP TABLE IF EXISTS dm.turns;
	DROP TABLE IF EXISTS dm.balance_out;
	
END;
$$;'''
create(engine, query)

In [70]:
# Расчет формы 101 за январь 2018
report_date = pd.to_datetime('01-02-2018', dayfirst = True).date()
with engine.connect() as connection:
    connection.execute(text(f'CALL dm.fill_f101_round_f(:date)'), {'date': report_date})
    connection.commit()
    connection.close()