### Ingest products.csv file
##### The products file is meant for a dimension table
##### We are following SCD - Type 1, whereby the product name will be updated in the dimension table every time there is a change in the source data

##### Let's pass on Data Source name as parameter - It can be passed from Azure Data Factory as well

In [0]:
dbutils.widgets.text("p_data_source", "")
v_data_source = dbutils.widgets.get("p_data_source")

##### Let's pass on Ingestion Date as parameter - It can be passed from Azure Data Factory as well

In [0]:
dbutils.widgets.text("p_file_date", "2021-03-28")
v_file_date = dbutils.widgets.get("p_file_date")

The configuration_ecom.py file contains the folder paths of the data landing zone

In [0]:
%run "./configuration_ecom"

The common_function_ecom.py file contains various functions to handle the routine ETL matters, like adding an ingestion_date column to a dataframe, re-arranging the columns sequence if the columns are not in desired sequence, functions to check if a table exists or not, & last but not least the Merge function to handle insert/update at the same time ;)

In [0]:
%run "./common_functions_ecom"

##### Step 1 - Read the CSV file using the spark dataframe reader API

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType

In [0]:
products_schema = StructType(fields=[StructField("product_Id", IntegerType(), False),
                                    StructField("created_at", DateType(), True),
                                    StructField("product_name", StringType(), True)])

In [0]:
results_df = spark.read \
.schema(products_schema).option("header",True)\
.csv(f"{raw_folder_path}/products/{v_file_date}/products.csv")

##### Step 2 - Rename columns and add new columns

In [0]:
from pyspark.sql.functions import lit

In [0]:
results_with_columns_df = results_df.withColumn("data_source", lit(v_data_source)) \
                                    .withColumn("ingestion_date", lit(v_file_date))
#results_with_columns_df = results_df.withColumnRenamed("created_at", "sourced_at") \

In [0]:
results_with_ingestion_date_df = add_ingestion_date(results_with_columns_df)

##### Step 3 - Drop the unwanted columns

In [0]:
from pyspark.sql.functions import col

In [0]:
results_final_df = results_with_ingestion_date_df.drop(col("data_source"))
#results_final_df.show(10,False)

De-dupe the dataframe

In [0]:
results_deduped_df = results_final_df.dropDuplicates(['product_Id'])
#results_deduped_df = results_final_df.dropDuplicates(['product_Id', 'created_at'])

##### Step 4 - Write to Dataframe to designated Azure Data Lake container/directory in parquet format

The function 'merge_delta_data' will create a table it if does not exist

##### Lets use Merge Function (merge_delta_data) using Python 
Databricks DeltaLake supports ACID & Merge Function
Let's pass on below arguments to Merge Function (located in common_functions_ecom.py)

  Input DataFrame,  
  
  Database Name, 
  
  Table Name, 
  
  Folder Path (Target Table's path), 
  
  Merge Condition 
  
  and Partition Column (not using at the moment due to ir-relevance, 
  
  <b>
Note: products is a small dimension table & does not need partition. The dataframe cache feature will server the purpose to avoid expensive data shuffle.

In [0]:
merge_condition = "tgt.product_Id = src.product_Id"
merge_delta_data(results_deduped_df, 'ecom_growth', 'products', processed_folder_path, merge_condition, 'product_Id')

In [0]:
dbutils.notebook.exit("Success")

Success

In [0]:
%sql
SELECT count(*)
  FROM ecom_growth.products;

count(1)
4


<b> SQL version of Merge Statement

Note:  we have to register a Temporary View named 'temp_view' from DataFrame results_deduped_df
  
%sql
  
MERGE INTO ecom_growth.products

USING temp_view

ON ecom_growth.products.product_Id = temp_view.product_Id

WHEN MATCHED THEN

  UPDATE SET ecom_growth.products.product_name = temp_view.product_name
  
WHEN NOT MATCHED

  THEN INSERT (product_id,created_at,product_name,ingestion_date) VALUES (product_id,created_at,product_name,ingestion_date)