In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col, isnan
from pyspark.sql.types import IntegerType, FloatType, StringType, TimestampType
from pyspark.sql.window import Window

In [2]:
# Create a Spark session
spark = SparkSession.builder \
    .appName("TestApp") \
    .master("local[*]") \
    .getOrCreate()
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")
# Verify SparkSession is created by checking version
print("Spark version: ", spark.version)

# Test Spark functionality
df = spark.read.csv('data/transaction_data.csv', header=True)

# Show the DataFrame
df.show()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/27 23:11:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark version:  3.5.5
+------+-------------+--------------------+--------+--------------------+----------------------+-----------+--------------+
|UserId|TransactionId|     TransactionTime|ItemCode|     ItemDescription|NumberOfItemsPurchased|CostPerItem|       Country|
+------+-------------+--------------------+--------+--------------------+----------------------+-----------+--------------+
|278166|      6355745|Sat Feb 02 12:50:...|  465549|FAMILY ALBUM WHIT...|                     6|      11.73|United Kingdom|
|337701|      6283376|Wed Dec 26 09:06:...|  482370|LONDON BUS COFFEE...|                     3|       3.52|United Kingdom|
|267099|      6385599|Fri Feb 15 09:45:...|  490728|SET 12 COLOUR PEN...|                    72|        0.9|        France|
|380478|      6044973|Fri Jun 22 07:14:...|  459186|UNION JACK FLAG L...|                     3|       1.73|United Kingdom|
|    -1|      6143225|Mon Sep 10 11:58:...| 1733592| WASHROOM METAL SIGN|                     3|        3.4|Un

## Q1. Clean data and filter Outliers

### Step 1: Handle Null or missing values

In [3]:
# Drop duplicated rows
print(f'Before: {df.count()} rows')
df = df.dropDuplicates()
print(f'After: {df.count()} rows')

total_rows = df.count()
for column in df.columns:
    missing_count = df.filter(col(column).isNull() | isnan(col(column))).count()
    missing_percentage = (missing_count / total_rows) * 100
    print(f"Column: {column}, Missing Value Percentage: {missing_percentage:.6f}% - {missing_count} rows")

                                                                                

Before: 1083818 rows


                                                                                

After: 536572 rows


                                                                                

Column: UserId, Missing Value Percentage: 0.000000% - 0 rows


                                                                                

Column: TransactionId, Missing Value Percentage: 0.000000% - 0 rows


                                                                                

Column: TransactionTime, Missing Value Percentage: 0.000000% - 0 rows


                                                                                

Column: ItemCode, Missing Value Percentage: 0.000000% - 0 rows


                                                                                

Column: ItemDescription, Missing Value Percentage: 0.270979% - 1454 rows


                                                                                

Column: NumberOfItemsPurchased, Missing Value Percentage: 0.000000% - 0 rows


                                                                                

Column: CostPerItem, Missing Value Percentage: 0.000000% - 0 rows


[Stage 41:>                                                         (0 + 8) / 8]

Column: Country, Missing Value Percentage: 0.000000% - 0 rows


                                                                                

`ItemDescription` (0.27%):

- The `ItemDescription` column has a small proportion of missing values (0.27% of the data). Since the dataset contains a relatively small number of missing rows, the decision is to fill these missing values with the placeholder 'N/A'.

- Filling missing values with 'N/A' is appropriate because:

    + Minimal Impact: The percentage of missing data is small, so it won’t drastically affect the dataset's overall integrity.

    + Maintaining Consistency: Filling with 'N/A' ensures that no rows are removed, preserving the dataset size for analysis.

    + Descriptive Placeholder: 'N/A' clearly indicates the absence of a description, making it easy to identify and address during analysis.

Other columns have no missing values -> No action required

In [4]:
df_cleaned = df.fillna({'ItemDescription': 'N/A'})

# Verify after cleaning
column = 'ItemDescription'
missing_count = df_cleaned.filter(col(column).isNull() | isnan(col(column))).count()
missing_percentage = (missing_count / total_rows) * 100
print(f"Column: {column}, Missing Value Percentage: {missing_percentage:.6f}% - {missing_count} rows")



Column: ItemDescription, Missing Value Percentage: 0.000000% - 0 rows


                                                                                

### Step 2: Correct data types
- UserId: string
- TransactionId: string
- TransactionTime: timestamp
- ItemCode: string
- ItemDescription: string
- NumberOfItemsPurchased: integer
- CostPerItem: float
- Country: string

In [5]:
# Print data type of each column
for column, dtype in df_cleaned.dtypes:
    print(f"Column: {column}, Data Type: {dtype}")

Column: UserId, Data Type: string
Column: TransactionId, Data Type: string
Column: TransactionTime, Data Type: string
Column: ItemCode, Data Type: string
Column: ItemDescription, Data Type: string
Column: NumberOfItemsPurchased, Data Type: string
Column: CostPerItem, Data Type: string
Column: Country, Data Type: string


In [6]:
# Cast columns to appropriate data types
datetime_format = 'EEE MMM dd HH:mm:ss zzzz yyyy'

# Cast the column to timestamp
df_casted = df_cleaned.withColumn('TransactionTime', F.to_timestamp('TransactionTime', datetime_format))
df_casted = df_casted.withColumn('NumberOfItemsPurchased', F.col('NumberOfItemsPurchased').cast(IntegerType()))
df_casted = df_casted.withColumn('CostPerItem', F.col('CostPerItem').cast(FloatType()))

In [7]:
# Print data type of each column
for column, dtype in df_casted.dtypes:
    print(f"Column: {column}, Data Type: {dtype}")
df_casted.show()

Column: UserId, Data Type: string
Column: TransactionId, Data Type: string
Column: TransactionTime, Data Type: timestamp
Column: ItemCode, Data Type: string
Column: ItemDescription, Data Type: string
Column: NumberOfItemsPurchased, Data Type: int
Column: CostPerItem, Data Type: float
Column: Country, Data Type: string




+------+-------------+-------------------+--------+--------------------+----------------------+-----------+--------------+
|UserId|TransactionId|    TransactionTime|ItemCode|     ItemDescription|NumberOfItemsPurchased|CostPerItem|       Country|
+------+-------------+-------------------+--------+--------------------+----------------------+-----------+--------------+
|    -1|      6143225|2018-09-10 16:58:00|  447867| SKULLS WRITING SET |                   120|       1.15|United Kingdom|
|313131|      6333261|2019-01-23 11:32:00|  464898|BAKING SET 9 PIEC...|                    18|       6.84|          EIRE|
|267708|      6360321|2019-02-04 12:47:00|  491232|SPACEBOY ROCKET L...|                     3|       2.88|United Kingdom|
|259917|      5968446|2018-04-11 16:45:00|  445326|STRAWBERRY HONEYC...|                    36|       2.28|   Switzerland|
|289107|      6240806|2018-12-01 16:53:00|  464394|HOT WATER BOTTLE ...|                    72|       5.18|United Kingdom|
|308175|      63

                                                                                

### Step 3: Filter and Treat Outliers

Outliers typically exist in numerical columns, and they can significantly impact the performance and accuracy of machine learning models. In our dataset, the following columns are numerical and could potentially contain outliers:

- `NumberOfItemsPurchased`: This column represents the total number of items purchased, which is a numeric value.

- `CostPerItem`: This column represents the cost of each item purchased, which is also numeric.

In [8]:
# Compute summary statistics
column = 'NumberOfItemsPurchased'
summary_df = df_casted.describe([column])
summary_df.show()

[Stage 52:>                                                         (0 + 8) / 9]

+-------+----------------------+
|summary|NumberOfItemsPurchased|
+-------+----------------------+
|  count|                536572|
|   mean|     28.86212661115377|
| stddev|     657.4326449645174|
|    min|               -242985|
|    max|                242985|
+-------+----------------------+



                                                                                

Issues Detected:
- Negative Values (min = -242,985)

    + Logically, `NumberOfItemsPurchased` should never be negative.

    + Possible causes: data entry errors, refunds/mistaken deductions recorded incorrectly.

- Extreme Outliers (max = 242,985 & stddev = 657.43)

    + Mean is only 28.66, but max is 242,985 → Suggests huge outliers.

    + High standard deviation (654) relative to the mean suggests extreme variation.

In [9]:
# Remove negative values
df_filtered = df_casted.filter(col(column) >= 0)
# Verify after filtering
summary_df = df_filtered.describe([column])
summary_df.show()

# Use IQR to remove outliers
q1, q3 = df_filtered.approxQuantile(column, [0.25, 0.75], 0.01)
iqr = q3 - q1
lower_bound = max(q1 - 1.5 * iqr, 0)  # Ensure no negatives
upper_bound = q3 + 1.5 * iqr
df_filtered = df_filtered.filter((col(column) >= lower_bound) & 
                        (col(column) <= upper_bound))
# Verify after filtering
summary_df = df_filtered.describe([column])
summary_df.show()

                                                                                

+-------+----------------------+
|summary|NumberOfItemsPurchased|
+-------+----------------------+
|  count|                525986|
|   mean|     32.19496336404391|
| stddev|     472.8059451987196|
|    min|                     3|
|    max|                242985|
+-------+----------------------+





+-------+----------------------+
|summary|NumberOfItemsPurchased|
+-------+----------------------+
|  count|                469463|
|   mean|    14.233025818861124|
| stddev|     13.34794069707312|
|    min|                     3|
|    max|                    69|
+-------+----------------------+



                                                                                

In [10]:
# Compute summary statistics
column = 'CostPerItem'
summary_df = df_filtered.describe([column])
summary_df.show()

[Stage 74:>                                                         (0 + 8) / 9]

+-------+-----------------+
|summary|      CostPerItem|
+-------+-----------------+
|  count|           469463|
|   mean|9.388288694970152|
| stddev|2476.452940245975|
|    min|        -15265.64|
|    max|        1696285.5|
+-------+-----------------+



                                                                                

Issues Detected:
- Negative Values (min = -15,265.64)

    + Logically, `CostPerItem` should never be negative.

- Extreme Outliers (max = 1,696,285.5)

    + The max value is much larger than the mean (9.39) and standard deviation (2,476.45).

In [11]:
# Remove negative values
df_filtered = df_filtered.filter(col(column) >= 0)
# Verify after filtering
summary_df = df_filtered.describe([column])
summary_df.show()

# Use IQR to remove outliers
q1, q3 = df_filtered.approxQuantile(column, [0.25, 0.75], 0.01)
iqr = q3 - q1
lower_bound = max(q1 - 1.5 * iqr, 0)  # Ensure no negatives
upper_bound = q3 + 1.5 * iqr
df_filtered = df_filtered.filter((col(column) >= lower_bound) & 
                        (col(column) <= upper_bound))
# Verify after filtering
summary_df = df_filtered.describe([column])
summary_df.show()

                                                                                

+-------+------------------+
|summary|       CostPerItem|
+-------+------------------+
|  count|            469461|
|   mean| 9.453363442125431|
| stddev|2476.2575123442357|
|    min|               0.0|
|    max|         1696285.5|
+-------+------------------+





+-------+------------------+
|summary|       CostPerItem|
+-------+------------------+
|  count|            438503|
|   mean| 3.848816805364888|
| stddev|2.8163446099997533|
|    min|               0.0|
|    max|             11.73|
+-------+------------------+



                                                                                

In [12]:
# Remove transactions with TransactionTime later than the current time
current_time = F.current_timestamp()
df_filtered = df_filtered.filter(F.col("TransactionTime") <= current_time)
print(f'Number of rows after filtering: {df_filtered.count()}')
df_filtered.show()

                                                                                

Number of rows after filtering: 436062




+------+-------------+-------------------+--------+--------------------+----------------------+-----------+--------------+
|UserId|TransactionId|    TransactionTime|ItemCode|     ItemDescription|NumberOfItemsPurchased|CostPerItem|       Country|
+------+-------------+-------------------+--------+--------------------+----------------------+-----------+--------------+
|313131|      6333261|2019-01-23 11:32:00|  464898|BAKING SET 9 PIEC...|                    18|       6.84|          EIRE|
|267708|      6360321|2019-02-04 12:47:00|  491232|SPACEBOY ROCKET L...|                     3|       2.88|United Kingdom|
|259917|      5968446|2018-04-11 16:45:00|  445326|STRAWBERRY HONEYC...|                    36|       2.28|   Switzerland|
|308175|      6360805|2019-02-04 13:58:00|  485520|PACK OF 6 SMALL F...|                     3|       0.58|United Kingdom|
|    -1|      6016439|2018-05-30 12:22:00| 1783866|ANTIQUE SILVER T-...|                     9|       1.73|United Kingdom|
|281442|      61

                                                                                

## Q2. Calculate the number of Items purchased and prices in each month

In [13]:
# Extract month, year from TransactionTime
df_temp = df_filtered.withColumn("TransactionMonth", F.month("TransactionTime"))
df_temp = df_temp.withColumn("TransactionYear", F.year("TransactionTime"))
result = df_temp.groupBy("TransactionYear", "TransactionMonth") \
           .agg(
               F.sum("NumberOfItemsPurchased").alias("TotalItemsPurchased"),
               F.round(F.sum(F.col("CostPerItem") * F.col("NumberOfItemsPurchased")), 2).alias("TotalPrice")
           )

result = result.orderBy("TransactionYear", "TransactionMonth")
result.show()

[Stage 105:>                                                        (0 + 8) / 8]

+---------------+----------------+-------------------+----------+
|TransactionYear|TransactionMonth|TotalItemsPurchased|TotalPrice|
+---------------+----------------+-------------------+----------+
|           2018|               2|             358755|1248032.27|
|           2018|               3|             261936| 815143.14|
|           2018|               4|             352611|1139592.33|
|           2018|               5|             392427|1235253.51|
|           2018|               6|             440889| 1330984.5|
|           2018|               7|             437631|1388303.55|
|           2018|               8|             430794|1332500.52|
|           2018|               9|             453969|1346345.55|
|           2018|              10|             475326|1411838.88|
|           2018|              11|             506772|1538477.25|
|           2018|              12|             797394|2418128.53|
|           2019|               1|             885156|2636481.46|
|         

                                                                                

In [14]:
# Save result to local
result.write \
    .mode('overwrite') \
    .option("header", "true") \
    .csv('data/total_by_month.csv')

                                                                                

## Q3. Calculate the number of items purchased for each userID in 30 days for each day

In [15]:
# Remove UserID = -1
df_filtered = df_filtered.filter(df_filtered["UserID"] != -1)
df_temp = df_filtered.withColumn("Date", F.to_date("TransactionTime"))
df_temp = df_temp.withColumn("Timestamp", F.unix_timestamp("Date"))

# Define the window specification to partition by userID, order by Timestamp
# The range is in seconds, so -30*24*60*60 gives the 30-day range in seconds
windowSpec = Window.partitionBy("UserID").orderBy("Timestamp").rangeBetween(-30*24*60*60, 0)

# Calculate the number of items purchased in the last 30 days for each user
result = df_temp.withColumn("NumberOfItemsPurchased_30days", F.sum("NumberOfItemsPurchased").over(windowSpec))
result.select('UserId','Timestamp', 'TransactionTime', 'NumberOfItemsPurchased', 'NumberOfItemsPurchased_30days').show(truncate=False)



+------+----------+-------------------+----------------------+-----------------------------+
|UserId|Timestamp |TransactionTime    |NumberOfItemsPurchased|NumberOfItemsPurchased_30days|
+------+----------+-------------------+----------------------+-----------------------------+
|259287|1523206800|2018-04-09 15:30:00|36                    |504                          |
|259287|1523206800|2018-04-09 15:30:00|30                    |504                          |
|259287|1523206800|2018-04-09 15:30:00|30                    |504                          |
|259287|1523206800|2018-04-09 15:30:00|12                    |504                          |
|259287|1523206800|2018-04-09 15:30:00|12                    |504                          |
|259287|1523206800|2018-04-09 15:30:00|12                    |504                          |
|259287|1523206800|2018-04-09 15:30:00|12                    |504                          |
|259287|1523206800|2018-04-09 15:30:00|18                    |504     

                                                                                

In [16]:
# Save result to local
result.write \
    .mode('overwrite') \
    .option("header", "true") \
    .csv('data/30days')

                                                                                

## Final Dataset for Recommendation Models

In [17]:
df_filtered.printSchema()

root
 |-- UserId: string (nullable = true)
 |-- TransactionId: string (nullable = true)
 |-- TransactionTime: timestamp (nullable = true)
 |-- ItemCode: string (nullable = true)
 |-- ItemDescription: string (nullable = false)
 |-- NumberOfItemsPurchased: integer (nullable = true)
 |-- CostPerItem: float (nullable = true)
 |-- Country: string (nullable = true)



In [19]:
final_df = df_filtered
print(f'Final dataset size: {final_df.count()}')
final_df.write \
    .mode('overwrite') \
    .option("header", "true") \
    .csv('data/cleaned_dataset')

                                                                                

Final dataset size: 320579


                                                                                

In [20]:
spark.stop()