In [75]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast, col, when, sum, count, desc, min, countDistinct

In [76]:
spark = SparkSession.builder.appName("spark-fundamentals-homework").getOrCreate()

In [77]:
# Read the medals data
medals = spark.read.option("header", "true").csv("/home/iceberg/data/medals.csv")

# Read the matches data
matches = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("/home/iceberg/data/matches.csv")

# Read the match details data
match_details = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("/home/iceberg/data/match_details.csv")

# Read the medals_matches_players data
medals_matches_players = spark.read.option("header", "true").csv("/home/iceberg/data/medals_matches_players.csv")

# Read the maps data
maps = spark.read.option("header", "true").csv("/home/iceberg/data/maps.csv")

In [78]:
# Disabled automatic broadcast join
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

In [79]:
# Explicitly broadcast JOINs maps
matches_with_maps = matches.join(broadcast(maps), matches.mapid == maps.mapid)

In [80]:
# Explicitly broadcast JOINs medals
medals_matches_players_with_medals = medals_matches_players.join(broadcast(medals), medals.medal_id == medals_matches_players.medal_id)

In [81]:
matches.count()

24025

In [82]:
# DDL Matches bucket table
spark.sql("""DROP TABLE IF EXISTS homework.matches_bucketed""")
bucketedDDL = """
CREATE TABLE IF NOT EXISTS homework.matches_bucketed (
     match_id STRING,
     mapid STRING,
     is_team_game BOOLEAN,
     playlist_id STRING,
     completion_date TIMESTAMP
 )
 USING iceberg
 CLUSTERED BY (match_id) INTO 16 buckets;
"""
spark.sql(bucketedDDL)

DataFrame[]

In [84]:
# Persit matches data to table matches_bucketed
matches.select("match_id", "mapid", "is_team_game", "playlist_id", "completion_date") \
    .write.mode("append") \
    .bucketBy(16, "match_id") \
    .saveAsTable("homework.matches_bucketed")

In [85]:
# DDL - match_details bucket table
spark.sql("""DROP TABLE IF EXISTS homework.match_details_bucketed""")
match_details_bucketed_ddl = """
CREATE TABLE IF NOT EXISTS homework.match_details_bucketed (
    match_id STRING,
    player_gamertag STRING,
    team_id STRING,
    player_total_kills INTEGER,
    player_total_deaths INTEGER,
    did_win INTEGER
)
USING iceberg
CLUSTERED BY (match_id) INTO 16 buckets;
"""
spark.sql(match_details_bucketed_ddl)

DataFrame[]

In [86]:
# Persit match_details_bucketed table
match_details.select("match_id", "player_gamertag", "team_id", "player_total_kills", "player_total_deaths", "did_win") \
    .write.mode("append") \
    .bucketBy(16, "match_id") \
    .saveAsTable("homework.match_details_bucketed")

In [87]:
# DDL - medals_matches_players_bucketed table
spark.sql("""DROP TABLE IF EXISTS homework.medals_matches_players_bucketed""")
medals_matches_players_bucketed_ddl = """
CREATE TABLE IF NOT EXISTS homework.medals_matches_players_bucketed (
    match_id STRING,
    player_gamertag STRING,
    medal_id STRING,
    count STRING
)
USING iceberg
CLUSTERED BY (match_id) INTO 16 buckets;
"""
spark.sql(medals_matches_players_bucketed_ddl)

DataFrame[]

In [88]:
# Persit match_details_bucketed table
medals_matches_players.select("match_id", "player_gamertag", "medal_id", "count") \
    .write.mode("append") \
    .bucketBy(16, "match_id") \
    .saveAsTable("homework.medals_matches_players_bucketed")

In [89]:
# DDL - medals table
spark.sql("""DROP TABLE IF EXISTS homework.medals""")
medals_ddl = """
CREATE TABLE IF NOT EXISTS homework.medals (
    medal_id STRING,
    classification STRING,
    description STRING,
    name STRING
)
USING iceberg;
"""
spark.sql(medals_ddl)

DataFrame[]

In [90]:
# Persit medals table
medals.select("medal_id", "classification", "description", "name") \
    .write.mode("append") \
    .saveAsTable("homework.medals")

In [91]:
## Aggregate - Which player averages the most kills per game?

In [142]:
join_sql = """
SELECT
    m.match_id,
    m.mapid as map_id,
    m.playlist_id,
    m_details.player_gamertag,
    m_details.player_total_kills
FROM homework.matches_bucketed m
LEFT JOIN homework.match_details_bucketed m_details ON m.match_id = m_details.match_id
"""
joined_df = spark.sql(join_sql)

In [144]:
# Aggregate - Which player averages the most kills per game
match_player_kills_df = joined_df \
    .groupBy("player_gamertag") \
    .agg((sum("player_total_kills")/count("match_id")).alias("player_total_kills")) \
    .sort(desc("player_total_kills"))
match_player_kills_df.head(1)

[Row(player_gamertag='gimpinator14', player_total_kills=109.0)]

In [148]:
# Aggregate playlist gets played the most
most_playlist_df = joined_df \
    .groupBy("playlist_id") \
    .agg(countDistinct("match_id").alias("count_playlist")) \
    .sort(desc("count_playlist"))
most_playlist_df.head(1)

[Row(playlist_id='f72e0ef0-7c4a-4307-af78-8e38dac3fdba', count_playlist=9350)]

In [149]:
# Aggregate which maps gets played the most
most_played_map_df = joined_df \
    .groupBy("map_id").agg(countDistinct("match_id").alias("count_played")) \
    .sort(desc("count_played"))
most_played_map_df.head(1)

[Row(map_id='c7edbf0f-f206-11e4-aa52-24be05e24f7e', count_played=8587)]

In [135]:
# Aggregate which maps player get the most Killing Spree on
join_sql = """
SELECT
    m.match_id,
    m.mapid as map_id,
    m.playlist_id,
    m_details.player_gamertag,
    m_details.player_total_kills,
    medals.classification as medal_classification,
    medal_players.count AS medal_count
FROM homework.matches_bucketed m
JOIN homework.match_details_bucketed m_details ON m.match_id = m_details.match_id
LEFT JOIN homework.medals_matches_players_bucketed medal_players ON medal_players.match_id = m.match_id AND medal_players.player_gamertag = m_details.player_gamertag
LEFT JOIN homework.medals ON medals.medal_id = medal_players.medal_id
"""
player_performance_df = spark.sql(join_sql)
most_map_has_killing_spree_df = player_performance_df \
    .where(player_performance_df.medal_classification == "KillingSpree") \
    .groupBy("map_id") \
    .agg(sum("medal_count").alias("total_killingspree_medals")) \
    .sort(desc("total_killingspree_medals"))
most_map_has_killing_spree_df.head(1)

                                                                                

[Row(map_id='c7edbf0f-f206-11e4-aa52-24be05e24f7e', total_killingspree_medals=6919.0)]

In [138]:
## Try out sortWithinPartition

start_df = player_performance_df.repartition(9, col("playlist_id"))
first_sort_df = start_df.sortWithinPartitions(col("map_id"), col("medal_classification"))

start_df.write.mode("overwrite").saveAsTable("homework.player_performance_unsorted")
first_sort_df.write.mode("overwrite").saveAsTable("homework.player_performance_sorted")


In [139]:
%%sql

SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'sorted' 
FROM homework.player_performance_sorted.files

UNION ALL
SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'unsorted' 
FROM homework.player_performance_unsorted.files


25/01/23 08:12:22 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


size,num_files,sorted
4250030,9,sorted
4669744,9,unsorted


In [147]:
sql = """
SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'sorted' 
FROM homwwork.player_performance_sorted.files

UNION ALL
SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'unsorted' 
FROM homwwork.player_performance_unsorted.files
"""

spark.sql(sql).show()

+-------+---------+--------+
|   size|num_files|  sorted|
+-------+---------+--------+
|4250030|        9|  sorted|
|4669744|        9|unsorted|
+-------+---------+--------+

