In [2]:
import org.apache.spark.sql.functions._
import org.apache.spark.storage.StorageLevel


spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

import org.apache.spark.sql.functions._
import org.apache.spark.storage.StorageLevel


In [100]:
val matchDetailsB = spark.table("bootcamp.match_details_bucketed")
val matchesB = spark.table("bootcamp.matches_bucketed")
val medalMatchesPlayersB = spark.table("bootcamp.medal_matches_players_bucketed")


val medals = spark.table("bootcamp.medals_bucketed")
val maps = spark.table("bootcamp.maps_bucketed")


val joinedDF = matchDetailsB
  .join(matchesB, Seq("match_id")) // bucket join
  .join(medalMatchesPlayersB, Seq("match_id")) // bucket join
  .join(broadcast(medals), Seq("medal_id")) // explicit broadcast join
  .join(broadcast(maps), Seq("mapid")) // explicit broadcast join

joinedDF.show(10)

+--------------------+----------+--------------------+---------------+---------------------+------------+-----------------+--------+-----------------+------------------------+------------+---------------------------------+-----------------+----------------+-----------------------+-----------+--------------------------------+----------------+-------------------+---------------+-------------------+------------------+----------------------+--------------------------+-------------------------+------------------------+-------------------------+---------------------------+-------------------------------+--------------------------------+---------------------------+--------------------------------+-------------------------------+-------------------+--------------------+--------------------------+-------+-------+------------+--------------------+--------------------+-------------+-------------------+--------------+---------+--------------+---------------+-----+--------------------+-----------+-

matchDetailsB: org.apache.spark.sql.DataFrame = [match_id: string, player_gamertag: string ... 34 more fields]
matchesB: org.apache.spark.sql.DataFrame = [match_id: string, mapid: string ... 8 more fields]
medalMatchesPlayersB: org.apache.spark.sql.DataFrame = [match_id: string, player_gamertag: string ... 2 more fields]
medals: org.apache.spark.sql.DataFrame = [medal_id: string, sprite_uri: string ... 10 more fields]
maps: org.apache.spark.sql.DataFrame = [mapid: string, name: string ... 1 more field]
joinedDF: org.apache.spark.sql.DataFrame = [mapid: string, medal_id: string ... 59 more fields]


In [3]:
val matches = spark.read.option("header", "true")
                        .option("inferSchema", "true")
                        .csv("/home/iceberg/data/matches.csv")
val matchDetails =  spark.read.option("header", "true")
                        .option("inferSchema", "true")
                        .csv("/home/iceberg/data/match_details.csv")
val medalMatchesPlayers = spark.read.option("header", "true")
                        .option("inferSchema", "true")
                        .csv("/home/iceberg/data/medals_matches_players.csv")

val medals = spark.read.option("header", "true")
                        .option("inferSchema", "true")
                        .csv("/home/iceberg/data/medals.csv")

val maps = spark.read.option("header", "true")
                        .option("inferSchema", "true")
                        .csv("/home/iceberg/data/maps.csv")


matches: org.apache.spark.sql.DataFrame = [match_id: string, mapid: string ... 8 more fields]
matchDetails: org.apache.spark.sql.DataFrame = [match_id: string, player_gamertag: string ... 34 more fields]
medalMatchesPlayers: org.apache.spark.sql.DataFrame = [match_id: string, player_gamertag: string ... 2 more fields]
medals: org.apache.spark.sql.DataFrame = [medal_id: bigint, sprite_uri: string ... 10 more fields]
maps: org.apache.spark.sql.DataFrame = [mapid: string, name: string ... 1 more field]


In [4]:
spark.sql("DROP TABLE IF EXISTS bootcamp.matches_bucketed")
spark.sql("""
CREATE TABLE IF NOT EXISTS bootcamp.matches_bucketed (
    match_id STRING,
    mapid STRING,
    is_team_game BOOLEAN,
    playlist_id STRING,
    game_variant_id STRING,
    is_match_over BOOLEAN,
    completion_date TIMESTAMP,
    match_duration STRING,
    game_mode STRING,
    map_variant_id STRING
)
USING iceberg
PARTITIONED BY (bucket(16, match_id))
""")

spark.sql("DROP TABLE IF EXISTS bootcamp.match_details_bucketed")
spark.sql("""
CREATE TABLE IF NOT EXISTS bootcamp.match_details_bucketed (
    match_id STRING,
    player_gamertag STRING,
    previous_spartan_rank INT,
    spartan_rank INT,
    previous_total_xp INT,
    total_xp INT,
    previous_csr_tier INT,
    previous_csr_designation INT,
    previous_csr INT,
    previous_csr_percent_to_next_tier INT,
    previous_csr_rank INT,
    current_csr_tier INT,
    current_csr_designation INT,
    current_csr INT,
    current_csr_percent_to_next_tier INT,
    current_csr_rank INT,
    player_rank_on_team INT,
    player_finished BOOLEAN,
    player_average_life STRING,
    player_total_kills INT,
    player_total_headshots INT,
    player_total_weapon_damage INT,
    player_total_shots_landed INT,
    player_total_melee_kills INT,
    player_total_melee_damage INT,
    player_total_assassinations INT,
    player_total_ground_pound_kills INT,
    player_total_shoulder_bash_kills INT,
    player_total_grenade_damage INT,
    player_total_power_weapon_damage INT,
    player_total_power_weapon_grabs INT,
    player_total_deaths INT,
    player_total_assists INT,
    player_total_grenade_kills INT,
    did_win INT,
    team_id INT
)
USING iceberg
PARTITIONED BY (bucket(16, match_id))
""")

spark.sql("DROP TABLE IF EXISTS bootcamp.medal_matches_players_bucketed")
spark.sql("""
CREATE TABLE IF NOT EXISTS bootcamp.medal_matches_players_bucketed (
    match_id STRING,
    player_gamertag STRING,
    medal_id STRING,
    count INT
)
USING iceberg
PARTITIONED BY (bucket(16, match_id))
""")


res1: org.apache.spark.sql.DataFrame = []


In [5]:
spark.sql("DROP TABLE IF EXISTS bootcamp.medal_matches_players_bucketed")
spark.sql("""
CREATE TABLE IF NOT EXISTS bootcamp.medal_matches_players_bucketed (
    match_id STRING,
    player_gamertag STRING,
    medal_id STRING,
    count INT
)
USING iceberg
PARTITIONED BY (bucket(16, match_id))
""")

medalMatchesPlayers.writeTo("bootcamp.medal_matches_players_bucketed").append()

In [6]:
spark.sql("DROP TABLE IF EXISTS bootcamp.medals_bucketed")
spark.sql("""
CREATE TABLE IF NOT EXISTS bootcamp.medals_bucketed (
medal_id STRING,
sprite_uri STRING,
sprite_left INT,
sprite_top INT,
sprite_sheet_width INT,
sprite_sheet_height INT,
sprite_width INT,
sprite_height INT,
classification STRING,
description STRING,
name STRING,
difficulty INT
)
USING iceberg
PARTITIONED BY (bucket(16, medal_id))
""")

res3: org.apache.spark.sql.DataFrame = []


In [93]:
spark.sql("DROP TABLE IF EXISTS bootcamp.maps_bucketed")
spark.sql("""
CREATE TABLE IF NOT EXISTS bootcamp.maps_bucketed (
mapid STRING,
name STRING,
description STRING
)
USING iceberg
PARTITIONED BY (bucket(16, mapid))
""")

res71: org.apache.spark.sql.DataFrame = []


In [7]:

matches.writeTo("bootcamp.matches_bucketed").append()
matchDetails.writeTo("bootcamp.match_details_bucketed").append()
medalMatchesPlayers.writeTo("bootcamp.medal_matches_players_bucketed").append()
medals.writeTo("bootcamp.medals_bucketed").append()
maps.writeTo("bootcamp.maps_bucketed").append()

In [8]:
val joinedDF = spark.table("bootcamp.match_details_bucketed")
  .join(spark.table("bootcamp.matches_bucketed"), Seq("match_id"))
  .join(spark.table("bootcamp.medal_matches_players_bucketed"), Seq("match_id"))

joinedDF: org.apache.spark.sql.DataFrame = [match_id: string, player_gamertag: string ... 46 more fields]


In [9]:
import org.apache.spark.sql.functions._
val killsPerGame = joinedDF
  .groupBy("`demo`.`bootcamp`.`match_details_bucketed`.`player_gamertag`")
  .agg(
    (avg("player_total_kills"))
      .alias("avg_kills_per_game")
  )
  .orderBy(desc("avg_kills_per_game"))
  killsPerGame.show(5)

+---------------+------------------+
|player_gamertag|avg_kills_per_game|
+---------------+------------------+
|   gimpinator14|             109.0|
|  I Johann117 I|              96.0|
|BudgetLegendary|              83.0|
|      GsFurreal|              75.0|
|   TameablePoet| 74.22429906542057|
+---------------+------------------+
only showing top 5 rows



import org.apache.spark.sql.functions._
killsPerGame: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [player_gamertag: string, avg_kills_per_game: double]


In [37]:
import org.apache.spark.sql.functions._
val most_played_playlist = joinedDF
  .groupBy("`playlist_id`")
  .agg(count("*").alias("played_times"))
  .orderBy(desc("played_times"))
  
  most_played_playlist.show(5)

+--------------------+------------+
|         playlist_id|played_times|
+--------------------+------------+
|f72e0ef0-7c4a-430...|     1565529|
|780cc101-005c-4fc...|     1116002|
|0bcf2be1-3168-4e4...|     1015496|
|c98949ae-60a8-43d...|      824932|
|2323b76a-db98-4e0...|      692342|
+--------------------+------------+
only showing top 5 rows



import org.apache.spark.sql.functions._
most_played_playlist: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [playlist_id: string, played_times: bigint]


In [48]:
val most_played_map = joinedDF
  .groupBy("`mapid`")
  .agg(count("*").alias("played_times"))
  .orderBy(desc("played_times"))
  
  most_played_map.show(5)

+--------------------+------------+
|               mapid|played_times|
+--------------------+------------+
|c74c9d0f-f206-11e...|     1445545|
|c7edbf0f-f206-11e...|     1435048|
|c7805740-f206-11e...|      953278|
|cdb934b0-f206-11e...|      396305|
|cb914b9e-f206-11e...|      309045|
+--------------------+------------+
only showing top 5 rows



most_played_map: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [mapid: string, played_times: bigint]


In [98]:
val killingSpreeDF = spark.table("bootcamp.medal_matches_players_bucketed")
  .join(spark.table("bootcamp.medals_bucketed"), Seq("medal_id"))
  .filter(col("classification") === "KillingSpree")

val spreeByMap = killingSpreeDF
  .join(spark.table("bootcamp.matches_bucketed"), Seq("match_id"))
  .groupBy("mapid")
  .agg(sum("count").alias("total_killing_sprees"))
  .orderBy(desc("total_killing_sprees"))

val spreeByMapName = spreeByMap
  .join(spark.table("bootcamp.maps_bucketed"), Seq("mapid"))
  .select("`demo`.`bootcamp`.`maps_bucketed`.`name`", "total_killing_sprees")
  .orderBy(desc("total_killing_sprees"))

spreeByMapName.show(10)

+--------------+--------------------+
|          name|total_killing_sprees|
+--------------+--------------------+
|Breakout Arena|               13850|
|        Alpine|               11830|
|       Glacier|                7574|
|        Empire|                4708|
|         Truth|                4356|
|       The Rig|                4234|
|      Coliseum|                4094|
|         Plaza|                4038|
|          Eden|                3730|
|        Regret|                3692|
+--------------+--------------------+
only showing top 10 rows



killingSpreeDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [medal_id: string, match_id: string ... 13 more fields]
spreeByMap: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [mapid: string, total_killing_sprees: bigint]
spreeByMapName: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [name: string, total_killing_sprees: bigint]


In [10]:
val aggDF = joinedDF
  .groupBy("playlist_id")
  .agg(sum("player_total_kills").alias("total_kills"))

aggDF: org.apache.spark.sql.DataFrame = [playlist_id: string, total_kills: bigint]


In [11]:
// Sort by playlist_id
val sortedByPlaylist = aggDF.sortWithinPartitions("playlist_id")
sortedByPlaylist.write.mode("overwrite").parquet("/tmp/sorted_playlist")

// Sort by mapid
val sortedByMap = aggDF.sortWithinPartitions("mapid")
sortedByMap.write.mode("overwrite").parquet("/tmp/sorted_map")

// Sort by total_kills (high cardinality, for comparison)
val sortedByKills = aggDF.sortWithinPartitions("total_kills")
sortedByKills.write.mode("overwrite").parquet("/tmp/sorted_kills")

org.apache.spark.sql.catalyst.ExtendedAnalysisException:  [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `mapid` cannot be resolved. Did you mean one of the following? [`total_kills`, `demo`.`bootcamp`.`matches_bucketed`.`playlist_id`].;

In [None]:
def folderSize(path: String): Long =
  dbutils.fs.ls(path).map(_.size).sum

println(s"Playlist sort size: ${folderSize("/tmp/sorted_playlist")}")
println(s"Map sort size: ${folderSize("/tmp/sorted_map")}")
println(s"Kills sort size: ${folderSize("/tmp/sorted_kills")}")