This example reads snapshot data and creates a slowly changing dimension from it using hashing and window functions.

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("SvnLocalSpark") \
    .config("spark.sql.warehouse.dir", "../spark-data-tmp")\
    .config("spark.jars", "../scala-udaf/target/scala-2.13/svn-local-spark_2.13-0.1.0-SNAPSHOT.jar") \
    .master("local")\
    .getOrCreate()

print(f"spark {spark.version} {spark.sparkContext.uiWebUrl}")

spark 3.5.4 http://DESKTOP-4GOMK6M:4041


In [2]:
spark.sql("CREATE SCHEMA IF NOT EXISTS landing")
# create external table
spark.catalog.getDatabase("landing")
spark.catalog.createTable(
    tableName = "landing.commercial_properties",
    source = "csv",
    description = "property values",
    header="true", delimiter=",", path="../../resources/sourcedata/commercial_property_snapshots_100_M39.csv", inferSchema="true")

raw = spark.table("landing.commercial_properties")

In [3]:
java_catalyst_udf = spark.sparkContext._jvm.spark.udaf.SvnFunctionRegistration.registerFunctions(spark._jsparkSession)
java_udf1 = spark.sparkContext._jvm.spark.udaf.LeadUnequalDateString.register(spark._jsparkSession)

In [4]:
from pyspark.sql.functions import *
from pyspark.sql.window import *

snapshot_colname= "date"
from_colname = "valid_from"
to_colname = "valid_to"
key_cols = ["property_id"]
inp_cols = key_cols +[snapshot_colname]
data_cols = [c for c in raw.columns if c not in inp_cols ]

sdw = Window.partitionBy(key_cols).orderBy(snapshot_colname)

spark.sql("CREATE SCHEMA IF NOT EXISTS integration") 

DataFrame[]

In [5]:
scdv1 =  raw.withColumn("hash",sha2(concat(*data_cols),512))\
            .withColumn("prevHash",lag("hash").over(sdw))\
            .where("hash<>prevHash or prevHash IS NULL")\
            .withColumn(to_colname, coalesce(lead(snapshot_colname).over(sdw), to_date(lit("9999-12-31"))))\
            .select(key_cols +[col(snapshot_colname).alias(from_colname)] +[to_colname] + data_cols)

scdv1.write.mode('overwrite').format("parquet").saveAsTable("integration.properties_lead")

In [6]:
scdv2 =  raw.withColumn("hash",sha2(concat(*data_cols),512))\
            .withColumn("prevHash",lag("hash").over(sdw))\
            .withColumn(to_colname, expr('coalesce(lead_unequal(`date`,`hash`) over(partition by property_id ORDER BY `date`), to_date("9999-12-31"))'))\
            .where("hash<>prevHash or prevHash IS NULL")\
            .select(key_cols +[col(snapshot_colname).alias(from_colname)] +[to_colname] + data_cols)

scdv2.write.mode('overwrite').format("parquet").saveAsTable("integration.properties_lead_uneqal")

In [7]:
scdv3 =  raw.withColumn("hash",sha2(concat(*data_cols),512))\
            .withColumn("prevHash",lag("hash").over(sdw))\
            .withColumn(to_colname, expr('coalesce(lead_unequal_date_string(`date`,`hash`) over(partition by property_id ORDER BY `date`  ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING), to_date("9999-12-31"))'))\
            .where("hash<>prevHash or prevHash IS NULL")\
            .select(key_cols +[col(snapshot_colname).alias(from_colname)] +[to_colname] + data_cols)

scdv3.write.mode('overwrite').format("parquet").saveAsTable("integration.properties_lead_uneqal_ds")