In [2]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("DataCleaning").getOrCreate()

# Load dataset (replace 'path_to_file' with the actual path)
df = spark.read.csv("Sales Data.csv", header=True, inferSchema=True)

# Display the loaded DataFrame
df.show()
# Verify column names and data types
df.printSchema()


+---+--------+--------------------+----------------+----------+----------------+--------------------+-----+------+--------------+----+
|_c0|Order ID|             Product|Quantity Ordered|Price Each|      Order Date|    Purchase Address|Month| Sales|          City|Hour|
+---+--------+--------------------+----------------+----------+----------------+--------------------+-----+------+--------------+----+
|  0|  295665|  Macbook Pro Laptop|               1|    1700.0|30-12-2019 00:01|136 Church St, Ne...|   12|1700.0| New York City|   0|
|  1|  295666|  LG Washing Machine|               1|     600.0|29-12-2019 07:03|562 2nd St, New Y...|   12| 600.0| New York City|   7|
|  2|  295667|USB-C Charging Cable|               1|     11.95|12-12-2019 18:21|277 Main St, New ...|   12| 11.95| New York City|  18|
|  3|  295668|    27in FHD Monitor|               1|    149.99|22-12-2019 15:13|410 6th St, San F...|   12|149.99| San Francisco|  15|
|  4|  295669|USB-C Charging Cable|               1|   

In [3]:
# Count null values in each column
from pyspark.sql.functions import col, isnan, when, count

df.select(
    [count(when(col(c).isNull() | isnan(c), c)).alias(c) for c in df.columns]).show()

+---+--------+-------+----------------+----------+----------+----------------+-----+-----+----+----+
|_c0|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|Month|Sales|City|Hour|
+---+--------+-------+----------------+----------+----------+----------------+-----+-----+----+----+
|  0|       0|      0|               0|         0|         0|               0|    0|    3|   0|   0|
+---+--------+-------+----------------+----------+----------+----------------+-----+-----+----+----+



In [4]:
# Decide whether to fill or drop rows with missing values
# For numerical columns, replace nulls with mean values
from pyspark.sql.functions import mean

numerical_columns = ['Sales', 'Quantity Ordered']
for col_name in numerical_columns:
    mean_value = df.select(mean(col_name)).collect()[0][0]
    df = df.fillna({col_name: mean_value})

# Drop rows with null values in critical columns
df = df.dropna()


In [5]:
df = df.dropDuplicates()


In [6]:
# Cast numerical columns to appropriate types
df = df.withColumn("Sales", col("Sales").cast("float"))
df = df.withColumn("Quantity Ordered", col("Quantity Ordered").cast("integer"))
df = df.withColumn("Price Each", col("Price Each").cast("float"))

# Verify schema
df.printSchema()


root
 |-- _c0: integer (nullable = true)
 |-- Order ID: integer (nullable = true)
 |-- Product: string (nullable = true)
 |-- Quantity Ordered: integer (nullable = true)
 |-- Price Each: float (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Purchase Address: string (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Sales: float (nullable = false)
 |-- City: string (nullable = true)
 |-- Hour: integer (nullable = true)



In [7]:
# Remove rows with negative values in specific columns
columns_to_check = ['Sales', 'Price Each', 'Quantity Ordered']
for col_name in columns_to_check:
    df = df.filter(col(col_name) >= 0)

In [8]:
# Calculate total sales per product
df.groupBy("Product").sum("Sales").withColumnRenamed("sum(Sales)", "Total Sales").show()

+--------------------+------------------+
|             Product|       Total Sales|
+--------------------+------------------+
|    Wired Headphones|246651.92929840088|
|  Macbook Pro Laptop|         8037600.0|
|Apple Airpods Hea...|         2349150.0|
|              iPhone|         4794300.0|
|Lightning Chargin...|347094.14547920227|
|Bose SoundSport H...|1345565.4012680054|
|USB-C Charging Cable| 286674.7890357971|
|AAA Batteries (4-...| 92740.83064889908|
|        20in Monitor| 454148.7011795044|
|    27in FHD Monitor|1132424.5414733887|
|     Vareebadd Phone|          827200.0|
|34in Ultrawide Mo...|2355557.9494628906|
|            LG Dryer|          387600.0|
|AA Batteries (4-p...|106300.05223083496|
|        Google Phone|         3319200.0|
|       Flatscreen TV|         1445700.0|
|  LG Washing Machine|          399600.0|
|27in 4K Gaming Mo...|2435097.4990234375|
|     ThinkPad Laptop|4129958.6596679688|
+--------------------+------------------+

