In [6]:
df = spark.table("coffee_sales_raw")
df.count()



StatementMeta(, 1fa31335-753c-40a2-83f7-edc215402246, 8, Finished, Available, Finished)

3547

In [7]:
# confirmed column and col-type

df.printSchema()
df.show(5, truncate=False)


StatementMeta(, 1fa31335-753c-40a2-83f7-edc215402246, 9, Finished, Available, Finished)

root
 |-- transaction_id: double (nullable = true)
 |-- hour_of_day: double (nullable = true)
 |-- cash_type: string (nullable = true)
 |-- money: double (nullable = true)
 |-- coffee_name: string (nullable = true)
 |-- time_of_day: string (nullable = true)
 |-- weekday: string (nullable = true)
 |-- month_name: string (nullable = true)
 |-- date: string (nullable = true)
 |-- time: timestamp (nullable = true)

+--------------+-----------+---------+-----+-----------+-----------+-------+----------+----------+-------------------+
|transaction_id|hour_of_day|cash_type|money|coffee_name|time_of_day|weekday|month_name|date      |time               |
+--------------+-----------+---------+-----+-----------+-----------+-------+----------+----------+-------------------+
|30.0          |14.0       |card     |24.0 |Espresso   |Afternoon  |Mon    |Mar       |04-03-2024|2025-12-09 14:04:38|
|56.0          |12.0       |card     |24.0 |Espresso   |Afternoon  |Sat    |Mar       |09-03-2024|2025-12-09 

In [9]:
# Load raw coffee sales data and perform initial cleaning and type standardization
from pyspark.sql.functions import col, trim, to_date
from pyspark.sql.types import IntegerType, DoubleType

df = spark.table("coffee_sales_raw")

df_clean = (
    df.dropDuplicates()
      .withColumn("transaction_id", col("transaction_id").cast(IntegerType()))
      .withColumn("hour_of_day", col("hour_of_day").cast(IntegerType()))
      .withColumn("money", col("money").cast(DoubleType()))
      .withColumn("cash_type", trim(col("cash_type")))
      .withColumn("coffee_name", trim(col("coffee_name")))
      .withColumn("time_of_day", trim(col("time_of_day")))
      .withColumn("weekday", trim(col("weekday")))
      .withColumn("month_name", trim(col("month_name")))
      .withColumn("date", to_date(col("date"), "dd-MM-yyyy"))
)


StatementMeta(, 1fa31335-753c-40a2-83f7-edc215402246, 11, Finished, Available, Finished)

In [10]:
from pyspark.sql.functions import col, concat_ws, date_format, to_timestamp

df_clean = (
    df_clean
    .withColumn(
        "order_ts",
        to_timestamp(
            concat_ws(
                " ",
                date_format(col("date"), "yyyy-MM-dd"),
                date_format(col("time"), "HH:mm:ss")
            ),
            "yyyy-MM-dd HH:mm:ss"
        )
    )
)



StatementMeta(, 1fa31335-753c-40a2-83f7-edc215402246, 12, Finished, Available, Finished)

In [11]:
print(df_clean.columns)


StatementMeta(, 1fa31335-753c-40a2-83f7-edc215402246, 13, Finished, Available, Finished)

['transaction_id', 'hour_of_day', 'cash_type', 'money', 'coffee_name', 'time_of_day', 'weekday', 'month_name', 'date', 'time', 'order_ts']


In [12]:
from pyspark.sql.functions import col, sum as _sum

df_clean.select(
    _sum(col("order_ts").isNull().cast("int")).alias("null_order_ts")
).show()




StatementMeta(, 1fa31335-753c-40a2-83f7-edc215402246, 14, Finished, Available, Finished)

+-------------+
|null_order_ts|
+-------------+
|            0|
+-------------+



In [13]:
# Create unified timestamp, remove redundant fields, and store final clean table
from pyspark.sql.functions import to_timestamp, concat_ws

df_clean = (
    df_clean
      .withColumn(
          "order_ts",
          to_timestamp(concat_ws(" ", col("date"), col("time")), "dd-MM-yyyy HH:mm:ss")
      )
      .drop("time")
)

df_clean.write.mode("overwrite").saveAsTable("coffee_sales_clean")


StatementMeta(, 1fa31335-753c-40a2-83f7-edc215402246, 15, Finished, Available, Finished)

In [14]:
# row count before and after duplicates

raw_cnt = spark.table("coffee_sales_raw").count()
clean_cnt = df_clean.count()
print(f"Raw rows: {raw_cnt}, After dropDuplicates: {clean_cnt}")


StatementMeta(, 1fa31335-753c-40a2-83f7-edc215402246, 16, Finished, Available, Finished)

Raw rows: 3547, After dropDuplicates: 3547
