# **Adidas sales data Analysis**

The objective of this project is to analyze the Adidas sales database for the year 2020 and 2021 and identify key insights to help improve sales performance and optimize business strategies.

By analyzing the sales data, we aim to understand factors influencing sales, identify trends, and uncover opportunities for growth. The analysis will be conducted using databricks Notebook to provide an interactive and insightful dashboard.

## **Business Metrics requirements**

Total Sales, Total Profit, Average Price per Unit, and Total Units Sold

Total sales by month

Total sales by state

total sales by region

Total sales by product

Total sales by retailer

Units Sold by Product Category and Gender Type

Top Performing Cities by Profit

# **Data profiling & understanding schema**

In [0]:
df_raw = spark.read.table("workspace.default.adidas_us_sales_datasets")
# display(df_raw)
df_raw.printSchema()

root
 |-- Retailer: string (nullable = true)
 |-- Retailer ID: long (nullable = true)
 |-- Invoice Date: date (nullable = true)
 |-- Region: string (nullable = true)
 |-- State: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Product: string (nullable = true)
 |-- Price per Unit: double (nullable = true)
 |-- Units Sold: string (nullable = true)
 |-- Total Sales: string (nullable = true)
 |-- Operating Profit: string (nullable = true)
 |-- Operating Margin: string (nullable = true)
 |-- Sales Method: string (nullable = true)



# **Bronze layer** 
## 1.1.Data Cleaning

In [0]:
from pyspark.sql.functions import col
from pyspark.sql.functions import col, regexp_replace

clean_df = df_raw .withColumn("Units Sold", regexp_replace(col("Units Sold"), ",", "")) \
                .withColumn( "Total Sales", regexp_replace(col("Total Sales"), ",", "")) \
                .withColumn("Operating Profit", regexp_replace(col("Operating Profit"), ",", ""))\
                .withColumn("Operating Margin",regexp_replace(col("Operating Margin"), "%", ""))
# display(clean_df)

## **1.2 Schema Enforcement**

In [0]:
bronze_df = clean_df.withColumn("Units Sold", col("Units Sold").cast("int")) \
    .withColumn("Total Sales", col("Total Sales").cast("double")) \
    .withColumn("Operating Profit", col("Operating Profit").cast("double")) \
    .withColumn("Operating Margin", col("Operating Margin").cast("double"))
bronze_df.printSchema()

root
 |-- Retailer: string (nullable = true)
 |-- Retailer ID: long (nullable = true)
 |-- Invoice Date: date (nullable = true)
 |-- Region: string (nullable = true)
 |-- State: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Product: string (nullable = true)
 |-- Price per Unit: double (nullable = true)
 |-- Units Sold: integer (nullable = true)
 |-- Total Sales: double (nullable = true)
 |-- Operating Profit: double (nullable = true)
 |-- Operating Margin: double (nullable = true)
 |-- Sales Method: string (nullable = true)



# **Silver layer**
## **1.1 Column Standardization**

In [0]:
column_std_df = bronze_df .withColumnRenamed("Retailer", "retailer") \
    .withColumnRenamed("Retailer ID", "retailer_id") \
    .withColumnRenamed("Region", "region") \
    .withColumnRenamed("State", "state") \
    .withColumnRenamed("City", "city") \
    .withColumnRenamed("Product", "product") \
    .withColumnRenamed("Price per Unit", "price_per_unit") \
    .withColumnRenamed("Units Sold", "units_sold") \
    .withColumnRenamed("Total Sales", "total_sales") \
    .withColumnRenamed("Operating Profit", "operating_profit") \
    .withColumnRenamed("Operating Margin", "operating_margin") \
    .withColumnRenamed("Sales Method", "sales_method") \
    .withColumnRenamed("Invoice Date", "invoice_date")

column_std_df.printSchema()


root
 |-- retailer: string (nullable = true)
 |-- retailer_id: long (nullable = true)
 |-- invoice_date: date (nullable = true)
 |-- region: string (nullable = true)
 |-- state: string (nullable = true)
 |-- city: string (nullable = true)
 |-- product: string (nullable = true)
 |-- price_per_unit: double (nullable = true)
 |-- units_sold: integer (nullable = true)
 |-- total_sales: double (nullable = true)
 |-- operating_profit: double (nullable = true)
 |-- operating_margin: double (nullable = true)
 |-- sales_method: string (nullable = true)



## **2.1 Null Values**

In [0]:

from pyspark.sql.functions import col, sum
df_null=column_std_df.select([sum(col(c).isNull().cast("int")).alias(c) for c in column_std_df.columns])
display(df_null)

retailer,retailer_id,invoice_date,region,state,city,product,price_per_unit,units_sold,total_sales,operating_profit,operating_margin,sales_method
0,0,0,0,0,0,0,0,0,0,0,0,0


## **2.2 Drop Null Values**

In [0]:
df_no_null = column_std_df.dropna(subset=["invoice_date", "units_sold", "total_sales"])
# display(df_no_null)


In [0]:
print("Before Null removing:", bronze_df.count())
print("After Null removing::", df_no_null.count())


Before Null removing: 9648
After Null removing:: 9648


## **3.1 Corrupt Data Condition**

In [0]:
from pyspark.sql.functions import col

corrupt_condition = ( (col("units_sold") <= 0) |
    (col("total_sales") <= 0) |
    (col("price_per_unit") <= 0) |
    (col("operating_margin") < 0) |
    (col("operating_margin") > 100))
  


## **3.2 Checking Corrupt Records**

In [0]:
corrupt_df = df_no_null.filter(corrupt_condition)
print("Corrupt records:", corrupt_df.count())


Corrupt records: 4


## **3.3 Cleaning Corrupt Records & validating**

In [0]:
clean_corrupt_df = df_no_null.filter(~corrupt_condition)
# display(clean_corrupt_df)

# validation
print("After NULL removal :", df_no_null.count())
print("Corrupt records    :", corrupt_df.count())
print("Clean records      :", clean_corrupt_df.count())
print("All Null & Corrupt data remove:",df_no_null.count() == corrupt_df.count() + clean_corrupt_df.count())



After NULL removal : 9648
Corrupt records    : 4
Clean records      : 9644
All Null & Corrupt data remove: True


## **3.4 Corrupt Record Storage**

In [0]:
from pyspark.sql.functions import lit, current_timestamp

# Adding metadata to corrupt records
corrupt_audit_df = corrupt_df.withColumn("corrupt_reason", lit("BUSINESS_RULE_VIOLATION")) \
    .withColumn("processed_at", current_timestamp())
display(corrupt_audit_df)


# saving corrupt records
corrupt_audit_df.write \
    .format("delta") \
    .mode("append") \
    .saveAsTable("workspace.default.adidas_corrupt_records")


retailer,retailer_id,invoice_date,region,state,city,product,price_per_unit,units_sold,total_sales,operating_profit,operating_margin,sales_method,corrupt_reason,processed_at
Foot Locker,1185732,2021-06-05,Midwest,Nebraska,Omaha,Women's Athletic Footwear,35.0,0,0.0,0.0,40.0,Outlet,BUSINESS_RULE_VIOLATION,2026-01-02T17:11:16.149Z
Foot Locker,1185732,2021-06-11,Midwest,Nebraska,Omaha,Women's Athletic Footwear,30.0,0,0.0,0.0,40.0,Outlet,BUSINESS_RULE_VIOLATION,2026-01-02T17:11:16.149Z
Foot Locker,1185732,2021-06-05,Midwest,Nebraska,Omaha,Women's Athletic Footwear,33.0,0,0.0,0.0,55.0,Online,BUSINESS_RULE_VIOLATION,2026-01-02T17:11:16.149Z
Foot Locker,1185732,2021-06-11,Midwest,Nebraska,Omaha,Women's Athletic Footwear,27.0,0,0.0,0.0,53.0,Online,BUSINESS_RULE_VIOLATION,2026-01-02T17:11:16.149Z


In [0]:
# display(spark.table("workspace.default.adidas_corrupt_records"))
spark.table("workspace.default.adidas_corrupt_records").count()


36

## **4.1 Duplicate Data Identification**

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

dup_window = Window.partitionBy("retailer_id","invoice_date","product","city").orderBy("invoice_date")

dedup_df = clean_corrupt_df.withColumn("row_num",row_number().over(dup_window))

duplicate_df = dedup_df.filter(col("row_num") > 1)
# display(duplicate_df)



## **4.2 Non Duplicate Records**

In [0]:
non_duplicate_df = dedup_df.filter(col("row_num") == 1).drop("row_num")
# display(non_duplicate_df)

## **4.3 Validation**

In [0]:
print("Before deduplication :", clean_corrupt_df.count())
print("Duplicate records   :", duplicate_df.count())
print("After deduplication :", non_duplicate_df.count())

print("Validation :",clean_corrupt_df.count() == duplicate_df.count() + non_duplicate_df.count())


Before deduplication : 9644
Duplicate records   : 5768
After deduplication : 3876
Validation : True


## **4.4 Duplicate Records Store**

In [0]:
duplicate_df \
    .drop("row_num") \
    .write \
    .format("delta") \
    .mode("append") \
    .saveAsTable("workspace.default.adidas_duplicate_records")


## **5.FINAL Silver Layer Data**

In [0]:
non_duplicate_df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("workspace.default.silver_adidas_clean_sales")


# **Gold Layer**

## Importing Silver Table 

In [0]:
silver_clean_df = spark.table("workspace.default.silver_adidas_clean_sales")


In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

gold_kpi_df = silver_clean_df.agg(round(sum("total_sales"), 2).alias("total_sales"),
    round(sum("operating_profit"), 2).alias("total_profit"),
    sum("units_sold").alias("total_units_sold"),
    round(avg("price_per_unit"), 2).alias("avg_price_per_unit"))

# display(gold_kpi_df)


In [0]:
from pyspark.sql.functions import year, month, sum, round

sales_by_month_df = silver_clean_df \
    .groupBy(year("invoice_date").alias("year"), month("invoice_date").alias("month")) \
    .agg(round(sum("total_sales"), 2).alias("monthly_total_sales"),
        round(sum("operating_profit"), 2).alias("monthly_total_profit"),
        sum("units_sold").alias("monthly_units_sold")) \
    .orderBy("year", "month")

# display(sales_by_month_df)


In [0]:
from pyspark.sql.functions import sum, round

sales_by_state_df = silver_clean_df. groupBy("state") \
    .agg(round(sum("total_sales"), 2).alias("total_sales"),
        round(sum("operating_profit"), 2).alias("total_profit"),
        sum("units_sold").alias("total_units_sold")) \
    .orderBy("state")

# display(sales_by_state_df)


In [0]:
sales_by_region_df = silver_clean_df. groupBy("region") \
    .agg(round(sum("total_sales"), 2).alias("total_sales"),
        round(sum("operating_profit"), 2).alias("total_profit"),
        sum("units_sold").alias("total_units_sold")) \
    .orderBy("region")

# display(sales_by_region_df)

In [0]:
sales_by_product_df = silver_clean_df. groupBy("product") \
    .agg(round(sum("total_sales"), 2).alias("total_sales"),
        round(sum("operating_profit"), 2).alias("total_profit"),
        sum("units_sold").alias("total_units_sold")) \
    .orderBy("total_sales", ascending=False)

# display(sales_by_product_df)

In [0]:

sales_by_retailer_df = silver_clean_df.groupBy("retailer_id", "retailer") \
    .agg(round(sum("total_sales"), 2).alias("total_sales"),
        round(sum("operating_profit"), 2).alias("total_profit"),
        sum("units_sold").alias("total_units_sold")) \
    .orderBy("total_sales", ascending=False)

# display(sales_by_retailer_df)


In [0]:

units_by_category_gender_df = silver_clean_df \
    .withColumn("gender",
        when(col("product").startswith("Men"), "Men")
        .when(col("product").startswith("Women"), "Women")
        .otherwise("Other")) \
    .withColumn("product_category",
        when(col("product").contains("Footwear"), "Footwear")
        .otherwise("Apparel")) \
    .groupBy("product_category", "gender") \
    .agg(sum("units_sold").alias("total_units_sold")) \
    .orderBy("product_category", "gender")

display(units_by_category_gender_df)


product_category,gender,total_units_sold
Apparel,Men,211025
Apparel,Women,299075
Footwear,Men,708680
Footwear,Women,486200


Databricks visualization. Run in Databricks to view.

In [0]:
from pyspark.sql.functions import sum, round

top_cities_profit_df = silver_clean_df .groupBy("city") \
    .agg(round(sum("operating_profit"), 2).alias("total_profit")) \
    .orderBy("total_profit", ascending=False)

# display(top_cities_profit_df)


city,total_profit
Charleston,15092348.0
New York,13094789.0
Miami,10283620.0
Portland,10123684.0
San Francisco,9638067.0
Charlotte,9436390.0
Houston,9278190.0
Albany,9121071.0
New Orleans,8880941.0
Birmingham,8643752.0


Databricks visualization. Run in Databricks to view.