In [1]:
import sys
sys.path.insert(0, '/Users/pyro/github/HiveHelper_on_PySpark/hhop') 
# for running .ipynb files anywhere outside of a current dir using the module hhop

from functools import reduce
from importlib import reload
import pandas as pd

from pyspark.sql.functions import col
import pyspark.sql.functions as F
from pyspark.sql.window import Window as W
import pyspark.sql.types as T

import shutil, os, time # working with FS
from glob import glob
from shutil import copy2
from pathlib import Path

import hhop # custom module
from hhop import DFExtender, SchemaManager, TablePartitionDescriber #main classes
import funs
from funs import read_table, write_table, union_all, deduplicate_df # useful functions
from spark_init import spark
from exceptions import HhopException
display(spark)

23/07/08 18:17:37 WARN Utils: Your hostname, Pavels-MacBook-Air.local resolves to a loopback address: 127.0.0.200; using 192.168.0.103 instead (on interface en0)
23/07/08 18:17:37 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
23/07/08 18:17:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
s1 = spark.read.csv('scd2_data/to_scd2.csv', sep=';', header=True)

In [3]:
s1.show(100, False)

+---+----+------+------+------+-----------+-------------------+
|pk1|pk2 |nonpk1|nonpk2|nonpk3|nonpk_extra|ts                 |
+---+----+------+------+------+-----------+-------------------+
|v1 |c1  |a1    |b1    |c1    |r1         |2023-05-01 10:00:00|
|v1 |c1  |a1    |b1    |c1    |r2         |2023-05-01 12:00:00|
|v1 |c1  |a1    |b1    |c1    |r3         |2023-05-02 12:00:00|
|v1 |c1  |a1    |b1    |c2    |null       |2023-05-03 12:00:00|
|v1 |c1  |a1    |b2    |c2    |null       |2023-05-03 15:00:00|
|v1 |c1  |null  |b2    |c2    |r3         |2023-05-05 15:00:00|
|v1 |c1  |null  |b2    |c2    |r3         |2023-05-06 15:00:00|
|v1 |c1  |null  |null  |c2    |r3         |2023-05-07 15:00:00|
|v1 |c1  |null  |null  |null  |null       |2023-05-10 15:00:00|
|v1 |c1  |null  |null  |c2    |r3         |2023-05-13 15:00:00|
|v1 |null|null  |null  |c2    |r3         |2023-05-07 15:00:00|
|v1 |null|null  |null  |null  |null       |2023-05-10 15:00:00|
|v1 |null|null  |fds   |null  |null     

In [4]:
def df_to_scd2(df, pk_cols, non_pk_cols, time_col):
    """Функция создания SCD2 историчности таблицы"""
    
    tech_col_names = { # для быстрой замены
        'row_hash': 'row_hash',
        'row_actual_from': 'row_actual_from',
        'row_actual_to': 'row_actual_to',
    }
    
    def concat_cols(*cols):
        return F.concat_ws('', *sorted(cols)) # сортировка для постоянного хэша
    
    df_cols = df.columns
    
    window_pk_asc = W.partitionBy(*pk_cols).orderBy(time_col)
    df_hash = (
        df
        .withColumn('row_hash', F.md5(concat_cols(*pk_cols, *non_pk_cols))) # хэш только по ключу и бизнес-атрибутам
        .withColumn('row_actual_from', col(time_col).cast('date'))
        .withColumn('version_num', F.count(F.when(F.lag('row_hash').over(window_pk_asc) != col('row_hash'), 1)).over(window_pk_asc))
    )
    df_ded_by_hash = deduplicate_df(
        df_hash,
        pk=[*pk_cols, 'version_num'],
        order_by_cols=[time_col], # нужна первая запись с хэшом с учетом последовательности изменений
    )
    df_ded_by_date = deduplicate_df(
        df_ded_by_hash,
        pk=[*pk_cols, 'row_actual_from'],
        order_by_cols=[F.desc(time_col)], # нужна последняя запись в разрезе дня
    )
    
    window_row_actual_to = W.partitionBy(*pk_cols).orderBy('row_actual_from')
    
    alias_tech_col_names = lambda x: col(x).alias(tech_col_names[x])
    
    df_result = (
        df_ded_by_date
        .withColumn(
            'row_actual_to', 
            F.coalesce(F.date_sub(F.lead('row_actual_from').over(window_row_actual_to), 1), F.lit('9999-12-31'))
        )
        .select(
            *df_cols, 
            alias_tech_col_names('row_hash'), 
            alias_tech_col_names('row_actual_from').cast('string'), 
            alias_tech_col_names('row_actual_to').cast('string'),
        )
    )
    
    return df_result

df_to_scd2(s1, ['pk1', 'pk2'], ['nonpk1', 'nonpk2', 'nonpk3'], 'ts').orderBy(['pk1', 'pk2', 'ts']).show(100, False)

+---+----+------+------+------+-----------+-------------------+--------------------------------+---------------+-------------+
|pk1|pk2 |nonpk1|nonpk2|nonpk3|nonpk_extra|ts                 |row_hash                        |row_actual_from|row_actual_to|
+---+----+------+------+------+-----------+-------------------+--------------------------------+---------------+-------------+
|v1 |null|null  |null  |c2    |r3         |2023-05-07 15:00:00|56e6807f4b745e20dffeb1b731e5a6d4|2023-05-07     |2023-05-09   |
|v1 |null|null  |null  |null  |null       |2023-05-10 15:00:00|6654c734ccab8f440ff0825eb443dc7f|2023-05-10     |2023-05-10   |
|v1 |null|null  |fds   |null  |null       |2023-05-11 15:00:00|2d2722576095dd7996570b307d777539|2023-05-11     |2023-05-11   |
|v1 |null|null  |fds   |asdf  |null       |2023-05-12 15:00:00|b08363345cd7c1cb14e6f4747ce1563d|2023-05-12     |9999-12-31   |
|v1 |c1  |a1    |b1    |c1    |r1         |2023-05-01 10:00:00|93e6cc4b8b0445cf261e9417106ae6f0|2023-05-01     

In [5]:
df1, df2 = [spark.read.csv(f'scd2_data/df_scd2_join_{i}.csv', sep=';', header=True) for i in range(1, 3)]

In [6]:
df2.show(10, False)

+---+---+--------+-------------------+
|pk1|pk2|phone_id|ts                 |
+---+---+--------+-------------------+
|v1 |c1 |e1      |2023-04-01 10:00:00|
|v1 |c1 |e2      |2023-05-06 12:00:00|
|v1 |c1 |e3      |2023-05-12 12:00:00|
|v1 |c1 |e1      |2023-05-13 12:00:00|
|v1 |c2 |e1      |2023-04-01 10:00:00|
|v1 |c2 |e2      |2023-05-06 12:00:00|
|v1 |c2 |e3      |2023-05-12 12:00:00|
|v1 |c2 |e1      |2023-05-13 12:00:00|
+---+---+--------+-------------------+



In [7]:
df1_scd2, df2_scd2 = [df_to_scd2(df, ['pk1', 'pk2'], [non_pk_col], 'ts').orderBy(['pk1', 'pk2', 'ts']).drop('ts') for df, non_pk_col in zip((df1, df2), ('email_id', 'phone_id'))]

In [8]:
df2_scd2.show(100, False)

+---+---+--------+--------------------------------+---------------+-------------+
|pk1|pk2|phone_id|row_hash                        |row_actual_from|row_actual_to|
+---+---+--------+--------------------------------+---------------+-------------+
|v1 |c1 |e1      |e14f0e80db49cd1501de87adf05f6022|2023-04-01     |2023-05-05   |
|v1 |c1 |e2      |9862c1fb9265b03695dc9a727406c43e|2023-05-06     |2023-05-11   |
|v1 |c1 |e3      |543b4e1fe15d3cd37fc7b9454156f4e1|2023-05-12     |2023-05-12   |
|v1 |c1 |e1      |e14f0e80db49cd1501de87adf05f6022|2023-05-13     |9999-12-31   |
|v1 |c2 |e1      |db078b8d7b629e8c3e11aeaf24952480|2023-04-01     |2023-05-05   |
|v1 |c2 |e2      |284ed4afc0045d818e840896714656ca|2023-05-06     |2023-05-11   |
|v1 |c2 |e3      |87795052bb06129a6007a0dfaad2efef|2023-05-12     |2023-05-12   |
|v1 |c2 |e1      |db078b8d7b629e8c3e11aeaf24952480|2023-05-13     |9999-12-31   |
+---+---+--------+--------------------------------+---------------+-------------+



In [9]:
def validate_df_scd2(df, pk, ): pass

In [10]:
def scd2_join(df1, df2, pk):
    df1, df2 = df1.alias('df1'), df2.alias('df2')
    
    tech_attr = {'row_actual_from', 'row_actual_to', 'row_hash'}
    def get_non_pk_attrs(df):
        all_attrs = set(df.columns)
        pk_attrs = set(pk)
        non_pk_attrs = all_attrs - tech_attr - pk_attrs
        return non_pk_attrs
    
    greatest_from = F.greatest(df1['row_actual_from'], df2['row_actual_from'])
    least_to = F.least(df1['row_actual_to'], df2['row_actual_to'])
    pk_cond_join = ' and '.join([f'df1.{pk_col} = df2.{pk_col}' for pk_col in pk])
    
    cond_scd2_join = F.expr(pk_cond_join) & (greatest_from <= least_to)
    df_joined = (
        df1
        .join(df2, on=cond_scd2_join, how='inner')
    )
    
    df_new_scd2 = (
        df_joined
        .select(
            *[f'df1.{pk_col}' for pk_col in pk], # из-за условия джоина нельзя просто так взять атрибуты из pk
            *get_non_pk_attrs(df1),
            *get_non_pk_attrs(df2),
            greatest_from.alias('row_actual_from'),
            least_to.alias('row_actual_to'),
        )
    )

    return df_new_scd2
scd2_join(df1_scd2, df2_scd2, ['pk1','pk2']).orderBy('pk1', 'pk2','row_actual_from').show(100, False)

+---+---+--------+--------+---------------+-------------+
|pk1|pk2|email_id|phone_id|row_actual_from|row_actual_to|
+---+---+--------+--------+---------------+-------------+
|v1 |c1 |e1      |e1      |2023-05-01     |2023-05-03   |
|v1 |c1 |e2      |e1      |2023-05-04     |2023-05-05   |
|v1 |c1 |e2      |e2      |2023-05-06     |2023-05-09   |
|v1 |c1 |e3      |e2      |2023-05-10     |2023-05-11   |
|v1 |c1 |e1      |e3      |2023-05-12     |2023-05-12   |
|v1 |c1 |e1      |e1      |2023-05-13     |9999-12-31   |
|v1 |c2 |e1      |e1      |2023-05-01     |2023-05-05   |
|v1 |c2 |e1      |e2      |2023-05-06     |2023-05-11   |
|v1 |c2 |e1      |e3      |2023-05-12     |2023-05-12   |
|v1 |c2 |e1      |e1      |2023-05-13     |9999-12-31   |
+---+---+--------+--------+---------------+-------------+



In [11]:
spark.stop()