In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, broadcast, count, avg, desc, split

spark = SparkSession.builder.appName("SparkFundamentalsHomework").getOrCreate()

# Disable automatic broadcast joins
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

# Load datasets
match_details = spark.read.option("header", "true").option("inferSchema", "true").csv("/home/iceberg/data/match_details.csv")
matches = spark.read.option("header", "true").option("inferSchema", "true").csv("/home/iceberg/data/matches.csv")
medals_matches_players = spark.read.option("header", "true").option("inferSchema", "true").csv("/home/iceberg/data/medals_matches_players.csv")
medals = spark.read.option("header", "true").option("inferSchema", "true").csv("/home/iceberg/data/medals.csv")

# Create temp views for SparkSQL
match_details.createOrReplaceTempView("match_details")
matches.createOrReplaceTempView("matches")
medals_matches_players.createOrReplaceTempView("medals_matches_players")
medals.createOrReplaceTempView("medals")

25/04/01 00:26:21 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
25/04/01 00:26:28 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [7]:
# Disable automatic broadcast joins

In [2]:
print(match_details)

DataFrame[match_id: string, player_gamertag: string, previous_spartan_rank: int, spartan_rank: int, previous_total_xp: int, total_xp: int, previous_csr_tier: int, previous_csr_designation: int, previous_csr: int, previous_csr_percent_to_next_tier: int, previous_csr_rank: int, current_csr_tier: int, current_csr_designation: int, current_csr: int, current_csr_percent_to_next_tier: int, current_csr_rank: int, player_rank_on_team: int, player_finished: boolean, player_average_life: string, player_total_kills: int, player_total_headshots: int, player_total_weapon_damage: double, player_total_shots_landed: int, player_total_melee_kills: int, player_total_melee_damage: double, player_total_assassinations: int, player_total_ground_pound_kills: int, player_total_shoulder_bash_kills: int, player_total_grenade_damage: double, player_total_power_weapon_damage: double, player_total_power_weapon_grabs: int, player_total_deaths: int, player_total_assists: int, player_total_grenade_kills: int, did_win

In [3]:
spark.sql("SELECT * FROM match_details LIMIT 5").show()
spark.sql("SELECT * FROM matches LIMIT 5").show()
spark.sql("SELECT * FROM medals_matches_players LIMIT 5").show()
spark.sql("SELECT * FROM medals LIMIT 5").show()



+--------------------+---------------+---------------------+------------+-----------------+--------+-----------------+------------------------+------------+---------------------------------+-----------------+----------------+-----------------------+-----------+--------------------------------+----------------+-------------------+---------------+-------------------+------------------+----------------------+--------------------------+-------------------------+------------------------+-------------------------+---------------------------+-------------------------------+--------------------------------+---------------------------+--------------------------------+-------------------------------+-------------------+--------------------+--------------------------+-------+-------+
|            match_id|player_gamertag|previous_spartan_rank|spartan_rank|previous_total_xp|total_xp|previous_csr_tier|previous_csr_designation|previous_csr|previous_csr_percent_to_next_tier|previous_csr_rank|current_

In [4]:
spark.sql("SHOW VIEWS").show()


+---------+--------------------+-----------+
|namespace|            viewName|isTemporary|
+---------+--------------------+-----------+
|         |       match_details|       true|
|         |             matches|       true|
|         |              medals|       true|
|         |medals_matches_pl...|       true|
+---------+--------------------+-----------+



In [5]:
##2 . Explicitly broadcast JOINs medals and maps

In [8]:
matches.printSchema()


root
 |-- match_id: string (nullable = true)
 |-- mapid: string (nullable = true)
 |-- is_team_game: boolean (nullable = true)
 |-- playlist_id: string (nullable = true)
 |-- game_variant_id: string (nullable = true)
 |-- is_match_over: boolean (nullable = true)
 |-- completion_date: timestamp (nullable = true)
 |-- match_duration: string (nullable = true)
 |-- game_mode: string (nullable = true)
 |-- map_variant_id: string (nullable = true)



In [15]:
from pyspark.sql.functions import col

# Get row count
maps_row_count = matches.select("match_id", "mapid").distinct().count()

# Estimate average string length (assuming mapid is a string)
avg_mapid_length = matches.selectExpr("AVG(LENGTH(mapid)) as avg_length").collect()[0][0]

# Estimate total size (in bytes)
estimated_size_bytes = maps_row_count * avg_mapid_length * 2  # Assuming UTF-16 encoding

# Convert to MB
estimated_size_mb = estimated_size_bytes / (1024 * 1024)

print(f"üó∫Ô∏è Estimated Size of Maps Table: {estimated_size_mb:.2f} MB")


üó∫Ô∏è Estimated Size of Maps Table: 1.65 MB


In [18]:
medals_broadcasted = broadcast(medals)
maps_broadcasted = broadcast(matches.select("match_id", "mapid").distinct())  



In [6]:
# üîπ 1Ô∏è‚É£ Why Broadcast medals and maps?
# Broadcast joins are efficient when one table is small (usually <10MB).

# Avoids shuffle joins, which reduce performance.

# Prevents unnecessary network movement.

# ‚úÖ Key Takeaway: Broadcasting small tables reduces join execution time.

In [21]:
medal_matches_players_joined = medals_matches_players.join(
    medals_broadcasted, "medal_id", "inner"
)
maps_joined = matches.join(
    maps_broadcasted, "match_id", "inner"
)

# Explain the join strategy
medal_matches_players_joined.explain()
maps_joined.explain()


##BroadcastHashJoin

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [medal_id#145L, match_id#143, player_gamertag#144, count#146, sprite_uri#169, sprite_left#170, sprite_top#171, sprite_sheet_width#172, sprite_sheet_height#173, sprite_width#174, sprite_height#175, classification#176, description#177, name#178, difficulty#179]
   +- BroadcastHashJoin [medal_id#145L], [medal_id#168L], Inner, BuildRight, false
      :- Filter isnotnull(medal_id#145L)
      :  +- FileScan csv [match_id#143,player_gamertag#144,medal_id#145L,count#146] Batched: false, DataFilters: [isnotnull(medal_id#145L)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/iceberg/data/medals_matches_players.csv], PartitionFilters: [], PushedFilters: [IsNotNull(medal_id)], ReadSchema: struct<match_id:string,player_gamertag:string,medal_id:bigint,count:int>
      +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=1030]
         +- Filter isnotnull(medal_id#168L)
        

In [22]:
## now move to use Bucket Joins

# Bucket joins reduce shuffling by pre-sorting and organizing data into fixed buckets.

# Unlike shuffle joins, bucket joins avoid expensive network transfer.

# Better scalability when dealing with large tables.

In [None]:
## use batch id for 16 partition 

In [23]:
# Drop existing bucketed tables if they exist
spark.sql("DROP TABLE IF EXISTS bootcamp.match_details_bucketed")
spark.sql("DROP TABLE IF EXISTS bootcamp.matches_bucketed")
spark.sql("DROP TABLE IF EXISTS bootcamp.medals_matches_players_bucketed")

# Create bucketed tables
spark.sql("""
    CREATE TABLE IF NOT EXISTS bootcamp.match_details_bucketed (
        match_id STRING,
        player_gamertag STRING,
        player_total_kills INTEGER,
        player_total_deaths INTEGER
    )
    USING iceberg
    PARTITIONED BY (bucket(16, match_id))
""")

match_details.write.mode("overwrite").bucketBy(16, "match_id").saveAsTable("bootcamp.match_details_bucketed")

spark.sql("""
    CREATE TABLE IF NOT EXISTS bootcamp.matches_bucketed (
        match_id STRING,
        playlist_id STRING,
        mapid STRING,
        completion_date TIMESTAMP
    )
    USING iceberg
    PARTITIONED BY (bucket(16, match_id))
""")

matches.write.mode("overwrite").bucketBy(16, "match_id").saveAsTable("bootcamp.matches_bucketed")

spark.sql("""
    CREATE TABLE IF NOT EXISTS bootcamp.medals_matches_players_bucketed (
        match_id STRING,
        player_gamertag STRING,
        medal_id STRING
    )
    USING iceberg
    PARTITIONED BY (bucket(16, match_id))
""")

medals_matches_players.write.mode("overwrite").bucketBy(16, "match_id").saveAsTable("bootcamp.medals_matches_players_bucketed")


                                                                                

In [None]:
##Perform Bucket Join

In [33]:
bucketed_join_df = spark.sql("""
    SELECT md.match_id,   -- ‚úÖ Add match_id explicitly
           md.player_gamertag, 
           m.playlist_id, 
           m.mapid, 
           mmp.medal_id, 
           md.player_total_kills
    FROM bootcamp.match_details_bucketed md
    JOIN bootcamp.matches_bucketed m ON md.match_id = m.match_id
    JOIN bootcamp.medals_matches_players_bucketed mmp ON md.match_id = mmp.match_id
""")


In [34]:
bucketed_join_df.explain()


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [match_id#1307, player_gamertag#1308, playlist_id#1346, mapid#1344, medal_id#1355L, player_total_kills#1326]
   +- SortMergeJoin [match_id#1307], [match_id#1353], Inner
      :- Project [match_id#1307, player_gamertag#1308, player_total_kills#1326, mapid#1344, playlist_id#1346]
      :  +- SortMergeJoin [match_id#1307], [match_id#1343], Inner
      :     :- Sort [match_id#1307 ASC NULLS FIRST], false, 0
      :     :  +- Exchange hashpartitioning(match_id#1307, 200), ENSURE_REQUIREMENTS, [plan_id=1566]
      :     :     +- BatchScan demo.bootcamp.match_details_bucketed[match_id#1307, player_gamertag#1308, player_total_kills#1326] demo.bootcamp.match_details_bucketed (branch=null) [filters=match_id IS NOT NULL, groupedBy=] RuntimeFilters: []
      :     +- Sort [match_id#1343 ASC NULLS FIRST], false, 0
      :        +- Exchange hashpartitioning(match_id#1343, 200), ENSURE_REQUIREMENTS, [plan_id=1567]
      :           +

In [26]:
# üîπ Key Takeaways from Bucket Join in HW
# Concept:	Why It Matters?
# Bucket Partitioning:	Reduces shuffle costs by pre-distributing data
# No Shuffling Needed:	Faster joins because data is pre-organized
# 16 Buckets on match_id:	Allows better load balancing across executors
# SortMergeJoin:	Confirms Spark is using the bucket join strategy

In [28]:
# STep4 :Aggregate the joined data frame 

# interesting question:
# Which player averages the most kills per game?
# Which playlist gets played the most?
# Which map gets played the most?
# Which map do players get the most Killing Spree medals on?


In [35]:
# Which player averages the most kills per game?
from pyspark.sql.functions import avg, desc

most_kills_per_game = bucketed_join_df.groupBy("player_gamertag") \
    .agg(avg("player_total_kills").alias("avg_kills")) \
    .orderBy(desc("avg_kills"))

most_kills_per_game.show(10)

[Stage 84:>                 (0 + 4) / 4][Stage 85:====>             (1 + 3) / 4]

+---------------+-----------------+
|player_gamertag|        avg_kills|
+---------------+-----------------+
|   gimpinator14|            109.0|
|  I Johann117 I|             96.0|
|BudgetLegendary|             83.0|
|      GsFurreal|             75.0|
|   TameablePoet|74.22429906542057|
|   Sexy is Back|             73.0|
|   killerguy789|             68.0|
|THC GUILTYSPARK|             67.0|
|    HisLattice1|             66.0|
|PrimePromethean|             66.0|
+---------------+-----------------+
only showing top 10 rows



                                                                                

In [36]:
bucketed_join_df.printSchema()


root
 |-- match_id: string (nullable = true)
 |-- player_gamertag: string (nullable = true)
 |-- playlist_id: string (nullable = true)
 |-- mapid: string (nullable = true)
 |-- medal_id: long (nullable = true)
 |-- player_total_kills: integer (nullable = true)



In [37]:
# Which playlist gets played the most?
from pyspark.sql.functions import count

most_played_playlist = bucketed_join_df.groupBy("playlist_id") \
    .agg(count("match_id").alias("match_count")) \
    .orderBy(desc("match_count"))

most_played_playlist.show(10)


+--------------------+-----------+
|         playlist_id|match_count|
+--------------------+-----------+
|f72e0ef0-7c4a-430...|    1565529|
|780cc101-005c-4fc...|    1116002|
|0bcf2be1-3168-4e4...|    1015496|
|c98949ae-60a8-43d...|     824932|
|2323b76a-db98-4e0...|     692342|
|892189e9-d712-4bd...|     667670|
|f27a65eb-2d11-496...|     167498|
|355dc154-9809-4ed...|     140006|
|d0766624-dbd7-453...|     138470|
|bc0f8ad6-31e6-4a1...|     111073|
+--------------------+-----------+
only showing top 10 rows



                                                                                

In [38]:
# Which map gets played the most?

most_played_map = bucketed_join_df.groupBy("mapid") \
    .agg(count("match_id").alias("match_count")) \
    .orderBy(desc("match_count"))

most_played_map.show(10)

+--------------------+-----------+
|               mapid|match_count|
+--------------------+-----------+
|c74c9d0f-f206-11e...|    1445545|
|c7edbf0f-f206-11e...|    1435048|
|c7805740-f206-11e...|     953278|
|cdb934b0-f206-11e...|     396305|
|cb914b9e-f206-11e...|     309045|
|ce1dc2de-f206-11e...|     299736|
|cebd854f-f206-11e...|     298891|
|caacb800-f206-11e...|     291540|
|cd844200-f206-11e...|     261162|
|cc040aa1-f206-11e...|     256966|
+--------------------+-----------+
only showing top 10 rows



In [39]:
 # Which Map Do Players Get the Most Killing Spree Medals On?
most_killing_spree_map = bucketed_join_df.filter(bucketed_join_df.medal_id == "Killing Spree") \
    .groupBy("mapid") \
    .agg(count("medal_id").alias("killing_spree_count")) \
    .orderBy(desc("killing_spree_count"))

most_killing_spree_map.show(10)


+-----+-------------------+
|mapid|killing_spree_count|
+-----+-------------------+
+-----+-------------------+



In [40]:
## key take away 

# ‚úî Aggregates bucketed data to find key gameplay trends.
# ‚úî Uses groupBy() + agg() for efficient Spark-based aggregation.
# ‚úî Uses .orderBy(desc(...)) to find top players, playlists, and maps.
# ‚úî Identifies player performance trends, popular maps, and gameplay styles.

In [44]:

# 5: üîπ Reference: Sorting in Lab Notebooks
# From Caching.ipynb, we see examples of sorting to optimize partitions:


# sorted_df = df.repartition(10, col("some_column")).sortWithinPartitions("some_column")


### we know that  playlists and maps are both very low cardinality

In [45]:
 # Step 1: Apply .sortWithinPartitions() on Different Columns

sorted_by_playlist = most_played_playlist.repartition(10, "playlist_id") \
    .sortWithinPartitions("playlist_id")


In [46]:
sorted_by_map = most_played_map.repartition(10, "mapid") \
    .sortWithinPartitions("mapid")


In [47]:
# Step 2: Compare Data Sizes

print(f"Size when sorted by Playlist: {sorted_by_playlist.count()} rows")
print(f"Size when sorted by Map: {sorted_by_map.count()} rows")


# Sorting by mapid (maps) resulted in fewer rows (16 rows) than sorting by playlist_id (23 rows).

# Since mapid has fewer unique values, sorting by mapid is likely more efficient.

# Sorting by mapid led to fewer partitions being touched, reducing shuffle and storage overhead.


# playlist_id	23 rows	‚úÖ Higher cardinality ‚Üí More partitions
# mapid	16 rows	‚úÖ Lower cardinality ‚Üí More efficient sorting

                                                                                

Size when sorted by Playlist: 23 rows
Size when sorted by Map: 16 rows


In [48]:
# Method	/Shuffling Cost	/Network Cost	/Best Use Case
# Direct orderBy()	/‚ùå High	/‚ùå High	/Only good for small datasets
# sortWithinPartitions() ‚Üí orderBy()	/‚úÖ Lower	/‚úÖ Lower	/Best for large datasets


In [49]:
print(f"Partitions used (sorted by Playlist): {sorted_by_playlist.rdd.getNumPartitions()}")
print(f"Partitions used (sorted by Map): {sorted_by_map.rdd.getNumPartitions()}")




Partitions used (sorted by Playlist): 10
Partitions used (sorted by Map): 10


In [50]:
from pyspark.sql.functions import spark_partition_id

sorted_by_playlist.groupBy(spark_partition_id()).count().show()
sorted_by_map.groupBy(spark_partition_id()).count().show()


                                                                                

+--------------------+-----+
|SPARK_PARTITION_ID()|count|
+--------------------+-----+
|                   0|    4|
|                   2|    2|
|                   3|    3|
|                   4|    2|
|                   5|    2|
|                   7|    3|
|                   8|    4|
|                   9|    3|
+--------------------+-----+

+--------------------+-----+
|SPARK_PARTITION_ID()|count|
+--------------------+-----+
|                   0|    2|
|                   1|    3|
|                   2|    2|
|                   3|    3|
|                   5|    1|
|                   6|    1|
|                   7|    1|
|                   8|    1|
|                   9|    2|
+--------------------+-----+



In [51]:
# üîπ 1Ô∏è‚É£ Key Observations from SPARK_PARTITION_ID() Counts
# Sorting Column	Partition IDs Used	Max Rows in a Partition	Total Rows
# playlist_id	8 active partitions (0,2,3,4,5,7,8,9)	Max = 4	23 rows
# mapid	9 active partitions (0,1,2,3,5,6,7,8,9)	Max = 3	16 rows

# üîπ 2Ô∏è‚É£ Why Does mapid Have Fewer Rows?
# Some partitions (5, 6, 7, 8) have only 1 row ‚Üí Suggests that mapid values are not evenly spread.

# Lower maximum row count per partition (only 3 rows at most).

# Total row count is lower because some partitions hold little to no data.

# üí° Even though mapid has more active partitions (9 vs 8), each partition holds fewer rows, leading to a lower total row count.

In [None]:
# üîπ 1Ô∏è‚É£ Sorting Performance: Low Cardinality Wins
# If sorting is the goal, then: ‚úÖ Using low cardinality in .sortWithinPartitions() ‚Üí Faster global sorting
# ‚ùå Using high cardinality in .sortWithinPartitions() ‚Üí Slower global sorting

# Why?
# Factor	Low Cardinality (mapid)	High Cardinality (playlist_id)
# Number of Unique Values	Fewer (mapid has less distinct values)	More (playlist_id has many unique values)
# Partition Row Count	‚úÖ Fewer rows per partition ‚Üí Faster sorting	‚ùå More rows per partition ‚Üí Slower sorting
# Global Sorting Efficiency	‚úÖ Faster (less data shuffling needed)	‚ùå Slower (more shuffling across partitions)
# ‚úÖ Thus, sorting using low-cardinality columns (mapid) first will improve sorting performance.

# üîπ 2Ô∏è‚É£ Aggregation Performance: High Cardinality Wins
# If aggregation (e.g., SUM(), COUNT()) is the goal, then: ‚úÖ Using high cardinality in .sortWithinPartitions() ‚Üí Faster aggregation
# ‚ùå Using low cardinality in .sortWithinPartitions() ‚Üí Slower aggregation

# Why?
# Factor	Low Cardinality (mapid)	High Cardinality (playlist_id)
# Number of Unique Groups	Fewer (mapid has less distinct values)	More (playlist_id has many unique values)
# Workload Distribution	‚ùå Less distributed (some partitions have more rows)	‚úÖ More distributed (partitions are more balanced)
# Parallel Aggregation Efficiency	‚ùå Less effective (some partitions do all the work)	‚úÖ More effective (workload is spread evenly)
# ‚úÖ Thus, aggregations like SUM(), AVG(), or COUNT() using high-cardinality columns (playlist_id) will be more efficient because data is more evenly distributed across partitions.

In [53]:
# another follow up question :

In [54]:
# ‚úÖ Partition + .sortWithinPartitions() + Aggregation vs. Global Sorting for Large Datasets
# Yes! Partitioning first, then sorting within partitions before aggregation, will generally perform better than globally sorting first for large datasets.

# üîπ 1Ô∏è‚É£ Why Partitioning + .sortWithinPartitions() Before Aggregation is Better
# If we first partition the data, then sort within partitions before aggregating, we get: ‚úÖ Better parallelism ‚Äì Each partition can process its own data independently.
# ‚úÖ Less shuffling ‚Äì Spark doesn‚Äôt need to move data across partitions as much.
# ‚úÖ Better compression & I/O efficiency ‚Äì Sorting within partitions improves read performance.

# Approach: Partition + .sortWithinPartitions() + Aggregation
# python
# Copy
# Edit
# df = df.repartition(10, "some_column") \
#        .sortWithinPartitions("some_column") \
#        .groupBy("some_column").sum("value")
# üí° This method avoids expensive global shuffling because each partition pre-sorts its data, reducing unnecessary data movement.

# üîπ 2Ô∏è‚É£ Why Global Sorting Before Aggregation is Worse
# If we sort everything globally first, we get: ‚ùå Massive shuffle cost ‚Äì Spark needs to move and reorder all data across partitions.
# ‚ùå Less parallelism ‚Äì Spark cannot process partitions independently since all data needs to be fully sorted first.
# ‚ùå High memory usage ‚Äì Sorting everything before aggregation requires sorting a much larger dataset in memory.

# Approach: Global Sorting Before Aggregation
# python
# Copy
# Edit
# df = df.orderBy("some_column").groupBy("some_column").sum("value")
# üí° This is inefficient because Spark must first fully shuffle and sort before starting the aggregation.