In [156]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast, sum, countDistinct, col, avg, count, max, row_number
from pyspark.sql.window import Window

In [83]:
spark = SparkSession.builder.appName("Spark Homework").getOrCreate()
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

match_details = spark.read.option("header", "true") \
                    .option("inferSchema", "true") \
                    .csv("/home/iceberg/data/match_details.csv")
matches = spark.read.option("header", "true") \
                    .option("inferSchema", "true") \
                    .csv("/home/iceberg/data/matches.csv")
medals_matches_players = spark.read.option("header", "true") \
                    .option("inferSchema", "true") \
                    .csv("/home/iceberg/data/medals_matches_players.csv")
medals = spark.read.option("header", "true") \
                    .option("inferSchema", "true") \
                    .csv("/home/iceberg/data/medals.csv")
maps = spark.read.option("header", "true") \
                    .option("inferSchema", "true") \
                    .csv("/home/iceberg/data/maps.csv")

                                                                                

In [6]:
matches.show(5)

+--------------------+--------------------+------------+--------------------+--------------------+-------------+--------------------+--------------+---------+--------------------+
|            match_id|               mapid|is_team_game|         playlist_id|     game_variant_id|is_match_over|     completion_date|match_duration|game_mode|      map_variant_id|
+--------------------+--------------------+------------+--------------------+--------------------+-------------+--------------------+--------------+---------+--------------------+
|11de1a94-8d07-416...|c7edbf0f-f206-11e...|        true|f72e0ef0-7c4a-430...|1e473914-46e4-408...|         true|2016-02-22 00:00:...|          NULL|     NULL|                NULL|
|d3643e71-3e51-43e...|cb914b9e-f206-11e...|       false|d0766624-dbd7-453...|257a305e-4dd3-41f...|         true|2016-02-14 00:00:...|          NULL|     NULL|                NULL|
|d78d2aae-36e4-48a...|c7edbf0f-f206-11e...|        true|f72e0ef0-7c4a-430...|1e473914-46e4-408...|  

In [9]:
medals_matches_players.show(5)

+--------------------+---------------+----------+-----+
|            match_id|player_gamertag|  medal_id|count|
+--------------------+---------------+----------+-----+
|009fdac5-e15c-47c...|       EcZachly|3261908037|    7|
|009fdac5-e15c-47c...|       EcZachly| 824733727|    2|
|009fdac5-e15c-47c...|       EcZachly|2078758684|    2|
|009fdac5-e15c-47c...|       EcZachly|2782465081|    2|
|9169d1a3-955c-4ea...|       EcZachly|3001183151|    1|
+--------------------+---------------+----------+-----+
only showing top 5 rows



In [10]:
medals.show(5)

+----------+--------------------+-----------+----------+------------------+-------------------+------------+-------------+--------------+--------------------+--------------+----------+
|  medal_id|          sprite_uri|sprite_left|sprite_top|sprite_sheet_width|sprite_sheet_height|sprite_width|sprite_height|classification|         description|          name|difficulty|
+----------+--------------------+-----------+----------+------------------+-------------------+------------+-------------+--------------+--------------------+--------------+----------+
|2315448068|                NULL|       NULL|      NULL|              NULL|               NULL|        NULL|         NULL|          NULL|                NULL|          NULL|      NULL|
|3565441934|                NULL|       NULL|      NULL|              NULL|               NULL|        NULL|         NULL|          NULL|                NULL|          NULL|      NULL|
|4162659350|https://content.h...|        750|       750|                74|

In [5]:
match_details.show(5)

25/01/16 11:04:16 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+--------------------+---------------+---------------------+------------+-----------------+--------+-----------------+------------------------+------------+---------------------------------+-----------------+----------------+-----------------------+-----------+--------------------------------+----------------+-------------------+---------------+-------------------+------------------+----------------------+--------------------------+-------------------------+------------------------+-------------------------+---------------------------+-------------------------------+--------------------------------+---------------------------+--------------------------------+-------------------------------+-------------------+--------------------+--------------------------+-------+-------+
|            match_id|player_gamertag|previous_spartan_rank|spartan_rank|previous_total_xp|total_xp|previous_csr_tier|previous_csr_designation|previous_csr|previous_csr_percent_to_next_tier|previous_csr_rank|current_

In [84]:
maps.show(5)

+--------------------+-------------------+--------------------+
|               mapid|               name|         description|
+--------------------+-------------------+--------------------+
|c93d708f-f206-11e...|              Urban|Andesia was the c...|
|cb251c51-f206-11e...|     Raid on Apex 7|This unbroken rin...|
|c854e54f-f206-11e...|March on Stormbreak|                NULL|
|c8d69870-f206-11e...| Escape from A.R.C.|Scientists flocke...|
|73ed1fd0-45e5-4bb...|             Osiris|                NULL|
+--------------------+-------------------+--------------------+
only showing top 5 rows



In [88]:
# Broadcast JOIN medals and maps
broadcasted_medals = broadcast(medals.alias("med"))
broadcasted_maps = broadcast(maps.alias("map"))

In [86]:
spark.sql("DROP TABLE bootcamp.bucketed_matches")
spark.sql("DROP TABLE bootcamp.bucketed_match_details")
spark.sql("DROP TABLE bootcamp.bucketed_medals_matches_players")

DataFrame[]

In [87]:
# Bucket match_details, matches, and medals_matches_players on match_id with 16 buckets
matches.write.bucketBy(16, "match_id").saveAsTable("bootcamp.bucketed_matches")
match_details.write.bucketBy(16, "match_id").saveAsTable("bootcamp.bucketed_match_details")
medals_matches_players.write.bucketBy(16, "match_id").saveAsTable("bootcamp.bucketed_medals_matches_players")

                                                                                

In [89]:
bucketed_matches = spark.read.table("bootcamp.bucketed_matches")
bucketed_match_details = spark.read.table("bootcamp.bucketed_match_details")
bucketed_medals_matches_players = spark.read.table("bootcamp.bucketed_medals_matches_players")

df_final = bucketed_medals_matches_players.alias("mp") \
    .join(bucketed_matches.alias("m"), col("mp.match_id") == col("m.match_id"), "left") \
    .join(bucketed_match_details.alias("md"), col("mp.match_id") == col("md.match_id"), "left") \
    .join(broadcasted_medals, col("mp.medal_id") == col("med.medal_id"), "left") \
    .join(broadcasted_maps, col("m.mapid") == col("map.mapid"), "left")

In [90]:
df_final.show(5)

+--------------------+---------------+---------+-----+--------------------+--------------------+------------+--------------------+--------------------+-------------+-------------------+--------------+---------+--------------------+--------------------+---------------+---------------------+------------+-----------------+--------+-----------------+------------------------+------------+---------------------------------+-----------------+----------------+-----------------------+-----------+--------------------------------+----------------+-------------------+---------------+-------------------+------------------+----------------------+--------------------------+-------------------------+------------------------+-------------------------+---------------------------+-------------------------------+--------------------------------+---------------------------+--------------------------------+-------------------------------+-------------------+--------------------+--------------------------+---

In [94]:
# Which player averages the most kills per game?
most_kills_per_game = df_final.groupBy("md.player_gamertag").agg(
    avg("md.player_total_kills").alias("avg_kills_per_game")
).orderBy(col("avg_kills_per_game").desc())
most_kills_per_game.show(1)



+---------------+------------------+
|player_gamertag|avg_kills_per_game|
+---------------+------------------+
|   gimpinator14|             109.0|
+---------------+------------------+
only showing top 1 row



                                                                                

In [95]:
# Which playlist gets played the most?
most_played_playlist = df_final.groupBy("m.playlist_id").agg(
    countDistinct("m.match_id").alias("matches_played")
).orderBy(col("matches_played").desc())
most_played_playlist.show(1)

+--------------------+--------------+
|         playlist_id|matches_played|
+--------------------+--------------+
|f72e0ef0-7c4a-430...|          7640|
+--------------------+--------------+
only showing top 1 row



                                                                                

In [109]:
# Which map gets played the most?
most_played_map = df_final.groupBy("m.mapid", "map.name").agg(
    countDistinct("m.match_id").alias("matches_played")
).orderBy(col("matches_played").desc())
most_played_map.show(1)



+--------------------+--------------+--------------+
|               mapid|          name|matches_played|
+--------------------+--------------+--------------+
|c7edbf0f-f206-11e...|Breakout Arena|          7032|
+--------------------+--------------+--------------+
only showing top 1 row



                                                                                

In [158]:
# Which map do players get the most Killing Spree medals on?
killing_spree_medal_id = broadcasted_medals.filter(col("name") == "Killing Spree").select("medal_id").first()["medal_id"]
window_spec = Window.partitionBy("map.mapid", "map.name", "m.match_id").orderBy(col("mp.count").desc())

df_with_max = df_final.filter(col("med.medal_id") == killing_spree_medal_id).withColumn(
    "rank", row_number().over(window_spec)
)

result = df_with_max.filter(col("rank") == 1).drop("rank")

most_killing_spree_medals = result.filter(col("med.medal_id") == killing_spree_medal_id) \
.groupBy("map.mapid", "map.name").agg(
    sum("mp.count").alias("killing_spree_count")
).orderBy(col("killing_spree_count").desc())
most_killing_spree_medals.show(1)

25/01/17 12:36:50 WARN HintErrorLogger: A join hint (strategy=broadcast) is specified but it is not part of a join relation.
25/01/17 12:36:51 WARN DataSourceV2Strategy: Can't translate true to source filter, unsupported expression
25/01/17 12:36:51 WARN DataSourceV2Strategy: Can't translate true to source filter, unsupported expression
                                                                                

+--------------------+--------------+-------------------+
|               mapid|          name|killing_spree_count|
+--------------------+--------------+-------------------+
|c7edbf0f-f206-11e...|Breakout Arena|               5100|
+--------------------+--------------+-------------------+
only showing top 1 row



In [160]:
# Sort within partitions
playlist_sorted_df = df_final.repartition(10, col("m.playlist_id"))\
    .sortWithinPartitions(col("m.playlist_id"))
playlist_sorted_df.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [playlist_id#5182 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(playlist_id#5182, 10), REPARTITION_BY_NUM, [plan_id=37364]
      +- BroadcastHashJoin [mapid#5180], [mapid#5003], LeftOuter, BuildRight, false
         :- BroadcastHashJoin [medal_id#5273L], [medal_id#4962L], LeftOuter, BuildRight, false
         :  :- SortMergeJoin [match_id#5271], [match_id#5199], LeftOuter
         :  :  :- SortMergeJoin [match_id#5271], [match_id#5179], LeftOuter
         :  :  :  :- Sort [match_id#5271 ASC NULLS FIRST], false, 0
         :  :  :  :  +- Exchange hashpartitioning(match_id#5271, 200), ENSURE_REQUIREMENTS, [plan_id=37348]
         :  :  :  :     +- BatchScan demo.bootcamp.bucketed_medals_matches_players[match_id#5271, player_gamertag#5272, medal_id#5273L, count#5274] demo.bootcamp.bucketed_medals_matches_players (branch=null) [filters=, groupedBy=] RuntimeFilters: []
         :  :  :  +- Sort [match_id#5179 ASC N

In [161]:
map_sorted_df = df_final.repartition(10, col("map.mapid"))\
    .sortWithinPartitions(col("map.mapid"))

# Save results
map_sorted_df.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [mapid#5003 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(mapid#5003, 10), REPARTITION_BY_NUM, [plan_id=37439]
      +- BroadcastHashJoin [mapid#5180], [mapid#5003], LeftOuter, BuildRight, false
         :- BroadcastHashJoin [medal_id#5273L], [medal_id#4962L], LeftOuter, BuildRight, false
         :  :- SortMergeJoin [match_id#5271], [match_id#5199], LeftOuter
         :  :  :- SortMergeJoin [match_id#5271], [match_id#5179], LeftOuter
         :  :  :  :- Sort [match_id#5271 ASC NULLS FIRST], false, 0
         :  :  :  :  +- Exchange hashpartitioning(match_id#5271, 200), ENSURE_REQUIREMENTS, [plan_id=37423]
         :  :  :  :     +- BatchScan demo.bootcamp.bucketed_medals_matches_players[match_id#5271, player_gamertag#5272, medal_id#5273L, count#5274] demo.bootcamp.bucketed_medals_matches_players (branch=null) [filters=, groupedBy=] RuntimeFilters: []
         :  :  :  +- Sort [match_id#5179 ASC NULLS FIRST],

In [163]:
playlist_global_sort = df_final.repartition(10, col("map.mapid"))\
    .sort(col("m.playlist_id"))
playlist_global_sort.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [playlist_id#5182 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(playlist_id#5182 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [plan_id=37516]
      +- Exchange hashpartitioning(mapid#5003, 10), REPARTITION_BY_NUM, [plan_id=37514]
         +- BroadcastHashJoin [mapid#5180], [mapid#5003], LeftOuter, BuildRight, false
            :- BroadcastHashJoin [medal_id#5273L], [medal_id#4962L], LeftOuter, BuildRight, false
            :  :- SortMergeJoin [match_id#5271], [match_id#5199], LeftOuter
            :  :  :- SortMergeJoin [match_id#5271], [match_id#5179], LeftOuter
            :  :  :  :- Sort [match_id#5271 ASC NULLS FIRST], false, 0
            :  :  :  :  +- Exchange hashpartitioning(match_id#5271, 200), ENSURE_REQUIREMENTS, [plan_id=37498]
            :  :  :  :     +- BatchScan demo.bootcamp.bucketed_medals_matches_players[match_id#5271, player_gamertag#5272, medal_id#5273L, count#5274] demo.bootcamp.buckete

In [164]:
map_global_sort = df_final.repartition(10, col("map.mapid"))\
    .sort(col("map.mapid"))
map_global_sort.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [mapid#5003 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(mapid#5003 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [plan_id=37593]
      +- Exchange hashpartitioning(mapid#5003, 10), REPARTITION_BY_NUM, [plan_id=37591]
         +- BroadcastHashJoin [mapid#5180], [mapid#5003], LeftOuter, BuildRight, false
            :- BroadcastHashJoin [medal_id#5273L], [medal_id#4962L], LeftOuter, BuildRight, false
            :  :- SortMergeJoin [match_id#5271], [match_id#5199], LeftOuter
            :  :  :- SortMergeJoin [match_id#5271], [match_id#5179], LeftOuter
            :  :  :  :- Sort [match_id#5271 ASC NULLS FIRST], false, 0
            :  :  :  :  +- Exchange hashpartitioning(match_id#5271, 200), ENSURE_REQUIREMENTS, [plan_id=37575]
            :  :  :  :     +- BatchScan demo.bootcamp.bucketed_medals_matches_players[match_id#5271, player_gamertag#5272, medal_id#5273L, count#5274] demo.bootcamp.bucketed_medals_mat