In [None]:
import sys
!{sys.executable} -m pip install -U isort --user
!{sys.executable} -m pip install -U black --user

In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.functions import expr, col, broadcast, avg, count, sum
import os

BASE_PATH = "/home/iceberg/data"

In [None]:
def create_spark_session():

    conf = SparkConf()
    conf.setAll([
        ("spark.driver.memory", "8g"),
        ("spark.executor.memory", "8g")
    ])
    
    spark = SparkSession.builder \
        .config(conf=conf) \
        .appName("Spark-Iceberg-Homework") \
        .getOrCreate()
    
    # Disable automatic broadcast join
    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
    
    return spark

spark = create_spark_session()

In [3]:
def load_data(spark, base_path=BASE_PATH):
    # Load CSV files with error handling and schema inference
    def safe_read_csv(filename):
        try:
            return spark.read.csv(f"{base_path}/{filename}", header=True, inferSchema=True)
        except Exception as e:
            print(f"Error reading {filename}: {e}")
            return None

    match_details = safe_read_csv("match_details.csv")
    matches = safe_read_csv("matches.csv")
    medal_matches_players = safe_read_csv("medals_matches_players.csv")
    medals = safe_read_csv("medals.csv")
    maps = safe_read_csv("maps.csv")
    
    return match_details, matches, medal_matches_players, medals, maps


In [None]:
match_details, matches, medal_matches_players, medals, maps = load_data(spark)

In [None]:
match_details.describe()
matches.show()

In [None]:

def analyze_table_sizes(spark, base_path="/home/iceberg/data/"):
    # List of CSV files to analyze
    csv_files = [
        "match_details.csv",
        "matches.csv", 
        "medals.csv",
        "medals_matches_players.csv",
    ]
    
    print("Table Size and Row Count Analysis:")
    print("-" * 50)
    
    for filename in csv_files:
        full_path = os.path.join(base_path, filename)
        
        try:
            # Read the CSV file
            df = spark.read.csv(full_path, header=True, inferSchema=True)
            
            # Calculate file size
            file_size_mb = os.path.getsize(full_path) / (1024 * 1024)
            
            # Count rows
            row_count = df.count()
            
            # Get column names and count
            columns = df.columns
            
            print(f"File: {filename}")
            print(f"  Size: {file_size_mb:.2f} MB")
            print(f"  Rows: {row_count}")
            # print(f"  Columns: {len(columns)} - {columns}")
            print("-" * 50)
        
        except Exception as e:
            print(f"Error processing {filename}: {e}")
            print("-" * 50)

analyze_table_sizes(spark)

In [None]:
# Broadcast small tables
medals_broadcast = broadcast(medals)
maps_broadcast = broadcast(maps)

In [None]:
joined_df = match_details.join(matches, "match_id", "inner") \
    .join(medal_matches_players, "match_id", "inner") \
    .repartition(16, "match_id")

In [None]:
joined_df = match_details.join(matches, "match_id", "inner") \
    .join(medal_matches_players, ["match_id", "player_gamertag"], "inner") \
    .repartition(16, "match_id")

In [None]:
# 1. Which player averages the most kills per game?
player_kills_avg = match_details.groupBy("player_gamertag") \
    .agg(
        avg("player_total_kills").alias("avg_kills_per_game"),
        count("match_id").alias("total_matches")
    ).orderBy(col("avg_kills_per_game").desc())

In [None]:
player_kills_avg.toPandas()

In [None]:
# 2. Which playlist gets played the most?
playlist_popularity = matches.groupBy("playlist_id") \
    .agg(count("match_id").alias("match_count")) \
    .orderBy(col("match_count").desc())

In [None]:
playlist_popularity.toPandas()

In [None]:
# 3. Which map gets played the most?
map_popularity = matches.join(maps_broadcast, matches.mapid == maps_broadcast.mapid) \
    .groupBy(maps_broadcast.name) \
    .agg(count("match_id").alias("match_count")) \
    .orderBy(col("match_count").desc())

In [None]:
map_popularity.toPandas()

In [None]:
# 4. Which map do players get the most Killing Spree medals on?
killing_spree_map = joined_df.join(medals_broadcast, 
    joined_df.medal_id == medals_broadcast.medal_id) \
    .filter(medals_broadcast.classification == "KillingSpree") \
    .join(maps_broadcast, joined_df.mapid == maps_broadcast.mapid) \
    .groupBy(maps_broadcast.name) \
    .agg(
        sum("count").alias("killing_spree_count")
    ) \
    .orderBy(col("killing_spree_count").desc())

In [None]:
killing_spree_map.toPandas()

In [5]:
def perform_analysis(spark, match_details, matches, medal_matches_players, medals, maps):
    # Broadcast small tables
    medals_broadcast = broadcast(medals)
    maps_broadcast = broadcast(maps)

    joined_df = match_details.join(matches, "match_id", "inner") \
        .join(medal_matches_players, ["match_id", "player_gamertag"], "inner") \
        .repartition(16, "match_id")

    # 1. Which player averages the most kills per game?
    player_kills_avg = joined_df.groupBy("player_gamertag") \
        .agg(
            avg("player_total_kills").alias("avg_kills_per_game"),
            count("match_id").alias("total_matches")
        ).orderBy(col("avg_kills_per_game").desc())

    # 2. Which playlist gets played the most?
    playlist_popularity = joined_df.groupBy("playlist_id") \
        .agg(count("match_id").alias("match_count")) \
        .orderBy(col("match_count").desc())

    # 3. Which map gets played the most?
    map_popularity = joined_df.join(maps_broadcast, matches.mapid == maps_broadcast.mapid) \
        .groupBy(maps_broadcast.name) \
        .agg(count("match_id").alias("match_count")) \
        .orderBy(col("match_count").desc())

    # 4. Which map do players get the most Killing Spree medals on?
    killing_spree_map = joined_df.join(medals_broadcast, 
        joined_df.medal_id == medals_broadcast.medal_id) \
        .filter(medals_broadcast.classification == "KillingSpree") \
        .join(maps_broadcast, joined_df.mapid == maps_broadcast.mapid) \
        .groupBy(maps_broadcast.name) \
        .agg(
            sum("count").alias("killing_spree_count")
        ) \
        .orderBy(col("killing_spree_count").desc())

    return player_kills_avg, playlist_popularity, map_popularity, killing_spree_map

In [6]:
# Load data
match_details, matches, medal_matches_players, medals, maps = load_data(spark)

player_kills_avg, playlist_popularity, map_popularity, killing_spree_map = perform_analysis(
    spark, match_details, matches, medal_matches_players, medals, maps
)

In [None]:
def check_dataframe_cardinalities(player_kills_avg, playlist_popularity, map_popularity, killing_spree_map):
    from pyspark.sql.functions import countDistinct, col
    
    print("Cardinality Analysis:")
    print("-" * 50)
    
    # Player Kills Average
    print("1. Player Kills Average:")
    print(f"Total rows: {player_kills_avg.count()}")
    print(f"Unique player gamertags: {player_kills_avg.select(countDistinct('player_gamertag')).first()[0]}")
    print("Top 5 rows:")
    player_kills_avg.show(5)
    print("\n")
    
    # Playlist Popularity
    print("2. Playlist Popularity:")
    print(f"Total rows: {playlist_popularity.count()}")
    print(f"Unique playlist IDs: {playlist_popularity.select(countDistinct('playlist_id')).first()[0]}")
    print("Top 5 rows:")
    playlist_popularity.show(5)
    print("\n")
    
    # Map Popularity
    print("3. Map Popularity:")
    print(f"Total rows: {map_popularity.count()}")
    print(f"Unique map names: {map_popularity.select(countDistinct('name')).first()[0]}")
    print("Top 5 rows:")
    map_popularity.show(5)
    print("\n")
    
    # Killing Spree Map
    print("4. Killing Spree Map:")
    print(f"Total rows: {killing_spree_map.count()}")
    print(f"Unique map names: {killing_spree_map.select(countDistinct('name')).first()[0]}")
    print("Top 5 rows:")
    killing_spree_map.show(5)

    
# Check cardinalities
check_dataframe_cardinalities(
    player_kills_avg, 
    playlist_popularity, 
    map_popularity, 
    killing_spree_map
)

In [None]:
# Experiment with sortWithinPartitions
# High cardinality (almost 1:1 ratio); NOT recommended for sort
player_kills_avg.sortWithinPartitions("player_gamertag")
# playlist_id is low cardinality
playlist_popularity.sortWithinPartitions("playlist_id")
# map name is low cardinality
map_popularity.sortWithinPartitions("name")
# same
killing_spree_map.sortWithinPartitions("name")

In [None]:
%%sql

CREATE DATABASE IF NOT EXISTS bootcamp

In [None]:
%%sql

DROP TABLE IF EXISTS bootcamp.player_kills_avg;

In [None]:
playlist_popularity.writeTo("bootcamp.playlist_popularity") \
    .partitionedBy("playlist_id") \
    .create()

In [None]:
map_popularity.writeTo("bootcamp.map_popularity") \
    .partitionedBy("name") \
    .create()

In [14]:
player_kills_avg.writeTo("bootcamp.player_kills_avg") \
    .create()

                                                                                

In [None]:
killing_spree_map.writeTo("bootcamp.killing_spree_map") \
    .partitionedBy("name") \
    .create()

In [None]:
%%sql

select * from bootcamp.killing_spree_map;

In [16]:
player_kills_avg.count()

66061