In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType, ArrayType
from pyspark.sql.window import Window
from pyspark.sql.functions import unix_timestamp
from pyspark.sql.functions import col, lag, when, sum as spark_sum


In [0]:
catalog_name = "practice_db_catalog"
schema_name = "clickstream"
volume_name = "data_volume"
folder_name = "clickstream"

In [0]:
# Reading Json file 
df = spark.read.json(f"/Volumes/practice_db_catalog/clickstream/data_volume/clickstream/raw/")

In [0]:
#casting event_time to timestamp
df_cast = df.withColumn("event_time",F.col("event_time").cast(TimestampType()))
#sorting the data by user_id and event_time
df_order = df_cast.orderBy(
    F.col("user_id").asc(),
    F.col("event_time").asc()
    )

#creating window specs to partion the data by user_id and order by event_time
window_spec = Window.partitionBy("user_id").orderBy("event_time")

#Getting previous event time to calculate session time
df_order = df_order.withColumn(
    "prev_event_time",
    lag("event_time").over(window_spec)
)

#Calculating time difference between two events
df_time_diff = df_order.withColumn(
    "time_diff_minutes",
    (unix_timestamp("event_time") - unix_timestamp("prev_event_time")) / 60
)

#Creating new session flag to identify new session
df_session = df_time_diff.withColumn(
    "new_session_flag",
    when(
        (col("prev_event_time").isNull()) | 
        (col("time_diff_minutes") > 30),
        1
    ).otherwise(0)
)

#Aggregating new session flag to get session id
df_aggr_session = df_session.withColumn(
    "session_id",
    spark_sum("new_session_flag").over(window_spec)
)
df_aggr_session.display()

event_time,event_type,user_id,prev_event_time,time_diff_minutes,new_session_flag,session_id
2025-01-05T07:46:00.000Z,view,U001,,,1,1
2025-01-07T05:50:00.000Z,purchase,U001,2025-01-05T07:46:00.000Z,2764.0,1,2
2025-01-07T12:50:00.000Z,purchase,U001,2025-01-07T05:50:00.000Z,420.0,1,3
2025-01-08T03:31:00.000Z,click,U001,2025-01-07T12:50:00.000Z,881.0,1,4
2025-01-08T06:12:00.000Z,view,U001,2025-01-08T03:31:00.000Z,161.0,1,5
2025-01-14T04:47:00.000Z,add_to_cart,U001,2025-01-08T06:12:00.000Z,8555.0,1,6
2025-01-20T08:10:00.000Z,click,U001,2025-01-14T04:47:00.000Z,8843.0,1,7
2025-01-22T22:29:00.000Z,purchase,U001,2025-01-20T08:10:00.000Z,3739.0,1,8
2025-01-24T08:43:00.000Z,click,U001,2025-01-22T22:29:00.000Z,2054.0,1,9
2025-01-26T18:40:00.000Z,click,U001,2025-01-24T08:43:00.000Z,3477.0,1,10


In [0]:

df_order = df_cast.orderBy(
    F.col("user_id").asc(),
    F.col("event_time").asc()
    )
df_order.display()

event_time,event_type,user_id
2025-01-05T07:46:00.000Z,view,U001
2025-01-07T05:50:00.000Z,purchase,U001
2025-01-07T12:50:00.000Z,purchase,U001
2025-01-08T03:31:00.000Z,click,U001
2025-01-08T06:12:00.000Z,view,U001
2025-01-14T04:47:00.000Z,add_to_cart,U001
2025-01-20T08:10:00.000Z,click,U001
2025-01-22T22:29:00.000Z,purchase,U001
2025-01-24T08:43:00.000Z,click,U001
2025-01-26T18:40:00.000Z,click,U001


In [0]:
window_spec = Window.partitionBy("user_id").orderBy("event_time")

In [0]:
df_order = df_order.withColumn(
    "prev_event_time",
    lag("event_time").over(window_spec)
)
df_order.display()

event_time,event_type,user_id,prev_event_time
2025-01-05T07:46:00.000Z,view,U001,
2025-01-07T05:50:00.000Z,purchase,U001,2025-01-05T07:46:00.000Z
2025-01-07T12:50:00.000Z,purchase,U001,2025-01-07T05:50:00.000Z
2025-01-08T03:31:00.000Z,click,U001,2025-01-07T12:50:00.000Z
2025-01-08T06:12:00.000Z,view,U001,2025-01-08T03:31:00.000Z
2025-01-14T04:47:00.000Z,add_to_cart,U001,2025-01-08T06:12:00.000Z
2025-01-20T08:10:00.000Z,click,U001,2025-01-14T04:47:00.000Z
2025-01-22T22:29:00.000Z,purchase,U001,2025-01-20T08:10:00.000Z
2025-01-24T08:43:00.000Z,click,U001,2025-01-22T22:29:00.000Z
2025-01-26T18:40:00.000Z,click,U001,2025-01-24T08:43:00.000Z


In [0]:
from pyspark.sql.functions import unix_timestamp

df_time_diff = df_order.withColumn(
    "time_diff_minutes",
    (unix_timestamp("event_time") - unix_timestamp("prev_event_time")) / 60
)
df_time_diff.display()

event_time,event_type,user_id,prev_event_time,time_diff_minutes
2025-01-05T07:46:00.000Z,view,U001,,
2025-01-07T05:50:00.000Z,purchase,U001,2025-01-05T07:46:00.000Z,2764.0
2025-01-07T12:50:00.000Z,purchase,U001,2025-01-07T05:50:00.000Z,420.0
2025-01-08T03:31:00.000Z,click,U001,2025-01-07T12:50:00.000Z,881.0
2025-01-08T06:12:00.000Z,view,U001,2025-01-08T03:31:00.000Z,161.0
2025-01-14T04:47:00.000Z,add_to_cart,U001,2025-01-08T06:12:00.000Z,8555.0
2025-01-20T08:10:00.000Z,click,U001,2025-01-14T04:47:00.000Z,8843.0
2025-01-22T22:29:00.000Z,purchase,U001,2025-01-20T08:10:00.000Z,3739.0
2025-01-24T08:43:00.000Z,click,U001,2025-01-22T22:29:00.000Z,2054.0
2025-01-26T18:40:00.000Z,click,U001,2025-01-24T08:43:00.000Z,3477.0


In [0]:
df_session = df_time_diff.withColumn(
    "new_session_flag",
    when(
        (col("prev_event_time").isNull()) | 
        (col("time_diff_minutes") > 30),
        1
    ).otherwise(0)
)
df_session.display()

event_time,event_type,user_id,prev_event_time,time_diff_minutes,new_session_flag
2025-01-05T07:46:00.000Z,view,U001,,,1
2025-01-07T05:50:00.000Z,purchase,U001,2025-01-05T07:46:00.000Z,2764.0,1
2025-01-07T12:50:00.000Z,purchase,U001,2025-01-07T05:50:00.000Z,420.0,1
2025-01-08T03:31:00.000Z,click,U001,2025-01-07T12:50:00.000Z,881.0,1
2025-01-08T06:12:00.000Z,view,U001,2025-01-08T03:31:00.000Z,161.0,1
2025-01-14T04:47:00.000Z,add_to_cart,U001,2025-01-08T06:12:00.000Z,8555.0,1
2025-01-20T08:10:00.000Z,click,U001,2025-01-14T04:47:00.000Z,8843.0,1
2025-01-22T22:29:00.000Z,purchase,U001,2025-01-20T08:10:00.000Z,3739.0,1
2025-01-24T08:43:00.000Z,click,U001,2025-01-22T22:29:00.000Z,2054.0,1
2025-01-26T18:40:00.000Z,click,U001,2025-01-24T08:43:00.000Z,3477.0,1


In [0]:
df_aggr_session = df_session.withColumn(
    "session_id",
    spark_sum("new_session_flag").over(window_spec)
)
df_aggr_session.display()

event_time,event_type,user_id,prev_event_time,time_diff_minutes,new_session_flag,session_id
2025-01-05T07:46:00.000Z,view,U001,,,1,1
2025-01-07T05:50:00.000Z,purchase,U001,2025-01-05T07:46:00.000Z,2764.0,1,2
2025-01-07T12:50:00.000Z,purchase,U001,2025-01-07T05:50:00.000Z,420.0,1,3
2025-01-08T03:31:00.000Z,click,U001,2025-01-07T12:50:00.000Z,881.0,1,4
2025-01-08T06:12:00.000Z,view,U001,2025-01-08T03:31:00.000Z,161.0,1,5
2025-01-14T04:47:00.000Z,add_to_cart,U001,2025-01-08T06:12:00.000Z,8555.0,1,6
2025-01-20T08:10:00.000Z,click,U001,2025-01-14T04:47:00.000Z,8843.0,1,7
2025-01-22T22:29:00.000Z,purchase,U001,2025-01-20T08:10:00.000Z,3739.0,1,8
2025-01-24T08:43:00.000Z,click,U001,2025-01-22T22:29:00.000Z,2054.0,1,9
2025-01-26T18:40:00.000Z,click,U001,2025-01-24T08:43:00.000Z,3477.0,1,10
