# Read curated sales data

In [0]:
df_sales = spark.read.parquet("s3://ecommerce-data-pipeline-sanket/curated/sales/")

In [0]:
%python
df_sales.printSchema()

root
 |-- Row ID: string (nullable = true)
 |-- Order ID: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Ship Date: string (nullable = true)
 |-- Ship Mode: string (nullable = true)
 |-- Customer ID: string (nullable = true)
 |-- Customer Name: string (nullable = true)
 |-- Segment: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Postal Code: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Product ID: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Sub-Category: string (nullable = true)
 |-- Product Name: string (nullable = true)
 |-- Sales: double (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- Discount: string (nullable = true)
 |-- Profit: double (nullable = true)
 |-- Order_Date: date (nullable = true)
 |-- Ship_Date: date (nullable = true)



In [0]:
from pyspark.sql.functions import *
from pyspark.sql import SparkSession

###  Fixing datatypes for date, discount, and quantity


In [0]:
df_sales = (df_sales
    .withColumn("Order_Date", to_date(col("Order Date")))
    .withColumn("Ship_Date", to_date(col("Ship Date")))
)

In [0]:
df_sales = (df_sales
    .withColumn("Quantity", col("Quantity").cast("int"))
    .withColumn("Discount", col("Discount").cast("double"))
)

### Dim_Customer

In [0]:
dim_customer = df_sales.select(
    col("Customer ID").alias("customer_id"),
    col("Customer Name").alias("customer_name"),
    col("Segment"),
    col("Country"),
    col("Region")
).dropDuplicates(["customer_id"])

### Dim_Products

In [0]:
dim_product = df_sales.select(
    col("Product ID").alias("product_id"),
    col("Product Name").alias("product_name"),
    col("Category"),
    col("Sub-Category").alias("sub_category")
).dropDuplicates(["product_id"])

### Dim_Region

In [0]:
dim_region = df_sales.select(
    col("Region"),
    col("State"),
    col("City"),
    col("Postal Code").alias("postal_code")
).dropDuplicates(["Region", "State", "City"])

### Dim_Date

In [0]:
dim_date = (df_sales
    .select(col("Order_Date").alias("order_date"))
    .filter(col("order_date").isNotNull())
    .dropDuplicates()
    .withColumn("year", year("order_date"))
    .withColumn("month", month("order_date"))
    .withColumn("day", dayofmonth("order_date"))
    .withColumn("week", weekofyear("order_date"))
)

###  Fact Table

In [0]:
fact_sales = (df_sales
    .select(
        col("Order ID").alias("order_id"),
        col("Customer ID").alias("customer_id"),
        col("Product ID").alias("product_id"),
        col("Region"),
        col("Order_Date"),
        col("Ship_Date"),
        col("Ship Mode").alias("ship_mode"),
        col("Sales"),
        col("Quantity"),
        col("Discount"),
        col("Profit")
    )
)

### Write Tables to S3

In [0]:
# Base paths
dim_path = "s3://ecommerce-data-pipeline-sanket/warehouse/dimensions/"
fact_path = "s3://ecommerce-data-pipeline-sanket/warehouse/facts/"

# Write dimensions
dim_customer.write.mode("overwrite").parquet(dim_path + "dim_customer/")
dim_product.write.mode("overwrite").parquet(dim_path + "dim_product/")
dim_region.write.mode("overwrite").parquet(dim_path + "dim_region/")
dim_date.write.mode("overwrite").parquet(dim_path + "dim_date/")

# Write fact
fact_sales.write.mode("overwrite").parquet(fact_path + "fact_sales/")