In [1]:
# TASK 1 - Disable broadcast join
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

In [2]:
%%sql
CREATE DATABASE IF NOT EXISTS bootcamp

24/12/20 11:24:23 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [3]:
# TASK 2 - Explicit broadcast join for both medals and maps
import pyspark.sql.functions as f

medals = (
    spark.read.option("header", "true")
    .option("inferSchema", "true")
    .csv("/home/iceberg/data/medals.csv")
)

medals_matches_players = (
    spark.read.option("header", "true")
    .option("inferSchema", "true")
    .csv("/home/iceberg/data/medals_matches_players.csv")
)

medals_matches_players_medal_info = (
    medals_matches_players
    .join(
        f.broadcast(medals),
        ["medal_id"],
        "left"
    )
)

medals_matches_players_medal_info.show(5, False)

                                                                                

+----------+------------------------------------+---------------+-----+------------------------------------------------------------------------------------------------------------------------------+-----------+----------+------------------+-------------------+------------+-------------+-----------------+-------------------------------------------------+------------+----------+
|medal_id  |match_id                            |player_gamertag|count|sprite_uri                                                                                                                    |sprite_left|sprite_top|sprite_sheet_width|sprite_sheet_height|sprite_width|sprite_height|classification   |description                                      |name        |difficulty|
+----------+------------------------------------+---------------+-----+------------------------------------------------------------------------------------------------------------------------------+-----------+----------+------------------+

In [4]:
maps = (
    spark.read.option("header", "true")
    .option("inferSchema", "true")
    .csv("/home/iceberg/data/maps.csv")
)

matches = (
    spark.read.option("header", "true")
    .option("inferSchema", "true")
    .csv("/home/iceberg/data/matches.csv")
)

matches_maps_info = (
    matches
    .join(
        f.broadcast(maps),
        ["mapid"],
        "left"
    )
    .withColumnRenamed("name", "map_name")
    .withColumnRenamed("description", "map_description")
)

#matches_maps_info.show(5, False)

                                                                                

In [5]:
# TASK 3 - Bucket join match_details - matches - medal_matches_players on match_id with 16 buckets
# Enable Iceberg v2 tables (Storage Partitioned Joins SPJ)
spark.conf.set('spark.sql.sources.v2.bucketing.enabled','true') 
spark.conf.set('spark.sql.iceberg.planning.preserve-data-grouping','true')

In [6]:
# DDLs to create bucketed tables
# DDL match details
bucketed_match_details_ddl = """
    CREATE TABLE IF NOT EXISTS bootcamp.bucketed_match_details(
        match_id STRING,
        player_gamertag STRING,
        player_total_kills INTEGER,
        spartan_rank INTEGER,
        total_xp INTEGER
    )
    USING iceberg
    PARTITIONED BY (bucket(16, match_id));
"""

spark.sql(bucketed_match_details_ddl)

DataFrame[]

In [7]:
# Load data into match_details_bucketed
# Data presents 1 match - player per row
(
    spark.read.option("header", "true")
    .option("inferSchema", "true")
    .csv("/home/iceberg/data/match_details.csv")
    .select(
        "match_id",
        "player_gamertag",
        "player_total_kills",
        "spartan_rank",
        "total_xp"
    )
    .write.mode("append")
    .bucketBy(16, "match_id")
    .saveAsTable("bootcamp.bucketed_match_details")
)

                                                                                

In [8]:
# DDL matches
bucketed_matches_ddl = """
    CREATE TABLE IF NOT EXISTS bootcamp.bucketed_matches (
        match_id STRING,
        mapid STRING,
        playlist_id STRING,
        map_name STRING,
        map_description STRING,
        is_team_game BOOLEAN,
        is_match_over BOOLEAN,
        completion_date TIMESTAMP
    )
    USING iceberg
    PARTITIONED BY (bucket(16, match_id));
"""

spark.sql(bucketed_matches_ddl)

DataFrame[]

In [9]:
# Load data into match_details_bucketed
# Data presents 1 match per row
(
    matches_maps_info
    .select(
        "match_id",
        "mapid",
        "playlist_id",
        "map_name",
        "map_description",
        "is_team_game",
        "is_match_over",
        "completion_date"
    )
    .write.mode("append")
    .bucketBy(16, "match_id")
    .saveAsTable("bootcamp.bucketed_matches")
)

                                                                                

In [10]:
# DDL medal_matches_players
bucketed_medals_matches_players_ddl = """
CREATE TABLE IF NOT EXISTS bootcamp.bucketed_medals_matches_players (
    match_id STRING,
    player_gamertag STRING,
    medal_id STRING,
    classification STRING,
    medal_description STRING,
    medal_difficulty INTEGER,
    num_medals INTEGER
)
USING iceberg
PARTITIONED BY (bucket(16, match_id));
"""

spark.sql(bucketed_medals_matches_players_ddl)

DataFrame[]

In [11]:
# Load data into matches_bucketed based on broadcasted table medals_matches_players - medals
# Data presents 1 medal per player per match per row
(
    medals_matches_players_medal_info
    .withColumnRenamed("difficulty", "medal_difficulty")
    .withColumnRenamed("description", "medal_description")
    .withColumnRenamed("count", "num_medals")
    .select(
        "match_id",
        "player_gamertag",
        "medal_id",
        "classification",
        "medal_description",
        "medal_difficulty",
        "num_medals"
    )
    .write.mode("append")
    .bucketBy(16, "match_id")
    .saveAsTable("bootcamp.bucketed_medals_matches_players")
)

                                                                                

In [12]:
match_details_bucketed = spark.table("bootcamp.bucketed_match_details")
print(f"count match_details_bucketed: {match_details_bucketed.count()}")
matches_bucketed = spark.table("bootcamp.bucketed_matches")
print(f"count matches_bucketed: {matches_bucketed.count()}")
medals_matches_players_bucketed = spark.table("bootcamp.bucketed_medals_matches_players")
print(f"count medals_matches_players_bucketed: {medals_matches_players_bucketed.count()}")

count match_details_bucketed: 151761
count matches_bucketed: 24025
count medals_matches_players_bucketed: 755229


In [15]:
# Bucket join match_details_bucketed - matches_bucketed - medals_matches_players_bucketed
from datetime import datetime
start = datetime.now()
match_details_bucketed = spark.table("bootcamp.bucketed_match_details").withColumnRenamed("player_gamertag","player_gamertag_md")
matches_bucketed = spark.table("bootcamp.bucketed_matches")
medals_matches_players_bucketed = spark.table("bootcamp.bucketed_medals_matches_players")

# There will be Exchange 
matches_info = (
    matches_bucketed
    # Results in total number of players considering all matches
    .join(match_details_bucketed, ["match_id"], "left")
    # Results in total number of medals from all players in all matches
    .join(medals_matches_players_bucketed, ["match_id"], "left")
)

# With this volume, it is still cheaper the shuffle than a sort with filter, but it will not be the case with huge volumes of data
matches_info.show(10)
execution_time=datetime.now() - start
execution_time

[Stage 32:>                                                         (0 + 1) / 1]

+--------------------+--------------------+--------------------+--------------+--------------------+------------+-------------+-------------------+------------------+------------------+------------+--------+---------------+----------+-----------------+--------------------+----------------+----------+
|            match_id|               mapid|         playlist_id|      map_name|     map_description|is_team_game|is_match_over|    completion_date|player_gamertag_md|player_total_kills|spartan_rank|total_xp|player_gamertag|  medal_id|   classification|   medal_description|medal_difficulty|num_medals|
+--------------------+--------------------+--------------------+--------------+--------------------+------------+-------------+-------------------+------------------+------------------+------------+--------+---------------+----------+-----------------+--------------------+----------------+----------+
|03964845-247e-4f0...|c7edbf0f-f206-11e...|f72e0ef0-7c4a-430...|Breakout Arena|The broadcast o

                                                                                

datetime.timedelta(seconds=3, microseconds=589049)

In [16]:
# No shuffles when joining (no Exchange operation)
matches_info.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [match_id#589, mapid#590, playlist_id#591, map_name#592, map_description#593, is_team_game#594, is_match_over#595, completion_date#596, player_gamertag_md#578, player_total_kills#570, spartan_rank#571, total_xp#572, player_gamertag#606, medal_id#607, classification#608, medal_description#609, medal_difficulty#610, num_medals#611]
   +- SortMergeJoin [match_id#589], [match_id#605], LeftOuter
      :- Project [match_id#589, mapid#590, playlist_id#591, map_name#592, map_description#593, is_team_game#594, is_match_over#595, completion_date#596, player_gamertag_md#578, player_total_kills#570, spartan_rank#571, total_xp#572]
      :  +- SortMergeJoin [match_id#589], [match_id#568], LeftOuter
      :     :- Sort [match_id#589 ASC NULLS FIRST], false, 0
      :     :  +- BatchScan demo.bootcamp.bucketed_matches[match_id#589, mapid#590, playlist_id#591, map_name#592, map_description#593, is_team_game#594, is_match_over#595, comp

In [17]:
# TASK 4 - Player with highest average kills per game
player_max_avg_kills = (
    matches_info
    .groupBy("player_gamertag_md")
    .agg(f.avg("player_total_kills").alias("avg_kills"))
    .orderBy(f.desc("avg_kills"))
    .limit(1)
)

player_max_avg_kills.show()



+------------------+---------+
|player_gamertag_md|avg_kills|
+------------------+---------+
|      gimpinator14|    109.0|
+------------------+---------+



                                                                                

In [18]:
# TASK 5 - Playlist which is played the most
# On aggregated dataframe, for each match you will have N players and N medals per player.
# If each player in a match got 10 medals and there were 5 players, then for that match we will have 50 rows on the final joined DF.
# All those 50 rows will belong to the same playlist_id (there is one playlist_id per match)
playlist_most_played = (
    matches_info
    .groupBy("playlist_id")
    .agg(f.countDistinct("match_id").alias("num_plays"))
    .orderBy(f.desc("num_plays"))
    .limit(1)
)

playlist_most_played.show(truncate=False)



+------------------------------------+---------+
|playlist_id                         |num_plays|
+------------------------------------+---------+
|f72e0ef0-7c4a-4307-af78-8e38dac3fdba|9350     |
+------------------------------------+---------+



                                                                                

In [19]:
# TASK 6 - Map which is played the most
map_most_played = (
    matches_info
    .groupBy("mapid", "map_name")
    .agg(f.countDistinct("match_id").alias("num_plays_in_map"))
    .orderBy(f.desc("num_plays_in_map"))
    .limit(1)
)

map_most_played.show(truncate=False)



+------------------------------------+--------------+----------------+
|mapid                               |map_name      |num_plays_in_map|
+------------------------------------+--------------+----------------+
|c7edbf0f-f206-11e4-aa52-24be05e24f7e|Breakout Arena|8587            |
+------------------------------------+--------------+----------------+



                                                                                

In [20]:
# TASK 7 - Map where players get the most Killing Spree medals
map_most_killing_spree_medals = (
    matches_info
    # pre filter to keep just Killing Spree medals
    .filter(f.lower(f.col("classification")) == 'killingspree')
    # Player gamertag must be informed on medals_matches_players because it is there where you observe how many medals he won
    .filter(f.col("player_gamertag").isNotNull())
    # Get for each player the medals achieved just by him and not by others who played on the same match_id
    .filter(f.col("player_gamertag") == f.col("player_gamertag_md"))
    .groupBy("mapid", "map_name")
    .agg(f.sum("num_medals").alias("num_medals_in_map"))
    .orderBy(f.desc("num_medals_in_map"))
    .limit(1)
)

map_most_killing_spree_medals.show()

24/12/20 11:27:00 WARN DataSourceV2Strategy: Can't translate true to source filter, unsupported expression

+--------------------+--------------+-----------------+
|               mapid|      map_name|num_medals_in_map|
+--------------------+--------------+-----------------+
|c7edbf0f-f206-11e...|Breakout Arena|             6919|
+--------------------+--------------+-----------------+



                                                                                

In [21]:
# TASK 8 - Try different sortWithinPartitions with the aggregated dataset to check which of them get the best compression

# Select sorted columns from lowest to highest cardinality

# save aggregated table without sorting
matches_info.write.mode("overwrite").saveAsTable("bootcamp.matches_info_unsorted")

# test sortWithinPartitions  mapid - playlist_id
matches_info.sortWithinPartitions(f.col("mapid"), f.col("playlist_id")).write.mode("overwrite").saveAsTable("bootcamp.matches_info_sorted_map_playlist")

# test sortWithinPartitions  mapid - playlist_id - medal_id
matches_info.sortWithinPartitions(f.col("mapid"), f.col("playlist_id"), f.col("medal_id")).write.mode("overwrite").saveAsTable("bootcamp.matches_info_sorted_map_playlist_medal")

# test sortWithinPartitions  mapid
matches_info.sortWithinPartitions(f.col("mapid")).write.mode("overwrite").saveAsTable("bootcamp.matches_info_sorted_map")

# test sortWithinPartitions  playlist_id
matches_info.sortWithinPartitions(f.col("playlist_id")).write.mode("overwrite").saveAsTable("bootcamp.matches_info_sorted_playlist")

                                                                                

In [23]:
%%sql

/*  Check which of all sorted version compresses data better */
SELECT SUM(file_size_in_bytes) as size, count(1) as num_files, 'unsorted' as table_name from demo.bootcamp.matches_info_unsorted.files
UNION ALL
SELECT SUM(file_size_in_bytes) as size, count(1) as num_files, 'sorted_map_playlist' as table_name from demo.bootcamp.matches_info_sorted_map_playlist.files
UNION ALL
SELECT SUM(file_size_in_bytes) as size, count(1) as num_files, 'sorted_map_playlist_medal' as table_name from demo.bootcamp.matches_info_sorted_map_playlist_medal.files
UNION ALL
SELECT SUM(file_size_in_bytes) as size, count(1) as num_files, 'sorted_map' as table_name from demo.bootcamp.matches_info_sorted_map.files
UNION ALL
SELECT SUM(file_size_in_bytes) as size, count(1) as num_files, 'sorted_playlist' as table_name from demo.bootcamp.matches_info_sorted_playlist.files
ORDER BY size;

size,num_files,table_name
16117820,16,unsorted
28625096,16,sorted_map_playlist
29628103,16,sorted_playlist
30087637,16,sorted_map
37622437,16,sorted_map_playlist_medal


24/12/20 12:38:31 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 1514907 ms exceeds timeout 120000 ms
24/12/20 12:38:31 WARN SparkContext: Killing executors is not supported by current scheduler.
24/12/20 12:38:39 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$

In [None]:
%%sql

SELECT *
FROM demo.bootcamp.matches_info_rep_sorted_playlist.files