#Source of the Dataset:

- URL: https://www.kaggle.com/datasets/thedevastator/unlock-profits-with-e-commerce-sales-data?select=Sale+Report.csv

#Source of the Postal Code API:

- URL: http://www.postalpincode.in

In [0]:
import pyspark.sql.functions as F
import pyspark.sql.window as W
from datetime import datetime
import requests
import pandas as pd
from pyspark.sql.types import StringType
import delta



In [0]:
df = spark.read.format("csv")\
          .option("header", "true")\
          .load("dbfs:/FileStore/shared_uploads/youraccount@yourdomain.com/Amazon_Sale_Report-1.csv")

In [0]:
for i in df.columns:
  n = i.strip().replace(" ","_").replace("-","_").lower()
  df = df.withColumnRenamed(i,n.upper())
#excluding unused cols:
df_ren = df.drop(df.columns[-1])

In [0]:
df_t = df_ren.withColumn("PK", F.concat("order_id","sku"))
df_transformed = df_t.withColumn("row_number", F.row_number().over(W.Window.partitionBy("pk").orderBy("order_id")) )
df_trs = df_transformed.filter(F.col("row_number")==1).drop("row_number")
df_duplicates = df_transformed.filter(F.col("row_number")>1).drop("row_number")

In [0]:
datettm = datetime.now().strftime("%Y%m%d%H%M%S")
date = datetime.now().strftime("%Y-%m-%d")
df_duplicates.write.format("delta").option("mode","overwrite").csv(path="/mnt/logging/{0}/ecommerce_duplicated_records_{1}".format(date,datettm))

In [0]:
df_prd = df_trs.select("STYLE","CATEGORY","SIZE","SKU").distinct()\
               .withColumn("PRD_SK", F.monotonically_increasing_id() )\
               .withColumn("PRD_BK", F.col("SKU") )\
               .withColumn("PRD_UPDATEDTT", F.lit(datettm) )
df_prd.write.format("delta").mode("overwrite").saveAsTable("dim_product")

In [0]:
df_status = df_trs.select("STATUS").distinct()\
               .withColumn("STATUS_SK", F.monotonically_increasing_id() )\
               .withColumn("STATUS_BK", F.col("STATUS") )\
               .withColumn("STATUS_UPDATEDTT", F.lit(datettm) )
df_status.write.format("delta").mode("overwrite").saveAsTable("dim_status")

In [0]:
df_s_channel = df_trs.select("SALES_CHANNEL").distinct()\
               .withColumn("SALES_CHANNEL_SK", F.monotonically_increasing_id() )\
               .withColumn("SALES_CHANNEL_BK", F.col("SALES_CHANNEL") )\
               .withColumn("SALES_CHANNEL_UPDATEDTT", F.lit(datettm) )
df_s_channel.write.format("delta").mode("overwrite").saveAsTable("dim_sales_channel")

In [0]:
df_courier = df_trs.select(F.col("COURIER_STATUS")).fillna("N.A.").distinct()\
               .withColumn("COURIER_STATUS_SK", F.monotonically_increasing_id() )\
               .withColumn("COURIER_STATUS_BK", F.col("COURIER_STATUS"))\
               .withColumn("COURIER_STATUS_UPDATEDTT", F.lit(datettm) )
df_courier.write.format("delta").mode("overwrite").saveAsTable("dim_courier")

In [0]:
df_currency = df_trs.select(F.col("CURRENCY")).fillna("N.A.").distinct()\
               .withColumn("CURRENCY_SK", F.monotonically_increasing_id() )\
               .withColumn("CURRENCY_BK", F.col("CURRENCY"))\
               .withColumn("CURRENCY_UPDATEDTT", F.lit(datettm))
df_currency.write.format("delta").mode("overwrite").saveAsTable("dim_currency")

In [0]:
df_location = df_trs.select("SHIP_POSTAL_CODE","SHIP_CITY","SHIP_STATE","SHIP_COUNTRY").distinct()\
               .withColumn("LOCATION_SK", F.monotonically_increasing_id() )\
               .withColumn("LOCATION_UPDATEDTT", F.lit(datettm))\
               .withColumn("SHIP_CITY", F.upper("SHIP_CITY") )\
               .withColumn("SHIP_STATE", F.upper("SHIP_STATE") )\
               .withColumn("SHIP_COUNTRY", F.upper("SHIP_COUNTRY") )\
               .withColumn("LOCATION_BK", F.concat(F.col("SHIP_POSTAL_CODE"),F.lit("|"),F.col("SHIP_CITY"),F.lit("|"),F.col("SHIP_STATE"),F.lit("|"),F.col("SHIP_COUNTRY") ) )

#adjust location (CITY) with postal code information
def get_district_postal_code(postalcode):
  dist="N/A"
  try:
    url="https://api.postalpincode.in/pincode/{0}".format(str(postalcode).split(".")[0])
    print(url)
    ret = requests.get(url)
    if ret.json()[0]["Status"]=="Success":
      df_pd = pd.DataFrame(ret.json())
      dist = pd.json_normalize(df_pd.explode("PostOffice")["PostOffice"])["District"].unique()[0].upper()
  except Exception as e:
    pass
  return dist
  
udf_get_district_postal_code = F.udf(lambda x : get_district_postal_code(x), StringType())
df_location_adj = df_location.withColumn("SHIP_CITY", udf_get_district_postal_code(F.col("SHIP_POSTAL_CODE") ))
df_location_adj.write.format("delta").partitionBy(("SHIP_STATE")).saveAsTable("dim_local")

In [0]:
bk_location = F.concat(F.col("trs.SHIP_POSTAL_CODE"),F.lit("|"),\
                       F.col("trs.SHIP_CITY"),F.lit("|"),F.col("trs.SHIP_STATE"),F.lit("|"),F.col("trs.SHIP_COUNTRY") )
fact = df_trs.alias("trs").join(df_prd.alias("prd"), on=[F.col("trs.SKU")==F.col("prd.PRD_BK")],how="inner")\
                   .join(df_status.alias("sts"), on=[F.col("trs.STATUS")==F.col("sts.STATUS_BK")],how="inner" )\
                   .join(df_s_channel.alias("sc"), on=[F.col("trs.SALES_CHANNEL")==F.col("sc.SALES_CHANNEL_BK")],how="inner" )\
                   .join(df_courier.alias("cs"), on=[F.col("trs.COURIER_STATUS")==F.col("cs.COURIER_STATUS_BK")],how="inner" )\
                   .join(df_currency.alias("cur"), on=[F.col("trs.CURRENCY")==F.col("cur.CURRENCY_BK")],how="inner" )\
                   .join(df_location.alias("loc"), on=[bk_location==F.col("loc.LOCATION_BK")],how="inner" )\
                   .select("trs.ORDER_ID","trs.DATE","prd.PRD_SK","sts.STATUS_SK","sc.SALES_CHANNEL_SK","cs.COURIER_STATUS_SK",\
                           "cur.CURRENCY_BK","loc.LOCATION_SK","trs.FULFILMENT","trs.SHIP_SERVICE_LEVEL","trs.ASIN",\
                           "trs.QTY","trs.AMOUNT","trs.B2B","trs.FULFILLED_BY")\
                   .withColumn("QTY", F.col("QTY").cast("integer") )\
                   .withColumn("AMOUNT",F.col("AMOUNT").cast("double"))
fact.write.format("delta").saveAsTable("fct_transaction_sales")