In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Data Access

In [0]:
app_id = '18f0628e-d082-4ee4-83a1-21a3c22c4da7'
tenant_id = 'c3621375-72db-4931-8135-0fa380e9ba78'
secret_value = '******'

spark.conf.set("fs.azure.account.auth.type.fantasybookdatastorage.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.fantasybookdatastorage.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.fantasybookdatastorage.dfs.core.windows.net", app_id)
spark.conf.set("fs.azure.account.oauth2.client.secret.fantasybookdatastorage.dfs.core.windows.net", secret_value)
spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.fantasybookdatastorage.dfs.core.windows.net", "https://login.microsoftonline.com/c3621375-72db-4931-8135-0fa380e9ba78/oauth2/token")

# Creating Dimension

## Create Flag Parameter

In [0]:
dbutils.widgets.text("incremental_flag", "0")

incremental_flag = dbutils.widgets.get("incremental_flag")
print(incremental_flag)

1


## Authors Dimension Table

### Select Relevant Columns

In [0]:
df_authors_src = spark.sql('''
                select authorKey, name, birth_date, death_date, lastModifiedDate
                from parquet.`abfss://silver@fantasybookdatastorage.dfs.core.windows.net/authors`
                ''')

df_authors_src.display()

authorKey,name,birth_date,death_date,lastModifiedDate
OL2869173A,Alex Holder,,,2008-04-29T15:03:11.581851Z
OL1389492A,Kathleen Olmstead,,,2008-09-07T15:16:38.427017Z
OL3237537A,Mary E. Pearson,,,2008-04-30T09:38:13.731961Z
OL220579A,Norma Fox Mazer,1931,,2021-04-27T01:36:43.149269Z
OL7360388A,Nik Kerry,,,2016-11-17T19:56:19.563866Z
OL200181A,Max Velthuijs,1923,,2008-09-06T22:29:13.235957Z
OL1759717A,Barbara Softly,,,2010-12-05T02:28:43.891306Z
OL1221336A,Wilfried Blecher,,,2008-08-18T22:48:36.960933Z
OL15017001A,Ud Din,,,2025-02-21T21:05:19.661035Z
OL7524496A,朝霧カフカ,1984 March 17,,2025-04-10T21:33:49.094528Z


### Authors Dimension Table Sink - Initial and Incremental Load

In [0]:
gold_authors_path = 'abfss://gold@fantasybookdatastorage.dfs.core.windows.net/authors'
gold_authors_path_files = dbutils.fs.ls(gold_authors_path)

if len(gold_authors_path_files) == 0:
    df_authors_sink = spark.sql('''
                        select distinct(authorKey) as authorKey, name, birth_date, death_date, lastModifiedDate, 1 as dimAuthorKey
                        from parquet.`abfss://silver@fantasybookdatastorage.dfs.core.windows.net/authors`
                        where 1=0 
                        ''')
else:
    df_authors_sink = spark.sql('''
                        select distinct(authorKey) as authorKey, name, birth_date, death_date, lastModifiedDate, dimAuthorKey
                        from delta.`abfss://gold@fantasybookdatastorage.dfs.core.windows.net/authors`
                        ''')
    
df_authors_sink.display()

authorKey,name,birth_date,death_date,lastModifiedDate,dimAuthorKey


### Filtering between existing and new records

In [0]:
df_authors_filter = df_authors_src\
                    .join(df_authors_sink, df_authors_src["authorKey"] == df_authors_sink["authorKey"], 'left')\
                    .select(df_authors_src["authorKey"], df_authors_src["name"], df_authors_src["birth_date"], df_authors_src["death_date"], df_authors_src["lastModifiedDate"], df_authors_sink["dimAuthorKey"])

df_authors_filter_existing = df_authors_filter.filter(col("dimAuthorKey").isNotNull())
df_authors_filter_new = df_authors_filter.filter(col("dimAuthorKey").isNull())\
                        .select(df_authors_filter["authorKey"], df_authors_filter["name"], df_authors_filter["birth_date"], df_authors_filter["death_date"], df_authors_filter["lastModifiedDate"])

df_authors_filter_new.display()

authorKey,name,birth_date,death_date,lastModifiedDate
OL2869173A,Alex Holder,,,2008-04-29T15:03:11.581851Z
OL1389492A,Kathleen Olmstead,,,2008-09-07T15:16:38.427017Z
OL3237537A,Mary E. Pearson,,,2008-04-30T09:38:13.731961Z
OL220579A,Norma Fox Mazer,1931,,2021-04-27T01:36:43.149269Z
OL7360388A,Nik Kerry,,,2016-11-17T19:56:19.563866Z
OL200181A,Max Velthuijs,1923,,2008-09-06T22:29:13.235957Z
OL1759717A,Barbara Softly,,,2010-12-05T02:28:43.891306Z
OL1221336A,Wilfried Blecher,,,2008-08-18T22:48:36.960933Z
OL15017001A,Ud Din,,,2025-02-21T21:05:19.661035Z
OL7524496A,朝霧カフカ,1984 March 17,,2025-04-10T21:33:49.094528Z


### Create Surrogate/Dimension Key

In [0]:
if incremental_flag == '0':
    max_dim_key_value = 0
else:
    max_dim_key_value_df = spark.sql("select max(dimAuthorKey) from delta.`abfss://gold@fantasybookdatastorage.dfs.core.windows.net/authors`")
    max_dim_key_value = max_dim_key_value_df.collect()[0][0]

df_authors_filter_new = df_authors_filter_new.withColumn("dimAuthorKey", lit(max_dim_key_value + monotonically_increasing_id() + 1))
df_authors_filter_new.display()

authorKey,name,birth_date,death_date,lastModifiedDate,dimAuthorKey
OL2869173A,Alex Holder,,,2008-04-29T15:03:11.581851Z,1
OL1389492A,Kathleen Olmstead,,,2008-09-07T15:16:38.427017Z,2
OL3237537A,Mary E. Pearson,,,2008-04-30T09:38:13.731961Z,3
OL220579A,Norma Fox Mazer,1931,,2021-04-27T01:36:43.149269Z,4
OL7360388A,Nik Kerry,,,2016-11-17T19:56:19.563866Z,5
OL200181A,Max Velthuijs,1923,,2008-09-06T22:29:13.235957Z,6
OL1759717A,Barbara Softly,,,2010-12-05T02:28:43.891306Z,7
OL1221336A,Wilfried Blecher,,,2008-08-18T22:48:36.960933Z,8
OL15017001A,Ud Din,,,2025-02-21T21:05:19.661035Z,9
OL7524496A,朝霧カフカ,1984 March 17,,2025-04-10T21:33:49.094528Z,10


### Create New Final Authors Dimension Table

In [0]:
df_authors_final = df_authors_filter_existing.union(df_authors_filter_new)
df_authors_final.display()

authorKey,name,birth_date,death_date,lastModifiedDate,dimAuthorKey
OL2869173A,Alex Holder,,,2008-04-29T15:03:11.581851Z,1
OL1389492A,Kathleen Olmstead,,,2008-09-07T15:16:38.427017Z,2
OL3237537A,Mary E. Pearson,,,2008-04-30T09:38:13.731961Z,3
OL220579A,Norma Fox Mazer,1931,,2021-04-27T01:36:43.149269Z,4
OL7360388A,Nik Kerry,,,2016-11-17T19:56:19.563866Z,5
OL200181A,Max Velthuijs,1923,,2008-09-06T22:29:13.235957Z,6
OL1759717A,Barbara Softly,,,2010-12-05T02:28:43.891306Z,7
OL1221336A,Wilfried Blecher,,,2008-08-18T22:48:36.960933Z,8
OL15017001A,Ud Din,,,2025-02-21T21:05:19.661035Z,9
OL7524496A,朝霧カフカ,1984 March 17,,2025-04-10T21:33:49.094528Z,10


# Creating SCD Type 1 (Upsert)

In [0]:
from delta.tables import DeltaTable


## Authors Dimension Table

In [0]:
# Initial Run
if len(gold_authors_path_files) == 0:
    df_authors_final.write.format("delta")\
                .mode("overwrite")\
                .option("path", gold_authors_path)\
                .saveAsTable("authorsDimTable")

# Incremental Run
else:
    authorsDim_deltatable = DeltaTable.forPath(spark, gold_authors_path)
    authorsDim_deltatable.alias("authorsDimTable")\
                    .merge(df_authors_final.alias("updates"), "authorsDimTable.dimAuthorKey = updates.dimAuthorKey")\
                    .whenMatchedUpdateAll()\
                    .whenNotMatchedInsertAll()\
                    .execute()