# Homework Week 3 - Spark Fundamentals
### by Marina Tetzlaff | December 31, 2024

Start by loading the datasets from the iceberg data folder. The queries will use the following tables:
- match_details
- matches
- medals_matches_players
- medals
- maps


In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr, col
spark = SparkSession.builder.appName("Jupyter").getOrCreate()

spark

24/12/30 13:46:28 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


## 0. Import the data sources

In [4]:
match_details = spark.read.option("header", "true").csv("/home/iceberg/data/match_details.csv")
match_details.show(5)

24/12/30 13:46:33 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+--------------------+---------------+---------------------+------------+-----------------+--------+-----------------+------------------------+------------+---------------------------------+-----------------+----------------+-----------------------+-----------+--------------------------------+----------------+-------------------+---------------+-------------------+------------------+----------------------+--------------------------+-------------------------+------------------------+-------------------------+---------------------------+-------------------------------+--------------------------------+---------------------------+--------------------------------+-------------------------------+-------------------+--------------------+--------------------------+-------+-------+
|            match_id|player_gamertag|previous_spartan_rank|spartan_rank|previous_total_xp|total_xp|previous_csr_tier|previous_csr_designation|previous_csr|previous_csr_percent_to_next_tier|previous_csr_rank|current_

In [5]:
null_count = match_details.filter(match_details.team_id.isNull()).count()
null_count

                                                                                

11

In [6]:
matches = spark.read.option("header", "true").csv("/home/iceberg/data/matches.csv")
matches.show(5)

+--------------------+--------------------+------------+--------------------+--------------------+-------------+--------------------+--------------+---------+--------------------+
|            match_id|               mapid|is_team_game|         playlist_id|     game_variant_id|is_match_over|     completion_date|match_duration|game_mode|      map_variant_id|
+--------------------+--------------------+------------+--------------------+--------------------+-------------+--------------------+--------------+---------+--------------------+
|11de1a94-8d07-416...|c7edbf0f-f206-11e...|        true|f72e0ef0-7c4a-430...|1e473914-46e4-408...|         true|2016-02-22 00:00:...|          NULL|     NULL|                NULL|
|d3643e71-3e51-43e...|cb914b9e-f206-11e...|       false|d0766624-dbd7-453...|257a305e-4dd3-41f...|         true|2016-02-14 00:00:...|          NULL|     NULL|                NULL|
|d78d2aae-36e4-48a...|c7edbf0f-f206-11e...|        true|f72e0ef0-7c4a-430...|1e473914-46e4-408...|  

In [7]:
medals_matches_players = spark.read.option("header", "true").csv("/home/iceberg/data/medals_matches_players.csv")
medals_matches_players.show(5)

+--------------------+---------------+----------+-----+
|            match_id|player_gamertag|  medal_id|count|
+--------------------+---------------+----------+-----+
|009fdac5-e15c-47c...|       EcZachly|3261908037|    7|
|009fdac5-e15c-47c...|       EcZachly| 824733727|    2|
|009fdac5-e15c-47c...|       EcZachly|2078758684|    2|
|009fdac5-e15c-47c...|       EcZachly|2782465081|    2|
|9169d1a3-955c-4ea...|       EcZachly|3001183151|    1|
+--------------------+---------------+----------+-----+
only showing top 5 rows



In [8]:
medals = spark.read.option("header", "true").csv("/home/iceberg/data/medals.csv")
medals.show(5)

+----------+--------------------+-----------+----------+------------------+-------------------+------------+-------------+--------------+--------------------+--------------+----------+
|  medal_id|          sprite_uri|sprite_left|sprite_top|sprite_sheet_width|sprite_sheet_height|sprite_width|sprite_height|classification|         description|          name|difficulty|
+----------+--------------------+-----------+----------+------------------+-------------------+------------+-------------+--------------+--------------------+--------------+----------+
|2315448068|                NULL|       NULL|      NULL|              NULL|               NULL|        NULL|         NULL|          NULL|                NULL|          NULL|      NULL|
|3565441934|                NULL|       NULL|      NULL|              NULL|               NULL|        NULL|         NULL|          NULL|                NULL|          NULL|      NULL|
|4162659350|https://content.h...|        750|       750|                74|

Goal:
- Build a disabled automatic broadcast join with spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
- Explicitly broadcast JOINs medals and maps
- Bucket join match_details, matches, and medal_matches_players on match_id with 16 buckets
- Aggregate the joined data frame to determine the following: 
    - Player with highest kills per game average
    - Playlist with most plays
    - Map most played
    - Map with most players earning Killing Spree medals
- With the aggregated dataset, try different .sortWithinPartitions to see which has the smallest data size (check playlists and maps)

Next, I need to create a database and insert the data from these tables so we can use SQL to query the data

In [9]:
%%sql
CREATE DATABASE IF NOT EXISTS bootcamp

In [10]:
%%sql
DROP TABLE IF EXISTS bootcamp.match_details

In [11]:
%%sql
DROP TABLE IF EXISTS bootcamp.matches

In [12]:
%%sql
DROP TABLE IF EXISTS bootcamp.medals_matches_players

In [13]:
%%sql
DROP TABLE IF EXISTS bootcamp.medals

In [14]:
%%sql
DROP TABLE IF EXISTS bootcamp.maps

In [15]:
match_details.dtypes

[('match_id', 'string'),
 ('player_gamertag', 'string'),
 ('previous_spartan_rank', 'string'),
 ('spartan_rank', 'string'),
 ('previous_total_xp', 'string'),
 ('total_xp', 'string'),
 ('previous_csr_tier', 'string'),
 ('previous_csr_designation', 'string'),
 ('previous_csr', 'string'),
 ('previous_csr_percent_to_next_tier', 'string'),
 ('previous_csr_rank', 'string'),
 ('current_csr_tier', 'string'),
 ('current_csr_designation', 'string'),
 ('current_csr', 'string'),
 ('current_csr_percent_to_next_tier', 'string'),
 ('current_csr_rank', 'string'),
 ('player_rank_on_team', 'string'),
 ('player_finished', 'string'),
 ('player_average_life', 'string'),
 ('player_total_kills', 'string'),
 ('player_total_headshots', 'string'),
 ('player_total_weapon_damage', 'string'),
 ('player_total_shots_landed', 'string'),
 ('player_total_melee_kills', 'string'),
 ('player_total_melee_damage', 'string'),
 ('player_total_assassinations', 'string'),
 ('player_total_ground_pound_kills', 'string'),
 ('playe

In [16]:
from pyspark.sql.functions import countDistinct
unique_players_count = match_details.select(countDistinct('player_gamertag')).collect()[0][0]

unique_players_count

                                                                                

69420

In [17]:
unique_teams = match_details.select(countDistinct('team_id')).collect()[0][0]
unique_teams

8

In [18]:
unique_matches = match_details.select(countDistinct('match_id')).collect()[0][0]
unique_matches

19050

In [19]:
match_details.count()

151761

In [20]:
%%sql
CREATE TABLE
  IF NOT EXISTS bootcamp.match_details (
    match_id STRING,
    player_gamertag STRING,
    previous_spartan_rank INT,
    spartan_rank INT,
    previous_total_xp BIGINT,
    total_xp BIGINT,
    previous_csr_tier INT,
    previous_csr_designation INT,
    previous_csr INT,
    previous_csr_percent_to_next_tier DOUBLE,
    previous_csr_rank INT,
    current_csr_tier INT,
    current_csr_designation INT,
    current_csr INT,
    current_csr_percent_to_next_tier DOUBLE,
    current_csr_rank INT,
    player_rank_on_team INT,
    player_finished BOOLEAN,
    player_average_life STRING,
    player_total_kills INT,
    player_total_headshots INT,
    player_total_weapon_damage DOUBLE,
    player_total_shots_landed INT,
    player_total_melee_kills INT,
    player_total_melee_damage DOUBLE,
    player_total_assassinations INT,
    player_total_ground_pound_kills INT,
    player_total_shoulder_bash_kills INT,
    player_total_grenade_damage DOUBLE,
    player_total_power_weapon_damage DOUBLE,
    player_total_power_weapon_grabs INT,
    player_total_deaths INT,
    player_total_assists INT,
    player_total_grenade_kills INT,
    did_win BOOLEAN,
    team_id INT
  ) USING iceberg PARTITIONED BY (team_id)

In [21]:
matches.dtypes

[('match_id', 'string'),
 ('mapid', 'string'),
 ('is_team_game', 'string'),
 ('playlist_id', 'string'),
 ('game_variant_id', 'string'),
 ('is_match_over', 'string'),
 ('completion_date', 'string'),
 ('match_duration', 'string'),
 ('game_mode', 'string'),
 ('map_variant_id', 'string')]

In [22]:
%%sql 
CREATE TABLE
  IF NOT EXISTS bootcamp.matches (
    match_id STRING,
    mapid STRING,
    is_team_game BOOLEAN,
    playlist_id STRING,
    game_variant_id STRING,
    is_match_over BOOLEAN,
    completion_date TIMESTAMP,
    match_duration STRING,
    game_mode STRING,
    map_variant_id STRING,
    partitioned_date DATE
  ) USING iceberg PARTITIONED BY (mapid)

In [23]:
medals_matches_players.dtypes

[('match_id', 'string'),
 ('player_gamertag', 'string'),
 ('medal_id', 'string'),
 ('count', 'string')]

In [24]:
%%sql
CREATE TABLE
  IF NOT EXISTS bootcamp.medals_matches_players (
    match_id STRING,
    player_gamertag STRING,
    medal_id BIGINT,
    COUNT INT
  ) USING iceberg PARTITIONED BY (medal_id)

In [25]:
medals_matches_players.count()

755229

In [26]:
medals.dtypes

[('medal_id', 'string'),
 ('sprite_uri', 'string'),
 ('sprite_left', 'string'),
 ('sprite_top', 'string'),
 ('sprite_sheet_width', 'string'),
 ('sprite_sheet_height', 'string'),
 ('sprite_width', 'string'),
 ('sprite_height', 'string'),
 ('classification', 'string'),
 ('description', 'string'),
 ('name', 'string'),
 ('difficulty', 'string')]

In [27]:
medals.count()

183

In [28]:
%%sql
CREATE TABLE
  IF NOT EXISTS bootcamp.medals (
    medal_id BIGINT,
    sprite_uri STRING,
    sprite_left INT,
    sprite_top INT,
    sprite_sheet_width INT,
    sprite_sheet_height INT,
    sprite_width INT,
    sprite_height INT,
    classification STRING,
    description STRING,
    name STRING,
    difficulty INT
  ) USING iceberg

In [29]:
maps = spark.read.option("header", "true").csv("/home/iceberg/data/maps.csv")
maps.show(5)

+--------------------+-------------------+--------------------+
|               mapid|               name|         description|
+--------------------+-------------------+--------------------+
|c93d708f-f206-11e...|              Urban|Andesia was the c...|
|cb251c51-f206-11e...|     Raid on Apex 7|This unbroken rin...|
|c854e54f-f206-11e...|March on Stormbreak|                NULL|
|c8d69870-f206-11e...| Escape from A.R.C.|Scientists flocke...|
|73ed1fd0-45e5-4bb...|             Osiris|                NULL|
+--------------------+-------------------+--------------------+
only showing top 5 rows



In [30]:
maps.count()

40

In [31]:
%%sql
CREATE TABLE
  IF NOT EXISTS bootcamp.maps (mapid STRING, name STRING, description STRING) USING iceberg

## 1. Disable automatic broadcast join with spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
# joins default to broadcast join if we let it.

In [32]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

In [33]:
print(spark.conf.get("spark.sql.autoBroadcastJoinThreshold"))

-1


### Write the data to the database

In [34]:
match_details.write.mode("overwrite").saveAsTable("bootcamp.match_details")
matches.write.mode("overwrite").saveAsTable("bootcamp.matches")
medals_matches_players.write.mode("overwrite").saveAsTable("bootcamp.medals_matches_players")
medals.write.mode("overwrite").saveAsTable("bootcamp.medals")
maps.write.mode("overwrite").saveAsTable("bootcamp.maps")

                                                                                

In [35]:
%%sql
SELECT
  *
FROM
  bootcamp.match_details
LIMIT
  5

match_id,player_gamertag,previous_spartan_rank,spartan_rank,previous_total_xp,total_xp,previous_csr_tier,previous_csr_designation,previous_csr,previous_csr_percent_to_next_tier,previous_csr_rank,current_csr_tier,current_csr_designation,current_csr,current_csr_percent_to_next_tier,current_csr_rank,player_rank_on_team,player_finished,player_average_life,player_total_kills,player_total_headshots,player_total_weapon_damage,player_total_shots_landed,player_total_melee_kills,player_total_melee_damage,player_total_assassinations,player_total_ground_pound_kills,player_total_shoulder_bash_kills,player_total_grenade_damage,player_total_power_weapon_damage,player_total_power_weapon_grabs,player_total_deaths,player_total_assists,player_total_grenade_kills,did_win,team_id
71d79b23-4143-4359-a62e-489a27597b23,taterbase,5,5,12537,13383,1.0,3.0,0.0,98.0,,2.0,3.0,0.0,26.0,,4,False,PT14.81149S,6,4,255.0,28,0,0.0,0,0,0,0,0,0,13,1,0,1,1
71d79b23-4143-4359-a62e-489a27597b23,SuPeRSaYaInG0D,18,18,131943,132557,2.0,3.0,0.0,2.0,,1.0,3.0,0.0,76.0,,7,False,PT11.2990845S,7,3,350.5879230499268,49,1,45.0,0,0,0,0,0,0,18,2,0,0,0
71d79b23-4143-4359-a62e-489a27597b23,EcZachly,21,21,168811,169762,2.0,5.0,0.0,94.0,,3.0,5.0,0.0,24.0,,3,False,PT19.1357063S,12,12,625.0,43,0,0.0,0,0,0,0,0,0,10,4,0,1,1
71d79b23-4143-4359-a62e-489a27597b23,johnsnake04,14,14,64073,64639,,,,,,,,,,,6,False,PT21.1521599S,13,13,605.0,24,0,0.0,0,0,0,0,0,0,9,2,0,0,0
71d79b23-4143-4359-a62e-489a27597b23,Super Mac Bros,26,26,243425,244430,1.0,5.0,0.0,86.0,,2.0,5.0,0.0,8.0,,2,False,PT12.8373793S,13,12,595.0,32,1,20.00450134277344,0,0,0,0,0,0,15,2,0,1,1


In [36]:
%%sql
SELECT
  *
FROM
  bootcamp.matches
LIMIT
  5

match_id,mapid,is_team_game,playlist_id,game_variant_id,is_match_over,completion_date,match_duration,game_mode,map_variant_id
11de1a94-8d07-4162-9f5f-d3cc753c811c,c7edbf0f-f206-11e4-aa52-24be05e24f7e,True,f72e0ef0-7c4a-4307-af78-8e38dac3fdba,1e473914-46e4-408d-af26-178fb115de76,True,2016-02-22 00:00:00.000000,,,
d3643e71-3e51-43e6-a200-f4a7f306ac12,cb914b9e-f206-11e4-b447-24be05e24f7e,False,d0766624-dbd7-4536-ba39-2d890a6143a9,257a305e-4dd3-41f1-9824-dfe7e8bd59e1,True,2016-02-14 00:00:00.000000,,,
d78d2aae-36e4-48ac-a3b5-6d4d90f90ace,c7edbf0f-f206-11e4-aa52-24be05e24f7e,True,f72e0ef0-7c4a-4307-af78-8e38dac3fdba,1e473914-46e4-408d-af26-178fb115de76,True,2016-03-24 00:00:00.000000,,,55e5ee2e-88df-4657-b9ae-b6ec7ca64614
b440069e-ec5f-4f51-bdd1-bc0bc7fe1195,c7edbf0f-f206-11e4-aa52-24be05e24f7e,True,f72e0ef0-7c4a-4307-af78-8e38dac3fdba,1e473914-46e4-408d-af26-178fb115de76,True,2015-12-23 00:00:00.000000,,,ec3eef73-13e3-4d4b-a922-cc195109a842
1dd475fc-ee6b-4e1d-8140-c44d03812076,c93d708f-f206-11e4-a815-24be05e24f7e,True,0e39ead4-383b-4452-bbd4-babb7becd82e,42f97cca-2cb4-497a-a0fd-ceef1ba46bcc,True,2016-04-07 00:00:00.000000,,,


In [37]:
%%sql
SELECT
  *
FROM
  bootcamp.medals_matches_players
LIMIT
  5

match_id,player_gamertag,medal_id,count
009fdac5-e15c-47c6-a202-e18ff8800ce7,EcZachly,3261908037,7
009fdac5-e15c-47c6-a202-e18ff8800ce7,EcZachly,824733727,2
009fdac5-e15c-47c6-a202-e18ff8800ce7,EcZachly,2078758684,2
009fdac5-e15c-47c6-a202-e18ff8800ce7,EcZachly,2782465081,2
9169d1a3-955c-4ea9-a9a4-6d57da097660,EcZachly,3001183151,1


In [38]:
%%sql
SELECT
  *
FROM
  bootcamp.medals
LIMIT
  5

medal_id,sprite_uri,sprite_left,sprite_top,sprite_sheet_width,sprite_sheet_height,sprite_width,sprite_height,classification,description,name,difficulty
2315448068,,,,,,,,,,,
3565441934,,,,,,,,,,,
4162659350,https://content.halocdn.com/media/Default/games/halo-5-guardians/sprites/medalspritesheet-be288ea5c0994a4e9d36f43aee7bc631.png,750.0,750.0,74.0,74.0,1125.0,899.0,Breakout,Kill the last enemy within the last 10 seconds of a round.,Buzzer Beater,45.0
1573153198,https://content.halocdn.com/media/Default/games/halo-5-guardians/sprites/medalspritesheet-be288ea5c0994a4e9d36f43aee7bc631.png,0.0,300.0,74.0,74.0,1125.0,899.0,Breakout,Survive a one-on-one encounter.,Vanquisher,30.0
298813630,https://content.halocdn.com/media/Default/games/halo-5-guardians/sprites/medalspritesheet-be288ea5c0994a4e9d36f43aee7bc631.png,0.0,825.0,74.0,74.0,1125.0,899.0,Style,Kill an enemy with Spartan Charge.,Spartan Charge,135.0


In [39]:
%%sql
SELECT
  *
FROM
  bootcamp.maps
LIMIT
  5

mapid,name,description
c93d708f-f206-11e4-a815-24be05e24f7e,Urban,Andesia was the crucible for countless heroes and villains caught in the throes of seething rebellion and righteous excess.
cb251c51-f206-11e4-8541-24be05e24f7e,Raid on Apex 7,This unbroken ring is a symbol of the discovery that shook the galaxy and changed the course of both human and Covenant destiny.
c854e54f-f206-11e4-bddc-24be05e24f7e,March on Stormbreak,
c8d69870-f206-11e4-b477-24be05e24f7e,Escape from A.R.C.,Scientists flocked to this Forerunner excavation in search of new beginnings. What they unearthed will lead to their inevitable end.
73ed1fd0-45e5-4bb9-ab6a-d2852c04ea7d,Osiris,


## 2. Explicitly broadcast JOINs medals and maps

To explicitly broadcast the table joins for the medals and maps, we need to have an understanding of the tables: 

match_details ─┬─── matches (via match_id) ─── maps (via map_id)
             
             └─── medals_matches_players (via match_id)
                         │
                         
                         └─── medals (via medal_id)
Fact Tables: 
- match_details
- medals_matches_players
- matches

Dimension Tables: 
- medals
- maps


### Check for duplicates
Prior to joining, we need to consider the possibility of Duplicates, and dedupe the medals and maps tables (as well as the others)

In [40]:
duplicate_maps = maps.groupBy(maps.columns).count().filter("count >1")
duplicate_maps.show()

+-----+----+-----------+-----+
|mapid|name|description|count|
+-----+----+-----------+-----+
+-----+----+-----------+-----+



In [41]:
duplicate_medals = medals.groupBy(medals.columns).count().filter("count >1")
duplicate_medals.show()

+--------+----------+-----------+----------+------------------+-------------------+------------+-------------+--------------+-----------+----+----------+-----+
|medal_id|sprite_uri|sprite_left|sprite_top|sprite_sheet_width|sprite_sheet_height|sprite_width|sprite_height|classification|description|name|difficulty|count|
+--------+----------+-----------+----------+------------------+-------------------+------------+-------------+--------------+-----------+----+----------+-----+
+--------+----------+-----------+----------+------------------+-------------------+------------+-------------+--------------+-----------+----+----------+-----+



In [42]:
duplicate_match_details = match_details.groupBy(match_details.columns).count().filter("count > 1")
duplicate_match_details.show()

                                                                                

+--------+---------------+---------------------+------------+-----------------+--------+-----------------+------------------------+------------+---------------------------------+-----------------+----------------+-----------------------+-----------+--------------------------------+----------------+-------------------+---------------+-------------------+------------------+----------------------+--------------------------+-------------------------+------------------------+-------------------------+---------------------------+-------------------------------+--------------------------------+---------------------------+--------------------------------+-------------------------------+-------------------+--------------------+--------------------------+-------+-------+-----+
|match_id|player_gamertag|previous_spartan_rank|spartan_rank|previous_total_xp|total_xp|previous_csr_tier|previous_csr_designation|previous_csr|previous_csr_percent_to_next_tier|previous_csr_rank|current_csr_tier|current_c

In [43]:
duplicate_matches = matches.groupBy(matches.columns).count().filter("count > 1")
duplicate_matches.show()

+--------+-----+------------+-----------+---------------+-------------+---------------+--------------+---------+--------------+-----+
|match_id|mapid|is_team_game|playlist_id|game_variant_id|is_match_over|completion_date|match_duration|game_mode|map_variant_id|count|
+--------+-----+------------+-----------+---------------+-------------+---------------+--------------+---------+--------------+-----+
+--------+-----+------------+-----------+---------------+-------------+---------------+--------------+---------+--------------+-----+



In [44]:
from pyspark.sql.functions import sum, avg, max, min, col, lit, count

duplicate_medals_matches_players = medals_matches_players.groupBy("match_id", "player_gamertag", "medal_id") \
    .agg(count("*").alias("duplicate_count")) \
    .filter("duplicate_count > 1")

duplicate_medals_matches_players.show()

                                                                                

+--------+---------------+--------+---------------+
|match_id|player_gamertag|medal_id|duplicate_count|
+--------+---------------+--------+---------------+
+--------+---------------+--------+---------------+



There do not seem to be any duplicate rows in any of the tables. 

## 3. Bucket join match_details, matches, and medal_matches_players on match_id with 16 buckets

In [45]:
from pyspark.sql.functions import broadcast, split, lit
from pyspark.sql.functions import col
from pyspark.storagelevel import StorageLevel

# Initialize Spark session
spark = SparkSession.builder \
    .appName("IcebergTableManagement") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.sql.files.maxPartitionBytes", "134217728") \
    .config("spark.sql.autoBroadcastJoinThreshold", "-1") \
    .config("spark.dynamicAllocation.enabled", "true") \
    .config("spark.dynamicAllocation.minExecutors", "1") \
    .config("spark.dynamicAllocation.maxExecutors", "50") \
    .getOrCreate()

24/12/30 13:46:54 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


The next step is to join all of the tables. 
- Medals will be broadcasted explicitly
- the fact tables will be bucketed into 16 buckets
- the fact tables will be joined on match_id

In [46]:
matches_bucketed = spark.read.option("header", "true") \
    .option("inferSchema", "true") \
    .csv("/home/iceberg/data/matches.csv")

match_details_bucketed = spark.read.option("header", "true") \
    .option("inferSchema", "true") \
    .csv("/home/iceberg/data/match_details.csv")

medals_matches_players_bucketed = spark.read.option("header", "true") \
    .option("inferSchema", "true") \
    .csv("/home/iceberg/data/medals_matches_players.csv")

                                                                                

### 3.1.a. Create matches_bucketed table 🎮

In [47]:
spark.sql("""DROP TABLE IF EXISTS bootcamp.matches_bucketed""")

# Create the table using Iceberg
bucketed_ddl = """
CREATE TABLE IF NOT EXISTS bootcamp.matches_bucketed (
    match_id STRING,
    is_team_game BOOLEAN,
    playlist_id STRING,
    completion_date TIMESTAMP,
    mapid STRING
)
USING iceberg
PARTITIONED BY (completion_date, bucket(16, match_id))
"""
spark.sql(bucketed_ddl)

DataFrame[]

### 3.1.b. Populate matches_bucketed table

In [48]:
# Get distinct completion dates
distinct_dates = matches_bucketed.select("completion_date").distinct().collect()

# Process data in chunks based on completion_date
for row in distinct_dates:
    date = row["completion_date"]
    filtered_matches = matches_bucketed.filter(col("completion_date") == date)
    
    # Repartition and persist the filtered data
    optimized_matches = filtered_matches \
        .select("match_id", "is_team_game", "playlist_id", "completion_date","mapid") \
        .repartition(16, "match_id") \
        .persist(StorageLevel.MEMORY_AND_DISK)
    
    optimized_matches.write \
        .mode("append") \
        .bucketBy(16, "match_id") \
        .partitionBy("completion_date") \
        .saveAsTable("bootcamp.matches_bucketed")

# Verify the data in the table
result = spark.sql("SELECT * FROM bootcamp.matches_bucketed")
result.show(5)

+--------------------+------------+--------------------+-------------------+--------------------+
|            match_id|is_team_game|         playlist_id|    completion_date|               mapid|
+--------------------+------------+--------------------+-------------------+--------------------+
|6aab3241-2091-48a...|        true|0e39ead4-383b-445...|2016-05-25 00:00:00|c93d708f-f206-11e...|
|fa8f20c7-7b5c-4a3...|        true|f72e0ef0-7c4a-430...|2016-05-25 00:00:00|c7edbf0f-f206-11e...|
|de402bea-cf2a-483...|        true|f72e0ef0-7c4a-430...|2016-05-25 00:00:00|c7805740-f206-11e...|
|a07c4995-f198-4fc...|        true|c98949ae-60a8-43d...|2016-05-25 00:00:00|cebd854f-f206-11e...|
|9f336961-0f94-4e3...|        true|0e39ead4-383b-445...|2016-05-25 00:00:00|c89dae21-f206-11e...|
+--------------------+------------+--------------------+-------------------+--------------------+
only showing top 5 rows



### 3.2.a. Create the bucketed match_details_bucketed table📝

In [49]:
spark.sql("""DROP TABLE IF EXISTS bootcamp.match_details_bucketed""")

bucketed_details_ddl = """
CREATE TABLE IF NOT EXISTS bootcamp.match_details_bucketed (
        match_id STRING,
        player_gamertag STRING,
        player_total_kills INTEGER,
        player_total_deaths INTEGER
 )
USING iceberg
PARTITIONED BY (bucket(16, match_id));
"""
spark.sql(bucketed_details_ddl)

DataFrame[]

### 3.2.b. Populate the bucketed match_details_bucketed table

In [50]:
match_details_bucketed.select(
    "match_id", "player_gamertag", "player_total_kills", "player_total_deaths"
).write \
    .format("iceberg") \
    .mode("append") \
    .bucketBy(16, "match_id") \
    .saveAsTable("bootcamp.match_details_bucketed")

# Verify the data in the table
result = spark.sql("SELECT * FROM bootcamp.match_details_bucketed")
result.show(5)

+--------------------+---------------+------------------+-------------------+
|            match_id|player_gamertag|player_total_kills|player_total_deaths|
+--------------------+---------------+------------------+-------------------+
|f8852913-2ccf-46f...|    OneWingKing|                 7|                  6|
|155cfd23-4f97-4f1...|   BigChubSmith|                15|                 11|
|155cfd23-4f97-4f1...|  JakeWilson801|                18|                  9|
|155cfd23-4f97-4f1...|      taterbase|                 1|                 12|
|155cfd23-4f97-4f1...| BeyondHumanx39|                13|                 14|
+--------------------+---------------+------------------+-------------------+
only showing top 5 rows



### 3.3.a. Create the bucketed medals_matches_players table 🎖️

In [51]:
spark.sql("""DROP TABLE IF EXISTS bootcamp.medals_matches_players_bucketed""")

bucketed_medals_matches_players_ddl = """
CREATE TABLE IF NOT EXISTS bootcamp.medals_matches_players_bucketed (
    match_id STRING,
    player_gamertag STRING,
    medal_id BIGINT,
    COUNT INTEGER
 )
USING iceberg
PARTITIONED BY (bucket(16, match_id));
"""
spark.sql(bucketed_medals_matches_players_ddl)

DataFrame[]

### 3.3.b. Populate the bucketed medals_matches_players table

In [52]:
medals_matches_players_bucketed.select(
    "match_id", "player_gamertag", "medal_id", "COUNT"
).write \
    .format("iceberg") \
    .mode("append") \
    .bucketBy(16, "match_id") \
    .saveAsTable("bootcamp.medals_matches_players_bucketed")

# Verify the data in the table
result = spark.sql("SELECT * FROM bootcamp.medals_matches_players_bucketed")
result.show(5)

+--------------------+---------------+----------+-----+
|            match_id|player_gamertag|  medal_id|COUNT|
+--------------------+---------------+----------+-----+
|27d7c16b-b780-4f8...|       EcZachly| 824733727|    1|
|27d7c16b-b780-4f8...|       EcZachly|3261908037|    5|
|27d7c16b-b780-4f8...|       EcZachly|2078758684|    1|
|27d7c16b-b780-4f8...|       EcZachly|1573153198|    1|
|27d7c16b-b780-4f8...|       EcZachly|2782465081|    1|
+--------------------+---------------+----------+-----+
only showing top 5 rows



## 4. Aggregate the joined data frame to figure out questions 
- Which player averages the most kills per game?
- Which playlist gets played the most?
- Which map gets played the most?
- Which map do players get the most Killing Spree medals on?

In [53]:
spark.sql("""DROP TABLE IF EXISTS bootcamp.joined_matches""")

joined_matches_ddl = """
CREATE TABLE IF NOT EXISTS bootcamp.joined_matches (
        match_id STRING,
        player_gamertag STRING,
        player_total_kills INTEGER,
        player_total_deaths INTEGER,
        is_team_game BOOLEAN,
        playlist_id STRING,
        completion_date TIMESTAMP,
        mapid STRING,
        medal_id BIGINT,
        COUNT INTEGER   
 )
USING iceberg
PARTITIONED BY (bucket(16, match_id));
"""
spark.sql(joined_matches_ddl)

DataFrame[]

In [54]:
spark.sql("""
    SELECT * 
    FROM bootcamp.match_details_bucketed mdb 
    JOIN bootcamp.matches_bucketed md 
        ON mdb.match_id = md.match_id
        AND md.completion_date = DATE('2016-01-01')
    JOIN bootcamp.medals_matches_players_bucketed mmp 
        ON mdb.match_id = mmp.match_id
""").explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [match_id#47978], [match_id#47987], Inner
   :- SortMergeJoin [match_id#47978], [match_id#47982], Inner
   :  :- Sort [match_id#47978 ASC NULLS FIRST], false, 0
   :  :  +- Exchange hashpartitioning(match_id#47978, 200), ENSURE_REQUIREMENTS, [plan_id=17999]
   :  :     +- BatchScan demo.bootcamp.match_details_bucketed[match_id#47978, player_gamertag#47979, player_total_kills#47980, player_total_deaths#47981] demo.bootcamp.match_details_bucketed (branch=null) [filters=match_id IS NOT NULL, groupedBy=] RuntimeFilters: []
   :  +- Sort [match_id#47982 ASC NULLS FIRST], false, 0
   :     +- Exchange hashpartitioning(match_id#47982, 200), ENSURE_REQUIREMENTS, [plan_id=18000]
   :        +- BatchScan demo.bootcamp.matches_bucketed[match_id#47982, is_team_game#47983, playlist_id#47984, completion_date#47985, mapid#47986] demo.bootcamp.matches_bucketed (branch=null) [filters=completion_date IS NOT NULL, completion_date = 

In [55]:
joined_matches_query = """
    SELECT 
        mdb.match_id,
        mdb.player_gamertag,
        mdb.player_total_kills,
        mdb.player_total_deaths,
        md.is_team_game,
        md.playlist_id,
        md.completion_date,
        md.mapid,
        mmp.medal_id,
        mmp.COUNT    
    FROM bootcamp.match_details_bucketed mdb 
    JOIN bootcamp.matches_bucketed md 
        ON mdb.match_id = md.match_id
    JOIN bootcamp.medals_matches_players_bucketed mmp 
        ON mdb.match_id = mmp.match_id
"""

joined_matches = spark.sql(joined_matches_query)

In [56]:
joined_matches.select(
    "match_id", "player_gamertag", "player_total_kills", "player_total_deaths", \
"is_team_game", "playlist_id", "completion_date", "mapid", "medal_id", \
    "COUNT").write \
    .format("iceberg") \
    .mode("append") \
    .bucketBy(16, "match_id") \
    .saveAsTable("bootcamp.joined_matches")

# Verify the data in the table
result = spark.sql("""SELECT * FROM bootcamp.joined_matches""")
result.show(5)



+--------------------+---------------+------------------+-------------------+------------+--------------------+-------------------+--------------------+----------+-----+
|            match_id|player_gamertag|player_total_kills|player_total_deaths|is_team_game|         playlist_id|    completion_date|               mapid|  medal_id|COUNT|
+--------------------+---------------+------------------+-------------------+------------+--------------------+-------------------+--------------------+----------+-----+
|0377e616-bf8b-4a4...|     Snipe Envy|                14|                  8|        true|bc0f8ad6-31e6-4a1...|2016-04-23 00:00:00|cdee4e70-f206-11e...|2078758684|    2|
|0377e616-bf8b-4a4...|     Snipe Envy|                14|                  8|        true|bc0f8ad6-31e6-4a1...|2016-04-23 00:00:00|cdee4e70-f206-11e...| 848240062|    1|
|0377e616-bf8b-4a4...|     Snipe Envy|                14|                  8|        true|bc0f8ad6-31e6-4a1...|2016-04-23 00:00:00|cdee4e70-f206-11e..

                                                                                

## Broadcast JOINs medals and maps

In [57]:
from pyspark.sql.functions import broadcast

In [59]:
# Broadcast matches with maps
matches_with_maps = joined_matches \
    .join(broadcast(maps), on="mapid", how="inner")
matches_with_maps.show(5)

                                                                                

+--------------------+--------------------+---------------+------------------+-------------------+------------+--------------------+-------------------+----------+-----+------+--------------------+
|               mapid|            match_id|player_gamertag|player_total_kills|player_total_deaths|is_team_game|         playlist_id|    completion_date|  medal_id|COUNT|  name|         description|
+--------------------+--------------------+---------------+------------------+-------------------+------------+--------------------+-------------------+----------+-----+------+--------------------+
|cc040aa1-f206-11e...|00169217-cca6-4b4...|  King Terror V|                14|                  7|        true|2323b76a-db98-4e0...|2016-03-13 00:00:00|3261908037|   11|Fathom|The UNSC explores...|
|cc040aa1-f206-11e...|00169217-cca6-4b4...|  King Terror V|                14|                  7|        true|2323b76a-db98-4e0...|2016-03-13 00:00:00|3001183151|    1|Fathom|The UNSC explores...|
|cc040aa1-

In [60]:
# Broadcast matches with medals
matches_with_medals = joined_matches \
    .join(broadcast(medals), on="medal_id", how="inner")
matches_with_medals.show(5)

                                                                                

+----------+--------------------+---------------+------------------+-------------------+------------+--------------------+-------------------+--------------------+-----+--------------------+-----------+----------+------------------+-------------------+------------+-------------+-----------------+--------------------+-------------+----------+
|  medal_id|            match_id|player_gamertag|player_total_kills|player_total_deaths|is_team_game|         playlist_id|    completion_date|               mapid|COUNT|          sprite_uri|sprite_left|sprite_top|sprite_sheet_width|sprite_sheet_height|sprite_width|sprite_height|   classification|         description|         name|difficulty|
+----------+--------------------+---------------+------------------+-------------------+------------+--------------------+-------------------+--------------------+-----+--------------------+-----------+----------+------------------+-------------------+------------+-------------+-----------------+---------------

### a. Player with max kills per game?

In [61]:
max_kills_query = """
with raw as (
    select player_gamertag
    , player_total_kills
    from  bootcamp.joined_matches jm
)

, total_kills_per_player as (
select player_gamertag
, sum(player_total_kills) as count_k
from raw
group by 1
)

select * from total_kills_per_player 
order by count_k desc 
limit 1
"""

max_kills = spark.sql(max_kills_query)
max_kills.show()

+---------------+-------+
|player_gamertag|count_k|
+---------------+-------+
|       EcZachly|1503498|
+---------------+-------+



### b. Playlist played the most?

In [62]:
max_plays_query = """
with raw as (
    select playlist_id
    , match_id
    from  bootcamp.joined_matches jm
)

, total_matches_per_playlist as (
select playlist_id
, count(match_id) as count_m
from raw
group by 1
)

select * from total_matches_per_playlist 
order by count_m desc 
limit 1
"""

spark.sql(max_plays_query).show()

+--------------------+-------+
|         playlist_id|count_m|
+--------------------+-------+
|f72e0ef0-7c4a-430...|1565529|
+--------------------+-------+



                                                                                

### c. Map played the most?

In [63]:
raw_df = matches_with_maps.select("mapid", "name", "match_id").distinct()

# Step 2: Group by mapid and name, count distinct match IDs
total_matches_per_map_df = raw_df.groupBy("mapid", "name") \
    .agg({"match_id": "count"}) \
    .withColumnRenamed("count(match_id)", "count_m")

# Step 3: Get the map with the most unique matches
top_map_df = total_matches_per_map_df.orderBy("count_m", ascending=False).limit(1)

# Step 4: Display the result
top_map_df.show()



+--------------------+--------------+-------+
|               mapid|          name|count_m|
+--------------------+--------------+-------+
|c7edbf0f-f206-11e...|Breakout Arena|   7032|
+--------------------+--------------+-------+



                                                                                

### d. Which map do players get the most Killing Spree medals on?

In [64]:
from pyspark.sql.functions import col

killing_spree_df = matches_with_medals.withColumnRenamed("name", "medal_name").filter(col("medal_name") == "Killing Spree")

# Step 2: Select distinct combinations of mapid and match_id
unique_killing_spree_matches_df = killing_spree_df.select("mapid", "medal_name", "match_id").distinct()

# Step 3: Group by mapid and name, and count distinct match_id's
total_unique_matches_per_map_df = unique_killing_spree_matches_df.groupBy("mapid", "medal_name") \
    .agg({"match_id": "count"}) \
    .withColumnRenamed("count(match_id)", "unique_match_count")

total_unique_matches_with_name_df = total_unique_matches_per_map_df \
    .join(broadcast(maps), on="mapid", how="inner") \
    .select("name", "mapid", "medal_name", "unique_match_count")

# Step 5: Get the map with the most unique match IDs for "Killing Spree" medals
top_map_df = total_unique_matches_with_name_df.orderBy("unique_match_count", ascending=False).limit(1)

# Step 6: Display the result
top_map_df.show()

[Stage 2304:>                                                       (0 + 6) / 6]

+--------------+--------------------+-------------+------------------+
|          name|               mapid|   medal_name|unique_match_count|
+--------------+--------------------+-------------+------------------+
|Breakout Arena|c7edbf0f-f206-11e...|Killing Spree|              4917|
+--------------+--------------------+-------------+------------------+



                                                                                

## 5. Sort the aggregated dataset

In [65]:
%%sql

CREATE TABLE IF NOT EXISTS bootcamp.matches_unsorted (
    match_id STRING,
    player_gamertag STRING,
    player_total_kills INTEGER,
    player_total_deaths INTEGER,
    is_team_game BOOLEAN,
    playlist_id STRING,
    completion_date TIMESTAMP,
    mapid STRING,
    medal_id BIGINT,
    COUNT INTEGER   
)
USING iceberg
PARTITIONED BY (year(completion_date));

24/12/30 13:54:07 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [66]:
%%sql

CREATE TABLE IF NOT EXISTS bootcamp.matches_sorted (
    match_id STRING,
    player_gamertag STRING,
    player_total_kills INTEGER,
    player_total_deaths INTEGER,
    is_team_game BOOLEAN,
    playlist_id STRING,
    completion_date TIMESTAMP,
    mapid STRING,
    medal_id BIGINT,
    COUNT INTEGER   
)
USING iceberg
PARTITIONED BY (year(completion_date));

In [75]:
%%sql

CREATE TABLE IF NOT EXISTS bootcamp.matches_sorted_v2 (
    match_id STRING,
    player_gamertag STRING,
    player_total_kills INTEGER,
    player_total_deaths INTEGER,
    is_team_game BOOLEAN,
    playlist_id STRING,
    completion_date TIMESTAMP,
    mapid STRING,
    medal_id BIGINT,
    COUNT INTEGER   
)
USING iceberg
PARTITIONED BY (year(completion_date));

In [71]:
#Unsorted data, partition on completion date
partition_df = joined_matches.repartition(10, col("completion_date"))
partition_df.explain()
partition_df.write.mode("overwrite").saveAsTable("bootcamp.matches_unsorted")

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Exchange hashpartitioning(completion_date#48039, 10), REPARTITION_BY_NUM, [plan_id=21213]
   +- Project [match_id#48032, player_gamertag#48033, player_total_kills#48034, player_total_deaths#48035, is_team_game#48037, playlist_id#48038, completion_date#48039, mapid#48040, medal_id#48043L, COUNT#48044]
      +- SortMergeJoin [match_id#48032], [match_id#48041], Inner
         :- Project [match_id#48032, player_gamertag#48033, player_total_kills#48034, player_total_deaths#48035, is_team_game#48037, playlist_id#48038, completion_date#48039, mapid#48040]
         :  +- SortMergeJoin [match_id#48032], [match_id#48036], Inner
         :     :- Sort [match_id#48032 ASC NULLS FIRST], false, 0
         :     :  +- Exchange hashpartitioning(match_id#48032, 200), ENSURE_REQUIREMENTS, [plan_id=21201]
         :     :     +- BatchScan demo.bootcamp.match_details_bucketed[match_id#48032, player_gamertag#48033, player_total_kills#48034, player_

                                                                                

In [72]:
#Sorted on completion date and player gametag
sorted_df = joined_matches.repartition(10, col("completion_date"))\
    .sortWithinPartitions(col("completion_date"), col("player_gamertag"))
sorted_df.explain()
sorted_df.write.mode("overwrite").saveAsTable("bootcamp.matches_sorted")

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [completion_date#48039 ASC NULLS FIRST, player_gamertag#48033 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(completion_date#48039, 10), REPARTITION_BY_NUM, [plan_id=21553]
      +- Project [match_id#48032, player_gamertag#48033, player_total_kills#48034, player_total_deaths#48035, is_team_game#48037, playlist_id#48038, completion_date#48039, mapid#48040, medal_id#48043L, COUNT#48044]
         +- SortMergeJoin [match_id#48032], [match_id#48041], Inner
            :- Project [match_id#48032, player_gamertag#48033, player_total_kills#48034, player_total_deaths#48035, is_team_game#48037, playlist_id#48038, completion_date#48039, mapid#48040]
            :  +- SortMergeJoin [match_id#48032], [match_id#48036], Inner
            :     :- Sort [match_id#48032 ASC NULLS FIRST], false, 0
            :     :  +- Exchange hashpartitioning(match_id#48032, 200), ENSURE_REQUIREMENTS, [plan_id=21541]
            :     :     +

                                                                                

In [None]:
#Sorted on lowest cardinality to highest cardinality
sorted_df_2 = joined_matches.repartition(10, col("completion_date"))\
    .sortWithinPartitions(col("completion_date"), col("player_gamertag"), col("match_id"))
sorted_df_2.explain()
sorted_df_2.write.mode("overwrite").saveAsTable("bootcamp.matches_sorted_v2")

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [completion_date#48039 ASC NULLS FIRST, player_gamertag#48033 ASC NULLS FIRST, match_id#48032 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(completion_date#48039, 10), REPARTITION_BY_NUM, [plan_id=22110]
      +- Project [match_id#48032, player_gamertag#48033, player_total_kills#48034, player_total_deaths#48035, is_team_game#48037, playlist_id#48038, completion_date#48039, mapid#48040, medal_id#48043L, COUNT#48044]
         +- SortMergeJoin [match_id#48032], [match_id#48041], Inner
            :- Project [match_id#48032, player_gamertag#48033, player_total_kills#48034, player_total_deaths#48035, is_team_game#48037, playlist_id#48038, completion_date#48039, mapid#48040]
            :  +- SortMergeJoin [match_id#48032], [match_id#48036], Inner
            :     :- Sort [match_id#48032 ASC NULLS FIRST], false, 0
            :     :  +- Exchange hashpartitioning(match_id#48032, 200), ENSURE_REQUIREMENTS, [plan_id=

[Stage 2384:>                                                      (0 + 6) / 10]

In [None]:
%%sql

SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'sorted v2' 
FROM demo.bootcamp.matches_sorted_v2.files

UNION ALL
SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'sorted' 
FROM demo.bootcamp.matches_sorted.files

UNION ALL
SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'unsorted' 
FROM demo.bootcamp.matches_unsorted.files