### Imports and load csv dataframes

In [1]:
from pathlib import Path
from pyspark.sql import SparkSession

# Initialize Spark session
spark = (
    SparkSession.builder.appName("HomeWork3")
    .config("spark.executor.memory", "8g")
    .config("spark.driver.memory", "8g")
    .config("spark.sql.shuffle.partitions", "200") # Fine for large datasets
    .config("spark.sql.files.maxPartitionBytes", "134217728") # Optional: 128 MB is default
    .config("spark.sql.autoBroadcastJoinThreshold", "-1") # Optional: Disable broadcast join
    .config("spark.dynamicAllocation.enabled", "true") # Helps with resource allocation
    .config("spark.dynamicAllocation.minExecutors", "1") # Ensure minimum resources
    .config("spark.dynamicAllocation.maxExecutors", "50") # Scalable resource allocation
    .getOrCreate()
    )


24/12/10 22:50:59 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [2]:
# Paths to your CSV files
data_folder = Path("/home/iceberg/data")

match_details_path = f"{data_folder}/match_details.csv"
matches_path = f"{data_folder}/matches.csv"
medals_matches_players_path = f"{data_folder}/medals_matches_players.csv"
medals_path = f"{data_folder}/medals.csv"
maps_path = f"{data_folder}/maps.csv"

In [3]:
# Reading CSV files into DataFrames
match_details_df = spark.read.csv(match_details_path, header=True, inferSchema=True)
matches_df = spark.read.csv(matches_path, header=True, inferSchema=True)
medals_matches_players_df = spark.read.csv(medals_matches_players_path, header=True, inferSchema=True)
medals_df = spark.read.csv(medals_path, header=True, inferSchema=True)
maps_df = spark.read.csv(maps_path, header=True, inferSchema=True)

In [4]:
# Register DataFrames as temporary views or write them to Delta tables
match_details_df.createOrReplaceTempView("match_details")
matches_df.createOrReplaceTempView("matches")
medals_matches_players_df.createOrReplaceTempView("medals_matches_players")
medals_df.createOrReplaceTempView("medals")
maps_df.createOrReplaceTempView("maps")

24/12/10 22:51:05 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [5]:
matches_df.show()

+--------------------+--------------------+------------+--------------------+--------------------+-------------+-------------------+--------------+---------+--------------------+
|            match_id|               mapid|is_team_game|         playlist_id|     game_variant_id|is_match_over|    completion_date|match_duration|game_mode|      map_variant_id|
+--------------------+--------------------+------------+--------------------+--------------------+-------------+-------------------+--------------+---------+--------------------+
|11de1a94-8d07-416...|c7edbf0f-f206-11e...|        true|f72e0ef0-7c4a-430...|1e473914-46e4-408...|         true|2016-02-22 00:00:00|          NULL|     NULL|                NULL|
|d3643e71-3e51-43e...|cb914b9e-f206-11e...|       false|d0766624-dbd7-453...|257a305e-4dd3-41f...|         true|2016-02-14 00:00:00|          NULL|     NULL|                NULL|
|d78d2aae-36e4-48a...|c7edbf0f-f206-11e...|        true|f72e0ef0-7c4a-430...|1e473914-46e4-408...|       

In [6]:
medals_df.show()

+----------+--------------------+-----------+----------+------------------+-------------------+------------+-------------+-----------------+--------------------+--------------+----------+
|  medal_id|          sprite_uri|sprite_left|sprite_top|sprite_sheet_width|sprite_sheet_height|sprite_width|sprite_height|   classification|         description|          name|difficulty|
+----------+--------------------+-----------+----------+------------------+-------------------+------------+-------------+-----------------+--------------------+--------------+----------+
|2315448068|                NULL|       NULL|      NULL|              NULL|               NULL|        NULL|         NULL|             NULL|                NULL|          NULL|      NULL|
|3565441934|                NULL|       NULL|      NULL|              NULL|               NULL|        NULL|         NULL|             NULL|                NULL|          NULL|      NULL|
|4162659350|https://content.h...|        750|       750|    

In [7]:
maps_df.show()

+--------------------+-------------------+--------------------+
|               mapid|               name|         description|
+--------------------+-------------------+--------------------+
|c93d708f-f206-11e...|              Urban|Andesia was the c...|
|cb251c51-f206-11e...|     Raid on Apex 7|This unbroken rin...|
|c854e54f-f206-11e...|March on Stormbreak|                NULL|
|c8d69870-f206-11e...| Escape from A.R.C.|Scientists flocke...|
|73ed1fd0-45e5-4bb...|             Osiris|                NULL|
|96c3e3dd-7703-408...|          Blue Team|                NULL|
|1c4f8e19-b046-4f7...|            Glassed|                NULL|
|825065cf-df57-42e...|        Unconfirmed|                NULL|
|9a188f67-1664-4d7...|           Alliance|                NULL|
|2702ea83-2c3e-4fd...|   Before the Storm|                NULL|
|82f8471c-a2ef-408...|            Genesis|                NULL|
|fcd7caa4-37c9-436...|       The Breaking|                NULL|
|7dc80b62-dd39-41d...|          Guardian

In [8]:
medals_matches_players_df.show()

+--------------------+---------------+----------+-----+
|            match_id|player_gamertag|  medal_id|count|
+--------------------+---------------+----------+-----+
|009fdac5-e15c-47c...|       EcZachly|3261908037|    7|
|009fdac5-e15c-47c...|       EcZachly| 824733727|    2|
|009fdac5-e15c-47c...|       EcZachly|2078758684|    2|
|009fdac5-e15c-47c...|       EcZachly|2782465081|    2|
|9169d1a3-955c-4ea...|       EcZachly|3001183151|    1|
|9169d1a3-955c-4ea...|       EcZachly|3565443938|    6|
|9169d1a3-955c-4ea...|       EcZachly|3491849182|    1|
|4a078b2f-65eb-4c6...|       EcZachly|3261908037|    8|
|9169d1a3-955c-4ea...|       EcZachly|2105198095|    6|
|9169d1a3-955c-4ea...|       EcZachly|2916014239|    3|
|9169d1a3-955c-4ea...|       EcZachly|3261908037|    6|
|9169d1a3-955c-4ea...|       EcZachly|1351381581|    2|
|9169d1a3-955c-4ea...|       EcZachly|2838259753|    1|
|9169d1a3-955c-4ea...|       EcZachly|3354395650|    1|
|9169d1a3-955c-4ea...|       EcZachly| 298813630

match_details_df.show()

### 1. Explicitly broadcast JOINs `medals` and `maps`

In [9]:
# Check the size to know which table is smaller and to be used in the broadcast
medals_matches_players_df.count(), medals_df.count()

(755229, 183)

In [10]:
matches_df.count(), maps_df.count()

(24025, 40)

In [11]:
from pyspark.sql.functions import broadcast

In [12]:
joined_medals_with_players = medals_matches_players_df.join(broadcast(medals_df), medals_matches_players_df["medal_id"] == medals_df["medal_id"], "inner")

In [13]:
joined_medals_with_players.show()

+--------------------+---------------+----------+-----+----------+--------------------+-----------+----------+------------------+-------------------+------------+-------------+-----------------+--------------------+-------------------+----------+
|            match_id|player_gamertag|  medal_id|count|  medal_id|          sprite_uri|sprite_left|sprite_top|sprite_sheet_width|sprite_sheet_height|sprite_width|sprite_height|   classification|         description|               name|difficulty|
+--------------------+---------------+----------+-----+----------+--------------------+-----------+----------+------------------+-------------------+------------+-------------+-----------------+--------------------+-------------------+----------+
|009fdac5-e15c-47c...|       EcZachly|3261908037|    7|3261908037|https://content.h...|        375|       525|                74|                 74|        1125|          899|WeaponProficiency|Kill an opponent ...|           Headshot|        60|
|009fdac5-e1

In [14]:
joined_matches_with_map = matches_df.join(broadcast(maps_df), matches_df["mapid"] == maps_df["mapid"], "inner")

In [15]:
joined_matches_with_map.show()

+--------------------+--------------------+------------+--------------------+--------------------+-------------+-------------------+--------------+---------+--------------------+--------------------+--------------+--------------------+
|            match_id|               mapid|is_team_game|         playlist_id|     game_variant_id|is_match_over|    completion_date|match_duration|game_mode|      map_variant_id|               mapid|          name|         description|
+--------------------+--------------------+------------+--------------------+--------------------+-------------+-------------------+--------------+---------+--------------------+--------------------+--------------+--------------------+
|11de1a94-8d07-416...|c7edbf0f-f206-11e...|        true|f72e0ef0-7c4a-430...|1e473914-46e4-408...|         true|2016-02-22 00:00:00|          NULL|     NULL|                NULL|c7edbf0f-f206-11e...|Breakout Arena|The broadcast of ...|
|d3643e71-3e51-43e...|cb914b9e-f206-11e...|       false|

In [16]:
# check the sizes
joined_medals_with_players.count(), joined_matches_with_map.count()

(755229, 24025)

In [17]:
joined_df = joined_medals_with_players.join(broadcast(joined_matches_with_map), joined_medals_with_players["match_id"] == joined_matches_with_map["match_id"], "inner")

### 2. Bucket join `match_details`, `matches`, and `medal_matches_players` on `match_id` with `16` buckets


In [18]:
# Save bucketed tables
match_details_df.write.format("parquet") \
    .bucketBy(16, "match_id") \
    .mode("overwrite") \
    .saveAsTable("bootcamp.bucketed_match_details")

matches_df.write.format("parquet") \
    .bucketBy(16, "match_id") \
    .mode("overwrite") \
    .saveAsTable("bootcamp.bucketed_matches")

medals_matches_players_df.write.format("parquet") \
    .bucketBy(16, "match_id") \
    .mode("overwrite") \
    .saveAsTable("bootcamp.bucketed_medals_matches_players")

In [19]:
# Load bucketed tables (if needed for future queries)
bucketed_match_details = spark.table("bootcamp.bucketed_match_details")
bucketed_matches = spark.table("bootcamp.bucketed_matches")
bucketed_medals_matches_players = spark.table("bootcamp.bucketed_medals_matches_players")

In [20]:
# Perform the join on bucketed tables
bucket_joined_df = bucketed_match_details.join(
    bucketed_matches, "match_id", "inner"
).join(
    bucketed_medals_matches_players, ["match_id", "player_gamertag"], "inner"
)

In [21]:
# Show the result of the join
bucket_joined_df.count()

751134

In [39]:
joined_df.count()

755229

In [22]:
bucket_joined_df.columns

['match_id',
 'player_gamertag',
 'previous_spartan_rank',
 'spartan_rank',
 'previous_total_xp',
 'total_xp',
 'previous_csr_tier',
 'previous_csr_designation',
 'previous_csr',
 'previous_csr_percent_to_next_tier',
 'previous_csr_rank',
 'current_csr_tier',
 'current_csr_designation',
 'current_csr',
 'current_csr_percent_to_next_tier',
 'current_csr_rank',
 'player_rank_on_team',
 'player_finished',
 'player_average_life',
 'player_total_kills',
 'player_total_headshots',
 'player_total_weapon_damage',
 'player_total_shots_landed',
 'player_total_melee_kills',
 'player_total_melee_damage',
 'player_total_assassinations',
 'player_total_ground_pound_kills',
 'player_total_shoulder_bash_kills',
 'player_total_grenade_damage',
 'player_total_power_weapon_damage',
 'player_total_power_weapon_grabs',
 'player_total_deaths',
 'player_total_assists',
 'player_total_grenade_kills',
 'did_win',
 'team_id',
 'mapid',
 'is_team_game',
 'playlist_id',
 'game_variant_id',
 'is_match_over',
 'com

In [40]:
joined_df.columns

['match_id',
 'player_gamertag',
 'medal_id',
 'count',
 'medal_id',
 'sprite_uri',
 'sprite_left',
 'sprite_top',
 'sprite_sheet_width',
 'sprite_sheet_height',
 'sprite_width',
 'sprite_height',
 'classification',
 'description',
 'name',
 'difficulty',
 'match_id',
 'mapid',
 'is_team_game',
 'playlist_id',
 'game_variant_id',
 'is_match_over',
 'completion_date',
 'match_duration',
 'game_mode',
 'map_variant_id',
 'mapid',
 'name',
 'description']

## Fact Data Analysis 

### 4. Which player averages the most kills per game?

In [33]:
from pyspark.sql.functions import col, avg, desc

# Aggregate to calculate the average kills per game for each player
player_avg_kills_df = bucket_joined_df.groupBy("player_gamertag") \
    .agg(avg(col("player_total_kills")).alias("avg_kills_per_game")) \
    .orderBy(desc("avg_kills_per_game"))

# Find the player with the maximum average kills per game
top_player = player_avg_kills_df.first()

print(f"Player with the most average kills per game: {top_player['player_gamertag']} ({top_player['avg_kills_per_game']} kills/game)")


Player with the most average kills per game: gimpinator14 (109.0 kills/game)


### 5. Which playlist gets played the most?

In [51]:
from pyspark.sql.functions import col, count, desc

# Aggregate to calculate the count of matches per playlist
playlist_count_df = bucket_joined_df.groupBy(["playlist_id"]) \
    .agg(count("*").alias("num_matches")) \
    .orderBy(desc("num_matches"))

# Find the playlist with the maximum number of matches
top_playlist = playlist_count_df.first()

print(f"Playlist with the most plays: {top_playlist['playlist_id']} ({top_playlist['num_matches']} matches)")


Playlist with the most plays: f72e0ef0-7c4a-4307-af78-8e38dac3fdba (202489 matches)


In [47]:
# Register the DataFrame as a temporary SQL view
bucket_joined_df.createOrReplaceTempView("bucket_joined_table")

# Use Spark SQL to find the playlist with the most matches
playlist_query = """
    SELECT
        playlist_id,
        COUNT(*) AS num_matches
    FROM
        bucket_joined_table
    GROUP BY
        playlist_id, match_id
    ORDER BY
        num_matches DESC
    LIMIT 1
"""

# Execute the query
top_playlist_df = spark.sql(playlist_query)

# Show the result
top_playlist_df.show()


+--------------------+-----------+
|         playlist_id|num_matches|
+--------------------+-----------+
|780cc101-005c-4fc...|        238|
+--------------------+-----------+



### 6. Which map gets played the most?

In [57]:
from pyspark.sql.functions import count, desc

# Aggregate to calculate the count of matches per map
map_count_df = bucket_joined_df.groupBy(["mapid"]) \
    .agg(count("*").alias("num_matches")) \
    .orderBy(desc("num_matches"))

# Find the map with the maximum number of matches
top_map = map_count_df.first()

print(f"Map played the most: {top_map['mapid']} ({top_map['num_matches']} matches)")


[Stage 593:>                                                        (0 + 5) / 5]

Map played the most: c7edbf0f-f206-11e4-aa52-24be05e24f7e (186118 matches)


                                                                                

In [58]:
# Register the DataFrame as a temporary SQL view
bucket_joined_df.createOrReplaceTempView("bucket_joined_table")
maps_df.createOrReplaceTempView("maps")

# SQL query to find the map with the most matches
map_query = """
    SELECT
        maps.name,
        COUNT(*) AS num_matches
    FROM
        bucket_joined_table
    LEFT JOIN
        maps
    ON
        bucket_joined_table.mapid = maps.mapid
    GROUP BY
        maps.name
    ORDER BY
        num_matches DESC
    LIMIT 1
"""

# Execute the query
top_map_df = spark.sql(map_query)

# Show the result
top_map_df.show()


+--------------+-----------+
|          name|num_matches|
+--------------+-----------+
|Breakout Arena|     186118|
+--------------+-----------+



### 7. Which map do players get the most Killing Spree medals on?

In [59]:
from pyspark.sql.functions import col, sum, desc

KILLING_SPREE_MEDAL_ID = 2430242797


# Filter for Killing Spree medals
killing_spree_df = bucket_joined_df.filter(col("medal_id") == KILLING_SPREE_MEDAL_ID)

# Aggregate to find the count of Killing Spree medals per map
killing_spree_by_map = killing_spree_df.groupBy("mapid") \
    .agg(sum("count").alias("total_killing_spree_medals"))

# Join with maps_df to get map names
killing_spree_with_names = killing_spree_by_map.join(
    maps_df, killing_spree_by_map.mapid == maps_df.mapid, "inner"
).select(maps_df.name, "total_killing_spree_medals") \
 .orderBy(desc("total_killing_spree_medals"))

# Find the map with the most Killing Spree medals
top_map = killing_spree_with_names.first()

print(f"Map with the most Killing Spree medals: {top_map['name']} ({top_map['total_killing_spree_medals']} medals)")



24/12/11 00:10:12 WARN DataSourceV2Strategy: Can't translate true to source filter, unsupported expression


Map with the most Killing Spree medals: Breakout Arena (6738 medals)


In [61]:
# Register the DataFrame as a temporary SQL view
bucket_joined_df.createOrReplaceTempView("bucket_joined_table")
maps_df.createOrReplaceTempView("maps")


KILLING_SPREE_MEDAL_ID = 2430242797

# SQL query to find the map with the most Killing Spree medals, including map name
killing_spree_query = f"""
    SELECT
        maps.name,
        SUM(bucket_joined_table.count) AS total_killing_spree_medals
    FROM
        bucket_joined_table
    JOIN
        maps
    ON
        bucket_joined_table.mapid = maps.mapid
    WHERE
        bucket_joined_table.medal_id = {KILLING_SPREE_MEDAL_ID}
    GROUP BY
        maps.name
    ORDER BY
        total_killing_spree_medals DESC
    LIMIT 1
"""

# Execute the query
top_killing_spree_map_df = spark.sql(killing_spree_query)

# Show the result
top_killing_spree_map_df.show()

24/12/11 00:10:31 WARN DataSourceV2Strategy: Can't translate true to source filter, unsupported expression


+--------------+--------------------------+
|          name|total_killing_spree_medals|
+--------------+--------------------------+
|Breakout Arena|                      6738|
+--------------+--------------------------+



In [62]:
from pyspark.sql.functions import col

# Rename duplicate columns after join
deduplicated_df = bucket_joined_df.toDF(*[f"{col}" if i == 0 else f"{col}_dup" for i, col in enumerate(bucket_joined_df.columns)])


# Sort by playlist_id within partitions
sorted_by_playlist = deduplicated_df.sortWithinPartitions("playlist_id")

# Sort by mapid within partitions
sorted_by_map = deduplicated_df.sortWithinPartitions("mapid")

In [63]:
# Write the sorted DataFrames to storage and compare file sizes
sorted_by_playlist.write.mode("overwrite").saveAsTable("bootcamp.sorted_by_playlist")
sorted_by_map.write.mode("overwrite").saveAsTable("bootcamp.sorted_by_map")

In [64]:
%%sql

SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'by_playlist' 
FROM bootcamp.sorted_by_playlist.files

UNION ALL
SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'by_map' 
FROM bootcamp.sorted_by_map.files

24/12/11 00:11:07 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


size,num_files,by_playlist
18588460,13,by_playlist
18888371,13,by_map
