In [166]:
# Imports
# NOTE: On a notebook is preferable to have the imports first and then the Spark Session block
# so in case of adding more libraries to import, than can be executed any time, while the Session just once
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame as SparkDataFrame
from pyspark.sql.types import (
    StructType,
    StructField,
    StringType,
    IntegerType,
    DoubleType,
)

from pyspark.sql.functions import (
    col,
    when,
    isnull,
    count,
    split,
    lit,
    abs,
    round,
    regexp_extract,
    regexp_replace,
    array_remove,
    explode,
    to_date,
)


In [167]:
# Create Spark Session
spark = SparkSession.builder \
    .appName("PySparkDataClean") \
    .getOrCreate()

# Set log level to ERROR to reduce verbosity
spark.sparkContext.setLogLevel("ERROR")

In [168]:
# Define schema for the dataset
schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("customer_details", StringType(), True),
    StructField("order_date", StringType(), True),
    StructField("product_category", StringType(), True),
    StructField("quantity", StringType(), True),
    StructField("price_per_unit", StringType(), True),
    StructField("tags", StringType(), True),
    StructField("items", StringType(), True)
])

# Load the dataset
df = spark.read.csv("../data/online_sales_data.csv", schema=schema, header=True)

# Display the dataset
print("Raw Dataset:")
df.show(10, truncate=False)

Raw Dataset:
+--------+-----------------------------------------+----------+----------------+--------+------------------+------------------+----------------------------+
|order_id|customer_details                         |order_date|product_category|quantity|price_per_unit    |tags              |items                       |
+--------+-----------------------------------------+----------+----------------+--------+------------------+------------------+----------------------------+
|ORD001  |Alice Johnson                            |NULL      |Electronics     |4       |15.769160684603047|['urgent', 'gift']|['Phone', 'Charger', 'Case']|
|ORD002  |Bob Smith | 584 Street Name, City 16     |2022-12-30|NULL            |-3      |fifty             |['bulk_order']    |['Book1', 'Book2']          |
|ORD003  |Charlie Brown | 598 Street Name, City 17 |2023-05-22|Books           |ten     |79.63563178465238 |NULL              |['Book1', 'Book2']          |
|ORD004  |David Wilson | 290 Street Name, Cit

In [169]:
df.describe().show()

+-------+--------+--------------------+----------+----------------+-----------------+------------------+--------------+--------------------+
|summary|order_id|    customer_details|order_date|product_category|         quantity|    price_per_unit|          tags|               items|
+-------+--------+--------------------+----------+----------------+-----------------+------------------+--------------+--------------------+
|  count|     100|                 100|        63|              70|               78|                62|            82|                  80|
|   mean|    NULL|                NULL|      NULL|            NULL|             1.68| 55.49113068483046|          NULL|                NULL|
| stddev|    NULL|                NULL|      NULL|            NULL|4.639845176868095|27.456621254927434|          NULL|                NULL|
|    min|  ORD001|       Alice Johnson|2022-12-30|           Books|               -1|15.769160684603047|['bulk_order']|  ['Book1', 'Book2']|
|    max|  OR

In [170]:
# Show all the NULLs in the dataframe, defining a function so it can be reused on the Notebook
# NOTE: .alias(c) to get the name of the column in the header

def get_all_nulls(df: SparkDataFrame) -> SparkDataFrame:
    """
    This function will return a DataFrame with all the Null
    values per column
    """
    result_df = df.select(
        [count(when(isnull(c), c)).alias(c) for c in df.columns]
    )

    return result_df

# Usage
all_nulls_df = get_all_nulls(df=df)
all_nulls_df.show()

+--------+----------------+----------+----------------+--------+--------------+----+-----+
|order_id|customer_details|order_date|product_category|quantity|price_per_unit|tags|items|
+--------+----------------+----------+----------------+--------+--------------+----+-----+
|       0|               0|        37|              30|      22|            38|  18|   20|
+--------+----------------+----------+----------------+--------+--------------+----+-----+



In [171]:
# Display the Schema
df.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- customer_details: string (nullable = true)
 |-- order_date: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- quantity: string (nullable = true)
 |-- price_per_unit: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- items: string (nullable = true)



In [172]:
# Detect rows with negative quantity or invalid price
df.filter(
    (col("quantity") < 0) |
    (col("quantity").rlike("^[^0-9]")) | 
    (col("price_per_unit").rlike("^[^0-9]"))
).select("quantity", "price_per_unit").show()

+--------+------------------+
|quantity|    price_per_unit|
+--------+------------------+
|      -3|             fifty|
|     ten| 79.63563178465238|
|      -4|             fifty|
|      10|             fifty|
|    NULL|             fifty|
|      -3|              NULL|
|    NULL|             fifty|
|      -4| 26.15555520916483|
|    NULL|             fifty|
|       3|             fifty|
|     ten|             fifty|
|      -3|             fifty|
|      -2| 19.55396781305644|
|       7|             fifty|
|      -4|             fifty|
|     ten|19.030281187517616|
|     ten|              NULL|
|     ten|             fifty|
|      -1|             fifty|
|      -2|              NULL|
+--------+------------------+
only showing top 20 rows



In [173]:
# As noted, both columns, "price_per_unit" and "quantity" have corrupted data, that imply NULL, Negative Numers and StringsType (Schema)
# So as a first step, lets try to identify all the wrong values we need to fix in "quantity"

unique_quantity_values = df.filter(
    (col("quantity") < 0) |
    (col("quantity").rlike(r"^[0-9]+$")) |
    (col("quantity").rlike(r"^[a-z]+$")) |
    (col("quantity").isNull())
).select("quantity").distinct().rdd.map(lambda x: x[0]).collect()

print(f"Unique quantity col values: {len(unique_quantity_values)}")
print(sorted(unique_quantity_values, key=str))

Unique quantity col values: 16
['-1', '-2', '-3', '-4', '1', '10', '2', '3', '4', '5', '6', '7', '8', '9', None, 'ten']


In [174]:
# Based on the result above we can see that if we cast() values we will loose any of the Null (None) and the "ten"
# So the basic rules to fix the data on this column could be:
#   - replace "ten" by "10"
#   - replace Null by "0"
#   - then cast the quantity column values to INT and replace negative for absolute values
df = df.withColumn("quantity", when(col("quantity").isNull(), 0).otherwise(col("quantity")))
df = df.withColumn("quantity", when(col("quantity") == "ten", 10).otherwise(col("quantity")))
df = df.withColumn("quantity", col("quantity").cast(IntegerType()))
df = df.withColumn("quantity", abs(df["quantity"]))

# Check the changes
unique_quantity_clean_values = df.select("quantity").distinct().rdd.map(lambda x: x[0]).collect()
print(f"Unique quantity col values: {len(unique_quantity_clean_values)}")
print(sorted(unique_quantity_clean_values, key=int))
df.select("quantity").show(10)

Unique quantity col values: 11
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
+--------+
|quantity|
+--------+
|       4|
|       3|
|      10|
|       0|
|       4|
|      10|
|       0|
|       3|
|       0|
|       4|
+--------+
only showing top 10 rows



In [175]:
# Check again Nulls, now "quantity" Column has been fixed, based on the 22 original values
all_nulls_df = get_all_nulls(df=df)
all_nulls_df.select("quantity", "price_per_unit").show()

+--------+--------------+
|quantity|price_per_unit|
+--------+--------------+
|       0|            38|
+--------+--------------+



In [176]:
# We can apply the same principle but now on the "price_per_unit" column
# but in this case, besides replacing string values for integers, and negative for absolute
# will also imply replacing Nulls by the Median value of the column values as distribution, 50th percentile
# using the Median instead of the average is better in this cases as the Median is not impacted by edge/extreme values
# Data cleaning actions - continuing with the df DataFrame:
#   - Cast to double
#   - Round with 2 decimal digits
#   - Replace Null by the Median value, using ALL column values as the distribution 

# Lets get the unique values to know which literals to replace
unique_ppu_values = df.filter(
    (col("price_per_unit") < 0) |
    (col("price_per_unit").rlike(r"^[0-9]+$")) |
    (col("price_per_unit").rlike(r"^[a-z]+$")) |
    (col("price_per_unit").isNull())
).select("price_per_unit").distinct().rdd.map(lambda x: x[0]).collect()

print(f"Unique quantity col values: {len(unique_ppu_values)}")
print(sorted(unique_ppu_values, key=str))

Unique quantity col values: 2
[None, 'fifty']


In [177]:
# Lets check if besides Null values, there are "0" as value on "price_per_unit" Column
df.filter((col("price_per_unit") == "0")).select("price_per_unit").show()

# As there are no "0" values already, then its safe to replace each NULL for 0.0 given the column type in the context of the data

+--------------+
|price_per_unit|
+--------------+
+--------------+



In [178]:
# Replace literals and Null
df = df.withColumn(
    "price_per_unit", when(col("price_per_unit").isNull(), "0").otherwise(col("price_per_unit"))
)

df = df.withColumn(
    "price_per_unit", when(col("price_per_unit") == "fifty", "50.00").otherwise(col("price_per_unit"))
)

df = df.withColumn("price_per_unit", col("price_per_unit").cast(DoubleType()))
median_from_col_quantity = df.approxQuantile("price_per_unit", [0.5], 0.0)[0]
print(f"Median value of price_per_unit column: {median_from_col_quantity}")

df = df.withColumn(
    "price_per_unit",
    when(col("price_per_unit") == 0, median_from_col_quantity).otherwise(col("price_per_unit"))
)
# Round to 2 decimal places for "price_per_unit"
df = df.withColumn("price_per_unit", round(col("price_per_unit"), 2))

df.select("price_per_unit").show(10)

Median value of price_per_unit column: 43.861076396177964
+--------------+
|price_per_unit|
+--------------+
|         15.77|
|          50.0|
|         79.64|
|         27.56|
|          50.0|
|          50.0|
|          50.0|
|         43.86|
|          50.0|
|         37.53|
+--------------+
only showing top 10 rows



In [179]:
# Check two specific orders that should have now the rounded numbers
df.filter((col("order_id") == "ORD001") | (col("order_id") == "ORD003")).select("order_id", "price_per_unit").show()

+--------+--------------+
|order_id|price_per_unit|
+--------+--------------+
|  ORD001|         15.77|
|  ORD003|         79.64|
+--------+--------------+



In [180]:
# Check again Nulls, now "quantity" and "price_per_unit" Columns have been fixed
all_nulls_df = get_all_nulls(df=df)
all_nulls_df.select("quantity", "price_per_unit").show()

+--------+--------------+
|quantity|price_per_unit|
+--------+--------------+
|       0|             0|
+--------+--------------+



In [181]:
# Now lets fix "product_category" as it has 30 Nulls, so lets check that only NULL is what is missing and set for unknown_category
unique_prod_category_values = df.filter(
    (col("product_category").rlike(r"^[a-z]")) |
    (col("product_category").rlike(r"^[A-Z]")) |
    (col("product_category").isNull())
).select("product_category").distinct().rdd.map(lambda x: x[0]).collect()

print(f"Unique product_category col values: {len(unique_prod_category_values)}")
print(sorted(unique_prod_category_values, key=str))

# Replacing Nulls
df = df.withColumn(
    "product_category", when(col("product_category").isNull(), "unknown_category").otherwise(col("product_category"))
)

df.select("product_category").show(10)

Unique product_category col values: 4
['Books', 'Electronics', 'Home & Kitchen', None]
+----------------+
|product_category|
+----------------+
|     Electronics|
|unknown_category|
|           Books|
|           Books|
|unknown_category|
|unknown_category|
|           Books|
|unknown_category|
|unknown_category|
|     Electronics|
+----------------+
only showing top 10 rows



In [182]:
# Now based on the "customer_details" column, we can split such information by "|" and create the following:
#   - customer_name: split by | and get the first set of values (index 0)
#   - customer_address: split by | and get the second set of values (index 1)
#   - customer_address: replace NULL by "unknown_address"
df = df.withColumn("customer_name", split(col("customer_details"), "\\|")[0])
df = df.withColumn("customer_address", split(col("customer_details"), "\\|")[1])
df = df.withColumn(
    "customer_address", 
    when(col("customer_address").isNull(), lit("unknown_address")).otherwise(col("customer_address"))
)

df = df.drop("customer_details")
df.show()

+--------+----------+----------------+--------+--------------+------------------+--------------------+---------------+--------------------+
|order_id|order_date|product_category|quantity|price_per_unit|              tags|               items|  customer_name|    customer_address|
+--------+----------+----------------+--------+--------------+------------------+--------------------+---------------+--------------------+
|  ORD001|      NULL|     Electronics|       4|         15.77|['urgent', 'gift']|['Phone', 'Charge...|  Alice Johnson|     unknown_address|
|  ORD002|2022-12-30|unknown_category|       3|          50.0|    ['bulk_order']|  ['Book1', 'Book2']|     Bob Smith | 584 Street Name,...|
|  ORD003|2023-05-22|           Books|      10|         79.64|              NULL|  ['Book1', 'Book2']| Charlie Brown | 598 Street Name,...|
|  ORD004|2022-12-30|           Books|       0|         27.56|              NULL|['Laptop', 'Mouse...|  David Wilson | 290 Street Name,...|
|  ORD005|      NULL

In [183]:
# Check an example of customer_address, it contains Street Name and City, so we can create two more columns from it
df.filter(col("order_id") == "ORD002").select("customer_address").show(1, False)

+-------------------------+
|customer_address         |
+-------------------------+
| 584 Street Name, City 16|
+-------------------------+



In [184]:
# Based on above cell, extract components from customer_address:
#   - street_name: extract "Street Name"
#   - city: extract "City"
df = df.withColumn("street", regexp_extract(col('customer_address'), r'(\d+) Street Name', 1))
df = df.withColumn("city", regexp_extract(col('customer_address'), r'City (\d+)', 1))
df.select("customer_address", "street", "city").show(10, False)

+-------------------------+------+----+
|customer_address         |street|city|
+-------------------------+------+----+
|unknown_address          |      |    |
| 584 Street Name, City 16|584   |16  |
| 598 Street Name, City 17|598   |17  |
| 290 Street Name, City 12|290   |12  |
| 387 Street Name, City 10|387   |10  |
|unknown_address          |      |    |
| 869 Street Name, City 16|869   |16  |
| 661 Street Name, City 7 |661   |7   |
| 239 Street Name, City 3 |239   |3   |
| 760 Street Name, City 16|760   |16  |
+-------------------------+------+----+
only showing top 10 rows



In [185]:
# Add "unknown" for the empty string values on each column, and drop "customer_address" column
df = df.withColumn(
    "street", 
    when(col("street") == "", lit("unknown")).otherwise(col("street"))
) \
.withColumn(
    "city", 
    when(col("city") == "", lit("unknown")).otherwise(col("city"))
)

df = df.drop("customer_address")
df.show(10, False)

+--------+----------+----------------+--------+--------------+------------------+----------------------------+---------------+-------+-------+
|order_id|order_date|product_category|quantity|price_per_unit|tags              |items                       |customer_name  |street |city   |
+--------+----------+----------------+--------+--------------+------------------+----------------------------+---------------+-------+-------+
|ORD001  |NULL      |Electronics     |4       |15.77         |['urgent', 'gift']|['Phone', 'Charger', 'Case']|Alice Johnson  |unknown|unknown|
|ORD002  |2022-12-30|unknown_category|3       |50.0          |['bulk_order']    |['Book1', 'Book2']          |Bob Smith      |584    |16     |
|ORD003  |2023-05-22|Books           |10      |79.64         |NULL              |['Book1', 'Book2']          |Charlie Brown  |598    |17     |
|ORD004  |2022-12-30|Books           |0       |27.56         |NULL              |['Laptop', 'Mouse', None]   |David Wilson   |290    |12     |

In [186]:
# Now lets fix "items" and "tags" columns, to set column type to Array, to do this first we need to convert the literals
# "['value']" which is an string, into a real list, using the split function

# Create idem-potent function to reuse for both columns that needs similar transformation
def transform_col_to_array_type(df: SparkDataFrame, col_name: str) -> SparkDataFrame:
    """
    This function will transform a column with StringType into ArrayType(StringType)
    also replacing NULL for empty string, in an idem-potent way
    """
    if not dict(df.dtypes)[col_name] == "array<string>":
        df = df.withColumn(col_name, regexp_replace(col(col_name), r"^\[", "")) \
            .withColumn(col_name, regexp_replace(col(col_name), r"\]$", "")) \
            .withColumn(col_name, regexp_replace(col(col_name), r"'", ""))

        df = df.withColumn(
            col_name,
            when(col(col_name).isNull(), "").otherwise(col(col_name))
        )

        df = df.withColumn(col_name, split(col(col_name), ", "))

    return df

df = transform_col_to_array_type(df=df, col_name="tags")
df.select("tags").show(10, False)
df.printSchema()

+--------------+
|tags          |
+--------------+
|[urgent, gift]|
|[bulk_order]  |
|[]            |
|[]            |
|[bulk_order]  |
|[urgent,gift] |
|[bulk_order]  |
|[urgent,gift] |
|[urgent,gift] |
|[urgent,gift] |
+--------------+
only showing top 10 rows

root
 |-- order_id: string (nullable = true)
 |-- order_date: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- price_per_unit: double (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- items: string (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- street: string (nullable = true)
 |-- city: string (nullable = true)



In [187]:
# Same transformation process for "items"
df = transform_col_to_array_type(df=df, col_name="items")
df.select("items").show(10, False)
df.printSchema()

+----------------------+
|items                 |
+----------------------+
|[Phone, Charger, Case]|
|[Book1, Book2]        |
|[Book1, Book2]        |
|[Laptop, Mouse, None] |
|[Phone, Charger, Case]|
|[Table, Chair, -1]    |
|[Table, Chair, -1]    |
|[Table, Chair, -1]    |
|[Book1, Book2]        |
|[]                    |
+----------------------+
only showing top 10 rows

root
 |-- order_id: string (nullable = true)
 |-- order_date: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- price_per_unit: double (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- items: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- customer_name: string (nullable = true)
 |-- street: string (nullable = true)
 |-- city: string (nullable = true)



In [188]:
# As we can see there is still work to be done on the "items" column as on each row, inside the arrays
# there are values like "-1", "None" that are also corrupting the data, so lets try to identify
# if besides those two there are other values that need to be removed

# Get a unique set of values by getting the set() of everything that is not "None" AND "-1"
# explode() and collect() used here
result = list()
for items in df.select("items", explode("items")).collect():
    result.extend([item for item in items[0] if not item == "None" and not item == "-1"])

list(set(result))

['',
 'Laptop',
 'Table',
 'Chair',
 'Book2',
 'Book1',
 'Charger',
 'Case',
 'Phone',
 'Mouse']

In [189]:
# So based on the result above, we are confident that only "None" and "-1" are the values
# to be removed for each array when present. Using array_remove() for this
df = df.withColumn("items", array_remove("items", "None")).withColumn("items", array_remove("items", "-1"))
df.select("items").show(truncate=False)

+----------------------+
|items                 |
+----------------------+
|[Phone, Charger, Case]|
|[Book1, Book2]        |
|[Book1, Book2]        |
|[Laptop, Mouse]       |
|[Phone, Charger, Case]|
|[Table, Chair]        |
|[Table, Chair]        |
|[Table, Chair]        |
|[Book1, Book2]        |
|[]                    |
|[]                    |
|[Book1, Book2]        |
|[Table, Chair]        |
|[Book1, Book2]        |
|[Phone, Charger, Case]|
|[]                    |
|[Laptop, Mouse]       |
|[Phone, Charger, Case]|
|[Table, Chair]        |
|[]                    |
+----------------------+
only showing top 20 rows



In [190]:
# Lets check how the DataSet is now afer the transformations
df.show(10, truncate=False)
df.printSchema()

+--------+----------+----------------+--------+--------------+--------------+----------------------+---------------+-------+-------+
|order_id|order_date|product_category|quantity|price_per_unit|tags          |items                 |customer_name  |street |city   |
+--------+----------+----------------+--------+--------------+--------------+----------------------+---------------+-------+-------+
|ORD001  |NULL      |Electronics     |4       |15.77         |[urgent, gift]|[Phone, Charger, Case]|Alice Johnson  |unknown|unknown|
|ORD002  |2022-12-30|unknown_category|3       |50.0          |[bulk_order]  |[Book1, Book2]        |Bob Smith      |584    |16     |
|ORD003  |2023-05-22|Books           |10      |79.64         |[]            |[Book1, Book2]        |Charlie Brown  |598    |17     |
|ORD004  |2022-12-30|Books           |0       |27.56         |[]            |[Laptop, Mouse]       |David Wilson   |290    |12     |
|ORD005  |NULL      |unknown_category|4       |50.0          |[bulk_o

In [191]:
# First lets fill NULLs for a default date of "1900-01-01", then set the column to_date()
df = df.withColumn("order_date", when(col("order_date").isNull(), lit("1900-01-01")).otherwise(col("order_date")))
df = df.withColumn("order_date", to_date("order_date", "yyyy-MM-dd"))
df.select("order_date").show(5, truncate=False)
df.printSchema()

+----------+
|order_date|
+----------+
|1900-01-01|
|2022-12-30|
|2023-05-22|
|2022-12-30|
|1900-01-01|
+----------+
only showing top 5 rows

root
 |-- order_id: string (nullable = true)
 |-- order_date: date (nullable = true)
 |-- product_category: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- price_per_unit: double (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- items: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- customer_name: string (nullable = true)
 |-- street: string (nullable = true)
 |-- city: string (nullable = true)



In [192]:
# End result
df.show(20, truncate=False)

+--------+----------+----------------+--------+--------------+--------------+----------------------+---------------+-------+-------+
|order_id|order_date|product_category|quantity|price_per_unit|tags          |items                 |customer_name  |street |city   |
+--------+----------+----------------+--------+--------------+--------------+----------------------+---------------+-------+-------+
|ORD001  |1900-01-01|Electronics     |4       |15.77         |[urgent, gift]|[Phone, Charger, Case]|Alice Johnson  |unknown|unknown|
|ORD002  |2022-12-30|unknown_category|3       |50.0          |[bulk_order]  |[Book1, Book2]        |Bob Smith      |584    |16     |
|ORD003  |2023-05-22|Books           |10      |79.64         |[]            |[Book1, Book2]        |Charlie Brown  |598    |17     |
|ORD004  |2022-12-30|Books           |0       |27.56         |[]            |[Laptop, Mouse]       |David Wilson   |290    |12     |
|ORD005  |1900-01-01|unknown_category|4       |50.0          |[bulk_o

### Notes:
- When initially loading sample data (CSV, JSON) to define its quality, using an Schema based on StringType is a safe approach
- Then by checking each column to infer which is the correct type to use, check which minimal transformation needs to be done
- Casting directly a column to the desired type might lead to loose data that could be trasformed prior infering the Type
- In some cases the context of the data in a column will allow to replace missing/corrupted data, like None, Null, -1, etc
- Dig into PySpark SQL Functions in order to interact with the data in a more performant and safer way

In [193]:
# Stop Spark Session
spark.stop()