In [0]:
%run "./Assignment - utilities"

### Import libraries

In [0]:
import pyspark.sql.functions as F

Pull data from raw tables

In [0]:
schemaName = "raw"
product_raw_df = fetchTableData(schemaName, "raw_product")
customer_raw_df = fetchTableData(schemaName, "raw_customer")
orders_raw_df = fetchTableData(schemaName, "raw_orders")

### Process Product Data

In [0]:
productColumnsTobeBeAdded = {"price": F.round(F.col("price_per_product"), 3)}
product_columns_added_df = addColumns(product_raw_df, productColumnsTobeBeAdded)

### Drop columns not required

productColumnsToBeRemoved = ["price_per_product"]
product_columns_dropped_df = dropColumns(product_columns_added_df, productColumnsToBeRemoved)

### Rename Columns

productColumnsToBeRenamed = {"price": "price_per_product"}
product_cleaned_df = renameColumns(product_columns_dropped_df, productColumnsToBeRenamed)

### Enrich product table
path = "dbfs:/user/enriched/enriched_product"
schemaName = "enriched"
tableName = "enriched_product"
saveDataInTable(product_cleaned_df, path, schemaName, tableName)

### Process Customer data

In [0]:
### Add necessary columns

customerColumnsTobeBeAdded = {"cleaned_customer_name": F.trim(F.regexp_replace(F.regexp_replace(F.regexp_replace("customer_name", "[^a-zA-Z ]", ""), " +", " "), r" (\p{Ll})", "$1"))}
customer_columns_added_df = addColumns(customer_raw_df, customerColumnsTobeBeAdded)

### Drop columns not required

customerColumnsToBeRemoved = ["customer_name"]
customer_columns_dropped_df = dropColumns(customer_columns_added_df, customerColumnsToBeRemoved)

### Handle Null customer_name

replaceCustomerName = {"customer_name": F.coalesce(F.col("cleaned_customer_name"), F.lit("Customer Name Unavailable"))}
customer_cleaned_final_df = addColumns(customer_columns_dropped_df, replaceCustomerName)

### Enrich customer table

path = "dbfs:/user/enriched/enriched_customer"
schemaName = "enriched"
tableName = "enriched_customer"
saveDataInTable(customer_cleaned_final_df, path, schemaName, tableName)

Process Orders data

In [0]:
orders_cleaned_df = orders_raw_df.withColumn("profit_rounded", F.round(F.col("profit"), 2)).drop("profit")
orders_enriched_df = orders_cleaned_df.withColumnRenamed("profit_rounded", "profit")
customer_enriched_df = spark.read.table("enriched.enriched_customer")
customer_selected_df = customer_enriched_df.select("customer_id", "customer_name", "country")
product_enriched_df = spark.read.table("enriched.enriched_product")



### Round off profit column
newProfitColumn = {"profit_rounded": F.round(F.col("profit"), 2)}
orders_rounded_df = addColumns(orders_raw_df, newProfitColumn)

ordersColumnDropList = ["profit"]

orders_cleaned_df = dropColumns(orders_rounded_df, ordersColumnDropList)

ordersColumnsRename = {"profit_rounded": "profit"}
orders_cleaned_final_df = renameColumns(orders_cleaned_df, ordersColumnsRename)

### Prepare Customer and Product data to join with Orders
schemaName = "enriched"
customer_enriched_df = fetchTableData(schemaName, "enriched_customer")
product_enriched_df = fetchTableData(schemaName, "enriched_product")


customer_selected_df = customer_enriched_df.select("customer_id", "customer_name", "country")
product_selected_df = product_enriched_df.dropDuplicates(["product_id", "category", "sub_category"]).select("product_id", "category", "sub_category")

### Join Orders, Customer and Product data
orders_joined_customer_df = orders_enriched_df \
                            .join(customer_selected_df, F.trim(customer_selected_df.customer_id) == F.trim(orders_enriched_df.customer_id)) \
                            .select(orders_enriched_df['*'], customer_selected_df['customer_name'], customer_selected_df['country'])\
                            .drop("ingestion_date")

orders_customer_product_joined_df = orders_joined_customer_df\
                                    .join(product_selected_df, orders_joined_customer_df.product_id == product_selected_df.product_id, "left") \
                                    .select(orders_joined_customer_df['*'],product_selected_df['category'], product_selected_df['sub_category']) \
                                    .drop("state", "price_per_unit")

### Enrich Orders Table
path = "dbfs:/user/enriched/enriched_orders"
schemaName = "enriched"
tableName = "enriched_orders"
saveDataInTable(orders_customer_product_joined_df, path, schemaName, tableName)
