In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast
from pyspark.sql import functions as F

In [5]:
# Create session
spark = SparkSession.builder.appName("game").getOrCreate()

# Read tables
match_details = spark.read.option("header", "true").csv("/home/iceberg/data/match_details.csv")
matches = spark.read.option("header", "true").csv("/home/iceberg/data/matches.csv")
medals_matches_players  = spark.read.option("header", "true").csv("/home/iceberg/data/medals_matches_players.csv")
medals  = spark.read.option("header", "true").csv("/home/iceberg/data/medals.csv")
maps  = spark.read.option("header", "true").csv("/home/iceberg/data/maps.csv")

In [6]:
# Disable auto broadcast join
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

In [7]:
# Find which dataset is small/large
maps.count(), medals.count()

(40, 183)

In [8]:
# Explicity broadcast join
medals_and_maps = medals.crossJoin(broadcast(maps))

In [9]:
# Write bucketed tables
match_details.write \
    .bucketBy(16, "match_id") \
    .format("parquet") \
    .mode("overwrite") \
    .saveAsTable("bucketed_match_details.parquet")

matches.write \
    .bucketBy(16, "match_id") \
    .format("parquet") \
    .mode("overwrite") \
    .saveAsTable("bucketed_matches.parquet")

medals_matches_players.write \
    .bucketBy(16, "match_id") \
    .format("parquet") \
    .mode("overwrite") \
    .saveAsTable("bucketed_medal_matches_players.parquet")

25/02/12 20:25:20 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

In [10]:
# Read bucketed tables
bucketed_match_details = spark.read.format("iceberg").load("bucketed_match_details.parquet")
bucketed_matches = spark.read.format("iceberg").load("bucketed_matches.parquet")
bucketed_medal_matches_players = spark.read.format("iceberg").load("bucketed_medal_matches_players.parquet")

In [11]:
# Bucket join
spark.conf.set("spark.sql.shuffle.partitions", 16)

bucketed_match_details = bucketed_match_details.withColumnRenamed("player_gamertag", "player_gamertag_md")
aggregate = bucketed_match_details.join(bucketed_matches, "match_id").join(bucketed_medal_matches_players, "match_id")

In [12]:
# Check unique values in each column
unique_counts = {col_name: aggregate.select(col_name).distinct().count() for col_name in aggregate.columns}
for col_name, count in unique_counts.items():
    print(f"{col_name}: {count}")

match_id: 18942
player_gamertag_md: 68835
previous_spartan_rank: 151
spartan_rank: 151
previous_total_xp: 146774
total_xp: 147084
previous_csr_tier: 7
previous_csr_designation: 8
previous_csr: 2332
previous_csr_percent_to_next_tier: 51
previous_csr_rank: 1150
current_csr_tier: 7
current_csr_designation: 8
current_csr: 2344
current_csr_percent_to_next_tier: 51
current_csr_rank: 1963
player_rank_on_team: 25
player_finished: 2
player_average_life: 150003
player_total_kills: 80
player_total_headshots: 41
player_total_weapon_damage: 127373
player_total_shots_landed: 542
player_total_melee_kills: 30
player_total_melee_damage: 24893
player_total_assassinations: 10
player_total_ground_pound_kills: 8
player_total_shoulder_bash_kills: 10
player_total_grenade_damage: 87338
player_total_power_weapon_damage: 35538
player_total_power_weapon_grabs: 9
player_total_deaths: 72
player_total_assists: 32
player_total_grenade_kills: 12
did_win: 2
team_id: 9
mapid: 16
is_team_game: 3
playlist_id: 23
game_var

In [None]:
aggregate.write.mode("overwrite").format("parquet").saveAsTable("aggregate.parquet")

In [31]:
sorted_by_mapid = aggregate.repartition(16, "mapid").sortWithinPartitions("mapid")
sorted_by_mapid.write.mode("overwrite").format("parquet").saveAsTable("sorted_by_mapid.parquet")

                                                                                

In [32]:
sorted_by_playlist_id = aggregate.repartition(23, "playlist_id").sortWithinPartitions("playlist_id")
sorted_by_playlist_id.write.mode("overwrite").format("parquet").saveAsTable("sorted_by_playlist_id.parquet")

                                                                                

In [30]:
# aggregate = 19.7MB | sorted_by_map_id = 36.2MB | sorted_by_playlist_id = 35.8MB