In [20]:
import datetime
import os
import pandas as pd
import psycopg2
from sqlalchemy import text
import sqlalchemy
import time
import threading
import xml.etree.ElementTree as ET
from typing import Callable
from loguru import logger
from functools import wraps


#### Подключение к БД

In [21]:
connection_string = 'postgresql+psycopg2://postgres:5555@db.mpkazantsev.ru/demo'

engine = sqlalchemy.create_engine(connection_string)


#### Константы

In [4]:
TAGS = [
    'MONTHLY_DETAIL',
    'LOANS_OVERVIEW',
    'LOAN',
    'MAIN',
    'NAME',
    'SCORE',
    'FRAUD',
]

TABLE_NAMES = [
    'singleformattype',
    'monthlydetailtype',
    'loansoverviewtype',
    'loanstype',
    'maintype',
    'nametype',
    'scoretype',
    'fraudtype',
]

TAGS_TO_TABLE_NAMES_MAPPING = {
    'MONTHLY_DETAIL': 'monthlydetailtype',
    'LOANS_OVERVIEW': 'loansoverviewtype',
    'LOAN': 'loanstype',
    'MAIN': 'maintype',
    'NAME': 'nametype',
    'SCORE': 'scoretype',
    'FRAUD': 'fraudtype',
}

FIELD_NAMES_TO_EXCLUDE = ['cbtypecode', 'nextpmtprincipal']

TABLES_HJID = {
    'monthlydetailtype': 'loan_id',
    'loansoverviewtype': 'hjid',
    'loanstype': 'loanstypes_loan_hjid',
    'maintype': 'hjid',
    'nametype': 'nametypes_name__hjid',
    'scoretype': 'scoretypes_score_hjid',
    'fraudtype': 'fraudtypes_fraud_hjid',
}


def get_table_name_by_tag(tag: str):
    return TAGS_TO_TABLE_NAMES_MAPPING[tag]


def get_tag_to_table_types_dict():
    xml_root = ET.parse('./resources/SingleFormat.xsd').getroot()
    result_dict = {}
    for tag in TAGS:
        table_type = xml_root.findall(".//{http://www.w3.org/2001/XMLSchema}element[@name='" + tag + "']")[0].attrib['type']
        table_types_dict = {}
        for element in xml_root.findall(".//{http://www.w3.org/2001/XMLSchema}complexType[@name='" + table_type
                                        + "']/{http://www.w3.org/2001/XMLSchema}sequence/{http://www.w3.org/2001/XMLSchema}element"):
            element_name = element.attrib['name']
            element_type = element.attrib['type']
            if element_type[:3] == "xs:":
                element_type = element_type[3:]
            table_types_dict[element_name.lower().replace('_', '')] = element_type.lower()
        result_dict[tag] = table_types_dict
    return result_dict


TAG_TO_TABLE_TYPES_DICT = get_tag_to_table_types_dict()


#### Возвращает словарь, в котором ключ - тэг, значение - словарь, сопоставляющий имя столбоцов при парсинге к именам столбцов в БД

In [5]:
def get_tag_to_row_names_to_db_names_dict(engine: sqlalchemy.engine.base.Engine):
    result_dict = {}
    for tag in TAGS:
        table_name = get_table_name_by_tag(tag)
        describe_query = "SELECT column_name, data_type " \
                         "FROM information_schema.columns " \
                         f"WHERE table_name = '{table_name}';"
        df = pd.read_sql_query(describe_query, engine)
        row_names_to_db_names_dict = {}
        for _, row in df[['column_name']].iterrows():
            row_names_to_db_names_dict[row['column_name'].lower().replace('_', '')] = row['column_name']
        result_dict[tag] = row_names_to_db_names_dict
    return result_dict

TAG_TO_ROW_NAMES_TO_DB_NAMES_DICT = get_tag_to_row_names_to_db_names_dict(engine)


#### Вспомогательные функции

In [6]:
def timeit(func: Callable, args_to_print=None):
    """Декоратор, замеряющий время работы"""
    if args_to_print is None:
        args_to_print = []
    @wraps(func)
    def timeit_wrapper(*args, **kwargs):
        start_time = time.perf_counter()
        result = func(*args, **kwargs)
        end_time = time.perf_counter()
        total_time = end_time - start_time
        args_to_print_with_values = {k: v for k, v in kwargs.items() if k in args_to_print}
        logger.info(f'Function {func.__name__} called with {args_to_print_with_values} took {total_time:.4f} seconds')
        return result
    return timeit_wrapper


def get_dict_from_lists(keys_list: list, values_list: list):
    """Возвращает словарь из двух списков"""
    return {key: value for key, value in zip(keys_list, values_list)}


#### Алгоритм парсинга

In [7]:
def add_row(table_name_to_rows_dict: dict, table_name: str, row_to_append: dict):
    table_name_to_rows_dict[table_name].append(row_to_append)


def get_field_value(expected_type: str, field_name: str, str_value: str):
    if expected_type is None:
        return None
    elif expected_type == 'int' or field_name == 'recentlegalupdatedate':  # костыль на interestrate пока в БД поле int а не float
        return int(str_value)
    elif field_name == 'interestrate':
        return int(str_value.split('.')[0])
    elif expected_type == 'float' or expected_type == 'moneyvaluetype':
        return float(str_value)
    else:
        return str_value


def get_row_from_sf_item(sf_item: ET.Element,
                         table_name: str,
                         hjid: int,
                         tables_current_hjid: dict):
    field_values = []
    field_names = []
    for sf_subitem in sf_item:
        field_name = sf_subitem.tag.lower().replace('_', '')
        if len(sf_subitem) > 0 \
                or field_name in FIELD_NAMES_TO_EXCLUDE \
                or TAG_TO_ROW_NAMES_TO_DB_NAMES_DICT[sf_item.tag].get(field_name) is None:
            continue
        field_value = get_field_value(expected_type=TAG_TO_ROW_NAMES_TO_DB_NAMES_DICT[sf_item.tag].get(field_name),
                                      field_name=field_name,
                                      str_value=sf_subitem.text)
        if field_value is not None:
            field_names.append(field_name)
            if isinstance(field_value, int) or isinstance(field_value, float):
                field_values.append(field_value)
            else:
                field_values.append("'" + field_value + "'")

    field_names.append('hdp_datetime')
    current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    field_values.append("'" + current_time + "'")

    if TABLES_HJID[table_name] != 'hjid':
        field_for_hjid_name = table_name + "_hjid"
        tables_current_hjid[field_for_hjid_name] = tables_current_hjid.get(field_for_hjid_name, 0) + 1
        field_names.append('hjid')
        field_values.append(tables_current_hjid.get(field_for_hjid_name))

    if table_name != 'monthlydetailtype':
        field_names.append(TABLES_HJID[table_name])
        field_values.append(hjid)
    else:
        field_names.append('loan_id')
        field_values.append(tables_current_hjid.get('loanstype_hjid'))
    return get_dict_from_lists(keys_list=field_names, values_list=field_values)


def parse_tag_in_xml_file(xml_root: ET.Element,
                          hjid: int,
                          table_name_to_rows_dict: dict,
                          tables_current_hjid: dict,
                          tag: str):  # процедура парсинга тега в XML
    table_name = get_table_name_by_tag(tag)
    sf_items = xml_root.findall('.//' + tag)
    for sf_item in sf_items:
        row_to_append = get_row_from_sf_item(sf_item, table_name, hjid, tables_current_hjid)
        add_row(table_name_to_rows_dict, table_name, row_to_append)
        if tag == 'LOAN':
            md_sf_items = sf_item.findall(".//MONTHLY_DETAIL")
            for md_sf_item in md_sf_items:
                md_table_name = get_table_name_by_tag(md_sf_item.tag)
                row_to_append = get_row_from_sf_item(md_sf_item, md_table_name, hjid, tables_current_hjid)
                add_row(table_name_to_rows_dict, md_table_name, row_to_append)


def parse_xml_file(path_to_xml_file: str,
                   hjid: int,
                   table_name_to_df_dict: dict,
                   tables_current_hjid: dict):
    xml_root = ET.parse(path_to_xml_file).getroot()
    for tag in TAGS:  # парсим данные по тегу и накапливаем во фрейм
        if tag == 'MONTHLY_DETAIL':
            continue
        parse_tag_in_xml_file(xml_root, hjid, table_name_to_df_dict, tables_current_hjid, tag)  # парсим тег в файле


##### Функция, пересоздающая таблицы в БД

In [18]:
def recreate_tables(prefix_for_tables_to_create: str,
                    prefix_for_tables_from_which_to_create: str,
                    engine: sqlalchemy.engine.base.Engine):
    with engine.begin() as conn:
        for table_name in TABLE_NAMES:
            drop_query = f"DROP TABLE IF EXISTS {prefix_for_tables_to_create}{table_name}"
            conn.execute(text(drop_query))
            create_query = f"CREATE TABLE IF NOT EXISTS {prefix_for_tables_to_create}{table_name} AS " \
                           f"SELECT * FROM {prefix_for_tables_from_which_to_create}{table_name} WHERE 1<>1" #adm.sf_
            conn.execute(text(create_query))
            logger.info(f"{prefix_for_tables_to_create}{table_name} was recreated.")


##### Функция, осущ. сохранение списка из словаря в БД

In [9]:
def execute_insert_query(engine, insert_query, table_name_with_prefix):
    with engine.begin() as conn:
       query_result = conn.execute(text(insert_query))
       logger.info(f"{query_result.rowcount} were inserted into {table_name_with_prefix} table.")


def save_rows_to_db(list_of_rows: list,
                    tag: str,
                    table_name_with_prefix: str,
                    engine: sqlalchemy.engine.base.Engine,
                    threads: list):
    if len(list_of_rows) > 0:
        list_of_column_names = list(TAG_TO_ROW_NAMES_TO_DB_NAMES_DICT[tag].keys())
        list_of_str_for_values = []
        for row_dict in list_of_rows:
            row_to_append_in_str = ','.join(list(
                map(lambda x: str(row_dict.get(x)) if row_dict.get(x) is not None else 'Null', list_of_column_names)))
            list_of_str_for_values.append("(" + row_to_append_in_str + ")")
        insert_query = f"INSERT INTO {table_name_with_prefix} VALUES {','.join(list_of_str_for_values)}"
        thread = threading.Thread(target=execute_insert_query, args=(engine, insert_query, table_name_with_prefix))
        thread.start()
        threads.append(thread)


##### Функция, осущ. сохранение singleformattype в БД

In [10]:
def save_singleformattype_to_db(hjids: list,
                                prefix: str,
                                engine: sqlalchemy.engine.base.Engine,
                                threads: list):
    if len(hjids) > 0:
        values_list = []
        for hj in hjids:
            values_list.append("(" + ",".join([str(hj)] * 8) + ")")
        insert_query = f"INSERT INTO {prefix}singleformattype" \
                       "(hjid, names_, loansoverview, loans, frauds, documents, scores, main)" \
                       f"VALUES {','.join(values_list)};"
        thread = threading.Thread(target=execute_insert_query,
                                  args=(engine, insert_query, prefix + 'singleformattype'))
        thread.start()
        threads.append(thread)
  

##### Функция, осущ. сохранение датафреймов по всем тэгам и singleformattype в БД

In [11]:
def save_rows_and_singleformattype_to_db(table_name_to_rows_dict: dict,
                                         hjids: list,
                                         prefix: str,
                                         engine: sqlalchemy.engine.base.Engine,
                                         threads: list):
  for tag in TAGS:
    table_name = get_table_name_by_tag(tag)
    save_rows_to_db(table_name_to_rows_dict[table_name], tag, prefix+table_name, engine, threads)
  save_singleformattype_to_db(hjids, prefix, engine, threads)


##### Основная функция c логикой загрузки



In [12]:
def make_load(engine: sqlalchemy.engine.base.Engine,
              prefix: str,
              path_to_folder_with_xml_files: str,
              loading_size: int):
  tables_current_hjid = {}
  table_name_to_rows_dict = {}
  for tag in TAGS:
      table_name = get_table_name_by_tag(tag)
      table_name_to_rows_dict[table_name] = []
      table_name_to_rows_dict[table_name + "_hjid"] = 0 # счетчик idшников внутри сущности
  
  threads = []
  hjids = []
  for filename in os.listdir(path_to_folder_with_xml_files):
    if filename.endswith(".xml"):
        path_to_xml_file = path_to_folder_with_xml_files + "/" + filename
        hjid = int(filename.split('.')[0])
        hjids.append(hjid)
        parse_xml_file(path_to_xml_file, hjid, table_name_to_rows_dict, tables_current_hjid)
        if len(hjids) == loading_size:
          save_rows_and_singleformattype_to_db(table_name_to_rows_dict, hjids, prefix, engine, threads)
          for tag in TAGS:
            table_name_to_rows_dict[get_table_name_by_tag(tag)] = []
          hjids=[]
  else:
    save_rows_and_singleformattype_to_db(table_name_to_rows_dict, hjids, prefix, engine, threads)
  
  for thread in threads:
    thread.join()
  
  logger.info(f'All files in {path_to_folder_with_xml_files} have been loaded.')


## **Скрипт на парсинг**

In [22]:
prefix_for_tables_to_create = 'adm.ad_sf_'
prefix_for_tables_from_which_to_create='adm.sf_'
path_to_folder_with_xml_files = './xml_out' # задаём папку внутри которой xml файлы
loading_size = 500 # сохраняем по указанному количеству файлов

recreate_tables(prefix_for_tables_to_create,
                prefix_for_tables_from_which_to_create,
                engine)

make_load_timed = timeit(make_load)
make_load_timed(engine=engine,
                prefix=prefix_for_tables_to_create,
                path_to_folder_with_xml_files=path_to_folder_with_xml_files,
                loading_size=loading_size)


[32m2023-05-15 23:37:46.315[0m | [1mINFO    [0m | [36m__main__[0m:[36mrecreate_tables[0m:[36m11[0m - [1madm.ad_sf_singleformattype was recreated.[0m
[32m2023-05-15 23:37:46.387[0m | [1mINFO    [0m | [36m__main__[0m:[36mrecreate_tables[0m:[36m11[0m - [1madm.ad_sf_monthlydetailtype was recreated.[0m
[32m2023-05-15 23:37:46.470[0m | [1mINFO    [0m | [36m__main__[0m:[36mrecreate_tables[0m:[36m11[0m - [1madm.ad_sf_loansoverviewtype was recreated.[0m
[32m2023-05-15 23:37:46.554[0m | [1mINFO    [0m | [36m__main__[0m:[36mrecreate_tables[0m:[36m11[0m - [1madm.ad_sf_loanstype was recreated.[0m
[32m2023-05-15 23:37:46.626[0m | [1mINFO    [0m | [36m__main__[0m:[36mrecreate_tables[0m:[36m11[0m - [1madm.ad_sf_maintype was recreated.[0m
[32m2023-05-15 23:37:46.696[0m | [1mINFO    [0m | [36m__main__[0m:[36mrecreate_tables[0m:[36m11[0m - [1madm.ad_sf_nametype was recreated.[0m
[32m2023-05-15 23:37:46.767[0m | [1mINFO    [0m | 