In [93]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
spark = SparkSession.builder.appName("Jupyter").getOrCreate() #Spark is a JVM lib so we have camel case instead of snake case
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
# Load matches
matches = spark.read.option(
    "header", "true"
).csv(
    "/home/iceberg/data/matches.csv"
).select(
    'match_id',
    'is_team_game',
    'playlist_id',
    'mapid',
    'completion_date'
)



In [94]:
from IPython.core.display import HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

In [95]:
# Bucketed matches
spark.sql("""DROP TABLE IF EXISTS bootcamp.hw2_matches_bucketed""")
ddl_query_matches = """
CREATE TABLE IF NOT EXISTS bootcamp.hw2_matches_bucketed (
     match_id STRING,
     is_team_game BOOLEAN,
     playlist_id STRING,
     mapid STRING,
     completion_date TIMESTAMP
 )
 USING iceberg
 PARTITIONED BY (bucket(16, match_id));
"""
spark.sql(ddl_query_matches)

DataFrame[]

In [96]:
matches = matches.withColumn(
    'is_team_game',
    f.when(f.col('is_team_game')=='true',True).otherwise(False)
)

In [97]:
matches = matches.withColumn(
    'completion_date',
    f.to_timestamp("completion_date", "yyyy-MM-dd HH:mm:ss.SSSSSS")
)

In [98]:
matches = matches.sortWithinPartitions(f.col("completion_date"), f.col("playlist_id"), f.col('mapid')) # Lowest cardinality to the right

In [99]:
matches.write.mode(
    'overwrite'
).bucketBy(
    16,
    'match_id'
).saveAsTable('bootcamp.hw2_matches_bucketed')

                                                                                

In [100]:
df_files = spark.sql("""
    SELECT * FROM bootcamp.hw2_matches_bucketed.files
""")
    

In [85]:
# df_files.select(f.sum('file_size_in_bytes')).alias('sum').show() # Before was 606498

+-----------------------+
|sum(file_size_in_bytes)|
+-----------------------+
|                 606498|
+-----------------------+



In [101]:
df_files.select(f.sum('file_size_in_bytes')).alias('sum').show() 

+-----------------------+
|sum(file_size_in_bytes)|
+-----------------------+
|                 591839|
+-----------------------+



In [44]:
matches.unpersist()

DataFrame[match_id: string, is_team_game: boolean, playlist_id: string, mapid: string, completion_date: timestamp]

In [45]:
# Load match_details
match_details = spark.read.option(
    "header", "true"
).csv(
    "/home/iceberg/data/match_details.csv"
).select(
    'match_id',
    'player_gamertag',
    'player_total_kills',
    'player_total_deaths'
)

In [46]:
# Bucketed match_details
spark.sql("""DROP TABLE IF EXISTS bootcamp.hw2_match_details_bucketed""")
ddl_query_match_details = """
CREATE TABLE IF NOT EXISTS bootcamp.hw2_match_details_bucketed (
    match_id STRING,
    player_gamertag STRING,
    player_total_kills INTEGER,
    player_total_deaths INTEGER
)
USING iceberg
PARTITIONED BY (bucket(16, match_id));
"""
spark.sql(ddl_query_match_details)

DataFrame[]

In [47]:
match_details.write.mode(
    'overwrite'
).bucketBy(
    16,
    'match_id'
).saveAsTable('bootcamp.hw2_match_details_bucketed')

                                                                                

In [48]:
match_details.unpersist()

DataFrame[match_id: string, player_gamertag: string, player_total_kills: string, player_total_deaths: string]

In [49]:
# Load medaL_matches_players
medals_matches_players = spark.read.option(
    "header", "true"
).csv(
    "/home/iceberg/data/medals_matches_players.csv"
)

In [50]:
# Bucketed medal_matches_players
spark.sql("""DROP TABLE IF EXISTS bootcamp.hw2_medal_matches_players""")
ddl_query_match_details = """
CREATE TABLE IF NOT EXISTS bootcamp.hw2_medal_matches_players (
    match_id STRING,
    player_gamertag STRING,
    medal_id STRING,
    count STRING
)
USING iceberg
PARTITIONED BY (bucket(16, match_id));
"""
spark.sql(ddl_query_match_details)

DataFrame[]

In [51]:
medals_matches_players.write.mode(
    'overwrite'
).bucketBy(
    16,
    'match_id'
).saveAsTable('bootcamp.hw2_medal_matches_players')

                                                                                

In [52]:
medals_matches_players.unpersist()

DataFrame[match_id: string, player_gamertag: string, medal_id: string, count: string]

In [58]:
df_results = spark.sql("""
    SELECT
        m.match_id,
        m.is_team_game,
        m.playlist_id,
        m.completion_date,
        m.mapid,
        md.player_gamertag,
        md.player_total_kills,
        md.player_total_deaths,
        mm.player_gamertag AS medal_player_gametag,
        mm.medal_id,
        mm.count
    FROM bootcamp.hw2_matches_bucketed m
    INNER JOIN bootcamp.hw2_match_details_bucketed md
    ON m.match_id = md.match_id
    INNER JOIN bootcamp.hw2_medal_matches_players mm
    ON m.match_id = mm.match_id
""")

In [59]:
df_results.show()



+--------------------+------------+--------------------+-------------------+--------------------+---------------+------------------+-------------------+--------------------+----------+-----+
|            match_id|is_team_game|         playlist_id|    completion_date|               mapid|player_gamertag|player_total_kills|player_total_deaths|medal_player_gametag|  medal_id|count|
+--------------------+------------+--------------------+-------------------+--------------------+---------------+------------------+-------------------+--------------------+----------+-----+
|00169217-cca6-4b4...|        true|2323b76a-db98-4e0...|2016-03-13 00:00:00|cc040aa1-f206-11e...|  King Terror V|                14|                  7|       King Terror V|3261908037|   11|
|00169217-cca6-4b4...|        true|2323b76a-db98-4e0...|2016-03-13 00:00:00|cc040aa1-f206-11e...|  King Terror V|                14|                  7|       King Terror V|3001183151|    1|
|00169217-cca6-4b4...|        true|2323b76a-d

                                                                                

In [60]:
# Questions
# Which player averages the most kills per game? gimpinator14
df_kills = df_results.select(
    'match_id',
    'is_team_game',
    'completion_date',
    'player_gamertag',
    'player_total_kills',
    'player_total_deaths'
).dropDuplicates(
).groupBy(
    'player_gamertag'
).agg(
    f.avg(f.col('player_total_kills')).alias('avg_kills')
).orderBy(
    'avg_kills',
    ascending=False
).limit(1)
print(df_kills.show())



+---------------+---------+
|player_gamertag|avg_kills|
+---------------+---------+
|   gimpinator14|    109.0|
+---------------+---------+

None


                                                                                

In [61]:
# Which playlist gets played the most? f72e0ef0-7c4a-4307-af78-8e38dac3fdba
df_playlists = df_results.select(
    'playlist_id',
    'match_id',
).dropDuplicates(
).groupBy(
    'playlist_id'
).count(
).orderBy(
    'count',
    ascending=False
).limit(1)

print(df_playlists.show(truncate=False))



+------------------------------------+-----+
|playlist_id                         |count|
+------------------------------------+-----+
|f72e0ef0-7c4a-4307-af78-8e38dac3fdba|7640 |
+------------------------------------+-----+

None


                                                                                

In [62]:
# Which map gets played the most?
# Which playlist gets played the most? f72e0ef0-7c4a-4307-af78-8e38dac3fdba
df_maps = df_results.select(
    'mapid',
    'match_id',
).dropDuplicates(
).groupBy(
    'mapid'
).count(
).orderBy(
    'count',
    ascending=False
).limit(1)

print(df_maps.show(truncate=False))

[Stage 190:>                                                        (0 + 3) / 3]

+------------------------------------+-----+
|mapid                               |count|
+------------------------------------+-----+
|c7edbf0f-f206-11e4-aa52-24be05e24f7e|7032 |
+------------------------------------+-----+

None


                                                                                

In [65]:
# Which map do players get the most Killing Spree medals on?
# Load medals
medals = spark.read.option(
    "header", "true"
).csv(
    "/home/iceberg/data/medals.csv"
).filter(
    f.col('name').like('%Killing Spree%')
)

In [66]:
medals.show(truncate=False)

+----------+------------------------------------------------------------------------------------------------------------------------------+-----------+----------+------------------+-------------------+------------+-------------+--------------+-------------------------------+-------------+----------+
|medal_id  |sprite_uri                                                                                                                    |sprite_left|sprite_top|sprite_sheet_width|sprite_sheet_height|sprite_width|sprite_height|classification|description                    |name         |difficulty|
+----------+------------------------------------------------------------------------------------------------------------------------------+-----------+----------+------------------+-------------------+------------+-------------+--------------+-------------------------------+-------------+----------+
|2430242797|https://content.halocdn.com/media/Default/games/halo-5-guardians/sprites/medalsprites

In [73]:
df_kp = df_results.select(
    'mapid',
    'medal_id',
    'count'
).join(
    medals,
    on='medal_id',
    how='inner'
).groupBy(
    'mapid'
).agg(
    f.sum('count').alias('medal_count')
).orderBy(
    'medal_count',
    ascending=False
).limit(1)

In [74]:
df_kp.show(truncate=False)

25/07/16 15:49:39 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/07/16 15:49:39 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/07/16 15:49:39 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/07/16 15:49:39 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/07/16 15:49:39 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/07/16 15:49:39 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/07/16 15:49:39 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/07/16 15:49:39 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/07/16 15:49:40 WARN RowBasedKeyValueBatch: Calling spill() on

+------------------------------------+-----------+
|mapid                               |medal_count|
+------------------------------------+-----------+
|c74c9d0f-f206-11e4-8330-24be05e24f7e|71863.0    |
+------------------------------------+-----------+



                                                                                

                                                                                

+-----------------------+
|sum(file_size_in_bytes)|
+-----------------------+
|                 591839|
+-----------------------+



                                                                                

+---------------+---------+
|player_gamertag|avg_kills|
+---------------+---------+
|   gimpinator14|    109.0|
+---------------+---------+

None


                                                                                

+------------------------------------+-----+
|playlist_id                         |count|
+------------------------------------+-----+
|f72e0ef0-7c4a-4307-af78-8e38dac3fdba|7640 |
+------------------------------------+-----+

None


                                                                                

+------------------------------------+-----+
|mapid                               |count|
+------------------------------------+-----+
|c7edbf0f-f206-11e4-aa52-24be05e24f7e|7032 |
+------------------------------------+-----+

None


25/07/16 16:07:11 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/07/16 16:07:11 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/07/16 16:07:11 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/07/16 16:07:11 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/07/16 16:07:11 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/07/16 16:07:11 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/07/16 16:07:11 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/07/16 16:07:11 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/07/16 16:07:11 WARN RowBasedKeyValueBatch: Calling spill() on

+------------------------------------+-----------+
|mapid                               |medal_count|
+------------------------------------+-----------+
|c74c9d0f-f206-11e4-8330-24be05e24f7e|71863.0    |
+------------------------------------+-----------+

None


                                                                                