In [34]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import DateType
from pyspark.sql.window import Window

In [35]:
spark = SparkSession.builder.getOrCreate()
df = spark.read\
            .option("header",True) \
            .format("csv") \
            .load("../data/uncleaned_data.csv")

In [36]:
# w = Window.partitionBy(["InvoiceNo", "StockCode", "Quantity", "InvoiceDate", "UnitPrice", "Country"])
w = Window.orderBy(F.lit("A")) # orderBy as it is by a dummy value

df.withColumn(
    "_kde_id",
    F.row_number().over(w)
).select("_kde_id", *df.columns).tail(10)

[Row(_kde_id=541900, InvoiceNo='581587', StockCode='22726', Description='ALARM CLOCK BAKELIKE GREEN', Quantity='4', InvoiceDate='12/9/2011 12:50', UnitPrice='3.75', CustomerID='12680', Country='France'),
 Row(_kde_id=541901, InvoiceNo='581587', StockCode='22730', Description='ALARM CLOCK BAKELIKE IVORY', Quantity='4', InvoiceDate='12/9/2011 12:50', UnitPrice='3.75', CustomerID='12680', Country='France'),
 Row(_kde_id=541902, InvoiceNo='581587', StockCode='22367', Description='CHILDRENS APRON SPACEBOY DESIGN', Quantity='8', InvoiceDate='12/9/2011 12:50', UnitPrice='1.95', CustomerID='12680', Country='France'),
 Row(_kde_id=541903, InvoiceNo='581587', StockCode='22629', Description='SPACEBOY LUNCH BOX ', Quantity='12', InvoiceDate='12/9/2011 12:50', UnitPrice='1.95', CustomerID='12680', Country='France'),
 Row(_kde_id=541904, InvoiceNo='581587', StockCode='23256', Description='CHILDRENS CUTLERY SPACEBOY ', Quantity='4', InvoiceDate='12/9/2011 12:50', UnitPrice='4.15', CustomerID='12680',

In [37]:
df.describe().show()

+-------+------------------+------------------+--------------------+-----------------+---------------+-----------------+------------------+-----------+
|summary|         InvoiceNo|         StockCode|         Description|         Quantity|    InvoiceDate|        UnitPrice|        CustomerID|    Country|
+-------+------------------+------------------+--------------------+-----------------+---------------+-----------------+------------------+-----------+
|  count|            541909|            541909|              540455|           541909|         541909|           541909|            406829|     541909|
|   mean|  559965.752026781|27623.240210938104|             20713.0| 9.55224954743324|           null|4.611113626089252|15287.690570239585|       null|
| stddev|13428.417280798243|16799.737628427687|                null|218.0811578502336|           null|96.75985306117948|1713.6003033215975|       null|
|    min|            536365|             10002| 4 PURPLE FLOCK D...|               -1|1/

In [38]:
df.count()

541909

In [39]:
df.dropDuplicates(df.columns).count()

536641

In [40]:
df \
    .groupby(df.columns) \
    .count() \
    .where('count > 1') \
    .sort('count', ascending=False) \
    .show()

+---------+---------+--------------------+--------+----------------+---------+----------+--------------+-----+
|InvoiceNo|StockCode|         Description|Quantity|     InvoiceDate|UnitPrice|CustomerID|       Country|count|
+---------+---------+--------------------+--------+----------------+---------+----------+--------------+-----+
|   555524|    22698|PINK REGENCY TEAC...|       1|  6/5/2011 11:37|     2.95|     16923|United Kingdom|   20|
|   555524|    22697|GREEN REGENCY TEA...|       1|  6/5/2011 11:37|     2.95|     16923|United Kingdom|   12|
|   572861|    22775|PURPLE DRAWERKNOB...|      12|10/26/2011 12:46|     1.25|     14102|United Kingdom|    8|
|   540524|    21756|BATH BUILDING BLO...|       1|  1/9/2011 12:53|     5.95|     16735|United Kingdom|    6|
|   541266|    21755|LOVE BUILDING BLO...|       1| 1/16/2011 16:25|     5.95|     15673|United Kingdom|    6|
|   541266|    21754|HOME BUILDING BLO...|       1| 1/16/2011 16:25|     5.95|     15673|United Kingdom|    6|
|

In [41]:
df.count()

541909

In [42]:
df.withColumn("index_column", F.monotonically_increasing_id())\
    .where(F.col("index_column") == 49911).show()

+---------+---------+--------------------+--------+--------------+---------+----------+--------------+------------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|index_column|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+------------+
|   540538|    20970|PINK FLORAL FELTC...|       1|1/9/2011 14:36|     3.75|     17841|United Kingdom|       49911|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+------------+



In [43]:
df.withColumn("index_column", F.monotonically_increasing_id())\
    .where(F.col("index_column") >= 49911).show()

+---------+---------+--------------------+--------+--------------+---------+----------+--------------+------------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|index_column|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+------------+
|   540538|    20970|PINK FLORAL FELTC...|       1|1/9/2011 14:36|     3.75|     17841|United Kingdom|       49911|
|   540538|    22274|FELTCRAFT DOLL EMILY|       1|1/9/2011 14:36|     2.95|     17841|United Kingdom|  8589934592|
|   540538|    22179|SET 10 LIGHTS NIG...|       2|1/9/2011 14:36|     6.75|     17841|United Kingdom|  8589934593|
|   540538|   15060B|FAIRY CAKE DESIGN...|       1|1/9/2011 14:36|     3.75|     17841|United Kingdom|  8589934594|
|   540538|    22212|FOUR HOOK  WHITE ...|       1|1/9/2011 14:36|      2.1|     17841|United Kingdom|  8589934595|
|   540538|    22726|ALARM CLOCK BAKEL...|       1|1/9/2011 14:36|     3

In [44]:
df.where((F.col("InvoiceNo") == 555524) & (F.col("StockCode") == 22698)).show(truncate=False)

+---------+---------+------------------------------+--------+--------------+---------+----------+--------------+
|InvoiceNo|StockCode|Description                   |Quantity|InvoiceDate   |UnitPrice|CustomerID|Country       |
+---------+---------+------------------------------+--------+--------------+---------+----------+--------------+
|555524   |22698    |PINK REGENCY TEACUP AND SAUCER|1       |6/5/2011 11:37|2.95     |16923     |United Kingdom|
|555524   |22698    |PINK REGENCY TEACUP AND SAUCER|1       |6/5/2011 11:37|2.95     |16923     |United Kingdom|
|555524   |22698    |PINK REGENCY TEACUP AND SAUCER|1       |6/5/2011 11:37|2.95     |16923     |United Kingdom|
|555524   |22698    |PINK REGENCY TEACUP AND SAUCER|1       |6/5/2011 11:37|2.95     |16923     |United Kingdom|
|555524   |22698    |PINK REGENCY TEACUP AND SAUCER|1       |6/5/2011 11:37|2.95     |16923     |United Kingdom|
|555524   |22698    |PINK REGENCY TEACUP AND SAUCER|1       |6/5/2011 11:37|2.95     |16923     

In [45]:
df.where((F.col("Quantity") != 1)).show()

+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/2010 8:26|     7.65|     17850|United Kingdom|
|   536365|    21730|GLASS STAR FROSTE...|       6|12/1/2010 8:26|     4.

In [46]:
df.withColumn("invoice_date", F.trim(F.col("InvoiceDate"))).show()

+---------+---------+--------------------+--------+--------------+---------+----------+--------------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|  invoice_date|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|12/1/2010 8:26|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|12/1/2010 8:26|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|     2.75|     17850|United Kingdom|12/1/2010 8:26|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|12/1/2010 8:26|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|12/1/2010 8:26|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/

In [47]:
df.withColumn("invoice_date", F.col("InvoiceDate")).show()

+---------+---------+--------------------+--------+--------------+---------+----------+--------------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|  invoice_date|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|12/1/2010 8:26|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|12/1/2010 8:26|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|     2.75|     17850|United Kingdom|12/1/2010 8:26|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|12/1/2010 8:26|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|12/1/2010 8:26|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/

In [48]:
df.withColumn("unit_price", F.round(F.col("UnitPrice"), 1)).show()

+---------+---------+--------------------+--------+--------------+---------+----------+--------------+----------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|unit_price|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+----------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|       2.6|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|       3.4|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|     2.75|     17850|United Kingdom|       2.8|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|       3.4|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|       3.4|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/2010 8:26|     7.65|     17850|U

In [49]:
# df.withColumn("created_d", F.date_format(F.to_date("InvoiceDate", "M/d/yyyy H:mm"), "yyyy-MM-dd"))\
df.withColumn("created_d", F.date_format(F.to_date("InvoiceDate", "M/d/yyyy H:mm"), "yyyy-MM-dd").cast(DateType()))\
    .withColumn("_temp_date", F.date_format(F.to_date("created_d", "yyyy-MM-dd"), "dd-MM-yyyy")) \
    .withColumn("ptn_yyyy", F.year("created_d")) \
    .withColumn("ptn_mm", F.month("created_d")) \
    .withColumn("ptn_dd", F.dayofmonth("created_d")) \
    .drop("_temp_date") \
    .show(10)

+---------+---------+--------------------+--------+--------------+---------+----------+--------------+----------+--------+------+------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country| created_d|ptn_yyyy|ptn_mm|ptn_dd|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+----------+--------+------+------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|2010-12-01|    2010|    12|     1|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|2010-12-01|    2010|    12|     1|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|     2.75|     17850|United Kingdom|2010-12-01|    2010|    12|     1|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|2010-12-01|    2010|    12|     1|
|   536365|   84029E|RED WOOLLY HOTTIE...

In [50]:
# df.withColumn("ingestion_date", .cast(DateType()))\
df.withColumn("ingestion_date", F.lit("2025-04-23").cast(DateType()))\
    .withColumn("ptn_yyyy", F.year("ingestion_date")) \
    .withColumn("ptn_mm", F.month("ingestion_date")) \
    .withColumn("ptn_dd", F.dayofmonth("ingestion_date")) \
    .show(10)

+---------+---------+--------------------+--------+--------------+---------+----------+--------------+--------------+--------+------+------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|ingestion_date|ptn_yyyy|ptn_mm|ptn_dd|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+--------------+--------+------+------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|    2025-04-23|    2025|     4|    23|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|    2025-04-23|    2025|     4|    23|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|     2.75|     17850|United Kingdom|    2025-04-23|    2025|     4|    23|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|    2025-04-23|    2025|     4|    23|
|   536365|  

In [51]:
df.withColumn("created_d", F.date_format(F.to_date("InvoiceDate", "M/d/yyyy H:mm"), "yyyy-MM-dd").cast(DateType()))\
    .withColumn("ptn_yyyy", F.year("created_d")) \
    .withColumn("ptn_mm", F.month("created_d")) \
    .withColumn("ptn_dd", F.dayofmonth("created_d")) \
    .printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: string (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- created_d: date (nullable = true)
 |-- ptn_yyyy: integer (nullable = true)
 |-- ptn_mm: integer (nullable = true)
 |-- ptn_dd: integer (nullable = true)



In [52]:
sample = ["invoice_date", "country", "transaction_date"]

date_column = [column_name for column_name in sample if "date" in column_name]

if len(date_column) > 1:
    pass
else:
    date_col = date_column[0]
date_column
# date_col

['invoice_date', 'transaction_date']

In [53]:
# df.withColumn("created_d", F.date_format(F.to_date("InvoiceDate", "M/d/yyyy H:mm"), "yyyy-MM-dd"))\
df_type = df.withColumn("created_d", F.date_format(F.to_date("InvoiceDate", "M/d/yyyy H:mm"), "yyyy-MM-dd").cast(DateType()))\
    .withColumn("ptn_yyyy", F.year("created_d")) \
    .withColumn("ptn_mm", F.month("created_d")) \
    .withColumn("ptn_dd", F.dayofmonth("created_d")) \
    .dtypes
df_type = {field_name: data_type for field_name, data_type in df_type}

In [54]:
df_type

{'InvoiceNo': 'string',
 'StockCode': 'string',
 'Description': 'string',
 'Quantity': 'string',
 'InvoiceDate': 'string',
 'UnitPrice': 'string',
 'CustomerID': 'string',
 'Country': 'string',
 'created_d': 'date',
 'ptn_yyyy': 'int',
 'ptn_mm': 'int',
 'ptn_dd': 'int'}

In [55]:
# df.withColumn("created_d", F.date_format(F.to_date("InvoiceDate", "M/d/yyyy H:mm"), "yyyy-MM-dd"))\
df.withColumn("invoice_timestamp", F.date_format(F.to_timestamp("InvoiceDate", "M/d/yyyy H:mm"), "yyyy-MM-dd HH:mm:ss")) \
    .show()

+---------+---------+--------------------+--------+--------------+---------+----------+--------------+-------------------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|  invoice_timestamp|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+-------------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|2010-12-01 08:26:00|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|2010-12-01 08:26:00|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|     2.75|     17850|United Kingdom|2010-12-01 08:26:00|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|2010-12-01 08:26:00|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|2010-12-01 08:26:00|
|   536365|    2

In [56]:
df.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: string (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- Country: string (nullable = true)



In [57]:
df.select(F.regexp_extract("InvoiceNo", r'\D', 0).alias("is_contain_char")).show(5)

+---------------+
|is_contain_char|
+---------------+
|               |
|               |
|               |
|               |
|               |
+---------------+
only showing top 5 rows



In [58]:
df.select(F.regexp_extract("InvoiceNo", r'\D', 0).alias("is_contain_char"))\
    .where((F.col("is_contain_char") != "") & (F.col("is_contain_char") != "C") & ((F.col("is_contain_char") != "A"))) \
    .show()

+---------------+
|is_contain_char|
+---------------+
+---------------+



In [59]:
df.select(F.regexp_extract("Quantity", r'\D', 0).alias("is_contain_char"))\
    .where((F.col("is_contain_char") != "") & ((F.col("is_contain_char") != "-"))) \
    .show()

+---------------+
|is_contain_char|
+---------------+
+---------------+



In [60]:
df.select(F.regexp_extract("Quantity", r'\.', 0).alias("is_contain_char"))\
    .where((F.col("is_contain_char") != "")) \
    .show()

+---------------+
|is_contain_char|
+---------------+
+---------------+



In [61]:
df.withColumn("is_contain", F.regexp_extract("CustomerID", r"\D", 0))\
    .where(
        (F.col("is_contain") != "")
    ) \
    .show()

+---------+---------+-----------+--------+-----------+---------+----------+-------+----------+
|InvoiceNo|StockCode|Description|Quantity|InvoiceDate|UnitPrice|CustomerID|Country|is_contain|
+---------+---------+-----------+--------+-----------+---------+----------+-------+----------+
+---------+---------+-----------+--------+-----------+---------+----------+-------+----------+



In [62]:
spark.sparkContext._jvm.org.apache.hadoop.util.VersionInfo.getVersion()

'3.3.4'

In [63]:
spark.sparkContext._jvm.org.apache.hadoop.util.NativeCodeLoader.isNativeCodeLoaded()

True

In [64]:
spark.stop()