# STEP 1: Mount the AWS S3 Bucket named "my-databricks-project" to the Databricks with the access key and secret access key.

In [0]:
ACCESS_KEY = ""      #fill out the saved access key 
SECRET_KEY = ""      #fill out the saved secret access key
ENCODED_SECRET_KEY = SECRET_KEY.replace("/", "%2F")

AWS_BUCKET_NAME = "my-databricks-project"
MOUNT_NAME = "/mnt/dataco_smart_supply"

dbutils.fs.mount(
  source = f"s3a://{ACCESS_KEY}:{ENCODED_SECRET_KEY}@{AWS_BUCKET_NAME}",
  mount_point = MOUNT_NAME
)


### Verify Mounting of AWS S3 Bucket

In [0]:
display(dbutils.fs.ls("/mnt/dataco_smart_supply"))

path,name,size,modificationTime
dbfs:/mnt/dataco_smart_supply/DataCoSupplyChainDataset.csv,DataCoSupplyChainDataset.csv,95910149,1741358507000


### Read the dataset from the AWS S3 Blob Storage

In [0]:
df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("/mnt/dataco_smart_supply/DataCoSupplyChainDataset.csv")

# Show Dataset Sample
df.show(5)


+--------+------------------------+-----------------------------+-----------------+------------------+----------------+------------------+-----------+--------------+-------------+----------------+--------------+--------------+-----------+--------------+-----------------+----------------+--------------+--------------------+----------------+-------------+---------------+-----------+------------+------------+----------+-------------+-----------------+-----------------------+--------+----------------------+-------------------+------------------------+-------------+------------------------+-----------------------+-------------------+------+----------------+----------------------+--------------+---------------+---------------+-------------+---------------+-------------------+-------------------+--------------------+------------+-------------+--------------+--------------------------+--------------+
|    Type|Days for shipping (real)|Days for shipment (scheduled)|Benefit per order|Sales per c

### Check the Names of the columns in the dataset along with their stored datatype

In [0]:
df.printSchema()

root
 |-- Type: string (nullable = true)
 |-- Days for shipping (real): integer (nullable = true)
 |-- Days for shipment (scheduled): integer (nullable = true)
 |-- Benefit per order: double (nullable = true)
 |-- Sales per customer: double (nullable = true)
 |-- Delivery Status: string (nullable = true)
 |-- Late_delivery_risk: integer (nullable = true)
 |-- Category Id: integer (nullable = true)
 |-- Category Name: string (nullable = true)
 |-- Customer City: string (nullable = true)
 |-- Customer Country: string (nullable = true)
 |-- Customer Email: string (nullable = true)
 |-- Customer Fname: string (nullable = true)
 |-- Customer Id: integer (nullable = true)
 |-- Customer Lname: string (nullable = true)
 |-- Customer Password: string (nullable = true)
 |-- Customer Segment: string (nullable = true)
 |-- Customer State: string (nullable = true)
 |-- Customer Street: string (nullable = true)
 |-- Customer Zipcode: integer (nullable = true)
 |-- Department Id: integer (nullable = 

## STEP 2: Data Cleaning

### Remove Missing Values

In [0]:
df_clean = df.na.drop()

### Remove Duplicate Rows

In [0]:
df_clean = df_clean.dropDuplicates()

### Convert Date Columns to Proper Format

In [0]:
from pyspark.sql.functions import to_date

df_clean = df_clean.withColumn("order date (DateOrders)", to_date(df_clean["order date (DateOrders)"], "yyyy-MM-dd"))
df_clean = df_clean.withColumn("shipping date (DateOrders)", to_date(df_clean["shipping date (DateOrders)"], "yyyy-MM-dd"))

## STEP 3: Data Transformation for the Meaningful Insights

### Top 10 Product Categories

In [0]:
from pyspark.sql.functions import col, sum

top_categories = df.groupBy("Category Name") \
                   .agg(sum("Sales").alias("Total_Sales")) \
                   .orderBy(col("Total_Sales").desc()) \
                   .limit(10)

top_categories.show()


+--------------------+------------------+
|       Category Name|       Total_Sales|
+--------------------+------------------+
|             Fishing| 6929653.690574931|
|              Cleats| 4431942.783185065|
|    Camping & Hiking| 4118425.571018985|
|    Cardio Equipment|3694843.1966299606|
|     Women's Apparel|         3147800.0|
|        Water Sports| 3113844.684857893|
|      Men's Footwear|2891757.6623528693|
|Indoor/Outdoor Games|2888993.9135054224|
|       Shop By Sport|1309522.0414546824|
|           Computers|          663000.0|
+--------------------+------------------+



### Top 10 Most Frequent Customers

In [0]:
from pyspark.sql.functions import count

top_customers = df.groupBy("Customer Fname", "Customer Lname") \
                  .agg(count("Order Id").alias("Total_Orders")) \
                  .orderBy(col("Total_Orders").desc()) \
                  .limit(10)

top_customers.show()


+--------------+--------------+------------+
|Customer Fname|Customer Lname|Total_Orders|
+--------------+--------------+------------+
|          Mary|         Smith|       23884|
|        Robert|         Smith|         744|
|         James|         Smith|         726|
|         David|         Smith|         723|
|          John|         Smith|         645|
|       William|         Smith|         525|
|     Elizabeth|         Smith|         476|
|          Mary|         Jones|         465|
|       Michael|         Smith|         460|
|   Christopher|         Smith|         428|
+--------------+--------------+------------+



### Delivery Performance Analysis (Delay V/S Average Profit Per Order)

In [0]:
from pyspark.sql.functions import col, when

df = df.withColumn("Delay", col("Days for shipping (real)") - col("Days for shipment (scheduled)"))

# Analyzing the impact of delay on profitability
delay_profit_analysis = df.groupBy("Delay") \
    .agg({"Order Profit Per Order": "avg"}) \
    .orderBy(col("avg(Order Profit Per Order)").desc())

delay_profit_analysis.show()


+-----+---------------------------+
|Delay|avg(Order Profit Per Order)|
+-----+---------------------------+
|   -2|          23.42352811273813|
|    0|          22.53202148402705|
|    1|         21.940581587680732|
|    4|          21.56120150330889|
|   -1|          21.51699729013785|
|    2|          21.24623859510574|
|    3|         19.941130174914466|
+-----+---------------------------+



## STEP 4: MYSQL Server JDBC Connection and storing dataframes

### Connect the Databricks notebook with the MYSQL username and password 

In [0]:
# MySQL Connection Configuration
jdbc_url = "jdbc:mysql://localhost:3306/dataco_supply_chain"

connection_properties = {
    "user": "root",
    "password": "123",
    "driver": "com.mysql.cj.jdbc.Driver"
}
print("Server Connected")


Server Connected


### Save Top 10 Product Categories to MYSQL

In [0]:
top_10_product_categories.write.jdbc(
    url=jdbc_url,
    table="top_10_product_categories",
    mode="overwrite",
    properties=connection_properties
)
print("Top 10 Product Categories stored successfully!")

### Save Top 10 Frequent Customers to MYSQL

In [0]:
top_10_frequent_customers.write.jdbc(
    url=jdbc_url,
    table="top_10_frequent_customers",
    mode="overwrite",
    properties=connection_properties
)
print("Top 10 Frequent Customers stored successfully!")


### Save Delivery Performance Analysis to MYSQL

In [0]:
delivery_performance_analysis.write.jdbc(
    url=jdbc_url,
    table="delivery_performance_analysis",
    mode="overwrite",
    properties=connection_properties
)
print("Delivery Performance Analysis stored successfully!")
