In [None]:
# Load the CSV from the DBFS path
df = spark.read.option("header", True).option("inferSchema", True).csv("/FileStore/tables/user_behavior.csv")

In [None]:
# Show schema
df.printSchema()

In [None]:
# Show sample data
df.show(10)

In [None]:
# Count rows
df.count()

In [None]:
# Unique users
df.select("user_id").distinct().count()

In [None]:
# Most common events
df.groupBy("event_type").count().orderBy("count", ascending=False).show()

In [None]:
# Most visited screens
df.groupBy("screen_name").count().orderBy("count", ascending=False).show()

In [None]:
from pyspark.sql.functions import col, to_timestamp

# Convert 'event_time' to timestamp format
df = df.withColumn("event_time", to_timestamp(col("event_time")))

In [None]:
# Sort by user_id, session_id, and event_time
df = df.orderBy("user_id", "session_id", "event_time")

df.show(30)

In [None]:
from pyspark.sql.functions import max, min, col

# Group by user_id and session_id, then calculate session start and end
session_duration = df.groupBy("user_id", "session_id") \
    .agg(
        min("event_time").alias("session_start"),
        max("event_time").alias("session_end")
    ) \
    .withColumn("duration_minutes", (col("session_end").cast("long") - col("session_start").cast("long")) / 60)

In [None]:
# Show the longest sessions
session_duration.orderBy("duration_minutes", ascending=False).show(15)

In [None]:
# Most visited screens
df.groupBy("screen_name") \
  .count() \
  .orderBy("count", ascending=False) \
  .show(10)

In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

# Create a window partitioned by user and session, ordered by time DESC
w = Window.partitionBy("user_id", "session_id").orderBy(col("event_time").desc())

# Pick the last event in each session
last_events = df.withColumn("rank", row_number().over(w)) \
    .filter(col("rank") == 1) \
    .groupBy("screen_name") \
    .count() \
    .orderBy("count", ascending=False)

last_events.show(10)

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Create a row number per user session ordered by event time
w = Window.partitionBy("user_id", "session_id").orderBy("event_time")
df_with_order = df.withColumn("step", F.row_number().over(w))

In [None]:
# Self join to get next screen after each event
transitions = df_with_order.alias("a").join(
    df_with_order.alias("b"),
    on=[
        F.col("a.user_id") == F.col("b.user_id"),
        F.col("a.session_id") == F.col("b.session_id"),
        F.col("a.step") + 1 == F.col("b.step")
    ],
    how="inner"
).select(
    F.col("a.screen_name").alias("from_screen"),
    F.col("b.screen_name").alias("to_screen")
)

In [None]:
# Count screen transitions
transition_counts = transitions.groupBy("from_screen", "to_screen").count().orderBy("count", ascending=False)
transition_counts.show(10)

In [None]:
# Aggregate user stats
user_stats = session_duration.groupBy("user_id").agg(
    F.count("session_id").alias("num_sessions"),
    F.avg("duration_minutes").alias("avg_duration")
)

In [None]:
# Optional: Join with total events per user
event_counts = df.groupBy("user_id").count().withColumnRenamed("count", "total_events")
user_features = user_stats.join(event_counts, on="user_id")

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans

# Feature vector
assembler = VectorAssembler(inputCols=["num_sessions", "avg_duration", "total_events"], outputCol="features")
dataset = assembler.transform(user_features)

In [None]:
# Apply KMeans
kmeans = KMeans(k=3, seed=1)
model = kmeans.fit(dataset)
clusters = model.transform(dataset)

clusters.select("user_id", "prediction").show(10)

In [None]:
import pandas as pd
import matplotlib.pyplot as plt

# Convert to Pandas for plotting
pandas_df = clusters.select("num_sessions", "avg_duration", "total_events", "prediction").toPandas()

# Plot clusters
plt.scatter(pandas_df['avg_duration'], pandas_df['total_events'], c=pandas_df['prediction'], cmap='viridis')
plt.xlabel("Avg Session Duration")
plt.ylabel("Total Events")
plt.title("User Segments")
plt.grid(True)
plt.show()

In [None]:
# Drop the features column before exporting
clusters_cleaned = clusters.drop("features")

# Convert to Pandas safely
pandas_df = clusters_cleaned.toPandas()

# Export to CSV
pandas_df.to_csv("/tmp/user_behavior.csv", index=False)

In [None]:
# Move the CSV to a web-accessible location
dbutils.fs.mv("file:/tmp/user_behavior.csv", "dbfs:/FileStore/user_behavior.csv")