Key points included in this end-to-end PySpark data pipeline:
- CSV ingestion
- Parquet conversion
- Data quality checks
- Data cleaning
- Feature engineering
- Aggregations

Platform: Databricks  
Language:
 PySpark

In [0]:
from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("CSV to Parquet Conversion") \
    .getOrCreate()

## EDA

In [0]:
# Define paths (replace with your actual paths)
csv_file_path = "/Volumes/workspace/main_db/products/products.csv"


In [0]:
df = spark.read.csv(csv_file_path, header=True, inferSchema=True)

In [0]:
df.show()

+-----+--------------------+--------------------+--------------------+--------------------+-----+--------+-----+----------+--------------+----------+-------------+-----------+
|Index|                Name|         Description|               Brand|            Category|Price|Currency|Stock|       EAN|         Color|      Size| Availability|Internal ID|
+-----+--------------------+--------------------+--------------------+--------------------+-----+--------+-----+----------+--------------+----------+-------------+-----------+
|    1|        Portable Fan|Person agency pra...|        Conley-Noble|      Men's Clothing|654.0|     USD|  752|  3.247E12|   NavajoWhite|  50x70 cm| out_of_stock|         46|
|    2|Mini Oven Speaker...|All contain becau...|Becker, Wiggins a...|              Makeup|739.0|     USD|  676|4.39562E12|          Navy|  12x18 in|     in_stock|         94|
|    3|       Wireless Lamp|Source your adult...|Bowman, Lee and H...|          Automotive| 52.0|     USD|  556|4.91898E

In [0]:
df.printSchema()
print("Number of rows: ", df.count())
print("Number of columns: ", len(df.columns))

root
 |-- Index: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Brand: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Price: double (nullable = true)
 |-- Currency: string (nullable = true)
 |-- Stock: integer (nullable = true)
 |-- EAN: double (nullable = true)
 |-- Color: string (nullable = true)
 |-- Size: string (nullable = true)
 |-- Availability: string (nullable = true)
 |-- Internal ID: integer (nullable = true)

Number of rows:  1000000
Number of columns:  13


In [0]:
#convert csv to parquet for faster query performance
parquet_output_path = "/Volumes/workspace/main_db/products/products1"

In [0]:
#single parquet file with one partition
df.coalesce(1).write.mode("overwrite").parquet(parquet_output_path)


In [0]:
df1 = spark.read.parquet(parquet_output_path)
df1.show()

+-----+--------------------+--------------------+--------------------+--------------------+-----+--------+-----+----------+--------------+----------+-------------+-----------+
|Index|                Name|         Description|               Brand|            Category|Price|Currency|Stock|       EAN|         Color|      Size| Availability|Internal ID|
+-----+--------------------+--------------------+--------------------+--------------------+-----+--------+-----+----------+--------------+----------+-------------+-----------+
|    1|        Portable Fan|Person agency pra...|        Conley-Noble|      Men's Clothing|654.0|     USD|  752|  3.247E12|   NavajoWhite|  50x70 cm| out_of_stock|         46|
|    2|Mini Oven Speaker...|All contain becau...|Becker, Wiggins a...|              Makeup|739.0|     USD|  676|4.39562E12|          Navy|  12x18 in|     in_stock|         94|
|    3|       Wireless Lamp|Source your adult...|Bowman, Lee and H...|          Automotive| 52.0|     USD|  556|4.91898E

In [0]:
df1.limit(20).display()

Index,Name,Description,Brand,Category,Price,Currency,Stock,EAN,Color,Size,Availability,Internal ID
1,Portable Fan,Person agency practice lose high book.,Conley-Noble,Men's Clothing,654.0,USD,752,3247000000000.0,NavajoWhite,50x70 cm,out_of_stock,46
2,Mini Oven Speaker Fan Automatic Max Pro,All contain because mind guess control around speak.,"Becker, Wiggins and Drake",Makeup,739.0,USD,676,4395620000000.0,Navy,12x18 in,in_stock,94
3,Wireless Lamp,Source your adult produce station smile recently.,"Bowman, Lee and Hampton",Automotive,52.0,USD,556,4918980000000.0,Snow,Small,out_of_stock,51
4,Camera Heater Webcam,Late possible garden.,"Farrell, Logan and Bush",Toys & Games,638.0,USD,249,6858450000000.0,OrangeRed,L,limited_stock,98
5,Ultra Cooker Mouse Bicycle,His early car turn machine say debate.,Vargas-Shaw,Home & Kitchen,372.0,USD,464,4764010000000.0,Magenta,15x20 cm,discontinued,99
6,Grill Tablet Fridge,Above despite support parent season.,"Delgado, Byrd and Hernandez",Cameras & Accessories,43.0,USD,48,5451480000000.0,MediumOrchid,Large,out_of_stock,47
7,Compact Thermostat Iron Ultra 360 Pro,Air happy ok power story expect decide.,Mullins LLC,Automotive,899.0,USD,813,8820490000000.0,SandyBrown,S,pre_order,63
8,Automatic Fridge Cooker,Then available guess others which pass.,"Valentine, Lara and Duran",Shoes & Footwear,86.0,USD,584,8187550000000.0,HotPink,L,backorder,38
9,Thermostat Printer,Group age firm record.,Malone Inc,Bedding & Bath,857.0,USD,862,9095890000000.0,Violet,L,out_of_stock,34
10,Smart Thermostat Trimmer Portable One Touch,Must finish nothing board customer else subject.,"Ware, Griffith and Banks",Cycling,613.0,USD,838,9180630000000.0,Orchid,100x200 mm,pre_order,54


In [0]:
df1.printSchema()
print("Number of rows: ", df1.count())
print("Number of columns: ", len(df1.columns))

root
 |-- Index: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Brand: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Price: double (nullable = true)
 |-- Currency: string (nullable = true)
 |-- Stock: integer (nullable = true)
 |-- EAN: double (nullable = true)
 |-- Color: string (nullable = true)
 |-- Size: string (nullable = true)
 |-- Availability: string (nullable = true)
 |-- Internal ID: integer (nullable = true)

Number of rows:  1000000
Number of columns:  13


In [0]:
# statistical summary of data
# df1.summary().show()

In [0]:
# Count nulls per column
from pyspark.sql.functions import col, sum

df1.select([sum(col(i).isNull().cast("int")).alias(i) for i in df1.columns]).show()

+-----+----+-----------+-----+--------+-----+--------+-----+---+-----+----+------------+-----------+
|Index|Name|Description|Brand|Category|Price|Currency|Stock|EAN|Color|Size|Availability|Internal ID|
+-----+----+-----------+-----+--------+-----+--------+-----+---+-----+----+------------+-----------+
|    0|   0|          0|    0|       0|    0|       0|    1|  0|    0|   0|           0|          0|
+-----+----+-----------+-----+--------+-----+--------+-----+---+-----+----+------------+-----------+



In [0]:
# Check distinct values
for i in df1.columns:
    print(f"Column: {i}")
    df1.select(i).distinct().show(5)

Column: Index
+-----+
|Index|
+-----+
| 1021|
| 2737|
| 2936|
| 3139|
| 3683|
+-----+
only showing top 5 rows
Column: Name
+--------------------+
|                Name|
+--------------------+
|Premium Thermosta...|
|      Advanced Drone|
|Advanced Lock Lam...|
|Automatic Scanner...|
|Compact Charger R...|
+--------------------+
only showing top 5 rows
Column: Description
+--------------------+
|         Description|
+--------------------+
|National economic...|
|Color cup then ar...|
|They accept month...|
|Mean upon give qu...|
|Read production s...|
+--------------------+
only showing top 5 rows
Column: Brand
+--------------------+
|               Brand|
+--------------------+
|Brock, Rice and C...|
|        Logan-Valdez|
|Garrett, Warner a...|
|        Chambers LLC|
|Herring, Cobb and...|
+--------------------+
only showing top 5 rows
Column: Category
+------------------+
|          Category|
+------------------+
|   Office Supplies|
|    Bedding & Bath|
|           Cycling|
|Clothi

In [0]:
# Using quantiles to detect outliers in Price
quantiles = df1.approxQuantile("Price", [0.25, 0.5, 0.75], 0.05)
Q1, median, Q3 = quantiles
IQR = Q3 - Q1
lower_bound, upper_bound = Q1 - 1.5*IQR, Q3 + 1.5*IQR

outliers = df1.filter((col("Price") < lower_bound) | (col("Price") > upper_bound))
print("Outlier count:", outliers.count())
outliers.show()

Outlier count: 0
+-----+----+-----------+-----+--------+-----+--------+-----+---+-----+----+------------+-----------+
|Index|Name|Description|Brand|Category|Price|Currency|Stock|EAN|Color|Size|Availability|Internal ID|
+-----+----+-----------+-----+--------+-----+--------+-----+---+-----+----+------------+-----------+
+-----+----+-----------+-----+--------+-----+--------+-----+---+-----+----+------------+-----------+



### DATA CLEANING

In [0]:
from pyspark.sql.functions import col, trim, round, count, avg, stddev, aggregate

In [0]:
# Identify string columns
string_cols = [col_name for col_name, dtype in df1.dtypes if dtype == "string"]
string_cols

['Name',
 'Description',
 'Brand',
 'Category',
 'Currency',
 'Color',
 'Size',
 'Availability']

In [0]:
# Flag suspicious prices
from pyspark.sql.types import StringType

def flag_price(price):
    if price is None:
        return "Missing"
    elif price <= 0:
        return "Invalid"
    else:
        return "Valid"

flag_udf = udf(flag_price, StringType())
df4 = df4.withColumn("Price Flag", flag_udf(col("Price")))
df4.select("Price", "Price Flag").show()


+-----+----------+
|Price|Price Flag|
+-----+----------+
|654.0|     Valid|
|739.0|     Valid|
| 52.0|     Valid|
|638.0|     Valid|
|372.0|     Valid|
| 43.0|     Valid|
|899.0|     Valid|
| 86.0|     Valid|
|857.0|     Valid|
|613.0|     Valid|
|872.0|     Valid|
| 54.0|     Valid|
|859.0|     Valid|
|181.0|     Valid|
|820.0|     Valid|
|367.0|     Valid|
|922.5|     Valid|
|931.0|     Valid|
|458.0|     Valid|
|647.0|     Valid|
+-----+----------+
only showing top 20 rows


In [0]:
# Categorize Products by Price Range - product distribution across Budget / Standard / Premium
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def price_category(price):
    if price is None:
        return "Unknown"
    elif price < 20:
        return "Budget"
    elif price < 100:
        return "Standard"
    else:
        return "Premium"

price_category_udf = udf(price_category, StringType())
df4 = df4.withColumn("Price Category", price_category_udf(col("Price")))
df4.groupBy("Price Category").count().orderBy("count", ascending=False).show()



+--------------+------+
|Price Category| count|
+--------------+------+
|       Premium|901242|
|      Standard| 79603|
|        Budget| 19155|
+--------------+------+



In [0]:
# trim string columns and remove leading and trailing spaces
df3 = df1.select(*[trim(col(col_name)).alias(col_name) if col_name in string_cols else col(col_name) 
    for col_name in df1.columns])
df3.show()

+-----+--------------------+--------------------+--------------------+--------------------+-----+--------+-----+----------+--------------+----------+-------------+-----------+
|Index|                Name|         Description|               Brand|            Category|Price|Currency|Stock|       EAN|         Color|      Size| Availability|Internal ID|
+-----+--------------------+--------------------+--------------------+--------------------+-----+--------+-----+----------+--------------+----------+-------------+-----------+
|    1|        Portable Fan|Person agency pra...|        Conley-Noble|      Men's Clothing|654.0|     USD|  752|  3.247E12|   NavajoWhite|  50x70 cm| out_of_stock|         46|
|    2|Mini Oven Speaker...|All contain becau...|Becker, Wiggins a...|              Makeup|739.0|     USD|  676|4.39562E12|          Navy|  12x18 in|     in_stock|         94|
|    3|       Wireless Lamp|Source your adult...|Bowman, Lee and H...|          Automotive| 52.0|     USD|  556|4.91898E

In [0]:
# remove duplicates and count
total_duplicates = df3.count() - df3.dropDuplicates().count()
print("Total duplicate rows:", total_duplicates)


Total duplicate rows: 0


## Data Transformation

In [0]:
#  Feature engineering
# Add new calculated column
df4 = df3.withColumn("Tax Inclusive Price", round(col("Price") * 1.13, 2))

# move new column to the right of Price
cols = df3.columns
price_index = cols.index("Price")
new_cols = cols[:price_index+1] + ["Tax Inclusive Price"] + cols[price_index+1:]

df4 = df4.select(new_cols)
df4.limit(20).display()

Index,Name,Description,Brand,Category,Price,Tax Inclusive Price,Currency,Stock,EAN,Color,Size,Availability,Internal ID
1,Portable Fan,Person agency practice lose high book.,Conley-Noble,Men's Clothing,654.0,739.02,USD,752,3247000000000.0,NavajoWhite,50x70 cm,out_of_stock,46
2,Mini Oven Speaker Fan Automatic Max Pro,All contain because mind guess control around speak.,"Becker, Wiggins and Drake",Makeup,739.0,835.07,USD,676,4395620000000.0,Navy,12x18 in,in_stock,94
3,Wireless Lamp,Source your adult produce station smile recently.,"Bowman, Lee and Hampton",Automotive,52.0,58.76,USD,556,4918980000000.0,Snow,Small,out_of_stock,51
4,Camera Heater Webcam,Late possible garden.,"Farrell, Logan and Bush",Toys & Games,638.0,720.94,USD,249,6858450000000.0,OrangeRed,L,limited_stock,98
5,Ultra Cooker Mouse Bicycle,His early car turn machine say debate.,Vargas-Shaw,Home & Kitchen,372.0,420.36,USD,464,4764010000000.0,Magenta,15x20 cm,discontinued,99
6,Grill Tablet Fridge,Above despite support parent season.,"Delgado, Byrd and Hernandez",Cameras & Accessories,43.0,48.59,USD,48,5451480000000.0,MediumOrchid,Large,out_of_stock,47
7,Compact Thermostat Iron Ultra 360 Pro,Air happy ok power story expect decide.,Mullins LLC,Automotive,899.0,1015.87,USD,813,8820490000000.0,SandyBrown,S,pre_order,63
8,Automatic Fridge Cooker,Then available guess others which pass.,"Valentine, Lara and Duran",Shoes & Footwear,86.0,97.18,USD,584,8187550000000.0,HotPink,L,backorder,38
9,Thermostat Printer,Group age firm record.,Malone Inc,Bedding & Bath,857.0,968.41,USD,862,9095890000000.0,Violet,L,out_of_stock,34
10,Smart Thermostat Trimmer Portable One Touch,Must finish nothing board customer else subject.,"Ware, Griffith and Banks",Cycling,613.0,692.69,USD,838,9180630000000.0,Orchid,100x200 mm,pre_order,54


In [0]:
# Categorywise product count
df4.groupBy("Category").count().orderBy("count", ascending=False).show()

+--------------------+-----+
|            Category|count|
+--------------------+-----+
|            Haircare|29732|
|    Women's Clothing|29681|
|   Fishing & Hunting|29663|
| Laptops & Computers|29653|
|      Home & Kitchen|29652|
|        Smartwatches|29583|
|              Makeup|29560|
|             Cycling|29550|
|Beauty & Personal...|29496|
|           Furniture|29475|
|      Bedding & Bath|29453|
|   Sports & Outdoors|29428|
|          Fragrances|29424|
|          Home Decor|29423|
|      Grooming Tools|29420|
|        Toys & Games|29413|
|   Health & Wellness|29406|
|Cameras & Accesso...|29402|
|          Automotive|29396|
|  Books & Stationery|29392|
+--------------------+-----+
only showing top 20 rows


In [0]:
# Average price by category
from pyspark.sql import functions as F

# Group by Category and calculate average price, rounded to 2 decimals (numeric)
avg_prices = (
    df4.groupBy("Category")
       .agg(F.round(F.avg("Price"), 2).alias("avg_price"))
       .orderBy(F.col("avg_price").desc())
)

display(avg_prices)

Category,avg_price
Home & Kitchen,503.19
Laptops & Computers,502.9
Beauty & Personal Care,502.84
Shoes & Footwear,502.45
Haircare,502.12
Fitness Equipment,501.61
Home Decor,501.52
Cycling,501.32
Fragrances,501.3
Kitchen Appliances,500.9
