In [None]:
# Запуск/Остановка SparkSession
try:
    spark.stop()
    del spark
except:
    %run configuration.py

In [None]:
# Сборка витрины по расчётным счетам корпоративных клиентов
# Обработка данных из разных источников и сохранение результата
# в таблицу Hive (на кластер)
try:
    spark.sql('drop table stor.rko_showcase')
except:
    pass

# Префикс к таблицам
prx_epk: str = 'arn_model_party_organization_'

# БД
schema: dict = {
    'rko': 'with_db_collection_sch_rko_platform_rko',
    'epk': 'with_db_collection_sch_epk_platform_uckpb',
    'sbbol': 'with_db_collection_vnp_sbbol_birdabi'
}

# Таблицы
table: dict = {
    'currency': 'platform_gdp_model_jpa_currency',
    'rstatus': 'platform_gdp_model_jpa_rstatus',
    'rtype': 'platform_gdp_model_jpa_rtype',
    'register': 'platform_gdp_model_jpa_register',
    'mregister': 'platform_gdp_model_jpa_mregister',
    'sourcesystem': 'platform_gdp_model_jpa_sourcesystem',
    'ptarget': 'platform_gdp_model_jpa_ptarget',
    'product': 'platform_gdp_model_jpa_product',
    'ptype': 'platform_gdp_model_jpa_ptype',
    'mproduct': 'platform_gdp_model_jpa_mproduct',
    'segment': 'platform_gdp_model_jpa_segment',
    'priority': 'platform_gdp_model_jpa_priority',
    'country': 'platform_gdp_model_jpa_country',
    'org': 'platform_gdp_model_jpa_org',
    'orgn': 'platform_gdp_model_jpa_orgn',
    'crm': 'platform_gdp_model_jpa_crm',
    'ogrn': 'platform_gdp_model_jpa_ogrn',
    'inn': 'platform_gdp_model_jpa_inn'
}

# Валюта счёта
currency: DataFrame = (
    spark.table(f"{schema['rko']}.{table['country']}")
    .select(
        F.col('key').alias('currency_key'),
        F.col('name').alias('currency_name')
    )
)

# Статус счёта
register_status: DataFrame = (
    spark.table(f"{schema['rko']}.{table['rstatus']}")
    .select(
        F.col('key').alias('register_status_key'),
        F.col('name').alias('register_status_name')
    )
)

# Тип счёта
register_type: DataFrame = (
    spark.table(f"{schema['rko']}.{table['rtype']}")
    .select(
        F.col('key').alias('register_type_key'),
        F.col('name').alias('register_type_name')
    )
)

# Счета
register: DataFrame = (
    spark.table(f"{schema['rko']}.{['register']}")
    .filter(
        (F.to_date(F.col('begindate'))
        .between('2024-01-01 00:00:00:000000', '2024-12-31 23:59:60:999999'))
        & (F.col('currency_short_foreign_key').like('RUR'))
        & (F.col('status_short_foreign_key').isin('Working', 'Closed'))
        & (F.col('number').substr(1, 3).isin('401', '402', '404', '405',
                                             '406', '407', '408'))
        & (F.col('number').substr(1, 5).isin('40109', '40111', '40803',
                                             '40810', '40813', '40817',
                                             '40820', '40824') == False)
    )
    .select(
        F.col('ownerepkid_entityd').cast(T.StringType()).alias('epk_id'),
        F.col('begindate').cast(T.DateType()),
        F.col('enddate').cast(T.DateType()),
        F.col('lastchangedate').cast(T.DateType()),
        F.col('number').cast(T.StringType()),
        F.col('currency_short_foreign_key').alias('currency_key'),
        F.col('status_short_foreign_key').alias('register_status_key'),
        F.col('registertype_short_foreign_key').alias('register_type_key'),
        F.col('product_short_foreign_key').alias('product_key'),
        F.col('key').alias('register_key')
    )
    .withColumn(
        'diff',
        F.datediff(
            end=(F.col('enddate')),
            start=(F.col('startdate'))
        )
    )
    .filter((F.col('diff') >= F.lit(7)) | (F.col('enddate').isNull()))
    .drop('diff')
    .join(F.broadcast(currency), on='currency_key',
          how='left_outer')
    .drop('currency_key')
    .join(F.broadcast(register_status), on='register_status_key',
          how='left_outer')
    .drop('register_status_key')
    .join(F.broadcast(register_type), on='register_type_key',
        how='left_outer')
    .drop('register_type_key')
)

# Справочник систем источников
sourcesystem: DataFrame = (
    spark.table(f"{schema['rko']}.{table['sourcesystem']}")
    .select(
        F.col('objectid').alias('sourcesystem_key'),
        F.col('name').alias('sourcesystem_name')
    )
)

# Таблица миграции счетов
mregister: DataFrame = (
    spark.table(f"{schema['rko']}.{table['mregister']}")
    .select(
        F.col('eksid').cast(T.StringType()).alias('eks_id'),
        F.col('pprbid').cast(T.StringType()).alias('pprb_id'),
        F.col('register_entityid').alias('register_key'),
        F.col('sourcesystem_short_foreign_key').alias('sourcesystem_key'),
        F.col('migratetm').alias('migration_date')
    )
    .join(register, on='register_key', how='inner')
    .drop('register_key')
    .join(F.broadcast(sourcesystem), on='sourcesystem_key', how='inner')
    .drop('sourcesystem_key')
)

# Целевое назначение договора РКО
producttarget: DataFrame = (
    spark.table(f"{schema['rko']}.{table['ptarget']}")
    .select(
        F.col('description').alias('prod_target'),
        F.col('key').alias('key_target')
    )
)

# Вид договора РКО
producttype: DataFrame = (
    spark.table(f"{schema['rko']}.{table['ptype']}")
    .select(
        F.col('name').alias('ptype_name'),
        F.col('key').alias('ptype_key')
    )
)

# Договор расчётно-кассового обслуживания ЮЛ/ИП
product: DataFrame = (
    spark.table(f"{schema['rko']}.{table['product']}")
    .filter(
        F.col('target_short_foreign_key')
        .isin(
            'CorporateCreditAccount', 'IndividualElectionFunds',
            'ElectionCommissionAccount', 'MetalPreciousAccount',
            'NotaryDeposit', 'ElectoralAssociationAccount', 
            'LawEnforcementDepositAccount', 'SpecialBrokerageAccount',
            'SpecialDepositaryAccount', 'PrivatePensionReservesAccount',
            'PaymentIdentification', 'NominalGuardian48FZ',
            'PrivatePensionSavingsAccount', 'ClearingBankAccount',
            'CurrentBailiffsServiceDepositAccount', 'IndAccount',
            'FundsTemporarityDisposal', 'UFKvalyataAccount'
        ) == False
    )
    .select(
        F.col('key').alias('product_key'),
        F.col('target_short_foreign_key').alias('key_target'),
        F.col('num').cast(T.StringType()).alias('num_dog'),
        F.col('producttype_short_foreign_key').alias('ptype_key')
    )
    .join(F.broadcast(producttarget), on='key_target', how='inner')
    .drop('key_target')
    .join(F.broadcast(producttype), on='ptype_key', how='left_outer')
    .drop('ptype_key')
    .join(mregister, on='product_key', how='inner')
)

# Справочник подразделений банка
branch: DataFrame = (
    spark.table(f"{schema['sbbol']}.branch")
    .select(
        F.col('fullname').alias('division'),
        F.col('sclir').alias('divisionekscode')
    )
)

# Дополнительные атрибуты для РКО из устаревших систем
mproduct: DataFrame = (
    spark.table(f"{schema['rko']}.{table['mproduct']}")
    .select(
        'divisionekscode',
        F.col('product_entityid').cast(T.StringType()).alias('product_key')
    )
    .join(product, on='product_key', how='right_outer')
    .drop('product_key')
    .join(branch, on='divisionekscode', how='inner')
    .drop('divisionekscode')
)

# Справочник сегментов
dictionary_segment: DataFrame = (
    spark.table(f"{schema['epk']}.{table['segment']}")
    .select(
        F.col('name').alias('segment'),
        F.col('key').alias('key_segment')
    )
)

# Справочник приоритетов
dictionary_priority: DataFrame = (
    spark.table(f"{schema['epk']}.{table['priority']}")
    .select(
        F.col('name').alias('priority'),
        F.col('key').alias('key_priority')
    )
)

# Справочник стран
dictionary_country: DataFrame = (
    spark.table(f"{schema['epk']}.{table['country']}")
    .select(
        F.col('name').alias('country_name'),
        F.col('key').alias('key_country')
    )
)

# Организации
organization: DataFrame = (
    spark.table(f"{schema['epk']}.{table['org']}")
    .select(
        F.col('id').cast(T.StringType()).alias('epk_id'),
        F.col('key').alias('key_name'),
        F.col('countryresident_short_foreign_key').alias('key_country')
    )
    .join(mproduct, on='epk_id', how='inner')
)

# Наименование организаций
organizationname: DataFrame = (
    spark.table(f"{schema['epk']}.{prx_epk}{table['orgn']}")
    .filter(
        F.col('nametype_short_foreign_key').like('NameType_3')
    )
    .select(
        'key',
        F.col('name').alias('name_org')
    )
    .withColumn('key_name', F.split('key', '\.')[0])
    .join(organization, on='key_name', how='inner')
    .drop('key')
)

# CRM-атрибуты
crmattributes: DataFrame = (
    spark.table(f"{schema['epk']}.{prx_epk}{table['crm']}")
    .select(
        F.col('id').cast(T.StringType()).alias('epk_id'),
        F.col('segment_short_foreign_key').alias('key_segment'),
        F.col('priority_short_foreign_key').alias('key_priority')
    )
    .join(organizationname, on='epk_id', how='right_outer')
)

# ОГРН клиента
win_ogrn: any = (
    Window.partitionBy('key_name', 'ogrn').orderBy(F.desc('startdate'))
)
ogrn: DataFrame = (
    spark.table(f"{schema['epk']}.{prx_epk}{table['ogrn']}")
    .select(
        F.col('documentnumber').cast(T.StringType()).alias('ogrn'),
        'key',
        'startdate'
    )
    .witnColumn('key_name', F.split('key', '\.')[0])
    .withColumn('rn', F.row_number().over(win_ogrn))
    .filter(F.col('rn') == F.lit(1)).drop('rn')
    .join(crmattributes, on='key_name', how='right_outer')
    .drop('key', 'startdate')
)

# ИНН клиента
win_inn: any = (
    Window.partitionBy('key_name', 'inn').orderBy(F.desc('startdate'))
)
inn: DataFrame = (
    spark.table(f"{schema['epk']}.{table['inn']}")
    .select(
        F.col('documentnumber').cast(T.StringType()).alias('inn'),
        'key',
        'startdate'
    )
    .witnColumn('key_name', F.split('key', '\.')[0])
    .withColumn('rn', F.row_number().over(win_inn))
    .filter(F.col('rn') == F.lit(1)).drop('rn')
    .join(ogrn, on='key_name', how='right_outer')
    .drop('key', 'startdate', 'key_name')
)

assembling: DataFrame = (
    inn
    .withColumn('migration', F.concat_ws(' > ',
                                         'system_name',
                                         'migration_date')
    )
    .join(F.broadcast(dictionary_segment), on='key_segment',
          how='left_outer')
    .drop('key_segment')
    .join(F.broadcast(dictionary_priority), on='key_priority',
          how='left_outer')
    .drop('key_priority')
    .join(F.broadcast(dictionary_country), on='key_country',
          how='left_outer')
    .drop('key_country')
)

column_dict: dict = {
    'eks_id': 'Уникальный идентификатор ЕКС',
    'pprb_id': 'Уникальный идентификатор ППРБ',
    'epk_id': 'Уникальный идентификатор ЕПК',
    'name_org': 'Наименование клиента', 'inn': 'ИНН', 'ogrn': 'ОГРН',
    'country_name': 'Страна клиента', 'segment': 'Сегмент',
    'priority': 'Приоритет', 'number': 'Номер счёта',
    'num_dog': 'Номер договора', 'begindate': 'Дата открытия счёта',
    'lastchangedate': 'Дата последнего изменения',
    'enddate': 'Дата закрытия счёта',
    'prod_target': 'Целевое назначение договора',
    'ptype_name': 'Наименование вида договора',
    'currency_name': 'Валюта счёта',
    'register_status_name': 'Статус счёта', 'register_type_name': 'Тип счёта',
    'division': 'Подразделение',
    'migration': 'Система миграции и дата миграции'
}

createSchema(
    column_dict, assembling.select(*column_dict).orderBy('begindate')
).repartition(1).write.mode('overwrite').saveAsTable('stor.rko_showcase')