In [1]:
# Initialisation and Creation of source DFs
from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast, split, lit, avg, col

spark = SparkSession.builder.appName("Assignment").getOrCreate()

spark

match_details = spark.read.option("header", "true").option("inferSchema", "true").csv("/home/iceberg/data/match_details.csv")

matches = spark.read.option("header", "true").option("inferSchema", "true").csv("/home/iceberg/data/matches.csv").withColumnRenamed('mapid', 'map_id')

maps = spark.read.option("header", "true").option("inferSchema", "true").csv("/home/iceberg/data/maps.csv").withColumnRenamed('mapid', 'map_id')

medals = spark.read.option("header", "true").csv("/home/iceberg/data/medals.csv")

medals_matches_players = spark.read.option("header", "true").option("inferSchema", "true").csv("/home/iceberg/data/medals_matches_players.csv")


24/12/25 07:39:29 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                

In [2]:
# Create bootcamp database
spark.sql("""CREATE DATABASE IF NOT EXISTS bootcamp""")

DataFrame[]

In [3]:
# Enable Broadcast Join
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "1000000000000") 

# Broadcast Join of Map with Matches on map_id
matches_df = matches.join(broadcast(maps),"map_id", "inner")

# Broadcast Join of Medals with Medals_Matches_Players on medal_id
medals_matches_players_df = medals_matches_players.join(broadcast(medals), "medal_id", "inner")

# Disable Broadcast Join
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

In [4]:
# Matches
spark.sql("""DROP TABLE IF EXISTS bootcamp.matches_bucketed""")

matches_bucketedDDL = """
CREATE TABLE IF NOT EXISTS bootcamp.matches_bucketed (
    match_id STRING,
    map_id STRING,
    name STRING,
    description STRING,
    is_team_game BOOLEAN,
    playlist_id STRING,
    completion_date DATE,
    match_duration STRING
 )
 USING iceberg
 PARTITIONED BY (bucket(16, match_id));
 """

spark.sql(matches_bucketedDDL)

matches_df.select(col("match_id"), col("map_id"), col("name"), col("description"), col("is_team_game"), col("playlist_id"), col("completion_date"), col("match_duration")) \
        .write \
        .mode("append") \
        .bucketBy(16, "match_id") \
        .saveAsTable("bootcamp.matches_bucketed")




                                                                                

In [5]:
spark.sql("""SELECT * FROM bootcamp.matches_bucketed""").collect()

[Row(match_id='f44c9997-eb6f-4d62-bbd4-241351d84f5c', map_id='ce1dc2de-f206-11e4-a646-24be05e24f7e', name='Truth', description=None, is_team_game=True, playlist_id='0504ca3c-de41-48f3-b9c8-3aab534d69e5', completion_date=datetime.date(2016, 2, 28), match_duration=None),
 Row(match_id='f0f2daf2-52f3-4ff9-bb2d-ff348270cde2', map_id='cbcea2c0-f206-11e4-8c4a-24be05e24f7e', name='Riptide\xa0', description='The waters of this planet have receded', is_team_game=None, playlist_id='2323b76a-db98-4e03-aa37-e171cfbdd1a4', completion_date=datetime.date(2016, 2, 4), match_duration=None),
 Row(match_id='8aec419e-2bfa-4fc1-923e-5a7a4e0018b3', map_id='c7edbf0f-f206-11e4-aa52-24be05e24f7e', name='Breakout Arena', description='The broadcast of Breakout matches has proven immensely popular with the UNSC Infinity crew.', is_team_game=True, playlist_id='f72e0ef0-7c4a-4307-af78-8e38dac3fdba', completion_date=datetime.date(2016, 1, 7), match_duration=None),
 Row(match_id='c6f24b65-bb73-4890-8f35-ff13c7dffaaa'

In [6]:
# Match Details

spark.sql("""DROP TABLE IF EXISTS bootcamp.match_details_bucketed""")

match_details_bucketedDDL = """
 CREATE TABLE IF NOT EXISTS bootcamp.match_details_bucketed (
     match_id STRING,
     player_gamertag STRING,
     player_total_kills INTEGER,
     player_total_deaths INTEGER
 )
 USING iceberg
 PARTITIONED BY (bucket(16, match_id));
"""

spark.sql(match_details_bucketedDDL)

match_details.select(col("match_id"), col("player_gamertag"), col("player_total_kills"), col("player_total_deaths")) \
     .write.mode("append") \
     .bucketBy(16, "match_id").saveAsTable("bootcamp.match_details_bucketed")

In [7]:
spark.sql("""SELECT * FROM bootcamp.match_details_bucketed""").collect()

[Row(match_id='f8852913-2ccf-46f6-a546-5578036144a9', player_gamertag='OneWingKing', player_total_kills=7, player_total_deaths=6),
 Row(match_id='155cfd23-4f97-4f1b-b491-02cb5e7563b3', player_gamertag='BigChubSmith', player_total_kills=15, player_total_deaths=11),
 Row(match_id='155cfd23-4f97-4f1b-b491-02cb5e7563b3', player_gamertag='JakeWilson801', player_total_kills=18, player_total_deaths=9),
 Row(match_id='155cfd23-4f97-4f1b-b491-02cb5e7563b3', player_gamertag='taterbase', player_total_kills=1, player_total_deaths=12),
 Row(match_id='155cfd23-4f97-4f1b-b491-02cb5e7563b3', player_gamertag='BeyondHumanx39', player_total_kills=13, player_total_deaths=14),
 Row(match_id='155cfd23-4f97-4f1b-b491-02cb5e7563b3', player_gamertag='Twinsnakes05', player_total_kills=16, player_total_deaths=11),
 Row(match_id='155cfd23-4f97-4f1b-b491-02cb5e7563b3', player_gamertag='Maverick62011', player_total_kills=9, player_total_deaths=14),
 Row(match_id='155cfd23-4f97-4f1b-b491-02cb5e7563b3', player_gamert

In [8]:
# Medals Matches Players

spark.sql("""DROP TABLE IF EXISTS bootcamp.medals_matches_players_bucketed""")

medals_matches_players_bucketedDDL = """
CREATE TABLE IF NOT EXISTS bootcamp.medals_matches_players_bucketed (
    match_id STRING,
    player_gamertag STRING,
    medal_id STRING,
    classification STRING,
    description STRING,
    difficulty STRING,
    count INTEGER
)
 USING iceberg
 PARTITIONED BY (bucket(16, match_id));
"""

spark.sql(medals_matches_players_bucketedDDL)

medals_matches_players_df.select(col("match_id"),col("player_gamertag"),col("medal_id"), col("classification"), col("description"), col("difficulty"), col("count")) \
                      .write.mode("append") \
                      .bucketBy(16, "match_id").saveAsTable("bootcamp.medals_matches_players_bucketed")

In [9]:
# Join medals_matches_players, match_details, matches on bucketed 16

md_df = spark.table("bootcamp.match_details_bucketed")
m_df = spark.table("bootcamp.matches_bucketed")
mmp_df = spark.table("bootcamp.medals_matches_players_bucketed")

result_df = (
    md_df.alias("mdb")
    .join(
        m_df.alias("md"),
        on=md_df.match_id == m_df.match_id,
    )
    .join(
        mmp_df.alias("mmp"),
        on=(
            (md_df.match_id == mmp_df.match_id)
            & (md_df.player_gamertag == mmp_df.player_gamertag)
        ),
    )
    # Select the required columns
    .select(
        "mdb.match_id",
        "mdb.player_gamertag",
        "mdb.player_total_kills",
        "mdb.player_total_deaths",
        "md.playlist_id",
        "md.map_id",
        "md.name",
        "md.completion_date",
        "mmp.medal_id",
        "mmp.classification",
        "mmp.count",
    )
)

result_df.show()


# spark.sql("""
#     SELECT 
#     mdb.match_id, mdb.player_gamertag, mdb.player_total_kills, mdb.player_total_deaths, md.playlist_id, md.completion_date, mmp.medal_id, mmp.count AS medal_count
#     FROM bootcamp.match_details_bucketed mdb
#     JOIN bootcamp.matches_bucketed md
#     ON mdb.match_id = md.match_id
#     JOIN bootcamp.medals_matches_players_bucketed mmp
#     ON mdb.match_id = mmp.match_id AND mdb.player_gamertag = mmp.player_gamertag
# """)

+--------------------+---------------+------------------+-------------------+--------------------+--------------------+-------+---------------+----------+-----------------+-----+
|            match_id|player_gamertag|player_total_kills|player_total_deaths|         playlist_id|              map_id|   name|completion_date|  medal_id|   classification|count|
+--------------------+---------------+------------------+-------------------+--------------------+--------------------+-------+---------------+----------+-----------------+-----+
|0001a1c4-83dc-4f4...|    ILLICIT 117|                23|                 28|780cc101-005c-4fc...|c7805740-f206-11e...|Glacier|     2016-01-06|3565443938|      Strongholds|    4|
|0001a1c4-83dc-4f4...|    ILLICIT 117|                23|                 28|780cc101-005c-4fc...|c7805740-f206-11e...|Glacier|     2016-01-06|3261908037|WeaponProficiency|    8|
|0001a1c4-83dc-4f4...|    ILLICIT 117|                23|                 28|780cc101-005c-4fc...|c780574

In [10]:
# Create aggregated table for reference

spark.sql("""DROP TABLE IF EXISTS bootcamp.aggregated_table""")

agg_table_DDL = """ 
CREATE TABLE IF NOT EXISTS bootcamp.aggregated_table (
    match_id STRING, 
    player_gamertag STRING,
    player_total_kills INTEGER,
    player_total_deaths INTEGER,
    playlist_id STRING,
    map_id STRING,
    name STRING,
    completion_date DATE,
    medal_id STRING,
    classification STRING,
    count INTEGER
)
"""

spark.sql(agg_table_DDL)

result_df.select("*") \
         .write.mode("append") \
         .saveAsTable("bootcamp.aggregated_table")


                                                                                

In [11]:
#Which player averages the most kills per game? 
most_kills_per_game = result_df.groupBy(["player_gamertag", "match_id"]) \
                               .agg({"player_total_kills": "avg"}) \
                               .orderBy("avg(player_total_kills)",ascending=False) \
                               .show(n=1)


+---------------+--------------------+-----------------------+
|player_gamertag|            match_id|avg(player_total_kills)|
+---------------+--------------------+-----------------------+
|   gimpinator14|acf0e47e-20ac-4b1...|                  109.0|
+---------------+--------------------+-----------------------+
only showing top 1 row



In [12]:
#Which playlist gets played the most?
most_played_playlist = result_df.groupBy(["playlist_id"]) \
                                .count() \
                                .orderBy("count",ascending=False) \
                                .show(n=1)

+--------------------+------+
|         playlist_id| count|
+--------------------+------+
|f72e0ef0-7c4a-430...|202489|
+--------------------+------+
only showing top 1 row



In [13]:
#Which map gets played the most?
most_played_map = result_df.groupBy(["map_id", "name"]) \
                           .count() \
                           .orderBy("count",ascending=False) \
                           .show(n=1)

+--------------------+--------------+------+
|              map_id|          name| count|
+--------------------+--------------+------+
|c7edbf0f-f206-11e...|Breakout Arena|186118|
+--------------------+--------------+------+
only showing top 1 row



In [14]:
#Which map do players get the most killing spree medals on? 
most_killing_spree = result_df.groupBy(["map_id", "name", "classification"]) \
                              .agg({"count": "sum"}) \
                              .filter(result_df.classification == "KillingSpree") \
                              .orderBy("sum(count)", ascending=False) \
                              .show(n=1)

# Corresponding SQL
# spark.sql(""" 
# SELECT 
# map_id, name, classification, SUM(count) AS total_KS_medals
# FROM 
# bootcamp.aggregated_table
# GROUP BY map_id, name, classification
# HAVING classification = 'KillingSpree'
# ORDER BY total_ks_medals DESC
# """).show()

24/12/25 07:39:42 WARN DataSourceV2Strategy: Can't translate true to source filter, unsupported expression


+--------------------+--------------+--------------+----------+
|              map_id|          name|classification|sum(count)|
+--------------------+--------------+--------------+----------+
|c7edbf0f-f206-11e...|Breakout Arena|  KillingSpree|      6919|
+--------------------+--------------+--------------+----------+
only showing top 1 row



In [15]:
#DDL for Unsorted Agg Table
spark.sql("""DROP TABLE IF EXISTS bootcamp.unsorted_agg_table""")

unsorted_agg_table = """ 
CREATE TABLE IF NOT EXISTS bootcamp.unsorted_agg_table (
    match_id STRING, 
    player_gamertag STRING,
    player_total_kills INTEGER,
    player_total_deaths INTEGER,
    playlist_id STRING,
    map_id STRING,
    name STRING,
    completion_date DATE,
    medal_id STRING,
    classification STRING,
    count INTEGER
)
"""

spark.sql(unsorted_agg_table)

result_df.select("*") \
         .write.mode("append") \
         .saveAsTable("bootcamp.unsorted_agg_table")

                                                                                

In [16]:
#DDL for Sorted Agg Table
spark.sql("""DROP TABLE IF EXISTS bootcamp.sorted_agg_table""")

sorted_agg_table = """ 
CREATE TABLE IF NOT EXISTS bootcamp.sorted_agg_table (
    match_id STRING, 
    player_gamertag STRING,
    player_total_kills INTEGER,
    player_total_deaths INTEGER,
    playlist_id STRING,
    map_id STRING,
    name STRING,
    completion_date DATE,
    medal_id STRING,
    classification STRING,
    count INTEGER
)
"""

spark.sql(sorted_agg_table)

result_df.select("*") \
         .write.mode("append") \
         .saveAsTable("bootcamp.sorted_agg_table")

In [18]:
#Different Partition
start_df = result_df.repartition(4, col("playlist_id"))
    
first_sort_df = start_df.sortWithinPartitions(col("playlist_id"), col("map_id"))

start_df.write.mode("overwrite").saveAsTable("bootcamp.unsorted_agg_table")
first_sort_df.write.mode("overwrite").saveAsTable("bootcamp.sorted_agg_table")

In [20]:
spark.sql("""SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'sorted' 
FROM demo.bootcamp.sorted_agg_table.files
UNION ALL
SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'unsorted' 
FROM demo.bootcamp.unsorted_agg_table.files
""").show()

+-------+---------+--------+
|   size|num_files|  sorted|
+-------+---------+--------+
|4071612|        4|  sorted|
|4740813|        4|unsorted|
+-------+---------+--------+

