# DO NOT RUN THIS FILE

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# **A. Setup Environment**

In [2]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import*
import matplotlib.pyplot as plt
import pandas as pd
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.feature import PCA
from pyspark.sql.functions import min as spark_min, max as spark_max
from pyspark.sql.functions import explode,  udf,  when, col, sum as spark_sum
from pyspark.sql.types import StructType, StructField, StringType, ArrayType
from pyspark.sql import functions as F

In [3]:
# Create a new Spark session with optimized configurations
spark = (SparkSession.builder
         .appName("CustomerSegmentationOptimized")
         .config("spark.executor.memory", "8g")          # Allocate 8 GB memory to each executor
         .config("spark.executor.cores", "4")            # Allocate 4 cores to each executor
         .config("spark.driver.memory", "4g")            # Allocate 4 GB memory to the driver
         .config("spark.sql.shuffle.partitions", "200")   # Set shuffle partitions to 200
         .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")  # Use Kryo serializer for better performance
         .getOrCreate())

# Verify the new configuration
spark.conf.get("spark.executor.memory")       # Should return '8g'
spark.conf.get("spark.executor.cores")        # Should return '4'
spark.conf.get("spark.driver.memory")         # Should return '4g'
spark.conf.get("spark.sql.shuffle.partitions")# Should return '200'
spark.conf.get("spark.serializer")            # Should return 'org.apache.spark.serializer.KryoSerializer'

'org.apache.spark.serializer.KryoSerializer'

# **B. Data Ingestion**

In [4]:
file_path = "/content/drive/MyDrive/Course/Information system management/PHVN_DATest_Dataset.json"
df = spark.read.json(file_path)

df.show(5)

+----------+-------------------+-------------+----+--------------------+---------------+----------+--------+
|AppVersion|      EventDateTime|    EventName|Item|            Location|MobileBrandName| SessionID|  Source|
+----------+-------------------+-------------+----+--------------------+---------------+----------+--------+
|     2.1.7|28-08-2023 15:02:42|session_start|  []|{Bien Hoa, Vietna...|          Apple|1693234962|(direct)|
|     2.1.7|28-08-2023 06:35:51|    view_cart|  []|{Buon Ma Thuot, V...|          Apple|1693204441|(direct)|
|     2.1.8|28-08-2023 14:38:38|session_start|  []|{Ho Chi Minh City...|          Apple|1693233518|(direct)|
|     2.1.7|28-08-2023 06:29:58|session_start|  []|{Ho Chi Minh City...|          Apple|1693204198|(direct)|
|     2.1.7|28-08-2023 13:17:23|session_start|  []|{Ho Chi Minh City...|          Apple|1693228643|(direct)|
+----------+-------------------+-------------+----+--------------------+---------------+----------+--------+
only showing top 5 

In [5]:
df.printSchema()

root
 |-- AppVersion: string (nullable = true)
 |-- EventDateTime: string (nullable = true)
 |-- EventName: string (nullable = true)
 |-- Item: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- ItemCategory: string (nullable = true)
 |    |    |-- ItemName: string (nullable = true)
 |-- Location: struct (nullable = true)
 |    |-- City: string (nullable = true)
 |    |-- Country: string (nullable = true)
 |    |-- Region: string (nullable = true)
 |-- MobileBrandName: string (nullable = true)
 |-- SessionID: string (nullable = true)
 |-- Source: string (nullable = true)



In [6]:
num_rows = df.count()
num_cols = len(df.columns)
print(f"Shape: ({num_rows}, {num_cols})")

Shape: (443711, 8)


# **C. Data Preprocessing**

### **Step 1:** Parse `EventDateTime`

In [7]:
from pyspark.sql.functions import to_timestamp
df = df.withColumn("EventDateTime", to_timestamp("EventDateTime", "dd-MM-yyyy HH:mm:ss"))
df.show()

+----------+-------------------+-------------+----+--------------------+---------------+----------+--------+
|AppVersion|      EventDateTime|    EventName|Item|            Location|MobileBrandName| SessionID|  Source|
+----------+-------------------+-------------+----+--------------------+---------------+----------+--------+
|     2.1.7|2023-08-28 15:02:42|session_start|  []|{Bien Hoa, Vietna...|          Apple|1693234962|(direct)|
|     2.1.7|2023-08-28 06:35:51|    view_cart|  []|{Buon Ma Thuot, V...|          Apple|1693204441|(direct)|
|     2.1.8|2023-08-28 14:38:38|session_start|  []|{Ho Chi Minh City...|          Apple|1693233518|(direct)|
|     2.1.7|2023-08-28 06:29:58|session_start|  []|{Ho Chi Minh City...|          Apple|1693204198|(direct)|
|     2.1.7|2023-08-28 13:17:23|session_start|  []|{Ho Chi Minh City...|          Apple|1693228643|(direct)|
|     2.1.7|2023-08-28 07:16:56|session_start|  []|{, Vietnam, Kien ...|          Apple|1693207016|(direct)|
|     2.1.7|2023-08

### **Step 2:** Normalize `Location`

In [8]:
df = df.selectExpr("AppVersion", "EventDateTime", "EventName", "Item",
                   "Location.City as City", "Location.Country as Country", "Location.Region as Region",
                   "MobileBrandName", "SessionID", "Source")
df.show(5)

+----------+-------------------+-------------+----+----------------+-------+----------------+---------------+----------+--------+
|AppVersion|      EventDateTime|    EventName|Item|            City|Country|          Region|MobileBrandName| SessionID|  Source|
+----------+-------------------+-------------+----+----------------+-------+----------------+---------------+----------+--------+
|     2.1.7|2023-08-28 15:02:42|session_start|  []|        Bien Hoa|Vietnam|        Dong Nai|          Apple|1693234962|(direct)|
|     2.1.7|2023-08-28 06:35:51|    view_cart|  []|   Buon Ma Thuot|Vietnam|         Dak Lak|          Apple|1693204441|(direct)|
|     2.1.8|2023-08-28 14:38:38|session_start|  []|Ho Chi Minh City|Vietnam|Ho Chi Minh City|          Apple|1693233518|(direct)|
|     2.1.7|2023-08-28 06:29:58|session_start|  []|Ho Chi Minh City|Vietnam|Ho Chi Minh City|          Apple|1693204198|(direct)|
|     2.1.7|2023-08-28 13:17:23|session_start|  []|Ho Chi Minh City|Vietnam|Ho Chi Minh Ci

### **Step 3:** Filter `Country` == "Vietnam"

In [9]:
df = df.filter(col("Country") == "Vietnam")
df.show(5)

+----------+-------------------+-------------+----+----------------+-------+----------------+---------------+----------+--------+
|AppVersion|      EventDateTime|    EventName|Item|            City|Country|          Region|MobileBrandName| SessionID|  Source|
+----------+-------------------+-------------+----+----------------+-------+----------------+---------------+----------+--------+
|     2.1.7|2023-08-28 15:02:42|session_start|  []|        Bien Hoa|Vietnam|        Dong Nai|          Apple|1693234962|(direct)|
|     2.1.7|2023-08-28 06:35:51|    view_cart|  []|   Buon Ma Thuot|Vietnam|         Dak Lak|          Apple|1693204441|(direct)|
|     2.1.8|2023-08-28 14:38:38|session_start|  []|Ho Chi Minh City|Vietnam|Ho Chi Minh City|          Apple|1693233518|(direct)|
|     2.1.7|2023-08-28 06:29:58|session_start|  []|Ho Chi Minh City|Vietnam|Ho Chi Minh City|          Apple|1693204198|(direct)|
|     2.1.7|2023-08-28 13:17:23|session_start|  []|Ho Chi Minh City|Vietnam|Ho Chi Minh Ci

In [10]:
# Filter out rows where Region is empty
df = df.filter(col("Region") != "")

# Drop the Country column
df = df.drop("Country")

In [11]:
num_rows = df.count()
num_cols = len(df.columns)
print(f"Shape: ({num_rows}, {num_cols})")

Shape: (440560, 9)


### **Step 4:** Check Missing Values

In [12]:
from pyspark.sql.functions import sum

# Count missing values (nulls) for each column
missing_values = df.select(
    [sum(col(c).isNull().cast("int")).alias(c + "_missing") for c in df.columns]
)

# Show the result
missing_values.show()

+------------------+---------------------+-----------------+------------+------------+--------------+-----------------------+-----------------+--------------+
|AppVersion_missing|EventDateTime_missing|EventName_missing|Item_missing|City_missing|Region_missing|MobileBrandName_missing|SessionID_missing|Source_missing|
+------------------+---------------------+-----------------+------------+------------+--------------+-----------------------+-----------------+--------------+
|                 0|                    0|                0|           0|           0|             0|                   6634|                0|             4|
+------------------+---------------------+-----------------+------------+------------+--------------+-----------------------+-----------------+--------------+



In [13]:
df = df.dropna(subset=["MobileBrandName", "Source"])

In [14]:
num_rows = df.count()
num_cols = len(df.columns)
print(f"Shape: ({num_rows}, {num_cols})")

Shape: (433922, 9)


In [15]:
# Count missing values (nulls) for each column
missing_values = df.select(
    [sum(col(c).isNull().cast("int")).alias(c + "_missing") for c in df.columns]
)

# Show the result
missing_values.show()

+------------------+---------------------+-----------------+------------+------------+--------------+-----------------------+-----------------+--------------+
|AppVersion_missing|EventDateTime_missing|EventName_missing|Item_missing|City_missing|Region_missing|MobileBrandName_missing|SessionID_missing|Source_missing|
+------------------+---------------------+-----------------+------------+------------+--------------+-----------------------+-----------------+--------------+
|                 0|                    0|                0|           0|           0|             0|                      0|                0|             0|
+------------------+---------------------+-----------------+------------+------------+--------------+-----------------------+-----------------+--------------+



### **Step 5:** Check Duplicates

In [16]:
duplicates = df.groupBy(df.columns).count().filter("count > 1")
print(duplicates.count())

18864


In [17]:
df = df.dropDuplicates()

In [18]:
num_rows = df.count()
num_cols = len(df.columns)
print(f"Shape: ({num_rows}, {num_cols})")

Shape: (413425, 9)


In [19]:
df.printSchema()

root
 |-- AppVersion: string (nullable = true)
 |-- EventDateTime: timestamp (nullable = true)
 |-- EventName: string (nullable = true)
 |-- Item: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- ItemCategory: string (nullable = true)
 |    |    |-- ItemName: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- MobileBrandName: string (nullable = true)
 |-- SessionID: string (nullable = true)
 |-- Source: string (nullable = true)



In [20]:
df.show(5)

+----------+-------------------+-------------+----+----------------+----------------+---------------+----------+--------+
|AppVersion|      EventDateTime|    EventName|Item|            City|          Region|MobileBrandName| SessionID|  Source|
+----------+-------------------+-------------+----+----------------+----------------+---------------+----------+--------+
|     2.1.7|2023-08-28 13:07:14|session_start|  []|           Hanoi|           Hanoi|          Apple|1693228034|(direct)|
|     2.1.7|2023-08-28 03:50:44|session_start|  []|Ho Chi Minh City|Ho Chi Minh City|          Apple|1693194644|(direct)|
|   Website|2023-08-28 10:29:47|    page_view|  []|Ho Chi Minh City|Ho Chi Minh City|         Xiaomi|1693218578|FACEBOOK|
|   Website|2023-08-28 04:50:27|session_start|  []|Ho Chi Minh City|Ho Chi Minh City|          Apple|1693198227|     sms|
|     2.1.7|2023-08-28 05:04:53|session_start|  []|Ho Chi Minh City|Ho Chi Minh City|          Apple|1693199093|  google|
+----------+------------

In [21]:
# Define a function to extract item category
def get_item_category(item_list):
    if item_list and len(item_list) > 0:
        return item_list[0].asDict().get('ItemCategory')
    return None

# Define a UDF for item category
get_item_category_udf = udf(get_item_category, StringType())

# Define a function to extract item name
def get_item_name(item_list):
    if item_list and len(item_list) > 0:
        return item_list[0].asDict().get('ItemName')
    return None

# Define a UDF for item name
get_item_name_udf = udf(get_item_name, StringType())

# Apply the UDFs to create new columns
df = df.withColumn("ItemCategory", get_item_category_udf(col("Item"))) \
                   .withColumn("ItemName", get_item_name_udf(col("Item"))) \
                   .drop("Item") # Drop the original 'Item' column

# Show the transformed DataFrame
df.show()

+----------+-------------------+-------------+----------------+----------------+---------------+----------+--------+---------------+--------------------+
|AppVersion|      EventDateTime|    EventName|            City|          Region|MobileBrandName| SessionID|  Source|   ItemCategory|            ItemName|
+----------+-------------------+-------------+----------------+----------------+---------------+----------+--------+---------------+--------------------+
|     2.1.7|2023-08-28 13:07:14|session_start|           Hanoi|           Hanoi|          Apple|1693228034|(direct)|           NULL|                NULL|
|     2.1.7|2023-08-28 03:50:44|session_start|Ho Chi Minh City|Ho Chi Minh City|          Apple|1693194644|(direct)|           NULL|                NULL|
|   Website|2023-08-28 10:29:47|    page_view|Ho Chi Minh City|Ho Chi Minh City|         Xiaomi|1693218578|FACEBOOK|           NULL|                NULL|
|   Website|2023-08-28 04:50:27|session_start|Ho Chi Minh City|Ho Chi Minh C

# Create ItemID column for each Item

In [22]:
# Normalize names and create a unique key for each product
df = df.withColumn("ItemKey",
            F.concat_ws("_",
            F.lower(F.trim(F.col("ItemCategory"))),
            F.lower(F.trim(F.col("ItemName")))
        )
)
# Get the list of unique ItemKeys and assign sequential IDs
item_keys = df.select("ItemKey").where(F.col("ItemKey").isNotNull()).distinct().withColumn(
    "ItemID",
    F.row_number().over(Window.orderBy("ItemKey")) - 1  # Start from 0
)

# Join back to df to add ItemID
df = df.join(item_keys, on="ItemKey", how="left").drop("ItemKey")

# Handle mising value
df = df.fillna(" ", subset=["ItemCategory", "ItemName"])
df.show()

+----------+-------------------+-------------+----------------+----------------+---------------+----------+--------+---------------+--------------------+------+
|AppVersion|      EventDateTime|    EventName|            City|          Region|MobileBrandName| SessionID|  Source|   ItemCategory|            ItemName|ItemID|
+----------+-------------------+-------------+----------------+----------------+---------------+----------+--------+---------------+--------------------+------+
|     2.1.7|2023-08-28 13:07:14|session_start|           Hanoi|           Hanoi|          Apple|1693228034|(direct)|               |                    |     0|
|     2.1.7|2023-08-28 03:50:44|session_start|Ho Chi Minh City|Ho Chi Minh City|          Apple|1693194644|(direct)|               |                    |     0|
|   Website|2023-08-28 10:29:47|    page_view|Ho Chi Minh City|Ho Chi Minh City|         Xiaomi|1693218578|FACEBOOK|               |                    |     0|
|   Website|2023-08-28 04:50:27|se

In [None]:
# check duplicate
duplicates = df.groupBy(df.columns).count().filter("count > 1")
print(duplicates.count())

# drop duplicate
df= df.dropDuplicates()

In [32]:
from pyspark.sql.functions import min as spark_min, max as spark_max

# Calculate session start and end times
session_times = df.groupBy("SessionID").agg(
    spark_min("EventDateTime").alias("SessionStart"),
    spark_max("EventDateTime").alias("SessionEnd")
)
session_times.show()

+----------+-------------------+-------------------+
| SessionID|       SessionStart|         SessionEnd|
+----------+-------------------+-------------------+
|1693195720|2023-08-28 04:08:40|2023-08-28 04:08:40|
|1693214467|2023-08-28 09:21:08|2023-08-28 09:23:41|
|1693208955|2023-08-28 07:49:15|2023-08-28 07:49:21|
|1693197584|2023-08-28 04:39:44|2023-08-28 04:39:51|
|1693212993|2023-08-28 08:56:33|2023-08-28 08:59:45|
|1693187728|2023-08-28 01:55:29|2023-08-28 02:02:49|
|1693206179|2023-08-28 07:03:00|2023-08-28 07:03:49|
|1693230528|2023-08-28 13:48:49|2023-08-28 13:48:49|
|1693205559|2023-08-28 06:52:39|2023-08-28 06:59:01|
|1693221781|2023-08-28 11:23:01|2023-08-28 11:23:15|
|1693187031|2023-08-28 01:43:51|2023-08-28 01:43:51|
|1693217938|2023-08-28 10:18:58|2023-08-28 10:18:58|
|1693215492|2023-08-28 09:38:11|2023-08-28 09:38:21|
|1693217783|2023-08-28 10:16:23|2023-08-28 10:29:21|
|1693227405|2023-08-28 12:56:45|2023-08-28 12:58:25|
|1693224647|2023-08-28 12:10:47|2023-08-28 12:

In [33]:
#join back to df
temp = df.join(session_times, on="SessionID", how="left")
# check duplicate
duplicates = temp.groupBy(temp.columns).count().filter("count > 1")
print(duplicates.count())

0


In [34]:
# Convert the PySpark DataFrame to a Pandas DataFrame
df = temp.toPandas()

# Now you can use to_csv on the Pandas DataFrame
df.to_csv('clean_data.csv', index=False)

from google.colab import files
files.download('clean_data.csv')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>