In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("spark_job") \
    .config("spark.sql.autoBroadcastJoinThreshold", '-1') \
    .config("spark.sql.bucketingEnabled", "true") \
    .getOrCreate()

24/12/07 18:18:57 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [47]:
match_details = spark.read.csv("../../data/match_details.csv", header=True, inferSchema=True)
matches = spark.read.csv("../../data/matches.csv", header=True, inferSchema=True)
medal_matches_players = spark.read.csv("../../data/medals_matches_players.csv", header=True, inferSchema=True)
medals = spark.read.csv("../../data/medals.csv", header=True, inferSchema=True)

### Crear las tablas bucketizadas

In [3]:
spark.sql("""DROP TABLE IF EXISTS bootcamp.match_details_bucketed""")

DataFrame[]

In [4]:
match_details.select("match_id", "player_gamertag", "player_total_kills") \
    .write.mode("append") \
    .bucketBy(16, "match_id") \
    .saveAsTable("bootcamp.match_details_bucketed")

                                                                                

In [5]:
spark.sql("""DROP TABLE IF EXISTS bootcamp.matches_bucketed""")

DataFrame[]

In [6]:
matches.select("match_id", "mapid", "playlist_id") \
    .write.mode("append") \
    .bucketBy(16, "match_id") \
    .saveAsTable("bootcamp.matches_bucketed")

                                                                                

In [7]:
spark.sql("""DROP TABLE IF EXISTS bootcamp.medal_matches_players_bucketed""")

DataFrame[]

In [8]:
medal_matches_players.select("match_id", "player_gamertag", "medal_id", "count") \
    .write.mode("append") \
    .bucketBy(16, "match_id") \
    .saveAsTable("bootcamp.medal_matches_players_bucketed")

                                                                                

In [24]:
medal_matches_players.show()

+--------------------+---------------+----------+-----+
|            match_id|player_gamertag|  medal_id|count|
+--------------------+---------------+----------+-----+
|009fdac5-e15c-47c...|       EcZachly|3261908037|    7|
|009fdac5-e15c-47c...|       EcZachly| 824733727|    2|
|009fdac5-e15c-47c...|       EcZachly|2078758684|    2|
|009fdac5-e15c-47c...|       EcZachly|2782465081|    2|
|9169d1a3-955c-4ea...|       EcZachly|3001183151|    1|
|9169d1a3-955c-4ea...|       EcZachly|3565443938|    6|
|9169d1a3-955c-4ea...|       EcZachly|3491849182|    1|
|4a078b2f-65eb-4c6...|       EcZachly|3261908037|    8|
|9169d1a3-955c-4ea...|       EcZachly|2105198095|    6|
|9169d1a3-955c-4ea...|       EcZachly|2916014239|    3|
|9169d1a3-955c-4ea...|       EcZachly|3261908037|    6|
|9169d1a3-955c-4ea...|       EcZachly|1351381581|    2|
|9169d1a3-955c-4ea...|       EcZachly|2838259753|    1|
|9169d1a3-955c-4ea...|       EcZachly|3354395650|    1|
|9169d1a3-955c-4ea...|       EcZachly| 298813630

### Compare the bucketed vs non bucketed joins

In [9]:
joined_df = match_details.select("match_id", "player_gamertag", "player_total_kills").join(
    matches.select("match_id", "mapid", "playlist_id"),
    on=["match_id"]
)

In [11]:
match_details_bucketed = spark.read.table('bootcamp.match_details_bucketed')
matches_bucketed = spark.read.table('bootcamp.matches_bucketed')
medal_matches_players_bucketed = spark.read.table('bootcamp.medal_matches_players_bucketed')

In [38]:
match_full = match_details_bucketed.join(matches_bucketed, on=["match_id"], how="inner")

### Aggregate dfs to find metrics

In [32]:
import pyspark.sql.functions as F

avg_kills = match_full.groupBy("player_gamertag").agg(F.avg("player_total_kills").alias("avg_kills"))
avg_kills = avg_kills.orderBy("avg_kills", ascending=False)

In [33]:
avg_kills.show()



+---------------+---------+
|player_gamertag|avg_kills|
+---------------+---------+
|   gimpinator14|    109.0|
|  I Johann117 I|     96.0|
|BudgetLegendary|     83.0|
|      GsFurreal|     75.0|
|   Sexy is Back|     73.0|
|   killerguy789|     68.0|
|THC GUILTYSPARK|     67.0|
|PrimePromethean|     66.0|
|    HisLattice1|     66.0|
|     taurenmonk|     64.0|
|WhiteMountainDC|     63.0|
|   Dinosaur B0B|     63.0|
|     MONKEYBAKE|     62.0|
|       BlightNB|     62.0|
|        Darugis|     62.0|
|    ManicZ0mb1e|     61.0|
|  SiIentStriker|     61.0|
|    ohh Replxys|     60.0|
|LEGENDARY link0|     60.0|
|Lord Leonidamir|     60.0|
+---------------+---------+
only showing top 20 rows



                                                                                

In [35]:
playlist_count = match_full.groupBy("playlist_id").agg(F.count("player_total_kills").alias("playlist_count"))
playlist_count = playlist_count.orderBy("playlist_count", ascending=False)

In [36]:
playlist_count.show()



+--------------------+--------------+
|         playlist_id|playlist_count|
+--------------------+--------------+
|f72e0ef0-7c4a-430...|         58868|
|2323b76a-db98-4e0...|         23587|
|892189e9-d712-4bd...|         15079|
|c98949ae-60a8-43d...|         13943|
|0bcf2be1-3168-4e4...|          8487|
|780cc101-005c-4fc...|          8258|
|f27a65eb-2d11-496...|          5149|
|d0766624-dbd7-453...|          3558|
|bc0f8ad6-31e6-4a1...|          2542|
|355dc154-9809-4ed...|          2202|
|7b7e892c-d9b7-4b0...|          1683|
|5728f612-3f20-445...|          1519|
|7385b4a1-86bf-4ae...|          1375|
|f0c9ef9a-48bd-4b2...|          1161|
|4b12472e-2a06-423...|          1041|
|819eb188-1a1c-48b...|          1010|
|b5d5a242-ffa5-4d8...|           612|
|d21c8381-26f1-4d6...|           479|
|88b7de19-113c-4be...|           462|
|2e812e09-912f-458...|           329|
+--------------------+--------------+
only showing top 20 rows



                                                                                

In [37]:
map_count = match_full.groupBy("mapid").agg(F.count("player_total_kills").alias("map_count"))
map_count = map_count.orderBy("map_count", ascending=False)

In [64]:
matches_bucketed.show()

+--------------------+--------------------+--------------------+
|            match_id|               mapid|         playlist_id|
+--------------------+--------------------+--------------------+
|d78d2aae-36e4-48a...|c7edbf0f-f206-11e...|f72e0ef0-7c4a-430...|
|9e079488-1355-4c6...|c74c9d0f-f206-11e...|0bcf2be1-3168-4e4...|
|ad4a5b9d-7127-404...|c7edbf0f-f206-11e...|f72e0ef0-7c4a-430...|
|5650cad9-17c8-422...|c7edbf0f-f206-11e...|f72e0ef0-7c4a-430...|
|26bf5a48-7cf8-46c...|c7edbf0f-f206-11e...|f72e0ef0-7c4a-430...|
|b05b0b30-9387-448...|c7edbf0f-f206-11e...|f72e0ef0-7c4a-430...|
|40f27b2b-2da3-4aa...|caacb800-f206-11e...|2323b76a-db98-4e0...|
|d312982c-ab8e-4c4...|cdb934b0-f206-11e...|2323b76a-db98-4e0...|
|8df94631-bad3-4da...|cdb934b0-f206-11e...|c98949ae-60a8-43d...|
|40717908-fec3-47e...|cebd854f-f206-11e...|892189e9-d712-4bd...|
|01b13615-02c8-459...|c7edbf0f-f206-11e...|f72e0ef0-7c4a-430...|
|a21f113b-b562-4f6...|caacb800-f206-11e...|f27a65eb-2d11-496...|
|8e66d57b-24fb-4e4...|c7e

In [65]:
ks_medals = medal_matches_players_bucketed.join(
    F.broadcast(medals.select("medal_id", "name")),
    on=["medal_id"],
    how="inner",
).join(
    matches_bucketed,
    on=["match_id"],
    how="inner",
)

In [66]:
ks_medals = ks_medals.filter(F.col("name") == "Killing Spree")


In [69]:
ks_medals_count = ks_medals.groupBy("mapid").agg(F.count("medal_id").alias("map_count"))
ks_medals_count = ks_medals_count.orderBy("map_count", ascending=False)

In [70]:
ks_medals_count.show()

24/12/07 18:49:14 WARN DataSourceV2Strategy: Can't translate true to source filter, unsupported expression

+--------------------+---------+
|               mapid|map_count|
+--------------------+---------+
|c7edbf0f-f206-11e...|     6559|
|c74c9d0f-f206-11e...|     4346|
|c7805740-f206-11e...|     2617|
|cdb934b0-f206-11e...|     1996|
|ce1dc2de-f206-11e...|     1765|
|cb914b9e-f206-11e...|     1747|
|caacb800-f206-11e...|     1671|
|cebd854f-f206-11e...|     1653|
|cdee4e70-f206-11e...|     1558|
|cd844200-f206-11e...|     1551|
|cc040aa1-f206-11e...|     1451|
|ca737f8f-f206-11e...|      926|
|cbcea2c0-f206-11e...|      809|
|cc74f4e1-f206-11e...|      771|
|c7b7baf0-f206-11e...|      518|
|ce89a40f-f206-11e...|      312|
+--------------------+---------+



                                                                                