##Loading necessary libraries and Configuring Spark to ADLS Gen-2

In [0]:
import pandas as pd
import numpy as np
from pyspark.sql.types import *
import pyspark.sql.functions as F

spark.conf.set(
    "fs.azure.account.key.moviedatanalytics.dfs.core.windows.net","xs8eaOJ+uzl3/n8bz4au+ieUyllqksTZvuvLBa+ybkV3/DQjfPKN/75c/reepUbu/QMY2vM5HgUh+ASt7RUWaw==")

##Creating Paths

In [0]:
adls_source_path = 'abfss://amazonsalesanalyrics@moviedatanalytics.dfs.core.windows.net/'

source_path = adls_source_path + 'raw_data/'
india_sales_path = source_path + 'Amazon Sale Report.csv'
intl_sales_path = source_path + 'Amazon Sale Report.csv'

sink_path = adls_source_path + 'processed_data/'
retail_india_data_l0_path = sink_path + 'retail_india_data_l0/'
retail_intl_data_l0_path = sink_path + 'retail_intl_data_l0/'

##Loading and viewing Amazon India and Amazon Intl Sales Tables

In [0]:
retail_india_data = spark.read.csv(path=india_sales_path,header=True, inferSchema= True)

In [0]:
retail_intl_data = spark.read.csv(path=intl_sales_path,header=True, inferSchema= True)

##Renaming, Typecasting columns and standardizing Category and SKU columns

In [0]:
def to_camel_case(col_name):
    parts = col_name.split(" ")
    camel_case_parts = []
    for part in parts:
        if "-" in part:
            sub_parts = part.split("-")
            camel_case_sub_parts = [sub_parts[0].lower()] + [sub_part.title() for sub_part in sub_parts[1:]]
            camel_case_parts.append("_".join(camel_case_sub_parts))
        else:
            camel_case_parts.append(part.lower() if not camel_case_parts else part.title())
    return "_".join(camel_case_parts)

drop_cols = ['index','Unnamed: 22']
retail_india_data = retail_india_data.drop(*drop_cols)

retail_india_data = retail_india_data.select([retail_india_data[col].alias(to_camel_case(col)) for col in retail_india_data.columns])

retail_india_data = retail_india_data.withColumnRenamed('order_Id','order_id')\
                                     .withColumnRenamed('sales_Channel_','sales_channel')\
                                     .withColumnRenamed('courier_Status','courier_status')\
                                     .withColumnRenamed('ship_Service_Level','ship_service_level')\
                                     .withColumnRenamed('ship_City','ship_city')\
                                     .withColumnRenamed('ship_State','ship_state')\
                                     .withColumnRenamed('ship_Postal_Code','ship_postal_code')\
                                     .withColumnRenamed('ship_Country','ship_country')\
                                     .withColumnRenamed('promotion_Ids','promotion_ids')\
                                     .withColumnRenamed('fulfilled_By','fulfilled_by')\
                                     .withColumn('sku',F.upper(F.col('sku')))\
                                     .withColumn('category',F.upper(F.col('category')))

##Observing the Courier Status Column

In [0]:
retail_india_data.groupBy('courier_status').count().display()

courier_status,count
Shipped,109487
,6872
Cancelled,5935
Unshipped,6681


##Fixing the Courier Status column and observing post fix

In [0]:
retail_india_data = retail_india_data.filter(F.col("courier_status").isNotNull())
retail_india_data.groupBy('courier_status').count().display()

courier_status,count
Shipped,109487
Cancelled,5935
Unshipped,6681


##Imputing Fulfilled and Promotion ID columns

In [0]:
retail_india_data = retail_india_data.fillna({'fulfilled_by': 'Not Easy Ship'})

retail_india_data = retail_india_data.withColumn("promotion_ids",F.when(F.col('promotion_ids').isNull(), F.lit('No Promotion'))\
                                                                  .otherwise(F.lit('Promotion')))

retail_india_data.groupBy('promotion_ids').count().display()

promotion_ids,count
Promotion,79822
No Promotion,42281


##Performing the same operations on the International Sales Data

In [0]:
retail_intl_data = retail_intl_data.withColumnRenamed('SKU','sku')\
                                   .withColumnRenamed('Status','intl_ship_status')\
                                   .withColumnRenamed('ship-service-level','intl_ship_service_level')\
                                   .withColumnRenamed('Qty','intl_qty')\
                                   .withColumnRenamed('Amount','intl_amount')\
                                   .withColumnRenamed('promotion-ids','intl_promotion')\
                                   .withColumnRenamed('Order ID','intl_order_id')\
                                   .withColumnRenamed('Fulfilment','intl_fulfilment')\
                                   .withColumnRenamed('size','intl_size')\
                                   .withColumnRenamed('B2B','intl_b2b')\
                                   .withColumn('sku',F.upper(F.col('sku')))\
                                   .withColumn('category',F.upper(F.col('category')))

retail_intl_data = retail_intl_data.select('intl_order_id','date','intl_ship_status','intl_fulfilment','intl_ship_service_level','sku','category','intl_size','intl_qty','intl_amount','intl_promotion','intl_b2b')

retail_intl_data = retail_intl_data.withColumn("intl_promotion",F.when(F.col('intl_promotion').isNull(), F.lit('No Promotion'))\
                                                                  .otherwise(F.lit('Promotion')))

##Creating a database for this project

In [0]:
spark.sql("CREATE DATABASE IF NOT EXISTS amazon_india_sales_analysis")
spark.sql("USE amazon_india_sales_analysis")

##Creating and Loading the cleaned India Sales Data to ADLS as a Delta table

In [0]:
spark.sql(f"""
CREATE TABLE IF NOT EXISTS retail_india_data_l0(
    order_id string,
    date string,
    status string,
    fulfilment string,
    sales_channel string,
    ship_service_level string,
    style string,
    sku string,
    category string,
    size string,
    asin string,
    courier_status string,
    qty integer,
    currency string,
    amount double,
    ship_city string,
    ship_state string,
    ship_postal_code double,
    ship_country string,
    promotion_ids string,
    b2b boolean,
    fulfilled_by string,
    intl string
)
USING DELTA
LOCATION '{retail_india_data_l0_path}'
""")

retail_india_data.write.format("delta").mode("overwrite").save(retail_india_data_l0_path)

##Creating and Loading the cleaned International Sales Data to ADLS as a Delta table

In [0]:
spark.sql(f"""
CREATE TABLE IF NOT EXISTS retail_intl_data_l0(
    intl_order_id string,
    date string,
    intl_ship_status string,
    intl_fulfilment string,
    intl_ship_service_level string,
    sku string,
    category string,
    intl_size string,
    intl_qty integer,
    intl_amount double,
    intl_promotion string,
    intl_b2b boolean
)
USING DELTA
LOCATION '{retail_intl_data_l0_path}'
""")

retail_intl_data.write.format("delta").mode("overwrite").save(retail_intl_data_l0_path)