In [0]:
%sql
SELECT * FROM `workspace`.`default`.`session`;

player_id,session_id,login,logout,purchase,play,amount_spent,timestamp
P01,S001,1,0,0,0,0,8/2/2025 10:00
P01,S001,0,0,1,0,150,8/2/2025 10:25
P01,S001,0,1,0,0,0,8/2/2025 11:00
P05,S002,1,0,0,0,0,8/3/2025 9:10
P05,S002,0,0,0,1,0,8/3/2025 9:40
P05,S002,0,1,0,0,0,8/3/2025 10:05
P12,S003,1,0,0,0,0,8/4/2025 15:00
P12,S003,0,0,1,0,220,8/4/2025 15:35
P12,S003,0,1,0,0,0,8/4/2025 16:10
P03,S004,1,0,0,0,0,8/5/2025 14:00


In [0]:
# Load table into DataFrame
df1 = spark.table("players_dataset_1")

# Show data
display(df1)

player_id,player_name,country
P01,Rajesh,Canada
P02,Pooja,UK
P03,Alok,UK
P04,Suresh,India
P05,Arjun,UK
P06,Pooja,Germany
P07,Meena,India
P08,Alok,India
P09,Sneha,UK
P10,Sunita,Canada


In [0]:
df2 = spark.table("session")
# Show data
display(df2)


player_id,session_id,login,logout,purchase,play,amount_spent,timestamp
P01,S001,1,0,0,0,0,8/2/2025 10:00
P01,S001,0,0,1,0,150,8/2/2025 10:25
P01,S001,0,1,0,0,0,8/2/2025 11:00
P05,S002,1,0,0,0,0,8/3/2025 9:10
P05,S002,0,0,0,1,0,8/3/2025 9:40
P05,S002,0,1,0,0,0,8/3/2025 10:05
P12,S003,1,0,0,0,0,8/4/2025 15:00
P12,S003,0,0,1,0,220,8/4/2025 15:35
P12,S003,0,1,0,0,0,8/4/2025 16:10
P03,S004,1,0,0,0,0,8/5/2025 14:00


In [0]:

df1 = spark.table("players_dataset_1")
df2 = spark.table("session")

# Perform INNER JOIN on hotel_id
joined_df = df2.join(
    df1,
    df2["player_id"] ==df1["player_id"],
    "inner"
)

display(joined_df)

player_id,session_id,login,logout,purchase,play,amount_spent,timestamp,player_id.1,player_name,country
P01,S001,1,0,0,0,0,8/2/2025 10:00,P01,Rajesh,Canada
P01,S001,0,0,1,0,150,8/2/2025 10:25,P01,Rajesh,Canada
P01,S001,0,1,0,0,0,8/2/2025 11:00,P01,Rajesh,Canada
P05,S002,1,0,0,0,0,8/3/2025 9:10,P05,Arjun,UK
P05,S002,0,0,0,1,0,8/3/2025 9:40,P05,Arjun,UK
P05,S002,0,1,0,0,0,8/3/2025 10:05,P05,Arjun,UK
P12,S003,1,0,0,0,0,8/4/2025 15:00,P12,Neha,Germany
P12,S003,0,0,1,0,220,8/4/2025 15:35,P12,Neha,Germany
P12,S003,0,1,0,0,0,8/4/2025 16:10,P12,Neha,Germany
P03,S004,1,0,0,0,0,8/5/2025 14:00,P03,Alok,UK


In [0]:
from pyspark.sql import functions as F

# Calculate total purchases per player
total_purchases_df = joined_df.select('session.player_id','player_name', 'country', 'purchase').groupBy('session.player_id',
    "player_name", "country"
).agg(
    F.sum("purchase").alias("total_purchases")

).orderBy(F.desc("total_purchases"))

display(total_purchases_df)


player_id,player_name,country,total_purchases
P15,Amit,India,1
P12,Neha,Germany,1
P19,Ravi,USA,1
P04,Suresh,India,1
P08,Alok,India,1
P14,Neha,Germany,1
P01,Rajesh,Canada,1
P13,Neha,India,1
P02,Pooja,UK,1
P09,Sneha,UK,1


Databricks visualization. Run in Databricks to view.

In [0]:
from pyspark.sql import functions as F

# Convert timestamp string into Spark timestamp
clean_df = joined_df.select(
    df1["player_id"], df1["player_name"], df1["country"],
    df2["session_id"],
    F.to_timestamp(df2["timestamp"], "M/d/yyyy H:mm").alias("timestamp")
)

# 1. Min login, max logout per session
session_times = clean_df.groupBy("player_id", "session_id").agg(
    F.min("timestamp").alias("login_time"),
    F.max("timestamp").alias("logout_time")
)

# 2. Session length in minutes
session_times = session_times.withColumn(
    "session_length_min",
    (F.unix_timestamp("logout_time") - F.unix_timestamp("login_time")) / 60
)

# 3. Max session length per player
max_session_per_player = session_times.groupBy("player_id").agg(
    F.max("session_length_min").alias("max_session_length_min")
)

# 4. Join with player info
result_df = max_session_per_player.join(
    df1.select("player_id", "player_name", "country"),
    on="player_id",
    how="inner"
)

display(result_df)


player_id,max_session_length_min,player_name,country
P01,60.0,Rajesh,Canada
P02,70.0,Pooja,UK
P03,55.0,Alok,UK
P04,75.0,Suresh,India
P05,55.0,Arjun,UK
P06,65.0,Pooja,Germany
P07,70.0,Meena,India
P08,55.0,Alok,India
P09,55.0,Sneha,UK
P10,65.0,Sunita,Canada


Databricks visualization. Run in Databricks to view.

In [0]:
# overwrite joined_df with clean version (only one player_id)
clean_joined = joined_df.select(
    df1["player_id"].alias("player_id"),
    df1["player_name"],
    df1["country"],
    df2["session_id"],
    df2["amount_spent"],
    df2["timestamp"]
)

# create temp view
clean_joined.createOrReplaceTempView("joined_df")


In [0]:
top5_spenders = spark.sql("""
    SELECT 
        player_id,
        player_name,
        country,
        SUM(amount_spent) AS total_spent,
        RANK() OVER (ORDER BY SUM(amount_spent) DESC) AS rank
    FROM joined_df
    GROUP BY player_id, player_name, country
    ORDER BY total_spent DESC
    LIMIT 5
""")

display(top5_spenders)


player_id,player_name,country,total_spent,rank
P19,Ravi,USA,400,1
P09,Sneha,UK,300,2
P13,Neha,India,300,2
P02,Pooja,UK,250,4
P12,Neha,Germany,220,5


Databricks visualization. Run in Databricks to view.

In [0]:
%sql
CREATE OR REPLACE TEMP VIEW player_activity AS
SELECT 
    player_id,
    player_name,
    country,
    TO_TIMESTAMP(timestamp, 'M/d/yyyy H:mm') AS activity_time,
    DATE_TRUNC('month', TO_TIMESTAMP(timestamp, 'M/d/yyyy H:mm')) AS activity_month
FROM joined_df;


In [0]:
%sql
-- Distinct players active per month
CREATE OR REPLACE TEMP VIEW active_players AS
SELECT DISTINCT
    player_id,
    player_name,
    country,
    activity_month
FROM player_activity;


In [0]:
%sql
-- Last month vs this month players
WITH last_month AS (
    SELECT DISTINCT player_id 
    FROM active_players
    WHERE activity_month = ADD_MONTHS(DATE_TRUNC('month', CURRENT_DATE), -1)
),
this_month AS (
    SELECT DISTINCT player_id 
    FROM active_players
    WHERE activity_month = DATE_TRUNC('month', CURRENT_DATE)
),
churned AS (
    SELECT l.player_id
    FROM last_month l
    LEFT ANTI JOIN this_month t
    ON l.player_id = t.player_id
)

-- Churn rate
SELECT
    COUNT(c.player_id) AS churn_players,
    (SELECT COUNT(*) FROM last_month) AS players_last_month,
    (COUNT(c.player_id) * 100.0 / (SELECT COUNT(*) FROM last_month)) AS churn_rate_percentage
FROM churned c;


churn_players,players_last_month,churn_rate_percentage
10,10,100.0


Databricks visualization. Run in Databricks to view.