In [None]:
from pyspark.sql import functions as fn

### Task 1- Create raw tables for each source dataset

In [None]:
# File location and type
file_location = "/FileStore/tables/Product.csv"
file_type = "csv"

# CSV options
infer_schema = True
first_row_is_header = True
delimiter = ","

# Load file
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

In [None]:
# Creating Raw Table- Product
permanent_table_name = "Product"
df.write.format("parquet").saveAsTable(permanent_table_name)

In [None]:
# File location and type
file_location = "/FileStore/tables/Order.json"
file_type = "json"

df = spark.read.format(file_type) \
  .option('multiline', True) \
  .load(file_location)

In [None]:
# Creating Raw Table- Order
permanent_table_name = "Order"
df.write.format("parquet").saveAsTable(permanent_table_name)

In [None]:
excel_file_path = "/FileStore/tables/Customer.xlsx"

df = spark.read \
    .format("com.crealytics.spark.excel") \ # Install com.crealytics.spark.excel library before running
    .option("header", "true") \
    .load(excel_file_path)

In [None]:
# Creating Raw Table- Customer
permanent_table_name = "Customer"
df.write.format("parquet").saveAsTable(permanent_table_name)

##### Load Data

In [None]:
product_df = df = spark.read.table('Product')
order_df = df = spark.read.table('Order')
customer_df = df = spark.read.table('Customer')

### Task 2- Create an enriched table for customers and products

In [None]:
# Products ordered by customers
order_filtered_df = order_df.select('Customer ID', 'Product ID')

In [None]:
customer_order_df = customer_df.join(order_filtered_df, on="Customer ID", how="inner")
enriched_df_1 = customer_order_df.join(product_df, on="Product ID", how="inner")
display(enriched_df_1)

### Task 3

In [None]:
customer_filtered_df = customer_df.select('Customer ID', "Customer Name", "Country")
product_filtered_df = product_df.select('Product ID', "Category", "Sub-Category")

In [None]:
order_customer_df = order_df.join(customer_filtered_df, on="Customer ID", how="inner")
enriched_df_2 = order_customer_df.join(product_filtered_df, on="Product ID", how="inner")

In [None]:
enriched_df_2 = enriched_df_2.withColumn("Profit", fn.round("Profit", 2))
display(enriched_df_2)

### Task 4

In [None]:
master_df = order_df.join(customer_df, on="Customer ID", how="inner").join(product_df, on="Product ID", how="inner")

In [None]:
stg_table = master_df.withColumn("Profit", fn.round("Profit", 2))
stg_table = stg_table.withColumn("Order Date", fn.to_date("Order Date", "dd/MM/yyyy"))
master_enriched_df = stg_table.withColumn("Year", fn.year("Order Date"))

In [None]:
aggregate_df = master_enriched_df.groupBy("Year", "Category", "Sub-Category", "Customer ID") \
                           .agg({"Profit": "sum"}) \
                           .withColumnRenamed("sum(Profit)", "Total Profit")

In [None]:
display(aggregate_df)

### Task 5

In [None]:
master_enriched_df.createOrReplaceTempView("master_enriched_table")

#### a) Profit by Year

In [None]:
profit_by_year = spark.sql("""
    SELECT Year, ROUND(SUM(Profit), 2) AS Total_Profit
    FROM master_enriched_table
    GROUP BY Year
    ORDER BY Year
""")

display(profit_by_year)

#### b) Profit by Year + Product Category

In [None]:
profit_by_year_category = spark.sql("""
    SELECT Year, Category, ROUND(SUM(Profit), 2) AS Total_Profit
    FROM master_enriched_table
    GROUP BY Year, Category
    ORDER BY Year, Category
""")

display(profit_by_year_category)

#### c) Profit by Customer

In [None]:
profit_by_customer = spark.sql("""
    SELECT `Customer ID`, ROUND(SUM(Profit), 2) AS Total_Profit
    FROM master_enriched_table
    GROUP BY `Customer ID`
""")

display(profit_by_customer)

#### d) Profit by Customer + Year

In [None]:
profit_by_customer_year = spark.sql("""
    SELECT Year, `Customer ID`, ROUND(SUM(Profit), 2) AS Total_Profit
    FROM master_enriched_table
    GROUP BY Year, `Customer ID`
    ORDER BY Year, `Customer ID`
""")

display(profit_by_customer_year)