In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr, col
spark = SparkSession.builder.appName("Jupyter").getOrCreate()

spark
# Disable automatic broadcast joins
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")


match_details = spark.read.csv("/home/iceberg/data/match_details.csv", header=True, inferSchema=True)
matches = spark.read.csv("/home/iceberg/data/matches.csv", header=True, inferSchema=True)
medals_matches_players = spark.read.csv("/home/iceberg/data/medals_matches_players.csv", header=True, inferSchema=True)
medals = spark.read.csv("/home/iceberg/data/medals.csv", header=True, inferSchema=True)
maps = spark.read.csv("/home/iceberg/data/maps.csv", header=True, inferSchema=True)

maps.show()

In [45]:
from pyspark.sql.functions import broadcast

# Broadcast join

matches_maps_df = matches.join(broadcast(maps),"mapid")

matches_maps_df.show()

medals_matches_players_df = medals_matches_players.join(broadcast(medals),"medal_id")

matches_maps_df.show()

+--------------------+--------------------+------------+--------------------+--------------------+-------------+-------------------+--------------+---------+--------------------+--------------+--------------------+
|               mapid|            match_id|is_team_game|         playlist_id|     game_variant_id|is_match_over|    completion_date|match_duration|game_mode|      map_variant_id|          name|         description|
+--------------------+--------------------+------------+--------------------+--------------------+-------------+-------------------+--------------+---------+--------------------+--------------+--------------------+
|c7edbf0f-f206-11e...|11de1a94-8d07-416...|        true|f72e0ef0-7c4a-430...|1e473914-46e4-408...|         true|2016-02-22 00:00:00|          NULL|     NULL|                NULL|Breakout Arena|The broadcast of ...|
|cb914b9e-f206-11e...|d3643e71-3e51-43e...|       false|d0766624-dbd7-453...|257a305e-4dd3-41f...|         true|2016-02-14 00:00:00|        

In [46]:
# Bucketed join on match_id (16 buckets)
match_details.createOrReplaceTempView("match_details")
matches.createOrReplaceTempView("matches")
medals_matches_players.createOrReplaceTempView("medals_matches_players")

# Renaming columns in match_details to avoid ambiguity
#match_details_renamed = match_details.withColumnRenamed("player_total_kills", "match_details_player_total_kills")

# Renaming columns in medals_matches_players to avoid ambiguity
#medals_matches_players_renamed = medals_matches_players.withColumnRenamed("player_total_kills", "medals_matches_player_total_kills")

# Now perform the join with the renamed columns
bucketed_data = match_details.join(matches, ["match_id"], "inner") \
                                     .join(medals_matches_players, ["match_id","player_gamertag"], "inner") \
                                     .repartition(16, "match_id")


In [47]:

print(bucketed_data.columns)

['match_id', 'player_gamertag', 'previous_spartan_rank', 'spartan_rank', 'previous_total_xp', 'total_xp', 'previous_csr_tier', 'previous_csr_designation', 'previous_csr', 'previous_csr_percent_to_next_tier', 'previous_csr_rank', 'current_csr_tier', 'current_csr_designation', 'current_csr', 'current_csr_percent_to_next_tier', 'current_csr_rank', 'player_rank_on_team', 'player_finished', 'player_average_life', 'player_total_kills', 'player_total_headshots', 'player_total_weapon_damage', 'player_total_shots_landed', 'player_total_melee_kills', 'player_total_melee_damage', 'player_total_assassinations', 'player_total_ground_pound_kills', 'player_total_shoulder_bash_kills', 'player_total_grenade_damage', 'player_total_power_weapon_damage', 'player_total_power_weapon_grabs', 'player_total_deaths', 'player_total_assists', 'player_total_grenade_kills', 'did_win', 'team_id', 'mapid', 'is_team_game', 'playlist_id', 'game_variant_id', 'is_match_over', 'completion_date', 'match_duration', 'game_mo

In [52]:
# Aggregating data


agg_kills = bucketed_data.groupBy("match_id","player_gamertag").agg({"player_total_kills": "avg"}).withColumnRenamed("avg(player_total_kills)", "avg_kills")
agg_kills.show()


common_playlist = bucketed_data.groupBy("playlist_id").count().orderBy('count',ascending = False).take(1)

print(f"common_playlist count: {common_playlist}")

most_played_map = bucketed_data.groupBy("mapid").count().orderBy('count',ascending = False).take(1)

print(f"most_played_map count: {most_played_map}")

bucketed_data2= bucketed_data.join(broadcast(medals), 'medal_id')
killing_spree = bucketed_data2.filter(bucketed_data2.classification == "KillingSpree").groupBy("mapid").count().orderBy('count',ascending = False).take(1)
print(f"Killing spree count: {killing_spree}")
#killing_spree.show()




+--------------------+---------------+---------+
|            match_id|player_gamertag|avg_kills|
+--------------------+---------------+---------+
|00114a87-5696-421...|      Amplafied|     14.0|
|002d6369-161c-4ec...|       Attack3d|      5.0|
|002d6369-161c-4ec...|lX R3CLAIM3R Xl|      8.0|
|00fa6004-d709-410...|   HuNTy x BEaR|     12.0|
|01b59954-e15c-445...|  Chuck Chill33|      1.0|
|01b59954-e15c-445...|globalgoon78901|      1.0|
|023907c7-bf2f-4c5...|        Silhana|      9.0|
|028a0d83-c936-41f...|    ILLICIT 117|     12.0|
|02d4d2c8-7bce-46a...|     NWG StrAfe|     27.0|
|035f360e-989b-4ce...|   SpicyWings88|     14.0|
|035fe66c-70ac-4f0...|      My Regret|     10.0|
|045f959f-8df5-40d...|     REMIX WOLF|      1.0|
|046ae3fe-a44d-448...|      Dreamtage|      4.0|
|04a29ddf-df6a-40d...|      Ensecures|      5.0|
|04a29ddf-df6a-40d...| Seven6TwoX5One|      5.0|
|04a29ddf-df6a-40d...|        YohYeah|      2.0|
|04d7a744-4142-402...| landenNYgiants|      6.0|
|0558b50c-8ee5-441..

In [65]:

start_df = bucketed_data.repartition(4, "match_id")
    

first_sort_df = start_df.sortWithinPartitions("match_id")

sorted = bucketed_data.repartition(10, "match_id") \
        .sortWithinPartitions("match_id") \
# Create a temporary view of your DataFrame

start_df.write.mode("overwrite").saveAsTable(" bootcamp.matches_bucketed")
first_sort_df.write.mode("overwrite").saveAsTable("events_unsorted")

                                                                                

In [67]:
%%sql

SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'sorted' 
FROM bootcamp.matches_bucketed.files

UNION ALL
SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'unsorted' 
FROM bootcamp.matches_bucketed.files





size,num_files,sorted
17775416,4,sorted
17775416,4,unsorted


In [68]:
%%sql
SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files FROM bootcamp.matches_bucketed.files;

size,num_files
17775416,4


In [69]:
%%sql 
SELECT COUNT(1) FROM bootcamp.matches_bucketed.files

count(1)
4
