## Навигация
- **[Функции обработки данных](#Функции-обработки-данных)**
- **[Трансформация данных](#Трансформация-данных)**
- **[Сборка для Импорт](#Сборка-для-Импорт)**
- **[Сборка для Экспорт](#Сборка-для-Экспорт)**
- **[Сборка основного датасета](#Сборка-основного-датасета)**
- **[Сохраняем данные в БД](#Сохраняем-данные-в-БД)**

In [1]:
import pandas as pd
import numpy as np
import os
from pathlib import Path 

from tqdm import tqdm
import warnings
warnings.simplefilter('ignore') 

# Для автоматического закрытия курсора
from contextlib import closing

import psycopg2
from psycopg2 import Error
from sqlalchemy import create_engine

import json
from datetime import datetime

from dotenv import load_dotenv

# Сброс ограничений на число столбцов
pd.set_option('display.max_columns', None)

In [2]:
dotenv_path = './.env'
if os.path.exists(dotenv_path):
    load_dotenv(dotenv_path)

In [3]:
dict_postgres_cred = {'user': os.getenv('USER_NAME_PG'),
                      'password': os.getenv('PASSWORD_PG'),
                      'host': os.getenv('HOST_PG'),
                      'port': os.getenv('PORT_PG'),
                      'database': os.getenv('DATABASE_PG')}

In [4]:
# Инициализация подключений для работы с БД
engine = psycopg2.connect(user=dict_postgres_cred['user'],
                          password=dict_postgres_cred['password'],
                          host=dict_postgres_cred['host'],
                          port=dict_postgres_cred['port'],
                          database=dict_postgres_cred['database'])
conn = create_engine('postgresql://{}:{}@{}:{}/{}'
                     .format(dict_postgres_cred['user'], dict_postgres_cred['password'], 
                             dict_postgres_cred['host'], dict_postgres_cred['port'], dict_postgres_cred['database']))


In [5]:
# Функция для переименования всех столбцов
def rep(name):
    return str(name) 

def rep_2(name):
    return str(name).replace('-', '_') 

def rep_3(name):
    return str(name).replace(' ', '_') 

In [6]:
# Словаврь для партнера
query_country_add = f"""

SELECT code, name_itc FROM add

"""
df_country_add = pd.read_sql(query_country_add, con=engine)

# Подготовим колонку для мержа
df_country_add['test_name'] = df_country_add.name_itc.apply(lambda x: x.replace(',', ''))

# Чистим данные
df_country_add.drop_duplicates(subset=['name_itc'], inplace=True)

In [7]:
# Словарь для присовения кодов Репортеру
dict_partner_code = {}
dct_itc_and_test_name = {}
for code_itc, name_test in zip(list(df_country_add.code), list(df_country_add.test_name)):
    if name_test not in dict_partner_code:
        dict_partner_code[name_test] = code_itc
    else:
        continue
# Для сопоставления наших названий стран и стран из itc
for itc_name, name_test in zip(list(df_country_add.name_itc), list(df_country_add.test_name)):
    if name_test not in dct_itc_and_test_name:
        dct_itc_and_test_name[name_test] = itc_name
    else:
        continue

In [8]:
def get_need_tnved_code() -> dict:
    """
    Возвращает набор необходимых кодов ТНВЭД

    :return: словарь с кодами (пока что на 6 знаках 'code_6')
    """
   
    dict_return = {'code_6': []}
    with conn.connect() as connection:
        rez_query = connection.execute("""SELECT DISTINCT(LEFT(code,6)) 
                                               FROM ed 
                                               WHERE type = 10 AND prod_type = 'apk' AND LEFT(code, 2)::int > 24""")
    for code in rez_query.fetchall():
        code_cleare = code[0]
        dict_return['code_6'].append(code_cleare)
    return dict_return

dict_need_tnved_code_apk = get_need_tnved_code()

# Функции обработки данных

In [9]:
def trade_value_build(path_values: str, type_operation: int) -> pd.DataFrame:
    """
    Собирает все файлы в один датафрейм для переданного направления торговли
    по переданному пути
    
    :param path_values: путь к файлам для TRADE_VALUE
    
    :param type_operation: тип операции (импорт - 1 или экспорт - 2)
    
    :return: очищенный датафрейм с данными по TRADE_VALUE
    """
    # Объект типа Path для TRADE_VALUE
    way_pah_values = Path(path_values)

    # Пустой датафрейм для сборки всех значений trade_value
    void_df_value = pd.DataFrame()

    # Сбор файлов trade_value
    for i in tqdm(way_pah_values.glob("**/Tra*Bil*.txt")):

        if flag_and_in_reporter_name:
            # Если в названии репортера присутствует _and_ : Antigua_and_Barbuda
            reporter = ' and '.join(str(i).split('_between_')[1].split('_and_')[:2]).replace('_', ' ').replace('  ', ' ')
            partner = ' and '.join(str(i).split('_between_')[1].split('_and_')[2:]).replace('.txt', '').replace('_', ' ').replace('  ', ' ')  
        else:
            # Если репортер без _and_ : Cabo_Verde
            reporter = str(i).split('_between_')[1].split('_and_')[0].replace('_', ' ').replace('  ', ' ')
            partner = ' and '.join(str(i).split('_between_')[1].split('_and_')[1:]).replace('.txt', '').replace('_', ' ').replace('  ', ' ')

        temp_df = pd.read_table(i, dtype={'Product code': 'str'})

        # Получаем нужное количество колонок
        need_max_year = max([int(i.split(' in ')[-1]) for i in temp_df.columns.tolist() if ' in ' in i])
        for temp in range(len(list(temp_df.columns))):
            if f'-Value in {need_max_year}' in list(temp_df.columns)[temp]:
                temp_number_columns = temp + 1
                break
        if partner == 'World':
            temp_df = temp_df.drop(columns=list(temp_df.iloc[:, 2:16].columns)).iloc[:, 0:14]
        else:
            temp_df = temp_df.iloc[:,0:temp_number_columns]

        # Расплавляем датасет
        temp_df = temp_df.melt(id_vars=['Product code', 'Product label'])
        temp_df.rename(columns = rep_3, inplace=True)
        
        # Отсекаем не нужное
        temp_df.fillna(0, inplace=True)
        temp_df = temp_df.query('value > 0 and Product_code != "TOTAL"')
        temp_df = temp_df[
            (temp_df['Product_code'].str[:2].astype(int) <= 24) |
            (temp_df['Product_code'].str[:2].astype(int) == 31) |
            (temp_df['Product_code'].str[:6].isin(dict_need_tnved_code_apk['code_6']))
            ]
        
       
        # Добавляем столбцы
        temp_df = temp_df.assign(reporter_country = reporter ,partner_country = partner, trade_flow_code = type_operation, 
                                classification='HS', update_date=datetime.now().strftime('%Y-%m-%d'))
        temp_df['year_transaction'] = temp_df.variable.apply(lambda x: x.split(' in ')[1])
        temp_df['period'] = temp_df.year_transaction.apply(lambda x: datetime.strptime('01-01-' + x, '%d-%m-%Y'))
        temp_df['aggregate_level'] = 6
        temp_df['flag'] = 0
        temp_df['plus'] = 0
        temp_df['load_mark'] = 1
        temp_df['value'] = temp_df.value.mul(1000)
        void_df_value = pd.concat((void_df_value, temp_df))

    # Создаем колонку для мержа
    void_df_value['test_name'] = void_df_value.partner_country.apply(lambda x: x)
    # Удаляем лишнее
    void_df_value.drop(columns='variable', inplace=True)

    # Мержим наш void_df_value с COUNTRY_ADD
    df_merge = void_df_value.merge(df_country_add, how='left',  on='test_name')

    df_merge.rename(columns={'code': 'partner_code'}, inplace=True)
    # В зависимости от страны проставляем код
    df_merge['reporter_code'] = df_merge.reporter_country.apply(lambda x: dict_partner_code[x])
    
    return df_merge

In [10]:
# Класс для проверки загрузки данных в БД и их очистку
class Check_zero_in_db_value:
    
    def __init__(self, reporter_code, engine_class, df):
        """
        reporter_code: код репортера
        engine_class: движок подключения к БД
        df: полученный датафрейм на этапе обработки данных
        """
        self.reporter_code = reporter_code
        self.engine_class = engine_class
        self.df = df
    
    def get_count_rows(self, need_year=None):
        """
        need_year: минимально максимальный год из ранее загруженных данных в БД
        return: количество строк по конкретному репортеру в БД
        """
        # Если такой год присутствует, то вернуть рассчет без его учета
        # Для корректной валидации
        if need_year:
            return pd.read_sql(f"""SELECT COUNT(year) as count_rows 
                                   FROM tl 
                                   WHERE reporter_code = {self.reporter_code} AND year > {need_year}""",
                            con=self.engine_class).count_rows[0]
        # Если данных нет или они полностью совпадают, то посчитать все строки
        else:
            return pd.read_sql(f"""SELECT COUNT(year) as count_rows 
                                   FROM tl 
                                   WHERE reporter_code = {self.reporter_code}""",
                            con=self.engine_class).count_rows[0]
    
    def get_min_year_in_db(self, need_year):
        """
        need_year: минимальный год из датафрейма
        return: максимальный год из всех годов, которые меньше need_year или need_year если таковых нет
        """
        year_list = pd.read_sql(f"""SELECT DISTINCT(year) AS year 
                                    FROM tl
                                    WHERE reporter_code = {self.reporter_code} AND year < {need_year}
                                    ORDER BY year""",
                        con=self.engine_class).year
        return need_year if year_list.shape[0]  == 0 else year_list.max()
    
    def get_min_year_in_df(self):
        """
        return: минимальный год из датафрейма
        """
        return min([int(y) for y in self.df.year.unique()])
        
    def delete_need_value(self):
        """
        return: True если очистка прошла успешно, иначе False
        """
        if (self.get_min_year_in_df() == self.get_min_year_in_db(self.get_min_year_in_df())) or \
            (self.get_min_year_in_db(self.get_min_year_in_df()) > self.get_min_year_in_df()):
                
            with self.engine_class.cursor() as cr:
                cr.execute(f"""DELETE FROM tl 
                               WHERE reporter_code = {self.reporter_code}""")
                self.engine_class.commit()
            return self.get_count_rows() == 0
        
        elif self.get_min_year_in_db(self.get_min_year_in_df()) < self.get_min_year_in_df():

            with self.engine_class.cursor() as cr:
                cr.execute(f"""DELETE FROM tl 
                               WHERE reporter_code = {self.reporter_code} 
                               AND year > {self.get_min_year_in_db(self.get_min_year_in_df())}""")
                self.engine_class.commit()
            return self.get_count_rows(self.get_min_year_in_db(self.get_min_year_in_df())) == 0
        
    def check_value(self):
        if self.get_count_rows() == 0:
            return True
        return self.delete_need_value()

# Трансформация данных

In [11]:
# Для формирования путей к нужным папкам с файлами
type_flow_import = 'Imports'
type_flow_export = 'Exports'
value = 'Values'

In [12]:
# "Kazakhstan", "Türkiye"

In [48]:
# Подставить нужного репортера
reporter_name = 'Türkiye'
# Флаг. Если в имени репортера присутствует and (Antigua_and_Barbuda) Присвоить значение True, иначе False
flag_and_in_reporter_name = False

In [49]:
# 'Imports' = 1 'Exports' = 2
# Ссылки для Импорта
path_values_import = os.path.join(os.getcwd(), f"""{reporter_name}_{type_flow_import}_{value}_Tariff""")

# Ссылки для Экспорта
path_values_export = os.path.join(os.getcwd(), f"""{reporter_name}_{type_flow_export}_{value}_Tariff""")


In [50]:
# Удаляем скаченные дубликаты, если такие есть
# Для trade_value Imports
for val in Path(path_values_import).glob("**/*(1)*.txt"):
    print(val)
    val.unlink()

In [51]:
# Удаляем скаченные дубликаты, если такие есть
# Для trade_value Exports
for val in Path(path_values_export).glob("**/*(1)*.txt"):
    print(val)
    val.unlink()

# Сборка для Импорт

In [52]:
# Датафрейм Импорт tarde_value
df_trade_value_import = trade_value_build(path_values_import, 1)

254it [00:42,  5.98it/s]


In [53]:
df_trade_value_import = df_trade_value_import[['Product_code', 'Product_label', 'value', 'reporter_country', 'partner_country', 'trade_flow_code',
                   'classification', 'update_date', 'year_transaction', 'period', 'aggregate_level', 'flag', 'plus', 'load_mark',
                   'partner_code', 'reporter_code', 'name_itc']]

df_trade_value_import.rename(columns={'Product_code': 'commodity_code', 'value': 'trade_value', 'year_transaction': 'year'}, 
                      inplace=True)

# Сборка для Экспорт

In [54]:
# Датафрейм Экспорт tarde_value
df_trade_value_export = trade_value_build(path_values_export, 2)

254it [00:47,  5.29it/s]


In [55]:
df_trade_value_export = df_trade_value_export[['Product_code', 'Product_label', 'value', 'reporter_country', 'partner_country', 'trade_flow_code',
                   'classification', 'update_date', 'year_transaction', 'period', 'aggregate_level', 'flag', 'plus', 'load_mark',
                   'partner_code', 'reporter_code', 'name_itc']]

df_trade_value_export.rename(columns={'Product_code': 'commodity_code', 'value': 'trade_value', 'year_transaction': 'year'}, 
                      inplace=True)

### [⬅ Навигация](#Навигация)

# Сборка основного датасета

In [56]:
# Собираем все данные в один датафрейм
df_all_data = pd.concat((df_trade_value_import, df_trade_value_export))

In [57]:
# Выбираем нужные столбцы в нужном порядке
df_all_data = df_all_data[['classification', 'year', 'period', 'aggregate_level', 'trade_flow_code', 'reporter_code', 
                       'partner_code', 'commodity_code', 'trade_value', 'flag', 'plus', 'load_mark', 'update_date']]

In [58]:
df_all_data['qty_unit_code'] = 0
df_all_data['qty'] = 0
df_all_data['netweight'] = 0
df_all_data['update_date'] = datetime.now().strftime('%Y-%m-%d')
df_all_data['customs_proc_code'] = 'C00'
df_all_data['mode_of_transport_code'] = '0'
df_all_data['partner_code_2nd'] = 0
df_all_data.shape

(547577, 19)

In [60]:
# ПОТОМ УДАЛИТЬ
df_all_data['region_code'] = 'NNNNN'

In [61]:
df_all_data.isna().sum()

classification            0
year                      0
period                    0
aggregate_level           0
trade_flow_code           0
reporter_code             0
partner_code              0
commodity_code            0
trade_value               0
flag                      0
plus                      0
load_mark                 0
update_date               0
qty_unit_code             0
qty                       0
netweight                 0
customs_proc_code         0
mode_of_transport_code    0
partner_code_2nd          0
region_code               0
dtype: int64

### [⬅ Навигация](#Навигация)

# Сохраняем данные в БД

In [64]:
# Получаем код репортера
reporter_code_for_check = df_all_data.reporter_code.unique()[0]

In [65]:
# Перед записью в БД проверим года
sorted(df_all_data.year.unique().tolist())

['2013',
 '2014',
 '2015',
 '2016',
 '2017',
 '2018',
 '2019',
 '2020',
 '2021',
 '2022',
 '2023',
 '2024']

In [66]:
# Запись в партиции БД
if Check_zero_in_db_value(reporter_code_for_check, engine, df_all_data).check_value():
    print(f'Загружаем репортера {reporter_code_for_check}')
    for year_cycle in df_all_data.year.unique().tolist():

        df_data_in_bd = df_all_data.query('year == @year_cycle')
        print(f'{year_cycle}, количество строк: {df_data_in_bd.shape[0]}')
        df_data_in_bd.to_sql(f'_{year_cycle}', 
                             con=conn, 
                             schema='', 
                             if_exists='append', index=False)
else:
    print('Старые данные не очищенны')

Загружаем репортера 792
2013, количество строк: 32558
2014, количество строк: 37319
2015, количество строк: 37233
2016, количество строк: 37649
2017, количество строк: 39745
2018, количество строк: 40740
2019, количество строк: 41200
2020, количество строк: 50149
2021, количество строк: 56349
2022, количество строк: 57380
2023, количество строк: 58048
2024, количество строк: 59207


In [67]:
Check_zero_in_db_value(reporter_code_for_check, engine, df_all_data).get_min_year_in_db(df_all_data.year.min())

'2013'

In [68]:
# Если данные уже были в базе передать в параметр get_count_rows год
# Пример, у нас в датафрейме минимальный год 2011, но в базе есть 2009 и 2010 года (их мы не обновляем)
# Для корретной валидации нужно передать 2010 год(максимальный год, который есть в базе, но нет в датафрейме)
df_all_data.shape[0] ==  Check_zero_in_db_value(reporter_code_for_check, engine, df_all_data).get_count_rows(2012)

True

-------

### [⬅ Навигация](#Навигация)