In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import os

def create_spark_session():
    # Create a SparkSession
    spark = SparkSession.builder \
        .master("local[*]") \
        .appName("SessionizationExample") \
        .getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")
    return spark

In [16]:
spark = create_spark_session()

# Step 1: Read the input data
curr_dir = os.getcwd()
print(curr_dir)
file_path = "dataset1.txt"  # Adjust the path to match your environment
print(file_path)
df = spark.read.option("header", "true").csv(file_path)

# Show the initial DataFrame
df.show(truncate=False)

e:\Job & Interview Kit\Revision Material\PySpark\Practise
dataset1.txt
+--------------------+-------+
|Timestamp           |User_id|
+--------------------+-------+
|2021-05-01T10:00:00Z|u1     |
|2021-05-01T10:10:00Z|u1     |
|2021-05-01T11:50:00Z|u1     |
|2021-05-01T12:05:00Z|u1     |
|2021-05-01T13:00:00Z|u1     |
|2021-05-01T13:30:00Z|u1     |
|2021-05-01T14:30:00Z|u1     |
|2021-05-01T15:50:00Z|u1     |
|2021-05-01T17:00:00Z|u1     |
|2021-05-02T09:00:00Z|u2     |
|2021-05-02T09:45:00Z|u2     |
|2021-05-02T11:00:00Z|u2     |
|2021-05-02T11:30:00Z|u2     |
|2021-05-02T13:05:00Z|u2     |
|2021-05-02T14:10:00Z|u2     |
+--------------------+-------+



In [18]:
# Step 2: Convert the Timestamp column to a proper timestamp data type
df = df.withColumn("timestamp", F.col("Timestamp").cast("timestamp"))
df.show(truncate=False)

+-------------------+-------+
|timestamp          |User_id|
+-------------------+-------+
|2021-05-01 15:30:00|u1     |
|2021-05-01 15:40:00|u1     |
|2021-05-01 17:20:00|u1     |
|2021-05-01 17:35:00|u1     |
|2021-05-01 18:30:00|u1     |
|2021-05-01 19:00:00|u1     |
|2021-05-01 20:00:00|u1     |
|2021-05-01 21:20:00|u1     |
|2021-05-01 22:30:00|u1     |
|2021-05-02 14:30:00|u2     |
|2021-05-02 15:15:00|u2     |
|2021-05-02 16:30:00|u2     |
|2021-05-02 17:00:00|u2     |
|2021-05-02 18:35:00|u2     |
|2021-05-02 19:40:00|u2     |
+-------------------+-------+



In [19]:
# Step 3: Define a window specification for partitioning by User_id and ordering by timestamp
window_spec = Window.partitionBy("User_id").orderBy("timestamp")
window_spec

<pyspark.sql.window.WindowSpec at 0x226d675d8b0>

In [20]:
# Step 4: Calculate the time difference between consecutive events for each user
df = df.withColumn("prev_timestamp", F.lag("timestamp").over(window_spec))
df.show(truncate=False)

+-------------------+-------+-------------------+
|timestamp          |User_id|prev_timestamp     |
+-------------------+-------+-------------------+
|2021-05-01 15:30:00|u1     |NULL               |
|2021-05-01 15:40:00|u1     |2021-05-01 15:30:00|
|2021-05-01 17:20:00|u1     |2021-05-01 15:40:00|
|2021-05-01 17:35:00|u1     |2021-05-01 17:20:00|
|2021-05-01 18:30:00|u1     |2021-05-01 17:35:00|
|2021-05-01 19:00:00|u1     |2021-05-01 18:30:00|
|2021-05-01 20:00:00|u1     |2021-05-01 19:00:00|
|2021-05-01 21:20:00|u1     |2021-05-01 20:00:00|
|2021-05-01 22:30:00|u1     |2021-05-01 21:20:00|
|2021-05-02 14:30:00|u2     |NULL               |
|2021-05-02 15:15:00|u2     |2021-05-02 14:30:00|
|2021-05-02 16:30:00|u2     |2021-05-02 15:15:00|
|2021-05-02 17:00:00|u2     |2021-05-02 16:30:00|
|2021-05-02 18:35:00|u2     |2021-05-02 17:00:00|
|2021-05-02 19:40:00|u2     |2021-05-02 18:35:00|
+-------------------+-------+-------------------+

+-------------------+-------+-------------------+

In [21]:
df = df.withColumn("time_diff", F.col("timestamp").cast("long") - F.col("prev_timestamp").cast("long"))
df.show(truncate=False)

+-------------------+-------+-------------------+---------+
|timestamp          |User_id|prev_timestamp     |time_diff|
+-------------------+-------+-------------------+---------+
|2021-05-01 15:30:00|u1     |NULL               |NULL     |
|2021-05-01 15:40:00|u1     |2021-05-01 15:30:00|600      |
|2021-05-01 17:20:00|u1     |2021-05-01 15:40:00|6000     |
|2021-05-01 17:35:00|u1     |2021-05-01 17:20:00|900      |
|2021-05-01 18:30:00|u1     |2021-05-01 17:35:00|3300     |
|2021-05-01 19:00:00|u1     |2021-05-01 18:30:00|1800     |
|2021-05-01 20:00:00|u1     |2021-05-01 19:00:00|3600     |
|2021-05-01 21:20:00|u1     |2021-05-01 20:00:00|4800     |
|2021-05-01 22:30:00|u1     |2021-05-01 21:20:00|4200     |
|2021-05-02 14:30:00|u2     |NULL               |NULL     |
|2021-05-02 15:15:00|u2     |2021-05-02 14:30:00|2700     |
|2021-05-02 16:30:00|u2     |2021-05-02 15:15:00|4500     |
|2021-05-02 17:00:00|u2     |2021-05-02 16:30:00|1800     |
|2021-05-02 18:35:00|u2     |2021-05-02 

In [22]:
# Step 5: Determine when a new session should start
session_start_condition = (F.col("time_diff").isNull() | 
                            (F.col("time_diff") > 1800) |  # 30 minutes timeout
                            (F.sum("time_diff").over(window_spec) > 7200))  # 2 hours max session
session_start_condition

Column<'(((time_diff IS NULL) OR (time_diff > 1800)) OR (sum(time_diff) OVER (PARTITION BY User_id ORDER BY timestamp ASC NULLS FIRST unspecifiedframe$()) > 7200))'>

In [23]:
df = df.withColumn("new_session", F.when(session_start_condition, 1).otherwise(0))
df.show(truncate=False)

+-------------------+-------+-------------------+---------+-----------+
|timestamp          |User_id|prev_timestamp     |time_diff|new_session|
+-------------------+-------+-------------------+---------+-----------+
|2021-05-01 15:30:00|u1     |NULL               |NULL     |1          |
|2021-05-01 15:40:00|u1     |2021-05-01 15:30:00|600      |0          |
|2021-05-01 17:20:00|u1     |2021-05-01 15:40:00|6000     |1          |
|2021-05-01 17:35:00|u1     |2021-05-01 17:20:00|900      |1          |
|2021-05-01 18:30:00|u1     |2021-05-01 17:35:00|3300     |1          |
|2021-05-01 19:00:00|u1     |2021-05-01 18:30:00|1800     |1          |
|2021-05-01 20:00:00|u1     |2021-05-01 19:00:00|3600     |1          |
|2021-05-01 21:20:00|u1     |2021-05-01 20:00:00|4800     |1          |
|2021-05-01 22:30:00|u1     |2021-05-01 21:20:00|4200     |1          |
|2021-05-02 14:30:00|u2     |NULL               |NULL     |1          |
|2021-05-02 15:15:00|u2     |2021-05-02 14:30:00|2700     |1    

In [25]:
# Step 6: Create a running session count
df = df.withColumn("session_number", F.sum("new_session").over(window_spec))
df.show()

+-------------------+-------+-------------------+---------+-----------+--------------+
|          timestamp|User_id|     prev_timestamp|time_diff|new_session|session_number|
+-------------------+-------+-------------------+---------+-----------+--------------+
|2021-05-01 15:30:00|     u1|               NULL|     NULL|          1|             1|
|2021-05-01 15:40:00|     u1|2021-05-01 15:30:00|      600|          0|             1|
|2021-05-01 17:20:00|     u1|2021-05-01 15:40:00|     6000|          1|             2|
|2021-05-01 17:35:00|     u1|2021-05-01 17:20:00|      900|          1|             3|
|2021-05-01 18:30:00|     u1|2021-05-01 17:35:00|     3300|          1|             4|
|2021-05-01 19:00:00|     u1|2021-05-01 18:30:00|     1800|          1|             5|
|2021-05-01 20:00:00|     u1|2021-05-01 19:00:00|     3600|          1|             6|
|2021-05-01 21:20:00|     u1|2021-05-01 20:00:00|     4800|          1|             7|
|2021-05-01 22:30:00|     u1|2021-05-01 21:

In [26]:
# Step 7: Generate the session_id
df = df.withColumn("session_id", F.concat(F.col("User_id"), F.lit("_s"), F.col("session_number")))
df.show()

+-------------------+-------+-------------------+---------+-----------+--------------+----------+
|          timestamp|User_id|     prev_timestamp|time_diff|new_session|session_number|session_id|
+-------------------+-------+-------------------+---------+-----------+--------------+----------+
|2021-05-01 15:30:00|     u1|               NULL|     NULL|          1|             1|     u1_s1|
|2021-05-01 15:40:00|     u1|2021-05-01 15:30:00|      600|          0|             1|     u1_s1|
|2021-05-01 17:20:00|     u1|2021-05-01 15:40:00|     6000|          1|             2|     u1_s2|
|2021-05-01 17:35:00|     u1|2021-05-01 17:20:00|      900|          1|             3|     u1_s3|
|2021-05-01 18:30:00|     u1|2021-05-01 17:35:00|     3300|          1|             4|     u1_s4|
|2021-05-01 19:00:00|     u1|2021-05-01 18:30:00|     1800|          1|             5|     u1_s5|
|2021-05-01 20:00:00|     u1|2021-05-01 19:00:00|     3600|          1|             6|     u1_s6|
|2021-05-01 21:20:00

In [27]:
# Intermediate Result: Show the DataFrame after sessionization
df.select("Timestamp", "User_id", "session_id").show(truncate=False)

+-------------------+-------+----------+
|Timestamp          |User_id|session_id|
+-------------------+-------+----------+
|2021-05-01 15:30:00|u1     |u1_s1     |
|2021-05-01 15:40:00|u1     |u1_s1     |
|2021-05-01 17:20:00|u1     |u1_s2     |
|2021-05-01 17:35:00|u1     |u1_s3     |
|2021-05-01 18:30:00|u1     |u1_s4     |
|2021-05-01 19:00:00|u1     |u1_s5     |
|2021-05-01 20:00:00|u1     |u1_s6     |
|2021-05-01 21:20:00|u1     |u1_s7     |
|2021-05-01 22:30:00|u1     |u1_s8     |
|2021-05-02 14:30:00|u2     |u2_s1     |
|2021-05-02 15:15:00|u2     |u2_s2     |
|2021-05-02 16:30:00|u2     |u2_s3     |
|2021-05-02 17:00:00|u2     |u2_s4     |
|2021-05-02 18:35:00|u2     |u2_s5     |
|2021-05-02 19:40:00|u2     |u2_s6     |
+-------------------+-------+----------+



In [28]:
# Step 8: Save the resultant DataFrame to a Parquet file
# df.select("Timestamp", "User_id", "session_id").write.mode("overwrite").parquet("sessionized_data.parquet")

# Stop the SparkSession
spark.stop()