# Парсинг данных и заливка в БД

## Единый реестр субъектов малого и среднего предпринимательства – получателей поддержки

### Задачи:

* распарсить xml-файлы с использованием xsd-схемы
* заполнить данными предварительно спроектированную БД postgres


### Импорт необходимых библиотек

In [1]:
import psycopg2
import xmlschema
import pandas as pd
from datetime import datetime

from os import listdir
from os.path import isfile, join

from IPython.display import clear_output

### Входные параметры

In [2]:
FILES_COUNT = 200 # количество файлов для парсинга и заливки в БД

### Словари для последующего перевода числовых атрибутов в удобочитаемый формат

In [3]:
# виды субъектов предпринимательства
recipient_kind_values = {
    '1' : 'юридическое лицо',
    '2' : 'индивидуальный предприниматель',
    '3' : 'физическое лицо, не являющееся индивидуальным предпринимателем и применяющее специальный налоговый режим «Налог на профессиональный доход»'
}

# категории субъектов предпринимательства
recipient_category_values = {
    '1' : 'микропредприятие',
    '2' : 'малое предприятие',
    '3' : 'среднее предприятие',
    '4' : 'отсутствует'
}

# единицы измерения поддержки
units_values = {
    '1' : 'рубль',
    '2' : 'квадратный метр',
    '3' : 'час',
    '4' : 'процент',
    '5' : 'единица'
}

# признак нарушения условий поддержки 
terms_violation_values = {
    '1' : True,
    '2' : False
}

# признак растраты
embezzlement_values = {
    '1' : True,
    '2' : False
}


## Вспомогательные функции

In [4]:
def item_exists(cursor, uid, uid_col_name, table_name):
    '''
    Возвращает признак присутствия значения uid в столбце uid_col_name таблицы table_name.

    Args:
        cursor (Cursor obj):Курсор для выполнения запросов к БД.
        uid (str):Проверяемый идентификатор.
        uid_col_name (str):Название столбца, в котором ожидается найти идентификатор.
        table_name (str):Название таблицы, в которой осуществляется поиск.

    Returns:
        is_exists(bool):Результат проверки присутствия записей с значением uid в столбце uid_col_name таблицы table_name.   
    '''
    
    cursor.execute(f"SELECT {uid_col_name} FROM {table_name} WHERE {uid_col_name} = %s", (uid,))
    is_exists = cursor.fetchone() is not None
    return is_exists


def check_key(dict_, key, not_exists_value=None):
    '''
    Возвращает значение ключа key в словаре dict_, если ключ присутствует в нём.
    В противном случае возвращается значение not_exists_value.

    Args:
        dict_ (dict):Словарь, в котором происходит проверка.
        key (str):Ключ, присутствие которого проверяется.
        not_exists_value (any type):Альтернативное значение, используется если ключ не найден.

    Returns:
            value(any type):Значение ключа. Возвращается, если ключ найден в словаре.
            not_exists_value(any type):Альтернативное значение. Возвращается, если ключ не найден в словаре.
    '''
    
    if key in dict_:
        value = dict_[key]
        return value
    return not_exists_value

## Функции парсинга и заполнения БД

In [5]:
def insert_support_recipients(cursor, xml_dict):
    '''
    Парсинг словаря, в который предварительно был преобразован исходный xml-файл,
    и заполнение данными таблицы support_recipients в БД.

    Args:
        cursor (Cursor Class):Курсор для выполнения запросов к БД.
        xml_dict (dict):Словарь, в который преобразован исходный xml-файл.

    Returns:
        None
    '''
    
    items = xml_dict['Документ']
    
    for item in items:
        if item['@ВидСуб'] == '1':
            inn = item['СвЮЛ']['@ИННЮЛ']
            recipient_name = item['СвЮЛ']['@НаимОрг']
            recipient_kind = recipient_kind_values[item['@ВидСуб']]
        elif item['@ВидСуб'] in ['2','3']:
            inn = item['СвФЛ']['@ИННФЛ']
            recipient_name = item['СвФЛ']['ФИО']['@Имя']
            recipient_kind = recipient_kind_values[item['@ВидСуб']]

        if item_exists(cursor, inn, 'inn', 'support_recipients'):
            continue
        else:
            cursor.execute(
                """
                INSERT INTO support_recipients (inn, name, recipient_kind)
                VALUES (%s, %s, %s);
                """,
                (inn, recipient_name, recipient_kind)
            )

In [6]:
def insert_support_forms(cursor, xml_dict):
    '''
    Парсинг словаря, в который предварительно был преобразован исходный xml-файл,
    и заполнение данными таблицы support_forms в БД.

    Args:
        cursor (Cursor Class):Курсор для выполнения запросов к БД.
        xml_dict (dict):Словарь, в который преобразован исходный xml-файл.

    Returns:
        None
    '''
    
    items = xml_dict['Документ']

    for item in items:
        
        cases = item['СвПредПод']
        
        for case in cases:
            code = case['ФормПод']['@КодФорм']
            name = case['ФормПод']['@НаимФорм']
            
            if item_exists(cursor, code, 'code', 'support_forms'):
                continue
            else:
                cursor.execute(
                    """
                    INSERT INTO support_forms (code, name)
                    VALUES (%s, %s);
                    """,
                    (code, name)
                )

In [7]:
def insert_support_issuers(cursor, xml_dict):
    '''
    Парсинг словаря, в который предварительно был преобразован исходный xml-файл,
    и заполнение данными таблицы support_issuers в БД.

    Args:
        cursor (Cursor Class):Курсор для выполнения запросов к БД.
        xml_dict (dict):Словарь, в который преобразован исходный xml-файл.

    Returns:
        None
    '''
    
    items = xml_dict['Документ']
    
    for item in items:

        cases = item['СвПредПод']
        
        for case in cases:
            name = case['@НаимОрг']
            inn = case['@ИННЮЛ']
        
            if item_exists(cursor, inn, 'inn', 'support_issuers'):
                continue
            else:
                cursor.execute(
                    """
                    INSERT INTO support_issuers (inn, name)
                    VALUES (%s, %s);
                    """,
                    (inn, name)
                )

In [8]:
def insert_support_kinds(cursor, xml_dict):
    '''
    Парсинг словаря, в который предварительно был преобразован исходный xml-файл,
    и заполнение данными таблицы support_kinds в БД.

    Args:
        cursor (Cursor Class):Курсор для выполнения запросов к БД.
        xml_dict (dict):Словарь, в который преобразован исходный xml-файл.

    Returns:
        None
    '''
    
    items = xml_dict['Документ']
    
    for item in items:

        support_count = len(item['СвПредПод'])
        
        for s in range(support_count):
            code = item['СвПредПод'][s]['ВидПод']['@КодВид']
            name = item['СвПредПод'][s]['ВидПод']['@НаимВид']

            if item_exists(cursor, code, 'code', 'support_kinds'):
                continue
            else:
                cursor.execute(
                    """
                    INSERT INTO support_kinds (code, name)
                    VALUES (%s, %s);
                    """,
                    (code, name)
                )

In [9]:
def insert_support_cases(cursor, xml_dict):
    '''
    Парсинг словаря, в который предварительно был преобразован исходный xml-файл,
    и заполнение данными таблицы support_cases в БД.

    Args:
        cursor (Cursor Class):Курсор для выполнения запросов к БД.
        xml_dict (dict):Словарь, в который преобразован исходный xml-файл.

    Returns:
        None
    '''       

    items = xml_dict['Документ']
    file_id = xml_dict['@ИдФайл']
    
    for item in items:
        cases = item['СвПредПод']
        
        doc_id = item['@ИдДок']
        
        if item['@ВидСуб'] == '1':
            support_recipient_inn = item['СвЮЛ']['@ИННЮЛ']
        elif item['@ВидСуб'] in ['2','3']:
            support_recipient_inn = item['СвФЛ']['@ИННФЛ']
            
        for case in cases:
            current_recipient_kind = recipient_kind_values[case['@ВидПП']]
            support_term = datetime.strptime(case['@СрокПод'], '%d.%m.%Y')
            support_decision_date = datetime.strptime(
                case['@ДатаПрин'], '%d.%m.%Y'
            )

            if '@ДатаПрекр' in case:
                stop_support_decision_date = datetime.strptime(case['@ДатаПрекр'], '%d.%m.%Y')
            else:
                stop_support_decision_date = None
                
            support_issuer_inn = case['@ИННЮЛ']
            current_recipient_category = recipient_category_values[case['@КатСуб']]
            support_form = case['ФормПод']['@КодФорм']
            support_kind = case['ВидПод']['@КодВид']
            terms_violation = terms_violation_values[case['ИнфНаруш']['@ИнфНаруш']]
            embezzlement = embezzlement_values[case['ИнфНаруш']['@ИнфНецел']]
            
            subcases = case['РазмПод']
            for subcase in subcases:
                support_amount = subcase['@РазмПод']
                support_unit = units_values[subcase['@ЕдПод']]

                cursor.execute(
                    """
                    INSERT INTO support_cases (
                        support_issuer_inn, support_recipient_inn, current_recipient_kind,
                        current_recipient_category, support_term, support_decision_date,
                        stop_support_decision_date, support_form, support_kind,
                        support_amount, support_unit, terms_violation,
                        embezzlement, file_id, doc_id
                    )
                    VALUES (
                        %s, %s, %s, 
                        %s, %s, %s,
                        %s, %s, %s, 
                        %s, %s, %s, 
                        %s, %s, %s
                    );
                    """,
                    (
                         support_issuer_inn, support_recipient_inn, current_recipient_kind,
                         current_recipient_category, support_term, support_decision_date,
                         stop_support_decision_date, support_form, support_kind,
                         support_amount, support_unit, terms_violation,
                         embezzlement, file_id, doc_id
                    )
                )
            
            

In [10]:
def insert_files_metadata(cursor, xml_dict):
    '''
    Парсинг словаря, в который предварительно был преобразован исходный xml-файл,
    и заполнение данными таблицы files_metadata в БД.

    Args:
        cursor (Cursor Class):Курсор для выполнения запросов к БД.
        xml_dict (dict):Словарь, в который преобразован исходный xml-файл.

    Returns:
        None
    '''
    
    file_id = xml_dict['@ИдФайл']
    docs_count = xml_dict['@КолДок']
    format_version = xml_dict['@ВерсФорм']
    info_type = xml_dict['@ТипИнф']
    software_version = check_key(xml_dict, '@ВерсПрог')

    resp_position = check_key(xml_dict['ИдОтпр'], '@ДолжОтв')
    phone = check_key(xml_dict['ИдОтпр'], '@Тлф')
    e_mail = check_key(xml_dict['ИдОтпр'], '@E-mail')
    
    resp_firstname = xml_dict['ИдОтпр']['ФИООтв']['@Имя']
    resp_lastname = xml_dict['ИдОтпр']['ФИООтв']['@Фамилия']
    resp_patronymic = check_key(
        xml_dict['ИдОтпр']['ФИООтв'],
        '@Отчество',
        not_exists_value=''
    )

    resp_fullname = ' '.join(
        [resp_lastname, resp_firstname, resp_patronymic]
    ).strip()
    cursor.execute(
        """
        INSERT INTO files_metadata (
            file_id, format_version, info_type,
            software_version, docs_count, resp_position,
            phone, "e-mail", resp_fullname
            )
        VALUES (
            %s, %s, %s,
            %s, %s, %s,
            %s, %s, %s
        );
        """,
        (
            file_id, format_version, info_type,
            software_version, docs_count, resp_position,
            phone, e_mail, resp_fullname
        )
    )

In [11]:
def insert_documents_metadata(cursor, xml_dict):
    '''
    Парсинг словаря, в который предварительно был преобразован исходный xml-файл,
    и заполнение данными таблицы files_metadata в БД.

    Args:
        cursor (Cursor Class):Курсор для выполнения запросов к БД.
        xml_dict (dict):Словарь, в который преобразован исходный xml-файл.

    Returns:
        None
    '''
    
    items = xml_dict['Документ']
 
    for item in items:
        doc_id = item['@ИдДок']
        creation_date = datetime.strptime(item['@ДатаСост'], '%d.%m.%Y')
              
        cursor.execute(
            """
            INSERT INTO documents_metadata (doc_id,creation_date) 
            VALUES (%s, %s);
            """,
            (doc_id,creation_date)
        )    

### Получение списка обрабатываемых файлов

In [12]:
# относительный путь до папки с xml-файлами
rel_dir_path = "../data/rsmppp_data/"

# массив с путями до всех файлов в указанной папке
xml_file_paths = [
                    rel_dir_path + f for f in listdir(rel_dir_path) 
                    if isfile(join(rel_dir_path, f))
                 ]

# выбор ограниченного количества файлов
file_paths_for_db = xml_file_paths[:FILES_COUNT]

### Соединение с БД

In [13]:
conn = psycopg2.connect(
                        dbname='rsmppp', 
                        user='postgres', 
                        host='localhost', 
                        port=5436
                       )
conn.autocommit = True

### Преобразование xml-файлов в словари и вызов функций для заполнения таблиц в БД

In [14]:
cursor = conn.cursor()

# считывание xml-схемы для последующего парсинга xml-файлов
xs = xmlschema.XMLSchema('../data/rsmppp_schema/structure-20201220.xsd')

# Представленная на сайте ФНС xml-схема не соответствовала xml-файлам, 
# в неё пришлось внести изменения.

# С помощью цикла обрабатываем каждый файл отдельно
for idx, file_path in enumerate(file_paths_for_db):
    clear_output(wait=True) # очистка вывода
    done_perc = int((idx + 1) * 100 / len(file_paths_for_db)) # процент обработанных файлов
    print(f'Выполнено {done_perc}%\nОбрабатываемый файл: {file_path}')
    
    xml_dict = xs.to_dict(file_path) # преобразование xml-файла в словарь
    
    # поочередный вызов функций для парсинга словаря и записи данных в БД
    insert_files_metadata(cursor, xml_dict)
    insert_documents_metadata(cursor, xml_dict)
    insert_support_recipients(cursor, xml_dict)
    insert_support_issuers(cursor, xml_dict)
    insert_support_forms(cursor, xml_dict)
    insert_support_kinds(cursor, xml_dict)
    insert_support_cases(cursor, xml_dict)


clear_output(wait=True)
print('Данные успешно залиты в БД rsmppp.')
cursor.close()

Данные успешно залиты в БД rsmppp.


## Дополнительные данные

## Реестр субъектов малого и среднего предпринимательства

## Задачи:

* распарсить xlsx-файлы из реестра субъектов малого и среднего предпринимательства, полученные с помощью пакетного поиска по ИНН
на сайте ФНС https://rmsp.nalog.ru/search.html?mode=inn-listс
* заполнить данными таблицу rsmp в БД

### Импорт необходимых библиотек

In [21]:
import pandas as pd
from sqlalchemy import create_engine

from datetime import datetime

### Входные параметры

In [22]:
# Дата выгрузки набора из Реестра субъектов малого и среднего предпринимательства

REGISTER_DOWNLOAD_DATE = datetime.strptime('17.01.2021', '%d.%m.%Y')

### Соединение с БД

In [23]:
engine = create_engine('postgresql://postgres:postgres@localhost:5436/rsmppp')
conn = engine.connect()

### Формирование списка путей до файлов для парсинга

In [24]:
# относительный путь до папки с xml-файлами
rel_dir_path = "../data/rmsp_data/"

# массив с путями до всех файлов в указанной папке
xlsx_file_paths = [
                    rel_dir_path + f for f in listdir(rel_dir_path) 
                    if isfile(join(rel_dir_path, f))
                 ]

### Поочерёдный парсинг файлов и запись данных в БД 

In [25]:
for path in xlsx_file_paths:
    # запишем таблицу в dataframe
    df = pd.read_excel(path, header=2)
    
    # выделим из датафрейма только необходимые поля
    df_for_db = df[['ИНН', 'ОГРН', 'Основной вид деятельности', 'Регион',
                'Район', 'Город', 'Дата исключения из реестра',
                'Среднесписочная численность работников за предшествующий календарный год',
                'Наличие заключенных договоров, контрактов',
                'Производство инновационной, высокотехнологичной продукции', 
                'Дата включения в реестр',
               ]].copy()
    
    # добавим поле с датой выгрузки файлов
    df_for_db['download_from_register_date'] = REGISTER_DOWNLOAD_DATE
    
    # переименуем столбцы так, чтобы они соответствовали названиям в таблице
    df_for_db.columns = ['inn','ogrn','okved','region','district',
                         'city','register_out_date',
                         'workers_mean_amount', 'contract','innovation',
                         'register_in_date', 'download_from_register_date'
                        ]
    
    # переименуем значения полей так, чтобы их тип соответствовал тому, что задан в БД
    df_for_db['contract'] = df_for_db['contract'].replace(['Нет', 'Да'], [False, True])
    df_for_db['innovation'] = df_for_db['innovation'].replace(['Нет', 'Да'], [False, True])
    
    # запишем данные в таблицу rsmp
    table_name = 'rsmp'
    df_for_db.to_sql(table_name, conn, if_exists='append', index=False)

print('Данные успешно залиты в БД rsmppp.')
conn.close()

Данные успешно залиты в БД rsmppp.
