In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.appName('BroadcastJoin').getOrCreate()
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")


medals_df = spark.read.option("header", "true").option("inferSchema", "true").csv("/home/iceberg/data/medals.csv")
                                                       
maps_df = spark.read.option('header', 'true').option("inferSchema", "true").csv('/home/iceberg/data/maps.csv')

matches_df = spark.read.option('header', 'true').option("inferSchema", "true").csv("/home/iceberg/data/matches.csv")

matchDetails_df =  spark.read.option("header", "true").option("inferSchema", "true").csv("/home/iceberg/data/match_details.csv")

medal_match_df =  spark.read.option("header", "true").option("inferSchema", "true").csv("/home/iceberg/data/medals_matches_players.csv")

# Creating matches bucketed table
matches_bucketed_ddl = """
CREATE TABLE IF NOT EXISTS bootcamp.matches_bucketed (
    match_id STRING,
    mapid STRING,
    playlist_id STRING,
    is_match_over BOOLEAN,
    is_team_game BOOLEAN,
    completion_date TIMESTAMP
)
USING iceberg
PARTITIONED BY (completion_date, bucket(16, match_id))
"""

# Execute the DDL statement
spark.sql("""DROP TABLE IF EXISTS bootcamp.matches_bucketed""")
spark.sql(matches_bucketed_ddl)

matches_df.select(
     F.col("match_id"), F.col("mapid"),F.col("playlist_id"),\
       F.col("is_match_over"), F.col("is_team_game"),  F.col("completion_date")
    ).writeTo("bootcamp.matches_bucketed") \
 .option("write.distribution-mode", "hash") \
 .createOrReplace()

spark.sql("select * from bootcamp.matches_bucketed ")


24/12/16 16:05:58 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                

root
 |-- match_id: string (nullable = true)
 |-- player_gamertag: string (nullable = true)
 |-- medal_id: long (nullable = true)
 |-- count: integer (nullable = true)



                                                                                

In [34]:
maps_df.printSchema()
maps_df.show()

root
 |-- mapid: string (nullable = true)
 |-- name: string (nullable = true)
 |-- description: string (nullable = true)

+--------------------+-------------------+--------------------+
|               mapid|               name|         description|
+--------------------+-------------------+--------------------+
|c93d708f-f206-11e...|              Urban|Andesia was the c...|
|cb251c51-f206-11e...|     Raid on Apex 7|This unbroken rin...|
|c854e54f-f206-11e...|March on Stormbreak|                NULL|
|c8d69870-f206-11e...| Escape from A.R.C.|Scientists flocke...|
|73ed1fd0-45e5-4bb...|             Osiris|                NULL|
|96c3e3dd-7703-408...|          Blue Team|                NULL|
|1c4f8e19-b046-4f7...|            Glassed|                NULL|
|825065cf-df57-42e...|        Unconfirmed|                NULL|
|9a188f67-1664-4d7...|           Alliance|                NULL|
|2702ea83-2c3e-4fd...|   Before the Storm|                NULL|
|82f8471c-a2ef-408...|            Genesis|    

In [20]:
medals_df.printSchema()
medals_df.show()

root
 |-- medal_id: long (nullable = true)
 |-- sprite_uri: string (nullable = true)
 |-- sprite_left: integer (nullable = true)
 |-- sprite_top: integer (nullable = true)
 |-- sprite_sheet_width: integer (nullable = true)
 |-- sprite_sheet_height: integer (nullable = true)
 |-- sprite_width: integer (nullable = true)
 |-- sprite_height: integer (nullable = true)
 |-- classification: string (nullable = true)
 |-- description: string (nullable = true)
 |-- name: string (nullable = true)
 |-- difficulty: integer (nullable = true)

+----------+--------------------+-----------+----------+------------------+-------------------+------------+-------------+-----------------+--------------------+--------------+----------+
|  medal_id|          sprite_uri|sprite_left|sprite_top|sprite_sheet_width|sprite_sheet_height|sprite_width|sprite_height|   classification|         description|          name|difficulty|
+----------+--------------------+-----------+----------+------------------+--------------

In [16]:
matches_df.printSchema()

root
 |-- match_id: string (nullable = true)
 |-- mapid: string (nullable = true)
 |-- is_team_game: boolean (nullable = true)
 |-- playlist_id: string (nullable = true)
 |-- game_variant_id: string (nullable = true)
 |-- is_match_over: boolean (nullable = true)
 |-- completion_date: timestamp (nullable = true)
 |-- match_duration: string (nullable = true)
 |-- game_mode: string (nullable = true)
 |-- map_variant_id: string (nullable = true)



In [19]:
medal_match_df.printSchema()
medal_match_df.show()

root
 |-- match_id: string (nullable = true)
 |-- player_gamertag: string (nullable = true)
 |-- medal_id: long (nullable = true)
 |-- count: integer (nullable = true)

+--------------------+---------------+----------+-----+
|            match_id|player_gamertag|  medal_id|count|
+--------------------+---------------+----------+-----+
|009fdac5-e15c-47c...|       EcZachly|3261908037|    7|
|009fdac5-e15c-47c...|       EcZachly| 824733727|    2|
|009fdac5-e15c-47c...|       EcZachly|2078758684|    2|
|009fdac5-e15c-47c...|       EcZachly|2782465081|    2|
|9169d1a3-955c-4ea...|       EcZachly|3001183151|    1|
|9169d1a3-955c-4ea...|       EcZachly|3565443938|    6|
|9169d1a3-955c-4ea...|       EcZachly|3491849182|    1|
|4a078b2f-65eb-4c6...|       EcZachly|3261908037|    8|
|9169d1a3-955c-4ea...|       EcZachly|2105198095|    6|
|9169d1a3-955c-4ea...|       EcZachly|2916014239|    3|
|9169d1a3-955c-4ea...|       EcZachly|3261908037|    6|
|9169d1a3-955c-4ea...|       EcZachly|135138158

In [65]:
matchDetails_df.show()
matchDetails_df.printSchema()

+--------------------+---------------+---------------------+------------+-----------------+--------+-----------------+------------------------+------------+---------------------------------+-----------------+----------------+-----------------------+-----------+--------------------------------+----------------+-------------------+---------------+-------------------+------------------+----------------------+--------------------------+-------------------------+------------------------+-------------------------+---------------------------+-------------------------------+--------------------------------+---------------------------+--------------------------------+-------------------------------+-------------------+--------------------+--------------------------+-------+-------+
|            match_id|player_gamertag|previous_spartan_rank|spartan_rank|previous_total_xp|total_xp|previous_csr_tier|previous_csr_designation|previous_csr|previous_csr_percent_to_next_tier|previous_csr_rank|current_

In [3]:
# Creating matches_details bucketed table

match_details_bucketed_DDL = """
 CREATE TABLE IF NOT EXISTS bootcamp.match_details_bucketed (
     match_id STRING,
     player_gamertag STRING,
     player_total_kills INTEGER,
     player_total_deaths INTEGER
 )
 USING iceberg
 PARTITIONED BY (bucket(16, match_id));
 """
spark.sql("""DROP TABLE IF EXISTS bootcamp.match_details_bucketed""")
spark.sql(match_details_bucketed_DDL)

matchDetails_df.select(
     F.col("match_id"), F.col("player_gamertag"),F.col("player_total_kills"),\
       F.col("player_total_deaths")
    ).writeTo("bootcamp.match_details_bucketed") \
 .option("write.distribution-mode", "hash") \
 .createOrReplace()



In [None]:
%%sql 
SELECT
  *
FROM
  bootcamp.match_details_bucketed;

In [5]:
medals_matches_players_bucketed_ddl = """
 CREATE TABLE IF NOT EXISTS bootcamp.medals_matches_players_bucketed (
     match_id STRING,
     player_gamertag STRING,
     medal_id LONG,
     count INTEGER
 )
 USING iceberg
 PARTITIONED BY (bucket(16, match_id));
 """

spark.sql("""DROP TABLE IF EXISTS bootcamp.medals_matches_players_bucketed""")
spark.sql(medals_matches_players_bucketed_ddl)
medal_match_df.select(
     F.col("match_id"), F.col("player_gamertag"),F.col("medal_id"),\
       F.col("count")
    ).writeTo("bootcamp.medals_matches_players_bucketed") \
 .option("write.distribution-mode", "hash") \
 .createOrReplace()



In [67]:
matches_df = spark.table("bootcamp.matches_bucketed")
match_details_df = spark.table("bootcamp.match_details_bucketed")
medals_matches_df = spark.table("bootcamp.medals_matches_players_bucketed")

query = "select mb.match_id, mb.mapid, mb.playlist_id, mb.completion_date, mdb.player_gamertag, mmpd.medal_id, mmpd.count \
    from bootcamp.matches_bucketed mb join bootcamp.match_details_bucketed mdb \
    on mb.match_id = mdb.match_id\
    join bootcamp.medals_matches_players_bucketed mmpd on mb.match_id = mmpd.match_id"
result_df = spark.sql(query)
result_df.show()

+--------------------+--------------------+--------------------+-------------------+---------------+----------+-----+
|            match_id|               mapid|         playlist_id|    completion_date|player_gamertag|  medal_id|count|
+--------------------+--------------------+--------------------+-------------------+---------------+----------+-----+
|00169217-cca6-4b4...|cc040aa1-f206-11e...|2323b76a-db98-4e0...|2016-03-13 00:00:00|  King Terror V|3261908037|   11|
|00169217-cca6-4b4...|cc040aa1-f206-11e...|2323b76a-db98-4e0...|2016-03-13 00:00:00|  King Terror V|3001183151|    1|
|00169217-cca6-4b4...|cc040aa1-f206-11e...|2323b76a-db98-4e0...|2016-03-13 00:00:00|  King Terror V| 824733727|    3|
|00169217-cca6-4b4...|cc040aa1-f206-11e...|2323b76a-db98-4e0...|2016-03-13 00:00:00|  King Terror V|2078758684|    3|
|00169217-cca6-4b4...|cc040aa1-f206-11e...|2323b76a-db98-4e0...|2016-03-13 00:00:00|  King Terror V|2430242797|    1|
|00169217-cca6-4b4...|cc040aa1-f206-11e...|2323b76a-db98

In [66]:
# Which player averages the most kills per game?
query = """
SELECT 
    player_gamertag,
    SUM(player_total_kills) AS total_kills,
    COUNT(DISTINCT match_id) AS total_games,
    SUM(player_total_kills) / COUNT(DISTINCT match_id) AS avg_kills_per_game
FROM bootcamp.match_details_bucketed
GROUP BY player_gamertag
ORDER BY avg_kills_per_game DESC
LIMIT 1
"""

# Execute the query
most_kills_player = spark.sql(query)

# Show the result
most_kills_player.show()


+---------------+-----------+-----------+------------------+
|player_gamertag|total_kills|total_games|avg_kills_per_game|
+---------------+-----------+-----------+------------------+
|   gimpinator14|        109|          1|             109.0|
+---------------+-----------+-----------+------------------+



In [64]:
# Reading the bucketed tables
matches_bucketed_df = spark.table("bootcamp.matches_bucketed")
medals_matches_bucketed_df = spark.table("bootcamp.medals_matches_players_bucketed")

# Explicit bucket join on match_id
bucket_join_df = matches_bucketed_df.join(
    medals_matches_bucketed_df,
    on="match_id",  # Join on bucketed column
    how="inner"
)

# Example aggregation to show the result
result_df = bucket_join_df.groupBy("playlist_id").count().orderBy(F.col("count").desc())

# Show the result of the bucket join
result_df.show()

+--------------------+------+
|         playlist_id| count|
+--------------------+------+
|f72e0ef0-7c4a-430...|202710|
|c98949ae-60a8-43d...|107654|
|2323b76a-db98-4e0...| 92282|
|892189e9-d712-4bd...| 87012|
|0bcf2be1-3168-4e4...| 67149|
|780cc101-005c-4fc...| 66081|
|d0766624-dbd7-453...| 23969|
|f27a65eb-2d11-496...| 22032|
|355dc154-9809-4ed...| 17194|
|bc0f8ad6-31e6-4a1...| 11162|
|7b7e892c-d9b7-4b0...| 10685|
|7385b4a1-86bf-4ae...|  9718|
|5728f612-3f20-445...|  8991|
|819eb188-1a1c-48b...|  5926|
|f0c9ef9a-48bd-4b2...|  4849|
|4b12472e-2a06-423...|  3930|
|b5d5a242-ffa5-4d8...|  3778|
|d21c8381-26f1-4d6...|  3617|
|0504ca3c-de41-48f...|  2254|
|88b7de19-113c-4be...|  2116|
+--------------------+------+
only showing top 20 rows



In [7]:
# Which playlist gets played the most
query = """
SELECT 
    playlist_id,
    COUNT(match_id) AS total_matches
FROM bootcamp.matches_bucketed
GROUP BY playlist_id
ORDER BY total_matches DESC
LIMIT 1
"""

# Execute the query
most_played_playlist = spark.sql(query)

# Show the result
most_played_playlist.show()


+--------------------+-------------+
|         playlist_id|total_matches|
+--------------------+-------------+
|f72e0ef0-7c4a-430...|         9350|
+--------------------+-------------+



In [13]:
#Which map gets played the most?
# Broadcast join - maps being broadcasted

from pyspark.sql.functions import broadcast
matches_bucketed_df = spark.table("bootcamp.matches_bucketed")

# Perform a broadcast join between matches and maps
result_df = matches_bucketed_df.join(
    broadcast(maps_df.alias("maps")),
    matches_bucketed_df.mapid == maps_df.mapid, 
    "inner"
)

# Aggregate to find the most played map
most_played_map = result_df.groupBy("maps.mapid").count().orderBy("count", ascending=False).limit(1)

# Show the result
most_played_map.show()

+--------------------+-----+
|               mapid|count|
+--------------------+-----+
|c7edbf0f-f206-11e...| 8587|
+--------------------+-----+



In [40]:
#Which map do players get the most Killing Spree medals on?

medals_matches_bucketed_df = spark.table("bootcamp.medals_matches_players_bucketed")

killing_spree_medals_df = medals_df.filter(F.col("classification") == "KillingSpree")


# Perform a broadcast join between medals_matches and medals
killing_spree_matches_df = medals_matches_bucketed_df.join(
    broadcast(killing_spree_medals_df.alias("medals")),
    medals_matches_bucketed_df.medal_id == medals_df.medal_id, 
    "inner"
)

# join with matches_df to get the map_id
killing_spree_map_df = killing_spree_matches_df.join(
    matches_df,
    on="match_id",
    how="inner"
)

# join with maps to get map_name
killing_spree_map_df = killing_spree_map_df.join(
    broadcast(maps_df.alias("maps")),
    killing_spree_map_df.mapid == maps_df.mapid,
    "inner")

killing_spree_count_df = killing_spree_map_df.groupBy("maps.mapid", "maps.name") \
    .count() \
    .orderBy(F.col("count").desc())

killing_spree_count_df.show(1)


+--------------------+--------------+-----+
|               mapid|          name|count|
+--------------------+--------------+-----+
|c7edbf0f-f206-11e...|Breakout Arena| 6740|
+--------------------+--------------+-----+
only showing top 1 row



In [None]:
%%sql 
SELECT
  *
FROM
  bootcamp.medals_matches_players_bucketed.files;

In [49]:
%%sql
create table if not exists bootcamp.result_unsorted(
    match_id STRING,
    mapid STRING,
    playlist_id STRING,
    completion_date TIMESTAMP,
    player_gamertag STRING,
    medal_id STRING,
    count INTEGER
)
USING iceberg
PARTITIONED BY (year(completion_date));

In [50]:
%%sql
create table if not exists bootcamp.result_sorted(
    match_id STRING,
    mapid STRING,
    playlist_id STRING,
    completion_date TIMESTAMP,
    player_gamertag STRING,
    medal_id STRING,
    count INTEGER
)
USING iceberg
PARTITIONED BY (year(completion_date));

In [62]:
# joining matches, match_details and medals_matches_players
# applying sortpartition with this


matches_df = spark.table("bootcamp.matches_bucketed")
match_details_df = spark.table("bootcamp.match_details_bucketed")
medals_matches_df = spark.table("bootcamp.medals_matches_players_bucketed")

query = "select mb.match_id, mb.mapid, mb.playlist_id, mb.completion_date, mdb.player_gamertag, mmpd.medal_id, mmpd.count \
    from bootcamp.matches_bucketed mb join bootcamp.match_details_bucketed mdb \
    on mb.match_id = mdb.match_id\
    join bootcamp.medals_matches_players_bucketed mmpd on mb.match_id = mmpd.match_id"
result_df = spark.sql(query)
#result_df.show()

start_df = result_df.repartition(4, F.col("completion_date"))
sort_df = start_df.sortWithinPartitions(F.col("playlist_id"))

start_df.write.mode("overwrite").saveAsTable("bootcamp.result_unsorted")
sort_df.write.mode("overwrite").saveAsTable("bootcamp.result_sorted")

                                                                                

In [63]:
%%sql

SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'unsorted' 
FROM demo.bootcamp.result_unsorted.files

UNION ALL
SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'sorted' 
FROM demo.bootcamp.result_sorted.files

size,num_files,unsorted
4312755,4,unsorted
9243655,4,sorted


In [24]:
# Bucket join `match_details`, `matches`, and `medal_matches_players` on `match_id` with `16` buckets 

spark.sql("""
    SELECT * FROM bootcamp.match_details_bucketed mdb JOIN bootcamp.matches_bucketed md 
    ON mdb.match_id = md.match_id   
""").explain()


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [match_id#1099], [match_id#1103], Inner
   :- Sort [match_id#1099 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(match_id#1099, 200), ENSURE_REQUIREMENTS, [plan_id=535]
   :     +- Filter isnotnull(match_id#1099)
   :        +- BatchScan demo.bootcamp.match_details_bucketed[match_id#1099, player_gamertag#1100, player_total_kills#1101, player_total_deaths#1102] demo.bootcamp.match_details_bucketed (branch=null) [filters=match_id IS NOT NULL, groupedBy=] RuntimeFilters: []
   +- Sort [match_id#1103 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(match_id#1103, 200), ENSURE_REQUIREMENTS, [plan_id=536]
         +- Filter isnotnull(match_id#1103)
            +- BatchScan demo.bootcamp.matches_bucketed[match_id#1103, mapid#1104, playlist_id#1105, is_match_over#1106, is_team_game#1107, completion_date#1108] demo.bootcamp.matches_bucketed (branch=null) [filters=match_id IS NOT NULL, groupedBy=

In [25]:
spark.sql("""
    SELECT * FROM bootcamp.match_details_bucketed mdb JOIN bootcamp.matches_bucketed md 
    ON mdb.match_id = md.match_id
    join bootcamp.medals_matches_players_bucketed mmpd
    on mdb.match_id = mmpd.match_id
""").explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [match_id#1139], [match_id#1149], Inner
   :- SortMergeJoin [match_id#1139], [match_id#1143], Inner
   :  :- Sort [match_id#1139 ASC NULLS FIRST], false, 0
   :  :  +- Exchange hashpartitioning(match_id#1139, 200), ENSURE_REQUIREMENTS, [plan_id=575]
   :  :     +- Filter isnotnull(match_id#1139)
   :  :        +- BatchScan demo.bootcamp.match_details_bucketed[match_id#1139, player_gamertag#1140, player_total_kills#1141, player_total_deaths#1142] demo.bootcamp.match_details_bucketed (branch=null) [filters=match_id IS NOT NULL, groupedBy=] RuntimeFilters: []
   :  +- Sort [match_id#1143 ASC NULLS FIRST], false, 0
   :     +- Exchange hashpartitioning(match_id#1143, 200), ENSURE_REQUIREMENTS, [plan_id=576]
   :        +- Filter isnotnull(match_id#1143)
   :           +- BatchScan demo.bootcamp.matches_bucketed[match_id#1143, mapid#1144, playlist_id#1145, is_match_over#1146, is_team_game#1147, completion_date#1148] de