## 1. Set Up Spark Session & Configuration
This section initializes the Spark session, sets the configuration to disable automatic broadcast joins (which can be inefficient with large datasets), and creates a new database if it doesn't already exist.

In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast, col

# Initialize Spark session
spark = SparkSession.builder.appName("Halo").getOrCreate()

# Disable automatic broadcast joins to optimize performance
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

# Create a database for the Halo dataset
spark.sql("CREATE DATABASE IF NOT EXISTS halo")

DataFrame[]

## 2. Load and Bucket the Data
This section loads the CSV files into DataFrames and buckets them on the `match_id` column with 16 buckets for optimized joins. The data is then saved as Iceberg tables.

In [8]:
# Load and bucket the 'matches' data
df_matches = spark.read.option("header", "true").option("inferSchema", "true").csv("/home/iceberg/data/matches.csv")
df_matches.write.mode("overwrite").format("iceberg").bucketBy(16, "match_id").saveAsTable("halo.matches_bucketed")

# Load and bucket the 'match_details' data
df_match_details = spark.read.option("header", "true").option("inferSchema", "true").csv("/home/iceberg/data/match_details.csv")
df_match_details.write.mode("overwrite").format("iceberg").bucketBy(16, "match_id").saveAsTable("halo.match_details_bucketed")

# Load and bucket the 'medals_matches_players' data
df_medals_matches_players = spark.read.option("header", "true").option("inferSchema", "true").csv("/home/iceberg/data/medals_matches_players.csv")
df_medals_matches_players.write.mode("overwrite").format("iceberg").bucketBy(16, "match_id").saveAsTable("halo.medals_matches_players_bucketed")

The history saving thread hit an unexpected error (OperationalError('attempt to write a readonly database')).History will not be written to the database.


24/12/25 12:21:41 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

## 3. Bucketed Joins to Combine Data
This section joins the bucketed tables (`matches`, `match_details`, and `medals_matches_players`) based on the `match_id` and `player_gamertag`. The join is performed to combine all relevant information from the three tables into a unified dataset. After the join, a temporary view `vw_halo_data` is created, making it available for SQL queries later in the process.

In [9]:
# Load bucketed tables for joining
df_matches_bucketed = spark.read.table("halo.matches_bucketed")
df_match_details_bucketed = spark.read.table("halo.match_details_bucketed")
df_medals_matches_players_bucketed = spark.read.table("halo.medals_matches_players_bucketed")

# Create temporary views for SQL queries
df_matches_bucketed.createOrReplaceTempView("vw_matches_bucketed")
df_match_details_bucketed.createOrReplaceTempView("vw_match_details_bucketed")
df_medals_matches_players_bucketed.createOrReplaceTempView("vw_medals_matches_players_bucketed")

# Join the bucketed tables to create a unified dataset
df_joined = spark.sql("""
    SELECT 
        m.match_id, 
        m.mapid, 
        m.is_team_game, 
        m.playlist_id, 
        m.game_variant_id, 
        m.is_match_over, 
        m.completion_date, 
        m.match_duration, 
        m.game_mode, 
        m.map_variant_id,
        md.player_gamertag,
        md.previous_spartan_rank,
        md.spartan_rank,
        md.previous_total_xp,
        md.total_xp,
        md.previous_csr_tier,
        md.previous_csr_designation,
        md.previous_csr,
        md.previous_csr_percent_to_next_tier,
        md.previous_csr_rank,
        md.current_csr_tier,
        md.current_csr_designation,
        md.current_csr,
        md.current_csr_percent_to_next_tier,
        md.current_csr_rank,
        md.player_rank_on_team,
        md.player_finished,
        md.player_average_life,
        md.player_total_kills,
        md.player_total_headshots,
        md.player_total_weapon_damage,
        md.player_total_shots_landed,
        md.player_total_melee_kills,
        md.player_total_melee_damage,
        md.player_total_assassinations,
        md.player_total_ground_pound_kills,
        md.player_total_shoulder_bash_kills,
        md.player_total_grenade_damage,
        md.player_total_power_weapon_damage,
        md.player_total_power_weapon_grabs,
        md.player_total_deaths,
        md.player_total_assists,
        md.player_total_grenade_kills,
        mp.medal_id,
        mp.count AS medal_count
    FROM vw_matches_bucketed m
    JOIN vw_match_details_bucketed md ON m.match_id = md.match_id
    JOIN vw_medals_matches_players_bucketed mp ON md.match_id = mp.match_id AND md.player_gamertag = mp.player_gamertag
""")

## 4. Broadcast Joins for Small Tables
This section broadcasts the `maps` and `medals` tables for efficient joining with the large `df_joined` DataFrame. The joined data is stored in a temporary view for further analysis.

In [10]:
# Load 'maps' and 'medals' data and broadcast for efficient joins
df_maps = spark.read.option("header", "true").option("inferSchema", "true").csv("/home/iceberg/data/maps.csv")
df_maps = df_maps.withColumnRenamed("description", "map_description").withColumnRenamed("name", "map_name")
df_maps_broadcasted = broadcast(df_maps)

df_medals = spark.read.option("header", "true").option("inferSchema", "true").csv("/home/iceberg/data/medals.csv")
df_medals = df_medals.withColumnRenamed("description", "medal_description").withColumnRenamed("name", "medal_name")
df_medals_broadcasted = broadcast(df_medals)

# Join broadcasted tables with the main dataset
df_halo_data = df_joined.join(df_medals_broadcasted, on="medal_id", how="inner").join(df_maps_broadcasted, on="mapid", how="inner")
df_halo_data.createOrReplaceTempView("vw_halo_data")

## 5. Aggregated Queries
This section runs several aggregate queries to answer the questions regarding the most kills per player, the most played playlist, the most played map, and the map with the most Killing Spree medals. All results are obtained using `spark.sql()`.

In [18]:
# Query 1: Player with the most kills per game
most_kills_query = """
SELECT 
    player_gamertag,
    AVG(player_total_kills) AS avg_total_kills
FROM vw_halo_data
GROUP BY player_gamertag
ORDER BY avg_total_kills DESC
LIMIT 1
"""
most_kills_result = spark.sql(most_kills_query).show()

# Query 2: Playlist played the most
most_played_playlist_query = """
WITH matches_playlists AS (
    SELECT 
        match_id,
        playlist_id
    FROM vw_halo_data
    GROUP BY match_id, playlist_id
)
SELECT 
    playlist_id,
    COUNT(playlist_id) AS times_played
FROM matches_playlists
GROUP BY playlist_id
ORDER BY times_played DESC
LIMIT 1
"""
most_played_playlist_result = spark.sql(most_played_playlist_query).show()

# Query 3: Map played the most
most_played_map_query = """
WITH matches_maps AS (
    SELECT 
        match_id,
        mapid
    FROM vw_halo_data
    GROUP BY match_id, mapid
)
SELECT 
    mapid,
    COUNT(mapid) AS times_played
FROM matches_maps
GROUP BY mapid
ORDER BY times_played DESC
LIMIT 1
"""
most_played_map_result = spark.sql(most_played_map_query).show()

# Query 4: Map with the most Killing Spree medals
killing_spree_query = """
SELECT 
    mapid,
    COUNT(classification) AS killing_spree_total
FROM vw_halo_data
WHERE classification = 'KillingSpree'
GROUP BY mapid
ORDER BY killing_spree_total DESC
LIMIT 1
"""
killing_spree_result = spark.sql(killing_spree_query).show()

+---------------+---------------+
|player_gamertag|avg_total_kills|
+---------------+---------------+
|   gimpinator14|          109.0|
+---------------+---------------+



                                                                                

+--------------------+------------+
|         playlist_id|times_played|
+--------------------+------------+
|f72e0ef0-7c4a-430...|        7640|
+--------------------+------------+



                                                                                

+--------------------+------------+
|               mapid|times_played|
+--------------------+------------+
|c7edbf0f-f206-11e...|        7032|
+--------------------+------------+

+--------------------+-------------------+
|               mapid|killing_spree_total|
+--------------------+-------------------+
|c7edbf0f-f206-11e...|               6734|
+--------------------+-------------------+



                                                                                

## 6. Data Sorting
This section sorts the data within partitions based on several columns to reduce data size, particularly for columns with low cardinality like `playlist_id` and `mapid`. The sorted and unsorted versions of the data are saved to separate tables.

In [17]:
# Sort the data within partitions to optimize size
df_sorted = df_halo_data.sortWithinPartitions(
    col("match_id"), 
    col("player_gamertag"), 
    col("medal_id"),
    col("game_variant_id"), 
    col("playlist_id"), 
    col("mapid")
)

# Write both sorted and unsorted datasets to separate tables
df_halo_data.write.mode("overwrite").option("inferSchema", "true").saveAsTable("halo.halo_data_unsorted")
df_sorted.write.mode("overwrite").option("inferSchema", "true").saveAsTable("halo.halo_data_sorted")

                                                                                

## 7. Data Size Comparison
This section compares the file sizes and number of files between the sorted and unsorted versions of the dataset. This helps identify the impact of sorting on data storage efficiency.

In [19]:
# Compare the file sizes of sorted and unsorted data
file_size_comparison_query = """
SELECT SUM(file_size_in_bytes) AS size, COUNT(1) AS num_files, 'sorted' AS data_type
FROM halo.halo_data_sorted.files
UNION ALL
SELECT SUM(file_size_in_bytes) AS size, COUNT(1) AS num_files, 'unsorted' AS data_type
FROM halo.halo_data_unsorted.files
"""
file_size_comparison_result = spark.sql(file_size_comparison_query).show()

+--------+---------+---------+
|    size|num_files|data_type|
+--------+---------+---------+
|22187205|       13|   sorted|
|22253706|       13| unsorted|
+--------+---------+---------+

