# Halo Data Pipelines and Analysis
- The goal of this notebook is to provide a basic analysis of some of the data created for Halo games data.
- For this notebook, pyspark in databricks was utilized to read some of the data.
- The database was obtained from a Data Engineering Bootcamp.
## Tasks
- Create a Spark job that analyzes the medal counts of each player (one day at a time)
  - Create tables medals, 
    - medals
      - (doesn't need to be date partitioned since they don't change)
    - matches 
      - (should be date partitioned)
    - medals_matches_players 
      - (should be date partitioned)
  - Explicitly broadcast join medals to medals_matches_players (medals is a small table)
  - Create a DDL for a date partitioned table `daily_player_medal_counts`
    - This should have one row per player per medal per day
    - Play around with the sorting of this table to find what compressions best

## Creating SparkSession

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("PlayerMedalCounts").getOrCreate()

## Import packages and Functions

In [None]:
from pyspark.sql.functions import to_date, col, broadcast, split, lit, count, sum



# Set the autoBroadcastJoinThreshold configuration
# spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10000000000")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")




# Postgres credentials
driver = "org.postgresql.Driver"
database_host = ""
database_port = "5432" # update if you use a non-default port
database_name = "" # eg. postgres
user = ""
password = ""
url = ""

tables = ['matches','medals','medals_matches_players']

for table in tables:
    remote_table = spark.read\
        .format("jdbc")\
        .option("driver", driver)\
        .option("url", url)\
        .option("dbtable", table)\
        .option("user", user)\
        .option("password", password)\
        .load()
    
    # Save matches table as a parquet table
    remote_table.write.format("parquet").mode("overwrite").saveAsTable(table)
    display(table)


spark.sql("show tables").show()


'matches''medals''medals_matches_players'+--------+--------------------+-----------+
|database|           tableName|isTemporary|
+--------+--------------------+-----------+
| default|         device_data|      false|
| default|             devices|      false|
| default|          event_data|      false|
| default|              events|      false|
| default|             matches|      false|
| default|              medals|      false|
| default|medals_matches_pl...|      false|
| default|           user_data|      false|
+--------+--------------------+-----------+



In [None]:
# Import medals table
table = "medals"

medals = spark.table(table)

# medals.show(5)

In [None]:
# Import matches table
table = "matches"
matches = spark.table(table)
matches.printSchema()

matches = matches.repartition("completion_date")

# matches.explain()
# matches.show()

root
 |-- match_id: string (nullable = true)
 |-- mapid: string (nullable = true)
 |-- is_team_game: boolean (nullable = true)
 |-- playlist_id: string (nullable = true)
 |-- game_variant_id: string (nullable = true)
 |-- is_match_over: boolean (nullable = true)
 |-- completion_date: timestamp (nullable = true)
 |-- match_duration: string (nullable = true)
 |-- game_mode: string (nullable = true)
 |-- map_variant_id: string (nullable = true)



In [None]:

# Import medals_matches_players table
table = "medals_matches_players"
medals_matches_players = spark.table(table)


medals_matches_players.printSchema()
medals_matches_players.show(5)



root
 |-- match_id: string (nullable = true)
 |-- player_gamertag: string (nullable = true)
 |-- medal_id: string (nullable = true)
 |-- count: integer (nullable = true)

+--------------------+---------------+----------+-----+
|            match_id|player_gamertag|  medal_id|count|
+--------------------+---------------+----------+-----+
|009fdac5-e15c-47c...|       EcZachly|3261908037|    7|
|009fdac5-e15c-47c...|       EcZachly| 824733727|    2|
|009fdac5-e15c-47c...|       EcZachly|2078758684|    2|
|009fdac5-e15c-47c...|       EcZachly|2782465081|    2|
|9169d1a3-955c-4ea...|       EcZachly|3001183151|    1|
+--------------------+---------------+----------+-----+
only showing top 5 rows



In [None]:
# Join matches and medals_matches_players on match_id column and then partition by matches.completion_date
partitioned_medals_matches_players = medals_matches_players\
    .join(matches, "match_id")\
    .select(
            ["player_gamertag",
            "match_id",
            "medal_id",
            "count",
            "completion_date"]
            )\
    .repartition("completion_date")

# Save new combined table table as a parquet table
partitioned_medals_matches_players.write.format("parquet").mode("overwrite").saveAsTable("partitioned_medals_matches_players")
partitioned_medals_matches_players = spark.table("partitioned_medals_matches_players")

In [None]:
partitioned_medals_matches_players.show(5)

+---------------+--------------------+----------+-----+-------------------+
|player_gamertag|            match_id|  medal_id|count|    completion_date|
+---------------+--------------------+----------+-----+-------------------+
| Panda Bearsack|01e5f0bd-8382-4b2...|2287626681|    1|2015-12-27 00:00:00|
| Panda Bearsack|01e5f0bd-8382-4b2...|3261908037|    1|2015-12-27 00:00:00|
|          CJ700|01e5f0bd-8382-4b2...|3261908037|    9|2015-12-27 00:00:00|
|          CJ700|01e5f0bd-8382-4b2...|2078758684|    3|2015-12-27 00:00:00|
|          CJ700|01e5f0bd-8382-4b2...|2430242797|    1|2015-12-27 00:00:00|
+---------------+--------------------+----------+-----+-------------------+
only showing top 5 rows



In [None]:
joined_player_medal_counts = partitioned_medals_matches_players\
    .repartition(4)\
    .join(broadcast(medals), "medal_id")\
    .select(
        partitioned_medals_matches_players["*"],
        split(partitioned_medals_matches_players["completion_date"], " ").getItem(0).alias("ds")
    )
joined_player_medal_counts.write.format("parquet").mode("overwrite").saveAsTable("joined_player_medal_counts")


In [None]:
joined_player_medal_counts.printSchema()
joined_player_medal_counts.show(5)

root
 |-- player_gamertag: string (nullable = true)
 |-- match_id: string (nullable = true)
 |-- medal_id: string (nullable = true)
 |-- count: integer (nullable = true)
 |-- completion_date: timestamp (nullable = true)
 |-- ds: string (nullable = true)

+---------------+--------------------+----------+-----+-------------------+----------+
|player_gamertag|            match_id|  medal_id|count|    completion_date|        ds|
+---------------+--------------------+----------+-----+-------------------+----------+
|Killswitch V7II|4d163705-306f-4a0...|2287626681|    1|2015-12-23 00:00:00|2015-12-23|
|blue devil 2121|7ed01b67-7916-4a0...|1351381581|    8|2015-12-02 00:00:00|2015-12-02|
|    Tanner Haze|0cea6755-a0a7-40b...| 824733727|    1|2016-03-15 00:00:00|2016-03-15|
|           BMMV|d44d2577-da35-413...|3261908037|    3|2016-04-12 00:00:00|2016-04-12|
|    MUFFINSCRUB|d7da9c6e-339a-450...|2287626681|    1|2016-01-23 00:00:00|2016-01-23|
+---------------+--------------------+----------+


### Create a DDL for a date partitioned table `daily_player_medal_counts`

- This should have one row per player per medal per day
- Play around with the sorting of this table to find what compressions best

In [None]:
tableName = "daily_player_medal_counts"

In [None]:
# Create a DDL for daily_player_medal_counts

spark.sql(f"DROP TABLE IF EXISTS {tableName}")

daily_player_medal_counts_ddl = f"""
    CREATE TABLE IF NOT EXISTS {tableName} (
       player_gamertag STRING,    
       medal_id STRING,
       medal_counts INT
    )
    USING PARQUET
    PARTITIONED BY (ds STRING)
    LOCATION '/{tableName}'
"""
spark.sql(daily_player_medal_counts_ddl)

joined_player_medal_counts = spark.table("joined_player_medal_counts")
# joined_player_medal_counts.show(5)

daily_player_medal_counts = joined_player_medal_counts\
            .groupBy(
               joined_player_medal_counts.player_gamertag,
               joined_player_medal_counts.medal_id,
               joined_player_medal_counts.ds
            )\
            .agg(
                sum(joined_player_medal_counts["count"]).alias("medal_counts") # use medals_matches_players.count ?
            )\
            .select(
                ["player_gamertag",
                "medal_id",
                "medal_counts",
                "ds"]
             )\
            .repartition("ds")\
            .sortWithinPartitions("player_gamertag","medal_id")

# Write the daily_player_medal_counts DataFrame, repartitioning to 1 file per partition
daily_player_medal_counts.repartition(1).write.format("parquet").mode("overwrite").insertInto(tableName)

daily_player_medal_counts.show(5)

+---------------+----------+------------+----------+
|player_gamertag|  medal_id|medal_counts|        ds|
+---------------+----------+------------+----------+
|   A 2tha nimal|2078758684|           1|2016-01-14|
|   A 2tha nimal|2287626681|           1|2016-01-14|
|   A 2tha nimal|3261908037|           4|2016-01-14|
|   A 2tha nimal|3491849182|           1|2016-01-14|
|   A 2tha nimal| 824733727|           1|2016-01-14|
+---------------+----------+------------+----------+
only showing top 5 rows



In [None]:
# Compute and update partition statistics for the table
spark.sql(f"ANALYZE TABLE {tableName} COMPUTE STATISTICS")

# Get the sizes of the table
spark.sql(f"""DESC EXTENDED {tableName} """).collect()

Out[12]: [Row(col_name='player_gamertag', data_type='string', comment=None),
 Row(col_name='medal_id', data_type='string', comment=None),
 Row(col_name='medal_counts', data_type='int', comment=None),
 Row(col_name='ds', data_type='string', comment=None),
 Row(col_name='# Partition Information', data_type='', comment=''),
 Row(col_name='# col_name', data_type='data_type', comment='comment'),
 Row(col_name='ds', data_type='string', comment=None),
 Row(col_name='', data_type='', comment=''),
 Row(col_name='# Detailed Table Information', data_type='', comment=''),
 Row(col_name='Database', data_type='default', comment=''),
 Row(col_name='Table', data_type='daily_player_medal_counts', comment=''),
 Row(col_name='Owner', data_type='root', comment=''),
 Row(col_name='Created Time', data_type='Tue May 30 21:35:53 UTC 2023', comment=''),
 Row(col_name='Last Access', data_type='UNKNOWN', comment=''),
 Row(col_name='Created By', data_type='Spark 3.3.0', comment=''),
 Row(col_name='Type', data_typ