In [1]:
from pyspark.sql.functions import broadcast, split, lit
from pyspark.sql.functions import expr, col, avg, count, count_distinct
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("bootcamp").getOrCreate()

25/01/31 00:43:22 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [2]:
matches = spark.read.option("header", "true").option("inferSchema", "true").csv("/home/iceberg/data/matches.csv")
match_details = spark.read.option("header", "true").option("inferSchema", "true").csv("/home/iceberg/data/match_details.csv")
medals_matches_players = spark.read.option("header", "true").option("inferSchema", "true").csv("/home/iceberg/data/medals_matches_players.csv")
medals = spark.read.option("header", "true").option("inferSchema", "true").csv("/home/iceberg/data/medals.csv")
maps = spark.read.option("header", "true").option("inferSchema", "true").csv("/home/iceberg/data/maps.csv")

                                                                                

In [3]:
matches_bucketed_DDL = """
CREATE TABLE IF NOT EXISTS bootcamp.matches_bucketed (
     match_id STRING,
     mapid STRING,
     is_team_game BOOLEAN,
     playlist_id STRING,
     completion_date TIMESTAMP
 )
 USING iceberg
 PARTITIONED BY (bucket(16, match_id));
"""

match_details_bucketed_DDL = """
CREATE TABLE IF NOT EXISTS bootcamp.match_details_bucketed (
     match_id STRING,
     player_gamertag STRING,
     player_total_kills INTEGER,
     player_total_deaths INTEGER
 )
 USING iceberg
 PARTITIONED BY (bucket(16, match_id));
"""

medals_matches_players_bucketed_DDL = """
CREATE TABLE IF NOT EXISTS bootcamp.medals_matches_players_bucketed (
    match_id STRING,
    player_gamertag STRING,
    medal_id BIGINT,
    count INTEGER
 )
 USING iceberg
 PARtITIONED BY (bucket(16, match_id));
"""

In [4]:
spark.sql("""DROP TABLE IF EXISTS bootcamp.matches_bucketed""")
spark.sql(matches_bucketed_DDL)

spark.sql("""DROP TABLE IF EXISTS bootcamp.match_details_bucketed""")
spark.sql(match_details_bucketed_DDL)

spark.sql("""DROP TABLE IF EXISTS bootcamp.medals_matches_players_bucketed""")
spark.sql(medals_matches_players_bucketed_DDL)

DataFrame[]

In [5]:
matches.select(col("match_id"), col("mapid"), col("is_team_game"), col("playlist_id"), col("completion_date")) \
    .write.mode("append") \
    .bucketBy(16, "match_id").saveAsTable("bootcamp.matches_bucketed")

                                                                                

In [6]:
match_details.select(col("match_id"), col("player_gamertag"), col("player_total_kills"), col("player_total_deaths")) \
    .write.mode("append") \
    .bucketBy(16, "match_id").saveAsTable("bootcamp.match_details_bucketed")

                                                                                

In [7]:
medals_matches_players.select(col("match_id"), col("player_gamertag"), col("medal_id"), col("count")) \
    .write.mode("append") \
    .bucketBy(16, "match_id").saveAsTable("bootcamp.medals_matches_players_bucketed")

                                                                                

In [8]:
matches_bucketed = spark.read.table("bootcamp.matches_bucketed")
match_details_bucketed = spark.read.table("bootcamp.match_details_bucketed")
medals_matches_players_bucketed = spark.read.table("bootcamp.medals_matches_players_bucketed")

bucketed_join_df = matches_bucketed \
    .join(match_details_bucketed, "match_id", "left") \
    .join(medals_matches_players_bucketed, ["match_id", "player_gamertag"], "left")

In [9]:
bucketed_join_df.take(5)

                                                                                

[Row(match_id='467baba8-a360-459c-b508-c034d141acb9', player_gamertag='Stormbane321', mapid='cebd854f-f206-11e4-b46e-24be05e24f7e', is_team_game=None, playlist_id='892189e9-d712-4bdb-afa7-1ccab43fbed4', completion_date=datetime.datetime(2016, 1, 27, 0, 0), player_total_kills=16, player_total_deaths=5, medal_id=3001183151, count=1),
 Row(match_id='467baba8-a360-459c-b508-c034d141acb9', player_gamertag='Stormbane321', mapid='cebd854f-f206-11e4-b46e-24be05e24f7e', is_team_game=None, playlist_id='892189e9-d712-4bdb-afa7-1ccab43fbed4', completion_date=datetime.datetime(2016, 1, 27, 0, 0), player_total_kills=16, player_total_deaths=5, medal_id=3261908037, count=11),
 Row(match_id='467baba8-a360-459c-b508-c034d141acb9', player_gamertag='Stormbane321', mapid='cebd854f-f206-11e4-b46e-24be05e24f7e', is_team_game=None, playlist_id='892189e9-d712-4bdb-afa7-1ccab43fbed4', completion_date=datetime.datetime(2016, 1, 27, 0, 0), player_total_kills=16, player_total_deaths=5, medal_id=3653057799, count=1

In [10]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

In [11]:
final_df = bucketed_join_df \
    .join(broadcast(medals.select(col("medal_id"), col("sprite_uri"), col("classification"))), "medal_id", "inner") \
    .join(broadcast(maps.select(col("mapid"), col("name"))), "mapid", "inner")

In [12]:
player_avg_kills = final_df.groupBy("player_gamertag") \
    .agg(avg("player_total_kills").alias("avg_kills")) \
    .orderBy(col("avg_kills").desc())

playlist_count = final_df.groupBy("playlist_id") \
    .agg(count_distinct("match_id").alias("count")) \
    .orderBy(col("count").desc())

map_count = final_df.groupBy("mapid") \
    .agg(count_distinct("match_id").alias("count")) \
    .orderBy(col("count").desc())

killing_spree_count = final_df.filter(col("classification") == "KillingSpree") \
    .groupBy("mapid") \
    .agg(count("player_gamertag").alias("count")) \
    .orderBy(col("count").desc())

print(player_avg_kills.show(1))
print(playlist_count.show(1))
print(map_count.show(1))
print(killing_spree_count.show(1))

                                                                                

+---------------+---------+
|player_gamertag|avg_kills|
+---------------+---------+
|   gimpinator14|    109.0|
+---------------+---------+
only showing top 1 row

None


                                                                                

+--------------------+-----+
|         playlist_id|count|
+--------------------+-----+
|f72e0ef0-7c4a-430...| 7640|
+--------------------+-----+
only showing top 1 row

None


                                                                                

+--------------------+-----+
|               mapid|count|
+--------------------+-----+
|c7edbf0f-f206-11e...| 7032|
+--------------------+-----+
only showing top 1 row

None


[Stage 108:>                                                        (0 + 8) / 8]

+--------------------+-----+
|               mapid|count|
+--------------------+-----+
|c7edbf0f-f206-11e...| 6734|
+--------------------+-----+
only showing top 1 row

None


                                                                                

In [13]:
final_df.write.mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("bootcamp.combined_data")

                                                                                

In [14]:
final_df.sortWithinPartitions(col("playlist_id"), col("mapid")) \
    .write.mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("bootcamp.combined_data_sorted")

                                                                                

In [15]:
query = """
SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'unsorted' 
FROM demo.bootcamp.combined_data.files

UNION ALL

SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'sorted' 
FROM demo.bootcamp.combined_data_sorted.files
"""

spark.sql(query).show()

+-------+---------+--------+
|   size|num_files|unsorted|
+-------+---------+--------+
|6619223|        8|unsorted|
|6020927|        8|  sorted|
+-------+---------+--------+

