In [1]:
from pyspark.sql.functions import year, expr, broadcast, split, lit, col, split, avg, sum, count, when, round, max
spark = SparkSession.builder.appName("Jupyter").getOrCreate()
spark


23/12/22 21:06:56 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


#### Query 1 - disable the default behavior of broadcast joins

In [28]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

### Reading Data from CSV

In [30]:
matchesBucketed = spark.read.option("header", "true") \
                        .option("inferSchema", "true") \
                        .csv("/home/iceberg/data/matches.csv").withColumn("completion_year", year(expr("DATE_TRUNC('year', completion_date)")))
matchDetailsBucketed =  spark.read.option("header", "true") \
                        .option("inferSchema", "true") \
                        .csv("/home/iceberg/data/match_details.csv")

medalsMatchesPlayersBucketed = spark.read.option("header", "true") \
                        .option("inferSchema", "true") \
                        .csv("/home/iceberg/data/medals_matches_players.csv")
maps = spark.read.option("header", "true") \
                        .option("inferSchema", "true") \
                        .csv("/home/iceberg/data/maps.csv")
medals = spark.read.option("header", "true") \
                        .option("inferSchema", "true") \
                        .csv("/home/iceberg/data/medals.csv")

                                                                                

### Creating bucketed Tables
##### Partitioned by completion year instead of completion date for matches as that help me solve the Java OOM error.

In [31]:
%%sql
CREATE DATABASE IF NOT EXISTS bootcamp

In [32]:

bucketedDDL = """
 CREATE TABLE IF NOT EXISTS bootcamp.matches_bucketed (
    match_id STRING,
     is_team_game BOOLEAN,
     playlist_id STRING,
     mapid STRING,
     completion_date TIMESTAMP,
     completion_year INTEGER
 )
 USING iceberg
 PARTITIONED BY (completion_year, bucket(16, match_id));
 """
spark.sql(bucketedDDL)

bucketedDetailsDDL = """
 CREATE TABLE IF NOT EXISTS bootcamp.match_details_bucketed (
     match_id STRING,
     player_gamertag STRING,
     player_total_kills INTEGER,
     player_total_deaths INTEGER
 )
 USING iceberg
 PARTITIONED BY (bucket(16, match_id));
 """
spark.sql(bucketedDetailsDDL)

bucketedMedalMatchesDDL = """
 CREATE TABLE IF NOT EXISTS bootcamp.medals_matches_players_bucketed (
     match_id STRING,
     player_gamertag STRING,
     medal_id BIGINT,
     count INTEGER
 )
 USING iceberg
 PARTITIONED BY (bucket(16, match_id));
 """
spark.sql(bucketedMedalMatchesDDL)

DataFrame[]

#### Writing data from dataframes to tables

In [35]:
matchesBucketed.select(
     col("match_id"), col("is_team_game"), col("playlist_id"), col("mapid"), col("completion_date").cast("timestamp"), col("completion_year")
     ) \
     .write.mode("overwrite")  \
     .partitionBy("completion_year") \
     .bucketBy(16, "match_id").saveAsTable("bootcamp.matches_bucketed")

matchDetailsBucketed.select(
     col("match_id"), col("player_gamertag"), col("player_total_kills"), col("player_total_deaths")
     ) \
     .write.mode("overwrite")  \
     .bucketBy(16, "match_id").saveAsTable("bootcamp.match_details_bucketed")
medalsMatchesPlayersBucketed.select(
     col("match_id"), col("player_gamertag"), col("medal_id"), col("count")
     ) \
     .write.mode("overwrite")  \
     .bucketBy(16, "match_id").saveAsTable("bootcamp.medals_matches_players_bucketed")



#### Reading data from tables

In [36]:
spark.sql("USE bootcamp")

DataFrame[]

In [37]:
matchesBucketedDF = spark.read.table("matches_bucketed")
matchDetailsBucketedDF = spark.read.table("match_details_bucketed")
medalsMatchesPlayersBucketedDF = spark.read.table("medals_matches_players_bucketed")

#### Query 2: join the medals and maps tables with an explicitly specified a broadcast join 
#### Query 3: join the match_details, matches and medal_matches_players using a bucket join on match_id with 16 buckets

#### Bucket Join

In [38]:
bucketedJoinResults = matchDetailsBucketedDF.alias("mdb") \
    .join(matchesBucketedDF.alias("mb"), col("mb.match_id") == col("mdb.match_id")) \
    .join(medalsMatchesPlayersBucketedDF.alias("mmb"), 
          (col("mb.match_id") == col("mmb.match_id")) & 
          (col("mmb.player_gamertag") == col("mdb.player_gamertag"))) \
    .select(
        col("mb.*"),
        col("mdb.player_total_deaths"),
        col("mdb.player_total_kills"),
        col("mdb.player_gamertag"),
        col("mmb.count"),
        col("mmb.medal_id")
    )


In [39]:
bucketedJoinResults.take(5)

                                                                                

[Row(match_id='0001a1c4-83dc-4f40-a97e-7910a765c96a', is_team_game=True, playlist_id='780cc101-005c-4fca-8ce7-6f36d7156ffe', mapid='c7805740-f206-11e4-982c-24be05e24f7e', completion_date=datetime.datetime(2016, 1, 6, 0, 0), completion_year=2016, player_total_deaths=28, player_total_kills=23, player_gamertag='ILLICIT 117', count=4, medal_id=3565443938),
 Row(match_id='0001a1c4-83dc-4f40-a97e-7910a765c96a', is_team_game=True, playlist_id='780cc101-005c-4fca-8ce7-6f36d7156ffe', mapid='c7805740-f206-11e4-982c-24be05e24f7e', completion_date=datetime.datetime(2016, 1, 6, 0, 0), completion_year=2016, player_total_deaths=28, player_total_kills=23, player_gamertag='ILLICIT 117', count=8, medal_id=3261908037),
 Row(match_id='0001a1c4-83dc-4f40-a97e-7910a765c96a', is_team_game=True, playlist_id='780cc101-005c-4fca-8ce7-6f36d7156ffe', mapid='c7805740-f206-11e4-982c-24be05e24f7e', completion_date=datetime.datetime(2016, 1, 6, 0, 0), completion_year=2016, player_total_deaths=28, player_total_kills=2

#### Explicit Broadcast Join

In [40]:
explicitBroadcast = bucketedJoinResults.alias("bk") \
    .join(broadcast(medals).alias("md"), col("bk.medal_id") == col("md.medal_id")) \
    .join(broadcast(maps).alias("mp"), col("bk.mapid") == col("mp.mapid")) \
    .select(
        col("bk.*"),
        col("md.name").alias("medal_name"),
        col("md.description").alias("medal_description"),
        col("mp.name").alias("map_name"),
        col("mp.description").alias("map_description")
    )


#### Dropping Duplicates

In [41]:
finalDF = explicitBroadcast.drop_duplicates()

In [42]:
finalDF.take(5)

                                                                                

[Row(match_id='0001a1c4-83dc-4f40-a97e-7910a765c96a', is_team_game=True, playlist_id='780cc101-005c-4fca-8ce7-6f36d7156ffe', mapid='c7805740-f206-11e4-982c-24be05e24f7e', completion_date=datetime.datetime(2016, 1, 6, 0, 0), completion_year=2016, player_total_deaths=28, player_total_kills=23, player_gamertag='ILLICIT 117', count=4, medal_id=3565443938, medal_name='Stronghold Captured', medal_description='Capture a Stronghold.', map_name='Glacier', map_description="Each of Halo's microclimates can be controlled with exacting precision."),
 Row(match_id='0001a1c4-83dc-4f40-a97e-7910a765c96a', is_team_game=True, playlist_id='780cc101-005c-4fca-8ce7-6f36d7156ffe', mapid='c7805740-f206-11e4-982c-24be05e24f7e', completion_date=datetime.datetime(2016, 1, 6, 0, 0), completion_year=2016, player_total_deaths=28, player_total_kills=23, player_gamertag='ILLICIT 117', count=8, medal_id=3261908037, medal_name='Headshot', medal_description='Kill an opponent by shooting them in the head.', map_name='Gl

#### Query 4a 
which player has the highest average kills per game? 

In [43]:

roundedAvg = finalDF.groupBy("player_gamertag") \
                    .agg(round(avg(col("player_total_kills")), 2).alias("rounded_avg_kills"))

# Find the maximum rounded average value
maxAvgValue = roundedAvg.agg({'rounded_avg_kills': 'max'}).collect()[0][0]

# Filter the DataFrame to get all records with the maximum rounded average value
recordsWithMaxAvg = roundedAvg.filter(col("rounded_avg_kills") == maxAvgValue)

# Show all records with the maximum rounded average value
recordsWithMaxAvg.show()



23/12/22 21:50:05 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/12/22 21:50:05 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/12/22 21:50:05 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/12/22 21:50:05 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/12/22 21:50:05 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/12/22 21:50:05 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/12/22 21:50:05 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/12/22 21:50:05 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/12/22 21:50:05 WARN RowBasedKeyValueBatch: Calling spill() on

+---------------+-----------------+
|player_gamertag|rounded_avg_kills|
+---------------+-----------------+
|   gimpinator14|            109.0|
+---------------+-----------------+



                                                                                

#### Query 4b 
which playlist has received the most plays?

In [None]:
# Calculate the count of plays grouped by 'playlist_id'
countPlays = finalDF.groupBy("playlist_id") \
                     .agg(count("*").alias("no_of_plays"))

# Find the maximum count of plays
maxCountValue = countPlays.agg({'no_of_plays': 'max'}).collect()[0][0]

# Filter the DataFrame to get all records with the maximum count of plays
maxCountPlaylist = countPlays.filter(col("no_of_plays") == maxCountValue)

# Show all playlist_ids with the maximum count of plays
maxCountPlaylist.show()


23/12/22 21:50:17 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/12/22 21:50:17 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/12/22 21:50:17 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.


#### Query 4c
Which map was played the most?

In [None]:
# Calculate the count of plays grouped by 'playlist_id'
countPlaysMap = finalDF.groupBy("mapid", "map_name") \
                     .agg(count("*").alias("no_of_plays_map"))

# Find the maximum count of plays
maxCountValueMap = countPlaysMap.agg({'no_of_plays_map': 'max'}).collect()[0][0]

# Filter the DataFrame to get all records with the maximum count of plays
maxCountMap = countPlaysMap.filter(col("no_of_plays_map") == maxCountValueMap)

# Show all playlist_ids with the maximum count of plays
maxCountMap.show()

#### Query 4d
Which map do players receive the highest number of Killing Spree medals?

In [None]:
# Assuming finalDF is your DataFrame
# Calculate the sum of "Killing Spree" medals grouped by 'mapid' and 'map_name'
result = finalDF.groupBy("mapid", "map_name").agg(
    sum(
        when(
            col("medal_name") == "Killing Spree", col("count")
        ).otherwise(0)
    ).alias("sumOfKillingSpree")
)

# Find the maximum sum of "Killing Spree" medals
maxSum = result.agg(max(col("sumOfKillingSpree")).alias("maxSum")).collect()[0]["maxSum"]

# Filter the DataFrame to get all rows with the maximum sum, handling ties
tiedRecordsForMax = result.filter(col("sumOfKillingSpree") == maxSum)

tiedRecordsForMax.show()

#### Query 5
Try at least 3 different versions of partitioned tables, and use .sortWithinPartitions to get the smallest footprint possible (hint: playlists and maps are both very low cardinality)

In [None]:
start_df = finalDF.repartition(4, col("completion_date"))


#### Query 5a

In [None]:
first_sort_df = start_df.sortWithinPartitions(col("completion_date"), col("mapid"), col("playlist_id"))


#### Query 5b

In [None]:
second_sort_df = start_df.sortWithinPartitions(col("completion_date"), col("playlist_id"), col("mapid"))


#### Query 5c

In [None]:
third_sort_df = start_df.sortWithinPartitions(col("completion_date"), col("medal_id"), col("mapid"))

#### Compare Footprint

In [142]:
%%sql

SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'sorted1' 
FROM demo.bootcamp.events_sorted.files

UNION ALL
SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'sorted2' 
FROM demo.bootcamp.events_sorted_second.files


UNION ALL
SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'sorted3' 
FROM demo.bootcamp.events_sorted_third.files

UNION ALL
SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'unsorted' 
FROM demo.bootcamp.events_unsorted.files


size,num_files,sorted1
4736316,4,sorted1
4737511,4,sorted2
5355285,4,sorted3
5927163,4,unsorted


#### Observation
Setting the number of partitons as 4 and sorting withing the partition with completion_date, mapid, and playlist_id has the lowest footprint