Skip to content

maye-msft/SlowlyChangingDimensionsInDeltaLake

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

15 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Slowly Changing Dimensions implementation with Azure Databricks Delta Lake

pypi

Installation

Install dbxscd package in Azure Databricks notebook

%pip install dbxscd

Run Example

Import Notebook

  • Click "Run All" button

What is Slowly Changing Dimension

Slowly Changing Dimensions (SCD) are dimensions which change over time and in Data Warehouse we need to track the changes of the attributes keep the accuracy of the report.

And typically there are three types of SCD

  • Type 1: SCD1, No history preservation
  • Type 2: SCD2, Unlimited history preservation and new rows
  • Type 3: SCD3, Limited history preservation

For example we have a dataset

ShortName Fruit Color Price
FA Fiji Apple Red 3.6
BN Banana Yellow 1
GG Green Grape Green 2
RG Red Grape Red 2

If we change the price of "Fiji Apple" into 3.5, the dataset will be

with SCD1:

ShortName Fruit Color Price
FA Fiji Apple Red 3.5
BN Banana Yellow 1
GG Green Grape Green 2
RG Red Grape Red 2

with SCD2:

ShortName Fruit Color Price is_last
FA Fiji Apple Red 3.5 Y
FA Fiji Apple Red 3.6 N
BN Banana Yellow 1 Y
GG Green Grape Green 2 Y
RG Red Grape Red 2 Y

with SCD3:

ShortName Fruit Color Price Color_old Price_old
FA Fiji Apple Red 3.5 Red 3.6
BN Banana Yellow 1 NULL NULL
GG Green Grape Green 2 NULL NULL
RG Red Grape Red 2 NULL NULL

SCD implementation in Databricks

In this repository, there are implementations of SCD1, SCD2 and SCD3 in python and Databricks Delta Lake.

  1. SCD1
dbxscd.SCD1(spark, df, target_table_name, target_partition_keys, key_cols, current_time):

Parameters:

  • spark: instance of spark session
  • df: source data frame
  • target_table_name: target table name
  • target_partition_keys: partition key of the target table
  • key_cols: columns of the key for each row
  • current_time: current timestamp

Here is the code to show an example of SCD1

from pyspark.sql import functions as F
from pyspark.sql import DataFrame
import datetime

# create sample dataset
df1 = spark.createDataFrame([('FA', 'Fiji Apple', 'Red', 3.5)
                           ,('BN', 'Banana', 'Yellow', 1.0)
                           ,('GG', 'Green Grape', 'Green', 2.0)
                           ,('RG', 'Red Grape', 'Red', 2.0)], 
                           ['ShortName','Fruit', 'Color', 'Price'])
# prepare parameters
current_time = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
target_partition_keys = ['ShortName']
key_cols = "ShortName,Fruit"
target_table_name_scd1 = 'default.table_scd1'
# call the SCD1 function
dbxscd.SCD1(spark, df1, target_table_name_scd1, target_partition_keys, key_cols, current_time)
# display the result
display(spark.sql(f"select * from {target_table_name_scd1}"))

Image of SCD1

Change the price of "Fiji Apple" into 3.5 and run SCD1 again

df2 = spark.createDataFrame([('FA', 'Fiji Apple', 'Red', 3.6)
                           ,('BN', 'Banana', 'Yellow', 1.0)
                           ,('GG', 'Green Grape', 'Green', 2.0)
                           ,('RG', 'Red Grape', 'Red', 2.0)], 
                           ['ShortName','Fruit', 'Color', 'Price'])
# call the SCD1 function again
dbxscd.SCD1(spark, df2, target_table_name_scd1, target_partition_keys, key_cols, current_time)
display(spark.sql(f"select * from {target_table_name_scd1}"))

Image of SCD1

  1. SCD2
dbxscd.SCD2(spark, df, target_table_name, target_partition_keys, key_cols, current_time):

Parameters:

  • spark: instance of spark session
  • df: source data frame
  • target_table_name: target table name
  • target_partition_keys: partition key of the target table
  • key_cols: columns of the key for each row
  • current_time: current timestamp

Here is the code to show an example of SCD2

from pyspark.sql import functions as F
from pyspark.sql import DataFrame
import datetime

# create sample dataset
df1 = spark.createDataFrame([('FA', 'Fiji Apple', 'Red', 3.5)
                           ,('BN', 'Banana', 'Yellow', 1.0)
                           ,('GG', 'Green Grape', 'Green', 2.0)
                           ,('RG', 'Red Grape', 'Red', 2.0)], 
                           ['ShortName','Fruit', 'Color', 'Price'])
# prepare parameters
current_time = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
target_partition_keys = ['ShortName']
key_cols = "ShortName,Fruit"
target_table_name_scd2 = 'default.table_scd2'
# call the SCD1 function
dbxscd.SCD2(spark, df1, target_table_name_scd1, target_partition_keys, key_cols, current_time)
# display the result
display(spark.sql(f"select * from {target_table_name_scd2}"))

Image of SCD1

Change the price of "Fiji Apple" into 3.5 and run SCD2 again

df2 = spark.createDataFrame([('FA', 'Fiji Apple', 'Red', 3.6)
                           ,('BN', 'Banana', 'Yellow', 1.0)
                           ,('GG', 'Green Grape', 'Green', 2.0)
                           ,('RG', 'Red Grape', 'Red', 2.0)], 
                           ['ShortName','Fruit', 'Color', 'Price'])
# call the SCD1 function again
dbxscd.SCD2(spark, df2, target_table_name_scd2, target_partition_keys, key_cols, current_time)
display(spark.sql(f"select * from {target_table_name_scd2}"))

Image of SCD1

  1. SCD3
dbxscd.SCD3(spark, df, target_table_name, target_partition_keys, key_cols, current_time):

Parameters:

  • spark: instance of spark session
  • df: source data frame
  • target_table_name: target table name
  • target_partition_keys: partition key of the target table
  • key_cols: columns of the key for each row
  • current_time: current timestamp

Here is the code to show an example of SCD3

from pyspark.sql import functions as F
from pyspark.sql import DataFrame
import datetime

# create sample dataset
df1 = spark.createDataFrame([('FA', 'Fiji Apple', 'Red', 3.5)
                           ,('BN', 'Banana', 'Yellow', 1.0)
                           ,('GG', 'Green Grape', 'Green', 2.0)
                           ,('RG', 'Red Grape', 'Red', 2.0)], 
                           ['ShortName','Fruit', 'Color', 'Price'])
# prepare parameters
current_time = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
target_partition_keys = ['ShortName']
key_cols = "ShortName,Fruit"
target_table_name_scd3 = 'default.table_scd3'
# call the SCD3 function
dbxscd.SCD3(spark, df1, target_table_name_scd3, target_partition_keys, key_cols, current_time)
# display the result
display(spark.sql(f"select * from {target_table_name_scd3}"))

Image of SCD3

Change the price of "Fiji Apple" into 3.5 and run SCD3 again

df2 = spark.createDataFrame([('FA', 'Fiji Apple', 'Red', 3.6)
                           ,('BN', 'Banana', 'Yellow', 1.0)
                           ,('GG', 'Green Grape', 'Green', 2.0)
                           ,('RG', 'Red Grape', 'Red', 2.0)], 
                           ['ShortName','Fruit', 'Color', 'Price'])
# call the SCD1 function again
dbxscd.SCD3(spark, df2, target_table_name_scd3, target_partition_keys, key_cols, current_time)
display(spark.sql(f"select * from {target_table_name_scd3}"))

Image of SCD3

About

Slowly Changing Dimensions In Databricks Delta Lake

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages