## 🧠 HaloScop

This project develops a performance-optimized Spark pipeline to analyze multiplayer game data using four core datasets: `matches`, `match_details`, `medals_matches_players`, and `medals`.

The analysis focuses on answering key gameplay questions:
- Which players average the most kills per game?
- Which playlists are played most frequently?
- Which maps host the most matches?
- Where are 'Killing Spree' medals most commonly earned?

To ensure scalability and efficiency, the pipeline applies Spark best practices:
- Disables automatic broadcast joins to retain full control over join strategies
- Broadcasts small dimension tables explicitly to reduce shuffle
- Repartitions large fact tables to simulate bucketed join behavior
- Experiments with `.sortWithinPartitions()` to compare storage efficiency based on sort keys


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast, col, avg, desc

## 🔧 Spark Session Initialization

We begin by initializing a Spark session named `"SparkFundamentalsHomework"`. As a best practice for join performance tuning, we **disable automatic broadcast joins** using:

```python
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")


In [4]:
spark = SparkSession.builder.appName("SparkFundamentalsHomework").getOrCreate()
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")


---

### 📁 Loading the Data

```markdown
## 📁 Reading Input Data

We read five CSV files representing different aspects of gameplay:

- `matches.csv`: Match-level metadata
- `match_details.csv`: Player-level stats per match
- `medals_matches_players.csv`: Medals earned by players per match
- `medals.csv`: Lookup for medal names and categories
- `maps.csv`: Lookup for map names and descriptions

The small dimension tables (`medals`, `maps`) are explicitly **broadcasted** using Spark's `broadcast()` function to optimize join performance later in the pipeline.


In [5]:
# Read data
matches_df = spark.read.option("header", "true").csv("/home/iceberg/data/matches.csv")
match_details_df = spark.read.option("header", "true").csv("/home/iceberg/data/match_details.csv")
medals_matches_players_df = spark.read.option("header", "true").csv("/home/iceberg/data/medals_matches_players.csv")
# 2. Read data and explicitly broadcast medals and maps
#    (Explicit broadcast JOINs for small dimension tables)
medals_df = broadcast(spark.read.option("header", "true").csv("/home/iceberg/data/medals.csv"))
maps_df = broadcast(spark.read.option("header", "true").csv("/home/iceberg/data/maps.csv"))

In [26]:
matches_df.printSchema()
matches_df.show(5)

root
 |-- match_id: string (nullable = true)
 |-- mapid: string (nullable = true)
 |-- is_team_game: string (nullable = true)
 |-- playlist_id: string (nullable = true)
 |-- game_variant_id: string (nullable = true)
 |-- is_match_over: string (nullable = true)
 |-- completion_date: string (nullable = true)
 |-- match_duration: string (nullable = true)
 |-- game_mode: string (nullable = true)
 |-- map_variant_id: string (nullable = true)

+--------------------+--------------------+------------+--------------------+--------------------+-------------+--------------------+--------------+---------+--------------------+
|            match_id|               mapid|is_team_game|         playlist_id|     game_variant_id|is_match_over|     completion_date|match_duration|game_mode|      map_variant_id|
+--------------------+--------------------+------------+--------------------+--------------------+-------------+--------------------+--------------+---------+--------------------+
|11de1a94-8d07-416

In [15]:
maps_df.printSchema()
maps_df.show(5)

root
 |-- mapid: string (nullable = true)
 |-- name: string (nullable = true)
 |-- description: string (nullable = true)

+--------------------+-------------------+--------------------+
|               mapid|               name|         description|
+--------------------+-------------------+--------------------+
|c93d708f-f206-11e...|              Urban|Andesia was the c...|
|cb251c51-f206-11e...|     Raid on Apex 7|This unbroken rin...|
|c854e54f-f206-11e...|March on Stormbreak|                NULL|
|c8d69870-f206-11e...| Escape from A.R.C.|Scientists flocke...|
|73ed1fd0-45e5-4bb...|             Osiris|                NULL|
+--------------------+-------------------+--------------------+
only showing top 5 rows



25/07/10 18:17:46 WARN HintErrorLogger: A join hint (strategy=broadcast) is specified but it is not part of a join relation.


## 🪣 Bucketing for Optimized Joins

To simulate bucketed join behavior in Spark, all large fact tables are repartitioned on the `match_id` column using 16 partitions. This approach ensures that matching rows from different tables are colocated in the same partition, reducing shuffle overhead during joins.

By aligning the number of partitions and the partitioning column across tables, Spark can efficiently execute join operations, mimicking the performance benefits of bucketed tables without requiring full Hive-style bucketing.


In [6]:
bucketed_matches = matches_df.repartition(16, "match_id")
bucketed_match_details = match_details_df.repartition(16, "match_id")
bucketed_medals_matches_players = medals_matches_players_df.repartition(16, "match_id")

## 🎯 Players with the Highest Average Kills per Game

- Joins player-level match details with match records on `match_id`
- Groups results by `player_gamertag`
- Calculates the average number of kills per player
- Sorts players by descending average kills
- Displays the top 10 players based on kill efficiency


In [27]:


kills_per_game = (
    bucketed_match_details
    .join(bucketed_matches, "match_id")
    .groupBy("player_gamertag")
    .agg(avg(col("player_total_kills")).alias("avg_kills_per_game"))
    .orderBy(desc("avg_kills_per_game"))
)
kills_per_game.show(10)



+---------------+------------------+
|player_gamertag|avg_kills_per_game|
+---------------+------------------+
|   gimpinator14|             109.0|
|  I Johann117 I|              96.0|
|BudgetLegendary|              83.0|
|      GsFurreal|              75.0|
|   Sexy is Back|              73.0|
|   killerguy789|              68.0|
|THC GUILTYSPARK|              67.0|
|PrimePromethean|              66.0|
|    HisLattice1|              66.0|
|     taurenmonk|              64.0|
+---------------+------------------+
only showing top 10 rows



                                                                                

## 🗂️ Most Frequently Played Playlists

- Analyzes `bucketed_matches` to identify high-volume playlists
- Groups the data by `playlist_id`
- Counts the number of matches associated with each playlist
- Sorts the result in descending order of match count
- Highlights the top 10 most played playlists


In [8]:
# 4b. Which playlist gets played the most?
#     Count the number of matches per playlist using bucketed_matches.
most_played_playlist = (
    bucketed_matches
    .groupBy("playlist_id")
    .count()
    .orderBy(desc("count"))
)
most_played_playlist.show(10)

+--------------------+-----+
|         playlist_id|count|
+--------------------+-----+
|f72e0ef0-7c4a-430...| 9350|
|2323b76a-db98-4e0...| 3244|
|892189e9-d712-4bd...| 2159|
|c98949ae-60a8-43d...| 1984|
|b50c4dc2-6c86-4d7...| 1462|
|0e39ead4-383b-445...|  909|
|f27a65eb-2d11-496...|  701|
|d0766624-dbd7-453...|  643|
|0bcf2be1-3168-4e4...|  564|
|780cc101-005c-4fc...|  527|
+--------------------+-----+
only showing top 10 rows



## 🗺️ Most Frequently Played Maps

- Joins `bucketed_matches` with the broadcasted `maps` table on `mapid`
- Groups the results by map `name`
- Counts how often each map appears across matches
- Sorts by descending frequency
- Displays the top 10 most played maps based on match volume


In [29]:
# 4c. Which map gets played the most?
#     Join bucketed_matches with maps to get map names, then count.
most_played_map = (
    bucketed_matches
    .join(maps_df, bucketed_matches["mapid"] == maps_df["mapid"])
    .groupBy(maps_df["name"])
    .count()
    .orderBy(desc("count"))
)
most_played_map.show(10)

+--------------+-----+
|          name|count|
+--------------+-----+
|Breakout Arena| 8587|
|        Empire| 1489|
|        Alpine| 1461|
|       The Rig| 1088|
|       Glacier| 1052|
|         Truth| 1036|
|         Plaza|  996|
|        Regret|  982|
|      Coliseum|  971|
|          Eden|  915|
+--------------+-----+
only showing top 10 rows



## 🏅 Maps with the Most Killing Spree Medals

- Filters medal data to include only records labeled "Killing Spree"
- Joins the filtered medal-player data with `bucketed_matches` on `match_id`
- Joins the result with the broadcasted `maps` table on `mapid`
- Groups by map `name` to count occurrences of Killing Spree medals per map
- Displays the top 10 maps where players earn this medal most often


In [22]:
killing_spree = (
    bucketed_medals_matches_players
    .join(medals_df, "medal_id")
    .filter(col("name") == "Killing Spree")
    .join(bucketed_matches, "match_id")
    .join(maps_df, bucketed_matches["mapid"] == maps_df["mapid"])
    .groupBy(maps_df["name"])
    .count()
    .orderBy(desc("count"))
)
killing_spree.show(10)

+--------------+-----+
|          name|count|
+--------------+-----+
|Breakout Arena| 6559|
|        Alpine| 4346|
|       Glacier| 2617|
|        Empire| 1996|
|         Truth| 1765|
|       The Rig| 1747|
|         Plaza| 1671|
|      Coliseum| 1653|
|        Regret| 1558|
|          Eden| 1551|
+--------------+-----+
only showing top 10 rows



## 🧪 Partition Sorting for Storage Optimization

- Applies `.sortWithinPartitions()` on `bucketed_matches` using three keys:
  - `playlist_id`
  - `mapid`
  - `completion_date`
- Writes each sorted DataFrame to a Parquet file
- Goal: compare output sizes to identify which sort key results in the smallest on-disk footprint
- Helps determine the most efficient sort strategy for downstream performance


In [24]:
sorted_by_playlist = bucketed_matches.sortWithinPartitions("playlist_id")
sorted_by_playlist.write.mode("overwrite").parquet("/tmp/sorted_by_playlist")

sorted_by_map = bucketed_matches.sortWithinPartitions("mapid")
sorted_by_map.write.mode("overwrite").parquet("/tmp/sorted_by_map")

sorted_by_date = bucketed_matches.sortWithinPartitions("completion_date")
sorted_by_date.write.mode("overwrite").parquet("/tmp/sorted_by_date")

25/07/10 18:22:32 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/07/10 18:22:32 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
25/07/10 18:22:32 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


In [None]:
# Clean up
spark.stop()