In [None]:
from pyspark.sql.window import * 
from pyspark.sql.functions import *
import time
from datetime import datetime
import sys
import shutil
import os
import builtins

### Sources

In [None]:
#sources
matrix_ozon_path            = '/mnt/output/write/generic/sales/dCom/offtakes/ozon/matrix/'
ecom_other_clients_matrix   = '/mnt/output/write/distilled/Ecom/other_client_matrix'
matrix_samokat_path         = '/mnt/output/write/distilled/sales/dcom/offtakes/samokat_matrix'

#dimensional tables
dim_outlet_path             = '/mnt/output/write/dataModel/Ecom/dimClientMapping'
dim_products_path           = '/mnt/output/write/dataModel/Ecom/dimProduct'
dim_calendar_path           = '/mnt/output/read/dataModel/Atlas/dim_calendar'

#distilled_output_path
output_path                 = '/mnt/output/write/dataModel/Ecom/dim_matrix'

#hive
table_name                  = 'ecom_dim_matrix'
russiaMwDataRoomHiveSchema  = dbutils.secrets.get('russiaMwDataRoomKeyVaultSecret', 'russiaMwDataroomHiveSchema')
hiveTargetTableName         = f'{russiaMwDataRoomHiveSchema}.{table_name}'

In [None]:
df_dim_outlet              = (spark.read.format('parquet').load(dim_outlet_path)
                              .select(
                                        'ParsingClientName'
                                      , 'OSADisplayName'
                                      , 'ParsingClientName&ParsingAddress'
                                      , 'Customer_ID'
                                      , 'Store_ID'
                                      ))
df_dim_product             = (spark.read.format('parquet').load(dim_products_path)
                              .select(
                                        'CustomerId'
                                      , 'CustomerProductId'
                                      , 'CustomerProductName'
                                      , 'EAN'
                                      ))
df_ozon_matrix_generic     = (spark.read.format('parquet').load(matrix_ozon_path)
                              .select(
                                        'IDSKU'
                                      , 'StoreType'
                                      , 'StoreID'
                                      , 'WeekYear'
                                      , 'Customer_ID'
                                      ))
df_matrix_of_all_customers = (spark.read.format('parquet').load(ecom_other_clients_matrix)
                              .select(
                                        'EAN'
                                      , 'ParsingClientName'
                                      , 'Customer_ID'
                                      , 'StoreType'
                                      , 'StartDate'
                                      , 'EndDate'
                              ).filter(col("Customer_ID") != 21))
df_samokat_matrix = (spark.read.format('parquet').load(matrix_samokat_path)
                              .select(
                                        'EAN'
                                      , 'ParsingClientName'
                                      , 'Customer_ID'
                                      , 'StoreType'
                                      , 'StartDate'
                                      , 'EndDate'
                              ))                              
df_calendar                = spark.read.format('parquet').load(dim_calendar_path).select('GCALENDAR')

### Big Ozon. Customer_ID = 31

In [None]:
ozon_w_31 = Window.partitionBy('Store_ID').orderBy('OSADisplayName')

df_dim_outlet_distinct        = (df_dim_outlet
                                 .filter(col('Store_ID') != 'not available')
                                 .withColumn('row_num', row_number().over(ozon_w_31))
                                 .filter(col('row_num') == 1)
                                 .drop('row_num')
                                 )
df_union_ozon_matrix          = df_ozon_matrix_generic

df_join_ozon_matrix_store_id = (
  df_union_ozon_matrix
  .join(
    df_dim_outlet_distinct,
    df_dim_outlet_distinct['Store_ID'] == df_union_ozon_matrix['StoreID'], 
    how= 'left'
    )
    .select(
              df_union_ozon_matrix['*'                  ]
            , df_dim_outlet_distinct['Customer_ID'      ].alias('Customer_ID_do')
            , df_dim_outlet_distinct['ParsingClientName']
            )
  )
  
df_ozon_merged_store_id = df_join_ozon_matrix_store_id.filter(col('Customer_ID_do').isNotNull()).drop('Customer_ID').withColumnRenamed('Customer_ID_do', 'Customer_ID')

### Ozon Fresh. Customer_ID = 10 

In [None]:
df_ozon_not_merged_store_id = df_join_ozon_matrix_store_id.filter(col('Customer_ID_do').isNull()).drop('Customer_ID_do','ParsingClientName')

ozon_w_10 = Window.partitionBy('Customer_ID').orderBy('ParsingClientName&ParsingAddress')

df_ozon_express_check = df_dim_outlet.filter((col('Customer_ID') == 10) & (col('ParsingClientName') == 'ozon.ru_express'))
if df_ozon_express_check.count() > 1:
    raise ValueError("Duplicate records found for ParsingClientName: ozon.ru_express")

df_dim_outlet_for_ozon_fresh  = (
    df_dim_outlet
  .filter((col('Customer_ID') == 10) & (col('ParsingClientName') == 'ozon.ru_express'))
  .withColumn('row_num', row_number().over(ozon_w_10))
  .filter(col('row_num') == 1)
  .drop('row_num')
  .withColumnRenamed('Customer_ID', 'Customer_ID_dot'            )
  .withColumnRenamed('ParsingClientName', 'ParsingClientName_dot')
  )

df_join_ozon_matrix_cust_id_fresh = (
  df_ozon_not_merged_store_id
  .join(
    df_dim_outlet_for_ozon_fresh,
    df_dim_outlet_for_ozon_fresh['Customer_ID_dot'] == df_ozon_not_merged_store_id['Customer_ID'], 
    how= 'left'
      )
    .select(
              df_ozon_not_merged_store_id['*'                     ]
            , df_dim_outlet_for_ozon_fresh['Customer_ID_dot'      ]
            , df_dim_outlet_for_ozon_fresh['ParsingClientName_dot']
            )
  .drop('Customer_ID')
  .withColumnRenamed('Customer_ID_dot', 'Customer_ID'            )
  .withColumnRenamed('ParsingClientName_dot', 'ParsingClientName')

)

df_ozon_fresh_merged = df_join_ozon_matrix_cust_id_fresh.filter(col('Customer_ID').isNotNull())

### Ozon Matrix

In [None]:
df_join_ozon_matrix_dim_store = df_ozon_merged_store_id.unionByName(df_ozon_fresh_merged)

In [None]:
calendar_for_ozon_matrix = (
  df_calendar
  .select('GCALENDAR')
  .withColumn("year",              year("GCALENDAR")                                 )
  .withColumn("week_of_year",      weekofyear("GCALENDAR")                           )
  .withColumn('WeekYear',          concat_ws('_', 'week_of_year', 'year')            )
  .withColumn("day_of_week",       expr("date_format(GCALENDAR, 'u')")               )
  .withColumn("day_of_week_int",   col("day_of_week").cast("int")                    )
  .withColumn("first_day_of_week", date_sub("GCALENDAR", expr("day_of_week_int - 1")))
  .withColumn("last_day_of_week",  date_add("first_day_of_week", 6)                  )
  .drop('day_of_week', 'day_of_week_int')
  .select(
        'WeekYear'
      , 'first_day_of_week'
        )
  .distinct() #сделал distinct, так как поля сами по себе не уникальные, но именно они нужны для корректного джойна с главной таблицей
)

windowSpec = Window.partitionBy('WeekYear').orderBy(col('first_day_of_week').asc())
uniq_calendar_for_ozon_matrix = (
  calendar_for_ozon_matrix
  .withColumn('row_num', row_number().over(windowSpec))
  .filter(col('row_num') == 1)
  .drop('row_num')
  )

df_join_ozon_matrix_calendar  = (
  df_join_ozon_matrix_dim_store
  .join(uniq_calendar_for_ozon_matrix, ['WeekYear'],how= 'left')
  .select(
            df_join_ozon_matrix_dim_store['*']
          , uniq_calendar_for_ozon_matrix['first_day_of_week'].alias('StartDate')
          )
  .drop('WeekYear')
  )

window_for_max_start_date = Window.orderBy()

df_end_date_creation = (
  df_join_ozon_matrix_calendar
  .withColumn('max_date', max('StartDate').over(window_for_max_start_date))
  .withColumn('Endless_date', when(col('max_date').isNotNull(), '9999-12-31'))
  .withColumn('properformat', to_date(col('Endless_date'),'yyyy-MM-dd'))
  .drop('Endless_date')
  .withColumn('EndDate', 
                        when(col('StartDate') == col('max_date'), col('properformat'))
                        .otherwise( col('max_date')-1)
              )
  .drop('max_date','properformat')
)

In [None]:
customer_id_ozon = 31
df_ozon_dim_product = df_dim_product.filter(col('CustomerID') == customer_id_ozon)

df_join_ozon_dim_product = (
  df_end_date_creation
  .join(df_ozon_dim_product, df_ozon_dim_product['CustomerProductId'] == df_end_date_creation['IDSKU'], how= 'left')
  .select(
            df_end_date_creation['*']
          , df_ozon_dim_product['EAN']
          )
  .drop('IDSKU')
)

df_ozon_matrix = (
  df_join_ozon_dim_product
  .filter((col('Customer_ID').isNotNull()) & (col('EAN').isNotNull()))
  .select(
            'EAN'
          , 'Customer_ID'
          , 'ParsingClientName'
          , 'StoreType'
          , 'StartDate'
          , 'EndDate'
  )
  
  )

### Other customer matrix & union

### Union, deduplication & datatypes casting

In [None]:
df_prefinal_matrix = df_ozon_matrix.unionByName(df_matrix_of_all_customers).unionByName(df_samokat_matrix)

In [None]:
windowSpec_for_deduplication = Window.partitionBy('composite_key').orderBy(col("StartDate").cast('date'), col('EndDate').cast('date').desc())

key_for_deduplication = (
  df_prefinal_matrix
  .withColumn('composite_key',
               concat_ws('&',
                            col('EAN')
                          , col('ParsingClientName')
                          , col('StoreType')
                          )
               )
  .withColumn('row_num', row_number().over(windowSpec_for_deduplication))
  .filter(col('row_num') == 1)
  .drop('row_num', 'composite_key')
  .select(
          col('EAN').cast('long')
        , col('Customer_ID').cast('int')
        , 'ParsingClientName'
        , 'StoreType'
        , col('StartDate').cast('date')
        , col('EndDate').cast('date')

  )
  )

### Write to DataModel

In [None]:
df_final_matrix = (
  key_for_deduplication
  .select(
          'EAN'
        , 'Customer_ID'
        , 'ParsingClientName'
        , 'StoreType'
        , 'StartDate'
        , 'EndDate'
  )
)

In [None]:
df_final_matrix.write.mode('overwrite').format('parquet').option('path', output_path).saveAsTable(hiveTargetTableName)

In [None]:
%sql
select ParsingClientName,count(*) from 
russia_mw_dataroom.ecom_dim_matrix
group by ParsingClientName

ParsingClientName,count(1)
yandex_market_app,98
ozon.ru_krasnodar,99
ozon.ru,99
ozon.ru_ekb,99
ozon.ru_spb,99
sbermarket.ru_auchan,108
sbermarket.ru_metro,149
ozon.ru_nsk,99
ozon.ru_express,489
ozon.ru_kaliningrad,2
