 𝐂𝐚𝐥𝐜𝐮𝐥𝐚𝐭𝐞 𝐀𝐯𝐞𝐫𝐚𝐠𝐞 𝐔𝐬𝐞𝐫 𝐒𝐞𝐬𝐬𝐢𝐨𝐧 𝐃𝐮𝐫𝐚𝐭𝐢𝐨𝐧?

You have a dataset containing user activity logs in a PySpark DataFrame with the following columns: user_id, timestamp, and action. The action column indicates whether the user started or ended a session. It can have values 'start' or 'end'. Your task is to calculate the average duration of user sessions.

In [0]:
from pyspark.sql.functions import col
from pyspark.sql.functions import unix_timestamp
from pyspark.sql.window import Window
from pyspark.sql.functions import lag,when,sum,avg

In [0]:
data = [
    (1, "2022-01-01 10:00", "start"),
    (1, "2022-01-01 10:15", "end"),
    (2, "2022-01-01 11:00", "start"),
    (1, "2022-01-01 11:30", "start"),
    (2, "2022-01-01 11:45", "end"),
    (1, "2022-01-01 12:00", "end")
]

In [0]:
schema = ["user_id", "timestamp", "action"]

In [0]:
df=spark.createDataFrame(data,schema=schema)

In [0]:
df.show()

+-------+----------------+------+
|user_id|       timestamp|action|
+-------+----------------+------+
|      1|2022-01-01 10:00| start|
|      1|2022-01-01 10:15|   end|
|      2|2022-01-01 11:00| start|
|      1|2022-01-01 11:30| start|
|      2|2022-01-01 11:45|   end|
|      1|2022-01-01 12:00|   end|
+-------+----------------+------+



In [0]:
df=df.withColumn("unix_timestamp",unix_timestamp("timestamp","yyyy-MM-dd HH:mm"))

In [0]:
df.show()

+-------+----------------+------+--------------+
|user_id|       timestamp|action|unix_timestamp|
+-------+----------------+------+--------------+
|      1|2022-01-01 10:00| start|    1641031200|
|      1|2022-01-01 10:15|   end|    1641032100|
|      2|2022-01-01 11:00| start|    1641034800|
|      1|2022-01-01 11:30| start|    1641036600|
|      2|2022-01-01 11:45|   end|    1641037500|
|      1|2022-01-01 12:00|   end|    1641038400|
+-------+----------------+------+--------------+



In [0]:
#Calculate session Duration
window_spec=Window.partitionBy("user_id").orderBy("unix_timestamp")

In [0]:
df=df.withColumn(
    "session_duration",
    when(col("action") == "end",col("unix_timestamp") - lag(col("unix_timestamp")).over(window_spec))
)

In [0]:
df.show()

+-------+----------------+------+--------------+----------------+
|user_id|       timestamp|action|unix_timestamp|session_duration|
+-------+----------------+------+--------------+----------------+
|      1|2022-01-01 10:00| start|    1641031200|            null|
|      1|2022-01-01 10:15|   end|    1641032100|             900|
|      1|2022-01-01 11:30| start|    1641036600|            null|
|      1|2022-01-01 12:00|   end|    1641038400|            1800|
|      2|2022-01-01 11:00| start|    1641034800|            null|
|      2|2022-01-01 11:45|   end|    1641037500|            2700|
+-------+----------------+------+--------------+----------------+



In [0]:
#calculate total session duration

In [0]:
user_session_duration=df.groupBy(col("user_id")).agg(sum("session_duration").alias("total_duration"))

In [0]:
user_session_duration.show()

+-------+--------------+
|user_id|total_duration|
+-------+--------------+
|      1|          2700|
|      2|          2700|
+-------+--------------+



In [0]:
average_duration=user_session_duration.agg(avg("total_duration").alias("avg_duration"))

In [0]:
average_duration.show()

+------------+
|avg_duration|
+------------+
|      2700.0|
+------------+

