In [1]:
# Reading data from bronze_scm

scm_data_loc = "Files/bronze_scm/*.txt"
scm_df = (spark.read.format("csv")
                    .option("header", True)
                    .option("delimiter", ",")
                    .option("inferSchema", True)
                    .load(scm_data_loc)
)
display(scm_df.head(5))

StatementMeta(, c5adf2fb-3250-4804-8779-5bd75d30f381, 3, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, ef89bc64-bbd6-4299-875d-f264833c3df7)

In [2]:
# Reviewing all of the columns in a current DataFrame

scm_df.columns

StatementMeta(, c5adf2fb-3250-4804-8779-5bd75d30f381, 4, Finished, Available, Finished)

['Type',
 'Days_for_shipping_(real)',
 'Days_for_shipment_(scheduled)',
 'Sales_per_customer',
 'Delivery_Status',
 'Late_delivery_risk',
 'Category_Id',
 'Category_Name',
 'Customer_City',
 'Customer_Fname',
 'Customer_Id',
 'Customer_Lname',
 'Customer_Segment',
 'Customer_State',
 'Department_Name',
 'Latitude',
 'Longitude',
 'Market',
 'Order_City',
 'Order_Country',
 'order_date_(DateOrders)',
 'Order_Id',
 'Order_Item_Discount',
 'Order_Item_Discount_Rate',
 'Order_Item_Product_Price',
 'Order_Item_Quantity',
 'Order_Item_Total',
 'Order_Profit_Per_Order',
 'Order_Region',
 'Order_State',
 'Order_Status',
 'Product_Category_Id',
 'Product_Name',
 'Product_Price',
 'shipping_date_(DateOrders)',
 'Shipping_Mode']

In [3]:
# Adding timestamp columns

from pyspark.sql.functions import current_timestamp

refined_df = scm_df \
       .withColumn("created_ts", current_timestamp()) \
       .withColumn("modified_ts", current_timestamp())

StatementMeta(, c5adf2fb-3250-4804-8779-5bd75d30f381, 5, Finished, Available, Finished)

In [4]:
# Combining two columns 

from pyspark.sql.functions import concat_ws

refined_df = refined_df.withColumn(
    "customer_fullname",
    concat_ws(" ", refined_df.Customer_Fname, refined_df.Customer_Lname)
)

StatementMeta(, c5adf2fb-3250-4804-8779-5bd75d30f381, 6, Finished, Available, Finished)

In [5]:
# Adjusting DataFrame

refined_df = refined_df.drop("Customer_Fname", "Customer_Lname")

StatementMeta(, c5adf2fb-3250-4804-8779-5bd75d30f381, 7, Finished, Available, Finished)

In [6]:
refined_df.columns

StatementMeta(, c5adf2fb-3250-4804-8779-5bd75d30f381, 8, Finished, Available, Finished)

['Type',
 'Days_for_shipping_(real)',
 'Days_for_shipment_(scheduled)',
 'Sales_per_customer',
 'Delivery_Status',
 'Late_delivery_risk',
 'Category_Id',
 'Category_Name',
 'Customer_City',
 'Customer_Id',
 'Customer_Segment',
 'Customer_State',
 'Department_Name',
 'Latitude',
 'Longitude',
 'Market',
 'Order_City',
 'Order_Country',
 'order_date_(DateOrders)',
 'Order_Id',
 'Order_Item_Discount',
 'Order_Item_Discount_Rate',
 'Order_Item_Product_Price',
 'Order_Item_Quantity',
 'Order_Item_Total',
 'Order_Profit_Per_Order',
 'Order_Region',
 'Order_State',
 'Order_Status',
 'Product_Category_Id',
 'Product_Name',
 'Product_Price',
 'shipping_date_(DateOrders)',
 'Shipping_Mode',
 'created_ts',
 'modified_ts',
 'customer_fullname']

In [7]:
# Renaming some columns

refined_df = refined_df \
                .withColumnRenamed('Type', 'type') \
                .withColumnRenamed('Days_for_shipping_(real)', 'shipping_days_actual') \
                .withColumnRenamed('Days_for_shipment_(scheduled)', 'shipping_days_scheduled') \
                .withColumnRenamed('Sales_per_customer', 'sales_actual') \
                .withColumnRenamed('Delivery_Status', 'delivery_status') \
                .withColumnRenamed('Late_delivery_risk', 'late_delivery_risk') \
                .withColumnRenamed('Category_Id', 'category_id') \
                .withColumnRenamed('Category_Name', 'category_name') \
                .withColumnRenamed('Customer_City', 'customer_city') \
                .withColumnRenamed('Customer_Id', 'customer_id') \
                .withColumnRenamed('Customer_Segment', 'customer_segment') \
                .withColumnRenamed('Customer_State', 'customer_state') \
                .withColumnRenamed('Department_Name', 'department_name') \
                .withColumnRenamed('Latitude', 'latitude') \
                .withColumnRenamed('Longitude', 'longitude') \
                .withColumnRenamed('Market', 'market') \
                .withColumnRenamed('Order_City', 'order_city') \
                .withColumnRenamed('Order_Country', 'order_country') \
                .withColumnRenamed('order_date_(DateOrders)', 'order_date') \
                .withColumnRenamed('Order_Id', 'order_id') \
                .withColumnRenamed('Order_Item_Discount', 'order_dis') \
                .withColumnRenamed('Order_Item_Discount_Rate', 'order_dis_rate') \
                .withColumnRenamed('Order_Item_Product_Price', 'order_price') \
                .withColumnRenamed('Order_Item_Quantity', 'order_qty') \
                .withColumnRenamed('Order_Item_Total', 'order_item_total') \
                .withColumnRenamed('Order_Profit_Per_Order', 'order_profit') \
                .withColumnRenamed('Order_Region', 'order_region') \
                .withColumnRenamed('Order_State', 'order_state') \
                .withColumnRenamed('Order_Status', 'order_status') \
                .withColumnRenamed('Product_Category_Id', 'product_id') \
                .withColumnRenamed('Product_Name', 'product_name') \
                .withColumnRenamed('Product_Price', 'product_price') \
                .withColumnRenamed('shipping_date_(DateOrders)', 'shipping_date') \
                .withColumnRenamed('Shipping_Mode', 'shipping_mode')

StatementMeta(, c5adf2fb-3250-4804-8779-5bd75d30f381, 9, Finished, Available, Finished)

In [8]:
# Handling Null values

from pyspark.sql.functions import col, sum, when

null_counts = refined_df.select([
    sum(when(col(c).isNull(), 1).otherwise(0)).alias(c)
    for c in refined_df.columns
])

null_counts.show()

StatementMeta(, c5adf2fb-3250-4804-8779-5bd75d30f381, 10, Finished, Available, Finished)

+----+--------------------+-----------------------+------------+---------------+------------------+-----------+-------------+-------------+-----------+----------------+--------------+---------------+--------+---------+------+----------+-------------+----------+--------+---------+--------------+-----------+---------+----------------+------------+------------+-----------+------------+----------+------------+-------------+-------------+-------------+----------+-----------+-----------------+
|type|shipping_days_actual|shipping_days_scheduled|sales_actual|delivery_status|late_delivery_risk|category_id|category_name|customer_city|customer_id|customer_segment|customer_state|department_name|latitude|longitude|market|order_city|order_country|order_date|order_id|order_dis|order_dis_rate|order_price|order_qty|order_item_total|order_profit|order_region|order_state|order_status|product_id|product_name|product_price|shipping_date|shipping_mode|created_ts|modified_ts|customer_fullname|
+----+--------

In [9]:
# Deduplicating records

from pyspark.sql.functions import count

refined_df.groupBy(refined_df.columns) \
    .count() \
    .filter("count > 1") \
    .show()

StatementMeta(, c5adf2fb-3250-4804-8779-5bd75d30f381, 11, Finished, Available, Finished)

+----+--------------------+-----------------------+------------+---------------+------------------+-----------+-------------+-------------+-----------+----------------+--------------+---------------+--------+---------+------+----------+-------------+----------+--------+---------+--------------+-----------+---------+----------------+------------+------------+-----------+------------+----------+------------+-------------+-------------+-------------+----------+-----------+-----------------+-----+
|type|shipping_days_actual|shipping_days_scheduled|sales_actual|delivery_status|late_delivery_risk|category_id|category_name|customer_city|customer_id|customer_segment|customer_state|department_name|latitude|longitude|market|order_city|order_country|order_date|order_id|order_dis|order_dis_rate|order_price|order_qty|order_item_total|order_profit|order_region|order_state|order_status|product_id|product_name|product_price|shipping_date|shipping_mode|created_ts|modified_ts|customer_fullname|count|
+-

In [10]:
# Defining the schema for the silver_scm table
    
from pyspark.sql.types import *
from delta.tables import *
    
DeltaTable.createIfNotExists(spark) \
    .tableName("silver_scm") \
    .addColumn("type", StringType()) \
    .addColumn("shipping_days_actual", IntegerType()) \
    .addColumn("shipping_days_scheduled", IntegerType()) \
    .addColumn("sales_actual", FloatType()) \
    .addColumn("delivery_status", StringType()) \
    .addColumn("late_delivery_risk", StringType()) \
    .addColumn("category_id", StringType()) \
    .addColumn("category_name", StringType()) \
    .addColumn("customer_city", StringType()) \
    .addColumn("customer_id", StringType()) \
    .addColumn("customer_segment", StringType()) \
    .addColumn("customer_state", StringType()) \
    .addColumn("department_name", StringType()) \
    .addColumn("latitude", DoubleType()) \
    .addColumn("longitude", DoubleType()) \
    .addColumn("market", StringType()) \
    .addColumn("order_city", StringType()) \
    .addColumn("order_country", StringType()) \
    .addColumn("order_date", DateType()) \
    .addColumn("order_id", StringType()) \
    .addColumn("order_dis", FloatType()) \
    .addColumn("order_dis_rate", FloatType()) \
    .addColumn("order_price", FloatType()) \
    .addColumn("order_qty", IntegerType()) \
    .addColumn("order_item_total", FloatType()) \
    .addColumn("order_profit", FloatType()) \
    .addColumn("order_region", StringType()) \
    .addColumn("order_state", StringType()) \
    .addColumn("order_status", StringType()) \
    .addColumn("product_id", StringType()) \
    .addColumn("product_name", StringType()) \
    .addColumn("product_price", FloatType()) \
    .addColumn("shipping_date", DateType()) \
    .addColumn("shipping_mode", StringType()) \
    .addColumn("created_ts", DateType()) \
    .addColumn("modified_ts", DateType()) \
    .addColumn("customer_fullname", StringType()) \
    .execute()


StatementMeta(, c5adf2fb-3250-4804-8779-5bd75d30f381, 12, Finished, Available, Finished)

<delta.tables.DeltaTable at 0x75d06413f700>

In [11]:
deltaTable = DeltaTable.forPath(spark, 'Tables/silver_scm')
dfUpdates = refined_df

deltaTable.alias('silver') \
          .merge(
            dfUpdates.alias('updates'),
            'silver.customer_id = updates.customer_id and silver.order_date = updates.order_date and silver.order_id = updates.order_id and silver.product_id = updates.product_id and silver.category_id = updates.category_id'
          ) \
          .whenMatchedUpdate(
            set = {}
          ) \
          .whenNotMatchedInsert(
            values = {
                "type": "updates.type",
                "shipping_days_actual": "updates.shipping_days_actual",
                "shipping_days_scheduled": "updates.shipping_days_scheduled",
                "sales_actual": "updates.sales_actual",
                "delivery_status": "updates.delivery_status",
                "late_delivery_risk": "updates.late_delivery_risk",
                "category_id": "updates.category_id",
                "category_name": "updates.category_name",
                "customer_city": "updates.customer_city",
                "customer_id": "updates.customer_id",
                "customer_segment": "updates.customer_segment",
                "customer_state": "updates.customer_state",
                "department_name": "updates.department_name",
                "latitude": "updates.latitude",
                "longitude": "updates.longitude",
                "market": "updates.market",
                "order_city": "updates.order_city",
                "order_country": "updates.order_country",
                "order_date": "updates.order_date",
                "order_id": "updates.order_id",
                "order_dis": "updates.order_dis",
                "order_dis_rate": "updates.order_dis_rate",
                "order_price": "updates.order_price",
                "order_qty": "updates.order_qty",
                "order_item_total": "updates.order_item_total",
                "order_profit": "updates.order_profit",
                "order_region": "updates.order_region",
                "order_state": "updates.order_state",
                "order_status": "updates.order_status",
                "product_id": "updates.product_id",
                "product_name": "updates.product_name",
                "product_price": "updates.product_price",
                "shipping_date": "updates.shipping_date",
                "shipping_mode": "updates.shipping_mode",
                "created_ts": "updates.created_ts",
                "modified_ts": "updates.modified_ts",
                "customer_fullname": "updates.customer_fullname"
            }
          ) \
          .execute()

StatementMeta(, c5adf2fb-3250-4804-8779-5bd75d30f381, 13, Finished, Available, Finished)