In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, isnan, count, min, max, mean, stddev, trim, regexp_replace, to_date
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType, DateType
from pyspark.sql.window import Window
from pyspark.sql import functions as F

In [149]:
# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Sales Data Preparation") \
    .getOrCreate()

### Load the data into DataFrame and have a glimse 

In [150]:
# Load the CSV file into a DataFrame
df = spark.read.format("csv") \
    .option("header", "true") \
    .schema(schema) \
    .load("data/raw/sales_data_sample.csv")

# Cache the DataFrame for better performance in subsequent operations
df.cache()

24/12/05 21:07:13 WARN CacheManager: Asked to cache already cached data.


DataFrame[ORDERNUMBER: int, QUANTITYORDERED: int, PRICEEACH: double, ORDERLINENUMBER: int, SALES: double, ORDERDATE: date, STATUS: string, QTR_ID: int, MONTH_ID: int, YEAR_ID: int, PRODUCTLINE: string, MSRP: int, PRODUCTCODE: string, CUSTOMERNAME: string, PHONE: string, ADDRESSLINE1: string, ADDRESSLINE2: string, CITY: string, STATE: string, POSTALCODE: string, COUNTRY: string, TERRITORY: string, CONTACTLASTNAME: string, CONTACTFIRSTNAME: string, DEALSIZE: string]

In [151]:
df.show(10)

+-----------+---------------+---------+---------------+-------+---------+-------+------+--------+-------+-----------+----+-----------+--------------------+----------------+--------------------+------------+-------------+-----+----------+-------+---------+---------------+----------------+--------+
|ORDERNUMBER|QUANTITYORDERED|PRICEEACH|ORDERLINENUMBER|  SALES|ORDERDATE| STATUS|QTR_ID|MONTH_ID|YEAR_ID|PRODUCTLINE|MSRP|PRODUCTCODE|        CUSTOMERNAME|           PHONE|        ADDRESSLINE1|ADDRESSLINE2|         CITY|STATE|POSTALCODE|COUNTRY|TERRITORY|CONTACTLASTNAME|CONTACTFIRSTNAME|DEALSIZE|
+-----------+---------------+---------+---------------+-------+---------+-------+------+--------+-------+-----------+----+-----------+--------------------+----------------+--------------------+------------+-------------+-----+----------+-------+---------+---------------+----------------+--------+
|      10107|             30|     95.7|              2| 2871.0|     NULL|Shipped|     1|       2|   2003|M

In [152]:
df.describe().show()

+-------+------------------+-----------------+------------------+-----------------+------------------+---------+------------------+------------------+------------------+------------+------------------+-----------+-----------------+--------------------+--------------------+------------+------------+--------+------------------+---------+---------+---------------+----------------+--------+
|summary|       ORDERNUMBER|  QUANTITYORDERED|         PRICEEACH|  ORDERLINENUMBER|             SALES|   STATUS|            QTR_ID|          MONTH_ID|           YEAR_ID| PRODUCTLINE|              MSRP|PRODUCTCODE|     CUSTOMERNAME|               PHONE|        ADDRESSLINE1|ADDRESSLINE2|        CITY|   STATE|        POSTALCODE|  COUNTRY|TERRITORY|CONTACTLASTNAME|CONTACTFIRSTNAME|DEALSIZE|
+-------+------------------+-----------------+------------------+-----------------+------------------+---------+------------------+------------------+------------------+------------+------------------+-----------+-------

In [153]:
# Show the first few rows and schema
print("DataFrame Schema:")
df.printSchema()

DataFrame Schema:
root
 |-- ORDERNUMBER: integer (nullable = true)
 |-- QUANTITYORDERED: integer (nullable = true)
 |-- PRICEEACH: double (nullable = true)
 |-- ORDERLINENUMBER: integer (nullable = true)
 |-- SALES: double (nullable = true)
 |-- ORDERDATE: date (nullable = true)
 |-- STATUS: string (nullable = true)
 |-- QTR_ID: integer (nullable = true)
 |-- MONTH_ID: integer (nullable = true)
 |-- YEAR_ID: integer (nullable = true)
 |-- PRODUCTLINE: string (nullable = true)
 |-- MSRP: integer (nullable = true)
 |-- PRODUCTCODE: string (nullable = true)
 |-- CUSTOMERNAME: string (nullable = true)
 |-- PHONE: string (nullable = true)
 |-- ADDRESSLINE1: string (nullable = true)
 |-- ADDRESSLINE2: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- POSTALCODE: string (nullable = true)
 |-- COUNTRY: string (nullable = true)
 |-- TERRITORY: string (nullable = true)
 |-- CONTACTLASTNAME: string (nullable = true)
 |-- CONTACTFIRSTNAME: strin

### Handle NULL values

In [154]:
# Get counts of null values for each column
null_counts = df.select([count(when(col(c).isNull() | (col(c) == ""), c)).alias(c) for c in df.columns])

# Convert to rows and filter only columns with null values
for item in null_counts.collect()[0].asDict().items():
    if item[1] > 0:
        print(f"{item[0]}: {item[1]} null values")

ORDERDATE: 2823 null values
ADDRESSLINE2: 2521 null values
STATE: 1486 null values
POSTALCODE: 76 null values


Because "ORDERDATE" columns all has NULL value and we dont have any clude to fill the null value, so we decie to drop it out

In [155]:
df = df.drop("ORDERDATE")
df.show(5)

+-----------+---------------+---------+---------------+-------+-------+------+--------+-------+-----------+----+-----------+--------------------+----------------+--------------------+------------+-------------+-----+----------+-------+---------+---------------+----------------+--------+
|ORDERNUMBER|QUANTITYORDERED|PRICEEACH|ORDERLINENUMBER|  SALES| STATUS|QTR_ID|MONTH_ID|YEAR_ID|PRODUCTLINE|MSRP|PRODUCTCODE|        CUSTOMERNAME|           PHONE|        ADDRESSLINE1|ADDRESSLINE2|         CITY|STATE|POSTALCODE|COUNTRY|TERRITORY|CONTACTLASTNAME|CONTACTFIRSTNAME|DEALSIZE|
+-----------+---------------+---------+---------------+-------+-------+------+--------+-------+-----------+----+-----------+--------------------+----------------+--------------------+------------+-------------+-----+----------+-------+---------+---------------+----------------+--------+
|      10107|             30|     95.7|              2| 2871.0|Shipped|     1|       2|   2003|Motorcycles|  95|   S10_1678|   Land of T

the "ADDRESSLINE2" column,first we need to check if the unique value come from one person for each address or not

In [156]:
df.filter((col("ADDRESSLINE2").isNotNull()) & (col("PHONE").isNotNull()) & (col("ADDRESSLINE2") != "")) \
  .groupBy("ADDRESSLINE2", "PHONE") \
  .count() \
  .orderBy("count", ascending=False) \
  .show(truncate=False)

+------------+---------------+-----+
|ADDRESSLINE2|PHONE          |count|
+------------+---------------+-----+
|Level 3     |03 9520 4555   |55   |
|Suite 400   |2125557413     |48   |
|Level 15    |02 9936 8555   |46   |
|Level 6     |+61 2 9495 8555|46   |
|2nd Floor   |+65 224 1555   |36   |
|Suite 101   |2125551500     |25   |
|Suite 750   |2125558493     |20   |
|Floor No. 4 |+353 1862 1555 |16   |
|Suite 200   |2125551957     |10   |
+------------+---------------+-----+



We replace the NULL value to "NOT PROVIDED" value

In [157]:
df = df.fillna("NOT PROVIDED", subset=["ADDRESSLINE2"])

#Verify the change
df.groupBy("ADDRESSLINE2").count().orderBy("count", ascending=False).show(truncate=False)

+------------+-----+
|ADDRESSLINE2|count|
+------------+-----+
|NOT PROVIDED|2521 |
|Level 3     |55   |
|Suite 400   |48   |
|Level 6     |46   |
|Level 15    |46   |
|2nd Floor   |36   |
|Suite 101   |25   |
|Suite 750   |20   |
|Floor No. 4 |16   |
|Suite 200   |10   |
+------------+-----+



With each state has NULL, we will replace by the equivalent CITY name 

In [158]:
df = df.withColumn("STATE", 
    when(col("STATE").isNull(), col("CITY"))
    .otherwise(col("STATE")))

# To verify the changes, let's see the STATE and CITY columns together
df.select("CITY", "STATE").show(truncate=False)

+-------------+--------+
|CITY         |STATE   |
+-------------+--------+
|NYC          |NY      |
|Reims        |Reims   |
|Paris        |Paris   |
|Pasadena     |CA      |
|San Francisco|CA      |
|Burlingame   |CA      |
|Lille        |Lille   |
|Bergen       |Bergen  |
|San Francisco|CA      |
|Paris        |Paris   |
|Melbourne    |Victoria|
|NYC          |NY      |
|Newark       |NJ      |
|Bridgewater  |CT      |
|Nantes       |Nantes  |
|Cambridge    |MA      |
|Helsinki     |Helsinki|
|Stavern      |Stavern |
|Allentown    |PA      |
|NYC          |NY      |
+-------------+--------+
only showing top 20 rows



In [159]:
# See how many null postal codes we have
df.filter(col("POSTALCODE").isNull()).count()

# See which cities have null postal codes
df.filter(col("POSTALCODE").isNull()).select("CITY", "COUNTRY").show()

+-------------+-------+
|         CITY|COUNTRY|
+-------------+-------+
|San Francisco|    USA|
|San Francisco|    USA|
|San Francisco|    USA|
|San Francisco|    USA|
|San Francisco|    USA|
|San Francisco|    USA|
|San Francisco|    USA|
|San Francisco|    USA|
|  Los Angeles|    USA|
|San Francisco|    USA|
|San Francisco|    USA|
|San Francisco|    USA|
|San Francisco|    USA|
|San Francisco|    USA|
|San Francisco|    USA|
|San Francisco|    USA|
|  Los Angeles|    USA|
|San Francisco|    USA|
|  Los Angeles|    USA|
|San Francisco|    USA|
+-------------+-------+
only showing top 20 rows



In [160]:
# Create a window spec for each City-State combination
window_spec = Window.partitionBy('CITY', 'STATE')

In [161]:
# Find the most common postal code for each City-State combination
postal_mapping = df.filter(F.col('POSTALCODE').isNotNull()) \
    .groupBy('CITY', 'STATE', 'POSTALCODE') \
    .count() \
    .withColumn('rank', F.row_number().over(Window.partitionBy('CITY', 'STATE').orderBy(F.desc('count')))) \
    .filter(F.col('rank') == 1) \
    .select('CITY', 'STATE', F.col('POSTALCODE').alias('MAPPED_POSTALCODE'))

# Join the mapping back to original dataframe
df_filled = df.join(postal_mapping, ['CITY', 'STATE'], 'left')

# Use coalesce to keep original postal code if it exists, otherwise use the mapped one
df_filled = df_filled.withColumn(
    'POSTALCODE',
    F.when(F.col('POSTALCODE').isNotNull(), F.col('POSTALCODE'))
    .when(F.col('MAPPED_POSTALCODE').isNotNull(), F.col('MAPPED_POSTALCODE'))
    .otherwise(F.concat(F.coalesce(F.col('STATE'), F.lit('')), F.lit('00000')))
)

# Drop the extra postal code column
df_filled = df_filled.drop('MAPPED_POSTALCODE')
df=df_filled


In [162]:
# Show statistics
print("Sample of filled postal codes:")
df_filled.filter(df.POSTALCODE.isNull()).select('CITY', 'STATE', 'POSTALCODE').show(5)

# Count of filled values
null_count = df_filled.filter(df.POSTALCODE.isNull()).count()
print(f"\nNumber of remaining null postal codes: {null_count}")

Sample of filled postal codes:
+----+-----+----------+
|CITY|STATE|POSTALCODE|
+----+-----+----------+
+----+-----+----------+


Number of remaining null postal codes: 0


In [163]:
df_filled.write.mode('overwrite').csv('sales_data_updated.csv', header=True)

### HANDLE OUTLIERS IN NUMERIC COLUMNS

In [164]:
# Define numeric columns relevant for outlier analysis
numeric_cols = ['QUANTITYORDERED', 'PRICEEACH', 'SALES', 'MSRP']

In [165]:
# Calculate outliers for all numeric columns at once
for col_name in numeric_cols:
    # Calculate quartiles
    quartiles = df.approxQuantile(col_name, [0.25, 0.75], 0.01)
    Q1 = quartiles[0]
    Q3 = quartiles[1]
    IQR = Q3 - Q1
    
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR
    
    # Add outlier detection columns
    df = df.withColumn(
        f"{col_name}_is_outlier",
        (F.col(col_name) < lower_bound) | (F.col(col_name) > upper_bound)
    )
    
    # Print summary statistics
    print(f"\nOutlier Analysis for {col_name}")
    print(f"Lower Bound: {lower_bound:.2f}")
    print(f"Upper Bound: {upper_bound:.2f}")
    
    # Count outliers
    outlier_count = df.filter(F.col(f"{col_name}_is_outlier") == True).count()
    total_count = df.count()
    outlier_percentage = (outlier_count / total_count) * 100
    
    print(f"Number of outliers: {outlier_count}")
    print(f"Percentage of outliers: {outlier_percentage:.2f}%")
    
    # Show sample outliers
    print("\nSample outliers:")
    df.filter(F.col(f"{col_name}_is_outlier") == True) \
      .select('ORDERNUMBER', col_name, 'PRODUCTLINE', 'DEALSIZE') \
      .show(5)



Outlier Analysis for QUANTITYORDERED
Lower Bound: 3.00
Upper Bound: 67.00
Number of outliers: 8
Percentage of outliers: 0.28%

Sample outliers:
+-----------+---------------+----------------+--------+
|ORDERNUMBER|QUANTITYORDERED|     PRODUCTLINE|DEALSIZE|
+-----------+---------------+----------------+--------+
|      10405|             97|    Classic Cars|   Large|
|      10407|             76|    Vintage Cars|   Large|
|      10412|             70|Trucks and Buses|   Large|
|      10407|             76|    Classic Cars|   Large|
|      10405|             76|    Classic Cars|   Large|
+-----------+---------------+----------------+--------+
only showing top 5 rows


Outlier Analysis for PRICEEACH
Lower Bound: 23.20
Upper Bound: 146.08
Number of outliers: 0
Percentage of outliers: 0.00%

Sample outliers:
+-----------+---------+-----------+--------+
|ORDERNUMBER|PRICEEACH|PRODUCTLINE|DEALSIZE|
+-----------+---------+-----------+--------+
+-----------+---------+-----------+--------+


Out

In [166]:
#Create a view of all records with any outlier
outlier_view = df.filter(
    F.array_contains(
        F.array(*[F.col(f"{col}_is_outlier") for col in numeric_cols]),
        True
    )
)

In [167]:
print("\nSummary of records with any outlier:")
outlier_view.select(
    'ORDERNUMBER',
    'QUANTITYORDERED',
    'PRICEEACH',
    'SALES',
    'MSRP',
    'PRODUCTLINE'
).show(5)


Summary of records with any outlier:
+-----------+---------------+---------+-------+----+------------+
|ORDERNUMBER|QUANTITYORDERED|PRICEEACH|  SALES|MSRP| PRODUCTLINE|
+-----------+---------------+---------+-------+----+------------+
|      10103|             26|    100.0|5404.62| 214|Classic Cars|
|      10112|             29|    100.0|7209.11| 214|Classic Cars|
|      10126|             38|    100.0|7329.06| 214|Classic Cars|
|      10140|             37|    100.0| 7374.1| 214|Classic Cars|
|      10150|             45|    100.0|10993.5| 214|Classic Cars|
+-----------+---------------+---------+-------+----+------------+
only showing top 5 rows



In [168]:
# Function to calculate bounds
def get_bounds(df, column):
    quartiles = df.approxQuantile(column, [0.25, 0.75], 0.01)
    Q1, Q3 = quartiles[0], quartiles[1]
    IQR = Q3 - Q1
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR
    return lower_bound, upper_bound

In [169]:
#Capping Method (Winsorization)
def cap_outliers(df):
    numeric_cols = ['QUANTITYORDERED', 'PRICEEACH', 'SALES', 'MSRP']
    df_capped = df
    
    for col in numeric_cols:
        lower_bound, upper_bound = get_bounds(df, col)
        
        df_capped = df_capped.withColumn(
            f"{col}_capped",
            F.when(F.col(col) < lower_bound, lower_bound)
             .when(F.col(col) > upper_bound, upper_bound)
             .otherwise(F.col(col))
        )
    
    return df_capped

In [170]:
df_capped = cap_outliers(df)
df = df_capped

In [171]:
# Print summary statistics for each method
def print_summary(df_original, df_handled):
    print(f"Original count: {df_original.count()}")
    print(f"After handling: {df_handled.count()}")

    for col in ['QUANTITYORDERED', 'PRICEEACH', 'SALES', 'MSRP']:
        print(f"\n{col} statistics:")
        df_handled.select(F.mean(col_name), F.stddev(col_name)).show()

In [172]:
print_summary(df, df_capped)

Original count: 2823
After handling: 2823

QUANTITYORDERED statistics:
+------------------+-----------------+
|         avg(MSRP)|     stddev(MSRP)|
+------------------+-----------------+
|100.71555083244775|40.18791167720266|
+------------------+-----------------+


PRICEEACH statistics:
+------------------+-----------------+
|         avg(MSRP)|     stddev(MSRP)|
+------------------+-----------------+
|100.71555083244775|40.18791167720266|
+------------------+-----------------+


SALES statistics:
+------------------+-----------------+
|         avg(MSRP)|     stddev(MSRP)|
+------------------+-----------------+
|100.71555083244775|40.18791167720266|
+------------------+-----------------+


MSRP statistics:
+------------------+-----------------+
|         avg(MSRP)|     stddev(MSRP)|
+------------------+-----------------+
|100.71555083244775|40.18791167720266|
+------------------+-----------------+



In [173]:
df.write.mode('overwrite').csv('data/processed/sales_data_handled.csv', header=True)

In [175]:
df.select("ADDRESSLINE2").distinct().show()

+------------+
|ADDRESSLINE2|
+------------+
|   2nd Floor|
|     Level 6|
|   Suite 101|
|   Suite 750|
|     Level 3|
|   Suite 400|
| Floor No. 4|
|   Suite 200|
|    Level 15|
|NOT PROVIDED|
+------------+



In [176]:
# Run verification
verify_final_dataframe(df)

Missing Value Check:

Numeric Columns Statistics:
+-------------------+-------------------+--------------------+----------------------+-------------+-------------+-----------------+------------------+---------+---------+----------------+------------------+--------+--------+------------------+-----------------+
|QUANTITYORDERED_min|QUANTITYORDERED_max|QUANTITYORDERED_mean|QUANTITYORDERED_stddev|PRICEEACH_min|PRICEEACH_max|   PRICEEACH_mean|  PRICEEACH_stddev|SALES_min|SALES_max|      SALES_mean|      SALES_stddev|MSRP_min|MSRP_max|         MSRP_mean|      MSRP_stddev|
+-------------------+-------------------+--------------------+----------------------+-------------+-------------+-----------------+------------------+---------+---------+----------------+------------------+--------+--------+------------------+-----------------+
|                  6|                 97|   35.09280906836698|      9.74144273706958|        26.88|        100.0|83.65854410201929|20.174276527840536|   482.13|  14