# You have a DataFrame containing a list of events for each user. Your task is to calculate the duration (in hours) of each complete sequence from start to complete.


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, min, max, unix_timestamp, rand, expr, col, when, monotonically_increasing_id, countDistinct, count
from pyspark.sql.types import IntegerType, StringType, TimestampType

In [0]:
spark = SparkSession.builder.appName("101").getOrCreate()

num_rows = 1200000


df = spark.range(0, num_rows)
df = df.withColumn("event_id", (rand() * 400000).cast(IntegerType()) + 1)
df = df.withColumn("timestamp", expr("to_timestamp('2023-01-01 00:00:00') + interval 1 year * rand()"))

# Create event types
event_types = ["event_start", "event_process", "event_complete"]

df = df.withColumn("event_type", expr(f"array({', '.join(repr(t) for t in event_types)})[int(rand() * {len(event_types)})]"))

# Ensure each event_id has all three event types
df = df.withColumn("row_num", expr("row_number() over (partition by event_id order by timestamp)"))
df = df.withColumn("event_type", when(col("row_num") == 1, "event_start")
                                .when(col("row_num") == 2, "event_process")
                                .when(col("row_num") == 3, "event_complete"))

# Adjust timestamps to ensure correct order
df = df.withColumn("timestamp", 
                   when(col("event_type") == "event_start", col("timestamp"))
                   .when(col("event_type") == "event_process", expr("timestamp + interval 5 minutes"))
                   .when(col("event_type") == "event_complete", expr("timestamp + interval 15 minutes")))

# Select and order final columns
df = df.select("event_id", "event_type", "timestamp").orderBy("event_id", "timestamp")

In [0]:
df.filter(df.event_id == 34).show()

+--------+-----------+-------------------+
|event_id| event_type|          timestamp|
+--------+-----------+-------------------+
|      34|event_start|2023-10-01 00:00:00|
+--------+-----------+-------------------+



In [0]:
filtered_id = df.\
    filter(df.event_type.isin("event_start", "event_complete")).groupBy("event_id").agg(
    count("event_id").alias("event_count")
        ).filter(col("event_count") == 2)

In [0]:

df_aliased = df.alias("main")
filtered_id_aliased = filtered_id.alias("filtered")

filtered_df = df_aliased.\
    filter(df_aliased.timestamp.isNotNull()).\
    filter((df_aliased.event_type == "event_start") | (df_aliased.event_type == "event_complete")).\
    join(
    filtered_id_aliased,
    df_aliased.event_id == filtered_id_aliased.event_id,
    "inner"
).select(df_aliased.event_id, df_aliased.event_type, df_aliased.timestamp)
    

result = filtered_df.groupBy(filtered_df.event_id).agg(
    min(filtered_df.timestamp).alias("start_time"),
    max(filtered_df.timestamp).alias("end_time")
).withColumn("duration", (unix_timestamp("end_time") - unix_timestamp("start_time")) / 3600).\
select(filtered_df.event_id, "duration").show()

+--------+--------+
|event_id|duration|
+--------+--------+
|      27| 1464.25|
|      28| 2928.25|
|      31| 5088.25|
|      53| 4416.25|
|      78| 7296.25|
|      81| 4416.25|
|     101| 2208.25|
|     103| 7272.25|
|     108| 5088.25|
|     115| 1464.25|
|     126| 1416.25|
|     148| 5088.25|
|     155| 5856.25|
|     183| 1416.25|
|     210|  744.25|
|     211| 5808.25|
|     236| 3672.25|
|     243| 4416.25|
|     251| 2136.25|
|     253| 5856.25|
+--------+--------+
only showing top 20 rows

