# Cleaning input raw data

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from dateutil import parser
from pyspark.sql.functions import udf
from pyspark.sql.types import DateType, StringType

In [2]:
spark = SparkSession.builder.appName("Data Cleaning").master("local[*]").getOrCreate()
spark

In [3]:
input_df = spark.read.option("header", "true").option("inferSchema", "true").format("csv").load("../input/messy_ecommerce_1K.csv")
input_df.show(10)

+--------------------+----------------+----------+------------+-------------+-----------+--------------------+-------------------+------------------+--------------+------------+---------------+--------------------------+-------------+--------------+----------+--------------------+----------------+-------------------+--------------+------------+------------------+------------+--------------+---------------+-----------------+----------------+----------+----------+---------------+----------------+-----------+--------------------+--------------+--------------+----------------+----------------------+----------------------+-------------+--------------+------------+----------------+---------------+
|            order_id|      order_date|order_time|order_status|shipping_cost|customer_id|      customer_email|customer_first_name|customer_last_name|customer_phone|customer_age|customer_gender|customer_registration_date|customer_city|customer_state|product_id|        product_name|product_category|p

In [4]:
print(f"Number of rows: {input_df.count()}")
print(f"Number of columns: {len(input_df.columns)}")
print(f"Column names: {input_df.columns}")

Number of rows: 1000
Number of columns: 43
Column names: ['order_id', 'order_date', 'order_time', 'order_status', 'shipping_cost', 'customer_id', 'customer_email', 'customer_first_name', 'customer_last_name', 'customer_phone', 'customer_age', 'customer_gender', 'customer_registration_date', 'customer_city', 'customer_state', 'product_id', 'product_name', 'product_category', 'product_subcategory', 'product_brand', 'product_cost', 'product_list_price', 'warehouse_id', 'warehouse_city', 'warehouse_state', 'warehouse_country', 'quantity_ordered', 'unit_price', 'line_total', 'discount_amount', 'discount_percent', 'coupon_code', 'payment_method', 'payment_status', 'order_returned', 'payment_refunded', 'shipping_address_line1', 'shipping_address_line2', 'shipping_city', 'shipping_state', 'shipping_zip', 'shipping_country', 'shipping_method']


### Cleaning order status

In [None]:
raw_order_status = input_df.select("order_status").distinct().rdd.flatMap(lambda x: x).collect()
print(raw_order_status)

order_status_lookup = spark.read.option("header", "true").option("inferSchema", True).format("csv").load("../lookup/order_status.csv")
# order_status_lookup.show()

clean_order_status_df = input_df.withColumn("order_status", trim(lower(col("order_status"))))

clean_order_status_df = clean_order_status_df.join(
    order_status_lookup, 
    on=clean_order_status_df.order_status==order_status_lookup.raw_status,
    how="left").withColumn("order_status_cleaned", coalesce(col("clean_status"), lit("pending"))).drop("order_status", "raw_status", "clean_status")

cleaned_order_status = clean_order_status_df.select("order_status_cleaned").distinct().rdd.flatMap(lambda x: x).collect()
print(cleaned_order_status)

clean_order_status_df.show()



['pending|pending_alt', 'cancelled   ', 'deliveryd', 'shipped', '  pending  ', '  shipped  ', 'cancelled', 'pxnding', 'cancelled|cancelled_alt', 'shipped   ', 'DELIVERED', 'cancel3ed', 'CANCELLED', 'delivered', 'pending', 'cancexled', 'shxpped', 'shippxd', 'pending   ', 'PENDING', 'pexding', None]
+--------------------+-----------------+--------------------+-------------+-----------+--------------------+-------------------+------------------+--------------+------------+---------------+--------------------------+-------------------+--------------+----------+--------------------+----------------+-------------------+--------------+------------+------------------+------------+--------------+---------------+-----------------+----------------+----------+----------+---------------+----------------+-----------------+--------------------+--------------+--------------+----------------+----------------------+----------------------+-------------+--------------+------------+----------------+-------

### Clean dates - transforming dates data to standard form

In [6]:
def transform_date_udf(val):
    if val is None:
        return None
    return parser.parse(val).date()

transform_date_udf = udf(transform_date_udf, DateType())

clean_dates_df = clean_order_status_df.withColumn(
    "order_date_cleaned",
    transform_date_udf(col("order_date"))
).withColumn(
    "customer_registration_date_cleaned", 
    transform_date_udf(col("customer_registration_date"))
    ).drop("order_date", "customer_registration_date")
clean_dates_df.show()

+--------------------+--------------------+-------------+-----------+--------------------+-------------------+------------------+--------------+------------+---------------+-------------------+--------------+----------+--------------------+----------------+-------------------+--------------+------------+------------------+------------+--------------+---------------+-----------------+----------------+----------+----------+---------------+----------------+-----------------+--------------------+--------------+--------------+----------------+----------------------+----------------------+-------------+--------------+------------+----------------+---------------+--------------------+------------------+----------------------------------+
|            order_id|          order_time|shipping_cost|customer_id|      customer_email|customer_first_name|customer_last_name|customer_phone|customer_age|customer_gender|      customer_city|customer_state|product_id|        product_name|product_category|pr

### Transform phone number to standard format

In [7]:
cleaned_phone_number_df = clean_dates_df.withColumn(
    "customer_phone_cleaned", regexp_replace(col("customer_phone") , r"[^0-9]", "")
).withColumn(
    "customer_phone_cleaned", when(length(col("customer_phone_cleaned"))>10, col("customer_phone_cleaned").substr(2, 10)).otherwise(col("customer_phone_cleaned"))
).withColumn(
    "customer_phone_cleaned", concat(lit("("), col("customer_phone_cleaned").substr(1, 3), lit(")"), lit(" "), col("customer_phone_cleaned").substr(4, 3), lit("-"), col("customer_phone_cleaned").substr(7, 4))
).drop("customer_phone")
cleaned_phone_number_df.show()

+--------------------+--------------------+-------------+-----------+--------------------+-------------------+------------------+------------+---------------+-------------------+--------------+----------+--------------------+----------------+-------------------+--------------+------------+------------------+------------+--------------+---------------+-----------------+----------------+----------+----------+---------------+----------------+-----------------+--------------------+--------------+--------------+----------------+----------------------+----------------------+-------------+--------------+------------+----------------+---------------+--------------------+------------------+----------------------------------+----------------------+
|            order_id|          order_time|shipping_cost|customer_id|      customer_email|customer_first_name|customer_last_name|customer_age|customer_gender|      customer_city|customer_state|product_id|        product_name|product_category|product_s

### Clean payment status - map each payment status to 'success', 'failed' or 'pending'

In [16]:
raw_payment_status = cleaned_phone_number_df.select("payment_status").distinct().rdd.flatMap(lambda x: x).collect()
print(raw_order_status)

payment_status_lookup = spark.read.option("header", "true").option("inferSchema", "true").format("csv").load("../lookup/payment_status.csv")
payment_status_lookup.show(truncate=False)

cleaned_phone_number_df = cleaned_phone_number_df.withColumn("payment_status", trim(col("payment_status")))

cleaned_payment_status_df = cleaned_phone_number_df.join(
    broadcast(payment_status_lookup), 
    on=cleaned_phone_number_df.payment_status==payment_status_lookup.raw_value,
    how="left"
    ).withColumnRenamed(
        "standard_value", "payment_status_cleaned"
        ).drop("raw_value", "payment_status")

cleaned_payment_status_df.show(truncate=False)

cleaned_payment_status = cleaned_payment_status_df.select("payment_status_cleaned").distinct().rdd.flatMap(lambda x: x).collect()
print(cleaned_payment_status)



['pending|pending_alt', 'cancelled   ', 'deliveryd', 'shipped', '  pending  ', '  shipped  ', 'cancelled', 'pxnding', 'cancelled|cancelled_alt', 'shipped   ', 'DELIVERED', 'cancel3ed', 'CANCELLED', 'delivered', 'pending', 'cancexled', 'shxpped', 'shippxd', 'pending   ', 'PENDING', 'pexding', None]
+-------------------+--------------+
|raw_value          |standard_value|
+-------------------+--------------+
|pending|pending_alt|pending       |
|pending            |pending       |
|pen@ing            |pending       |
|penying            |pending       |
|pen#ing            |pending       |
|pendyng            |pending       |
|pendxng            |pending       |
|PENDING            |pending       |
|success            |success       |
|SUCCESS            |success       |
|success|success_alt|success       |
|succ@ss            |success       |
|succexs            |success       |
|failed             |failed        |
|fail1d             |failed        |
|fa@led             |failed        

### Clean payment method