## CREATE CATALOG

In [0]:
%sql
CREATE CATALOG CARS_CATALOG;

In [0]:
%sql
USE CATALOG cars_catalog;

## CREATE SCHEMA


In [0]:
%sql
CREATE SCHEMA CARS_CATALOG.SILVER;
CREATE SCHEMA CARS_CATALOG.GOLD;

## Data Loading

In [0]:
df_sales=spark.read.format('csv')\
                   .option('header',True)\
                   .option('inferSchema',True)\
                   .option('recursiveFileLookup',True)\
                   .load('abfss://car-sales-project@mystorageaccountrachit.dfs.core.windows.net/bronze')

In [0]:
df_sales.display()

## Data transformation

In [0]:
from pyspark.sql.functions import *
df_sales=df_sales.withColumn('model_category',split(col('Model_ID'),'-')[0])

In [0]:
from pyspark.sql.types import *
df_sales=df_sales.withColumn('Revenue', col('Revenue').cast(DoubleType()))

In [0]:
df_sales.display()

In [0]:
df_sales=df_sales.withColumn('Cost per unit',col('Revenue')/col('Units_Sold'))

In [0]:
df_sales.groupBy(col('Year'),col('BranchName')).agg(sum('Units_Sold').alias('Total Units')).orderBy([col('Year'),col('BranchName')],acsending=[True,True]).display()

Databricks visualization. Run in Databricks to view.

## Data Writing

In [0]:
df_sales.write.format('parquet')\
              .mode('append')\
              .save('abfss://car-sales-project@mystorageaccountrachit.dfs.core.windows.net/silver')

## Querying Silver Tables

In [0]:
%sql
SELECT * FROM parquet.`abfss://car-sales-project@mystorageaccountrachit.dfs.core.windows.net/silver`;

## Gold Layer

#### CREATE FLAG PARAMETER

In [0]:
dbutils.widgets.text('incremental_flag','0')

In [0]:
incremental_flag= dbutils.widgets.get('incremental_flag')

In [0]:
print(incremental_flag)

### Creating Dimension model

In [0]:
%sql
SELECT * FROM parquet.`abfss://car-sales-project@mystorageaccountrachit.dfs.core.windows.net/silver`
LIMIT 5;

#### Fetch Relative columns

In [0]:
df_src=spark.sql('''
SELECT DISTINCT MODEL_ID,MODEL_CATEGORY 
FROM parquet.`abfss://car-sales-project@mystorageaccountrachit.dfs.core.windows.net/silver`
''')

In [0]:
df_src.limit(5).display()

#### Dim model sink (Initial and Incremental)

In [0]:
if spark.catalog.tableExists('cars_catalog.gold.dim_model'):
    df_sink=spark.sql('''
    SELECT dim_model_key,Model_ID,model_category
    FROM PARQUET.`abfss://car-sales-project@mystorageaccountrachit.dfs.core.windows.net/silver/`
    ''')
else:
    df_sink=spark.sql('''
    SELECT null as dim_model_key,Model_ID,model_category
    FROM PARQUET.`abfss://car-sales-project@mystorageaccountrachit.dfs.core.windows.net/silver/`
    WHERE 1=0
    ''')

In [0]:
df_sink.display()

## Filtering new and old records

In [0]:
df_filter=df_src.join(df_sink,df_src['Model_Id']==df_sink['Model_Id'],how='left').select(df_src['Model_Id'],df_src['model_category'],'dim_model_key')

**df_filter_old**

In [0]:
%python
from pyspark.sql.functions import *


In [0]:
df_filter_old=df_filter.filter(col('dim_model_key').isNotNull())
df_filter_old.display()

**df_filter_new**

In [0]:
df_filter_new=df_filter.filter(col('dim_model_key').isNull()).select(col('Model_Id'),col('model_category'))
df_filter_new.display()

### Create surrogate key

#### Fetch the max surrogate key value from existing table

In [0]:
if(incremental_flag=='0'):   #in case of inital load
    max_value=1
else:                     # in case of incremental load
    max_value_df=spark.sql('''select max(dim_model_key) from cars_catalog.gold.dim_model''')  #will return dataframe
    max_value=max_value_df.collect()[0][0]      ## To fetch the value

#### Create surrogate key column and add the max surrogate key

In [0]:
df_filter_new=df_filter_new.withColumn('dim_model_key',max_value+monotonically_increasing_id())
df_filter_new.display()

#### Create final df

In [0]:
df_final=df_filter_old.union(df_filter_new)

In [0]:
df_final.display()

## SCD TYPE-1 (Upsert)

UPSERT= UPDATE+INSERT

In [0]:
from delta.tables import DeltaTable

In [0]:
if spark.catalog.tableExists('cars_catalog.gold.dim_model'):        
    delta_tbl=DeltaTable.forPath(spark,f'abfss://car-sales-project@mystorgeaccountrachit.dfs.core.windows.net/gold/dim_model')
    delta_tbl.alias('target').merge(df_final.alias('source'),target.dim_model_key==source.dim_model_key)\
                             .whenMatchedUpdateAll()\
                             .whenNotMatchedInsertAll()\
                             .execute()

#initial run
else:         
    df_final.write.format('delta')\
            .mode('overwrite')\
            .option("path",'abfss://car-sales-project@mystorageaccountrachit.dfs.core.windows.net/gold/dim_model')\
            .saveAsTable('cars_catalog.gold.dim_model')                                                    
    

#### Dim Branch sink (Initial and Incremental)

In [0]:
df_src=spark.sql('''
SELECT DISTINCT BRANCH_ID,BRANCHNAME 
FROM parquet.`abfss://car-sales-project@mystorageaccountrachit.dfs.core.windows.net/silver`
''')

In [0]:
if spark.catalog.tableExists('cars_catalog.gold.dim_branch'):
    df_branch_sink=spark.sql('''
    SELECT dim_branch_key,Branch_ID,BranchName
    FROM PARQUET.`abfss://car-sales-project@mystorageaccountrachit.dfs.core.windows.net/silver/`
    ''')
else:
    df_branch_sink=spark.sql('''
    SELECT null as dim_branch_key,Branch_ID,BranchName
    FROM PARQUET.`abfss://car-sales-project@mystorageaccountrachit.dfs.core.windows.net/silver/`
    WHERE 1=0
    ''')

## Filtering new and old records

In [0]:
df_filter=df_src.join(df_branch_sink,df_src['Branch_Id']==df_branch_sink['Branch_Id'],how='left').select(df_src['Branch_Id'],df_src['BranchName'],'dim_branch_key')

**df_filter_old**

In [0]:
df_filter_old=df_filter.filter(col('dim_branch_key').isNotNull())
df_filter_old.display()


**df_filter_new**

In [0]:
df_filter_new=df_filter.filter(col('dim_branch_key').isNull()).select(col('Branch_Id'),col('BranchName'))
df_filter_new.display()

### Create surrogate key

#### Fetch the max surrogate key value from existing table

In [0]:
if(incremental_flag=='0'):   #in case of inital load
    max_value=1
else:                     # in case of incremental load
    max_value_df=spark.sql('''select max(dim_branch_key) from cars_catalog.gold.dim_branch''')  #will return dataframe
    max_value=max_value_df.collect()[0][0]      ## To fetch the value

#### Create surrogate key column and add the max surrogate key

In [0]:
df_filter_new=df_filter_new.withColumn('dim_branch_key',max_value+monotonically_increasing_id())
df_filter_new.display()

#### Create final df

In [0]:
df_final=df_filter_old.union(df_filter_new)
df_final.display()

## SCD TYPE-1 (Upsert)

In [0]:
if spark.catalog.tableExists('cars_catalog.gold.dim_branch'):        
    delta_tbl_branch=DeltaTable.forPath(spark,f'abfss://car-sales-project@mystorgeaccountrachit.dfs.core.windows.net/gold/dim_branch')
    delta_tbl_branch.alias('target').merge(df_final.alias('source'),target.dim_branch_key==source.dim_branch_key)\
                             .whenMatchedUpdateAll()\
                             .whenNotMatchedInsertAll()\
                             .execute()

#initial run
else:         
    df_final.write.format('delta')\
            .mode('overwrite')\
            .option("path",'abfss://car-sales-project@mystorageaccountrachit.dfs.core.windows.net/gold/dim_branch')\
            .saveAsTable('cars_catalog.gold.dim_branch')                                                    
    

In [0]:
%sql
SELECT * FROM cars_catalog.gold.dim_branch;