# Setup


In [1]:
import os
from functools import reduce

import distributed
import kagglehub
from dask import dataframe as dd
from dotenv import load_dotenv
from tqdm import tqdm

# blocksize and partition size for dask dataframe
# https://docs.dask.org/en/stable/generated/dask.dataframe.read_csv.html
# https://docs.dask.org/en/stable/dataframe-create.html#read-from-csv
DASK_BLOCKSIZE = "8MB"
# name of the environment variable stored in ./.env for your sql uri
ENV_SQL_URI = "URI_PG"
# chunksize for to_sql. reduce value if you run out of memory
CHUNKSIZE = 5000

# load environment variables from ./.env
is_dotenv = load_dotenv()
if not is_dotenv:  # remind user to create a .env
    raise Exception(f"Please create a .env and define the {ENV_SQL_URI} variable")
# check if sql uri exists
sql_uri = str(os.getenv(ENV_SQL_URI))
if sql_uri == "None":
    raise Exception(f"Please define an sql uri on key {ENV_SQL_URI}")

In [2]:
# Start the dask distributed client
# click on the link in output to see the dashboard
distributed.Client()

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 6
Total threads: 24,Total memory: 31.93 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:55671,Workers: 6
Dashboard: http://127.0.0.1:8787/status,Total threads: 24
Started: Just now,Total memory: 31.93 GiB

0,1
Comm: tcp://127.0.0.1:55699,Total threads: 4
Dashboard: http://127.0.0.1:55703/status,Memory: 5.32 GiB
Nanny: tcp://127.0.0.1:55674,
Local directory: C:\Users\user\AppData\Local\Temp\dask-scratch-space\worker-7npi_yfg,Local directory: C:\Users\user\AppData\Local\Temp\dask-scratch-space\worker-7npi_yfg

0,1
Comm: tcp://127.0.0.1:55713,Total threads: 4
Dashboard: http://127.0.0.1:55714/status,Memory: 5.32 GiB
Nanny: tcp://127.0.0.1:55676,
Local directory: C:\Users\user\AppData\Local\Temp\dask-scratch-space\worker-cbiemt1u,Local directory: C:\Users\user\AppData\Local\Temp\dask-scratch-space\worker-cbiemt1u

0,1
Comm: tcp://127.0.0.1:55698,Total threads: 4
Dashboard: http://127.0.0.1:55700/status,Memory: 5.32 GiB
Nanny: tcp://127.0.0.1:55678,
Local directory: C:\Users\user\AppData\Local\Temp\dask-scratch-space\worker-tv9ikvll,Local directory: C:\Users\user\AppData\Local\Temp\dask-scratch-space\worker-tv9ikvll

0,1
Comm: tcp://127.0.0.1:55706,Total threads: 4
Dashboard: http://127.0.0.1:55707/status,Memory: 5.32 GiB
Nanny: tcp://127.0.0.1:55680,
Local directory: C:\Users\user\AppData\Local\Temp\dask-scratch-space\worker-w444uvly,Local directory: C:\Users\user\AppData\Local\Temp\dask-scratch-space\worker-w444uvly

0,1
Comm: tcp://127.0.0.1:55705,Total threads: 4
Dashboard: http://127.0.0.1:55708/status,Memory: 5.32 GiB
Nanny: tcp://127.0.0.1:55682,
Local directory: C:\Users\user\AppData\Local\Temp\dask-scratch-space\worker-b2y864z7,Local directory: C:\Users\user\AppData\Local\Temp\dask-scratch-space\worker-b2y864z7

0,1
Comm: tcp://127.0.0.1:55702,Total threads: 4
Dashboard: http://127.0.0.1:55709/status,Memory: 5.32 GiB
Nanny: tcp://127.0.0.1:55684,
Local directory: C:\Users\user\AppData\Local\Temp\dask-scratch-space\worker-578uy010,Local directory: C:\Users\user\AppData\Local\Temp\dask-scratch-space\worker-578uy010


# Extract


## Download dataset


In [3]:
# Download dataset to project folder
# https://github.com/Kaggle/kagglehub/issues/175
os.environ["KAGGLEHUB_CACHE"] = "./.kaggle"
dataset_path = kagglehub.dataset_download("martinellis/nhl-game-data")

## Define dtypes


In [4]:
# Define column dtypes for each file

# dtype shortcuts
DTYPE_BOOL = "bool[pyarrow]"
DTYPE_INT = "int64[pyarrow]"  # note that pyarrow ints are nullable
DTYPE_FLOAT = "float64[pyarrow]"
DTYPE_STRING = "string[pyarrow]"
DTYPE_DATETIME = "timestamp[ms][pyarrow]"

# full definitions
dtype_defs = {}
dtype_defs["game.csv"] = {
    "game_id": DTYPE_INT,
    "season": DTYPE_INT,
    "type": DTYPE_STRING,
    "date_time_GMT": DTYPE_DATETIME,
    "away_team_id": DTYPE_INT,
    "home_team_id": DTYPE_INT,
    "away_goals": DTYPE_INT,
    "home_goals": DTYPE_INT,
    "outcome": DTYPE_STRING,
    "home_rink_side_start": DTYPE_STRING,
    "venue": DTYPE_STRING,
    "venue_link": DTYPE_STRING,
    "venue_time_zone_id": DTYPE_STRING,
    "venue_time_zone_offset": DTYPE_INT,
    "venue_time_zone_tz": DTYPE_STRING,
}
dtype_defs["game_goalie_stats.csv"] = {
    "game_id": DTYPE_INT,
    "player_id": DTYPE_INT,
    "team_id": DTYPE_INT,
    "timeOnIce": DTYPE_INT,
    "assists": DTYPE_INT,
    "goals": DTYPE_INT,
    "pim": DTYPE_INT,
    "shots": DTYPE_INT,
    "saves": DTYPE_INT,
    "powerPlaySaves": DTYPE_INT,
    "shortHandedSaves": DTYPE_INT,
    "evenSaves": DTYPE_INT,
    "shortHandedShotsAgainst": DTYPE_INT,
    "evenShotsAgainst": DTYPE_INT,
    "powerPlayShotsAgainst": DTYPE_INT,
    "decision": DTYPE_STRING,
    "savePercentage": DTYPE_FLOAT,
    "powerPlaySavePercentage": DTYPE_FLOAT,
    "evenStrengthSavePercentage": DTYPE_FLOAT,
}
dtype_defs["game_goals.csv"] = {
    "play_id": DTYPE_STRING,
    "strength": DTYPE_STRING,
    "gameWinningGoal": DTYPE_BOOL,
    "emptyNet": DTYPE_BOOL,
}
dtype_defs["game_officials.csv"] = {
    "game_id": DTYPE_INT,
    "official_name": DTYPE_STRING,
    "official_type": DTYPE_STRING,
}
dtype_defs["game_penalties.csv"] = {
    "play_id": DTYPE_STRING,
    "penaltySeverity": DTYPE_STRING,
    "penaltyMinutes": DTYPE_INT,
}
dtype_defs["game_plays.csv"] = {
    "play_id": DTYPE_STRING,
    "game_id": DTYPE_INT,
    "team_id_for": DTYPE_INT,
    "team_id_against": DTYPE_INT,
    "event": DTYPE_STRING,
    "secondaryType": DTYPE_STRING,
    "x": DTYPE_INT,
    "y": DTYPE_INT,
    "period": DTYPE_INT,
    "periodType": DTYPE_STRING,
    "periodTime": DTYPE_INT,
    "periodTimeRemaining": DTYPE_INT,
    "dateTime": DTYPE_DATETIME,
    "goals_away": DTYPE_INT,
    "goals_home": DTYPE_INT,
    "description": DTYPE_STRING,
    "st_x": DTYPE_INT,
    "st_y": DTYPE_INT,
}
dtype_defs["game_plays_players.csv"] = {
    "play_id": DTYPE_STRING,
    "game_id": DTYPE_INT,
    "player_id": DTYPE_INT,
    "playerType": DTYPE_STRING,
}
dtype_defs["game_scratches.csv"] = {
    "game_id": DTYPE_INT,
    "team_id": DTYPE_INT,
    "player_id": DTYPE_INT,
}
dtype_defs["game_shifts.csv"] = {
    "game_id": DTYPE_INT,
    "player_id": DTYPE_INT,
    "period": DTYPE_INT,
    "shift_start": DTYPE_INT,
    "shift_end": DTYPE_INT,
}
dtype_defs["game_skater_stats.csv"] = {
    "game_id": DTYPE_INT,
    "player_id": DTYPE_INT,
    "team_id": DTYPE_INT,
    "timeOnIce": DTYPE_INT,
    "assists": DTYPE_INT,
    "goals": DTYPE_INT,
    "shots": DTYPE_INT,
    "hits": DTYPE_INT,
    "powerPlayGoals": DTYPE_INT,
    "powerPlayAssists": DTYPE_INT,
    "penaltyMinutes": DTYPE_INT,
    "faceOffWins": DTYPE_INT,
    "faceoffTaken": DTYPE_INT,
    "takeaways": DTYPE_INT,
    "giveaways": DTYPE_INT,
    "shortHandedGoals": DTYPE_INT,
    "shortHandedAssists": DTYPE_INT,
    "blocked": DTYPE_INT,
    "plusMinus": DTYPE_INT,
    "evenTimeOnIce": DTYPE_INT,
    "shortHandedTimeOnIce": DTYPE_INT,
    "powerPlayTimeOnIce": DTYPE_INT,
}
dtype_defs["game_teams_stats.csv"] = {
    "game_id": DTYPE_INT,
    "team_id": DTYPE_INT,
    "HoA": DTYPE_STRING,
    "won": "bool",
    "settled_in": DTYPE_STRING,
    "head_coach": DTYPE_STRING,
    "goals": DTYPE_INT,
    "shots": DTYPE_INT,
    "hits": DTYPE_INT,
    "pim": DTYPE_INT,
    "powerPlayOpportunities": DTYPE_INT,
    "powerPlayGoals": DTYPE_INT,
    "faceOffWinPercentage": DTYPE_FLOAT,
    "giveaways": DTYPE_INT,
    "takeaways": DTYPE_INT,
    "blocked": DTYPE_INT,
    "startRinkSide": DTYPE_STRING,
}
dtype_defs["player_info.csv"] = {
    "player_id": DTYPE_INT,
    "firstName": DTYPE_STRING,
    "lastName": DTYPE_STRING,
    "nationality": DTYPE_STRING,
    "birthCity": DTYPE_STRING,
    "primaryPosition": DTYPE_STRING,
    "birthDate": DTYPE_DATETIME,
    "birthStateProvince": DTYPE_STRING,
    "height": DTYPE_STRING,
    "height_cm": DTYPE_FLOAT,
    "weight": DTYPE_INT,
    "shootsCatches": DTYPE_STRING,
}
dtype_defs["team_info.csv"] = {
    "team_id": DTYPE_INT,
    "franchiseId": DTYPE_INT,
    "shortName": DTYPE_STRING,
    "teamName": DTYPE_STRING,
    "abbreviation": DTYPE_STRING,
    "link": DTYPE_STRING,
}

## Read files


In [5]:
# Read all csv files from data directory
dfs: dict[str, dd.DataFrame] = {}  # type: ignore # https://github.com/dask/dask/issues/9710
for n in os.listdir(dataset_path):
    if n.endswith(".csv"):
        if n in dtype_defs:
            print(f"✅ reading {n} with dtypes")
            fpath = os.path.join(dataset_path, n)
            dfs[n.split(".")[0]] = dd.read_csv(  # type: ignore
                fpath,
                blocksize=DASK_BLOCKSIZE,
                dtype=dtype_defs[n],
                engine="pyarrow",
                dtype_backend="pyarrow",
            )
        else:
            print(f"❌ reading {n} without dtypes")
            dfs[n.split(".")[0]] = dd.read_csv(os.path.join(dataset_path, n))  # type: ignore

✅ reading game.csv with dtypes
✅ reading game_goalie_stats.csv with dtypes
✅ reading game_goals.csv with dtypes
✅ reading game_officials.csv with dtypes
✅ reading game_penalties.csv with dtypes
✅ reading game_plays.csv with dtypes
✅ reading game_plays_players.csv with dtypes
✅ reading game_scratches.csv with dtypes
✅ reading game_shifts.csv with dtypes
✅ reading game_skater_stats.csv with dtypes
✅ reading game_teams_stats.csv with dtypes
✅ reading player_info.csv with dtypes
✅ reading team_info.csv with dtypes


In [9]:
# Print partitions
for k in dfs:
    print(f"✂️ npartitions {k}: {dfs[k].npartitions}")

✂️ npartitions game: 1
✂️ npartitions game_goalie_stats: 1
✂️ npartitions game_goals: 1
✂️ npartitions game_officials: 1
✂️ npartitions game_penalties: 1
✂️ npartitions game_plays: 95
✂️ npartitions game_plays_players: 47
✂️ npartitions game_scratches: 1
✂️ npartitions game_shifts: 52
✂️ npartitions game_skater_stats: 8
✂️ npartitions game_teams_stats: 1
✂️ npartitions player_info: 1
✂️ npartitions team_info: 1


In [6]:
# compute to verify types are compatible
# don't need to run this once types are finalized

# for n in dfs:
#     try:
#         dfs[n].compute()  # type: ignore
#         print(f"✅ {n} passed compute")
#     except Exception as e:
#         print(f"❌ {n} did not pass compute due to:")
#         print(e)

# Transform


## Drop duplicate rows


In [7]:
def drop_duplicates(df: dd.DataFrame, subset: str) -> dd.DataFrame:  # type: ignore
    """Drop duplicates and return the dataframe"""
    rows_before = df.shape[0].compute()
    df = df.drop_duplicates(subset=subset)
    rows_after = df.shape[0].compute()
    print(f"✅ dropped {rows_before-rows_after} rows using {subset}")
    return df


# I have verified that data with the same key are duplicate rows
dfs["game"] = drop_duplicates(dfs["game"], "game_id")
dfs["game_plays"] = drop_duplicates(dfs["game_plays"], "play_id")
dfs["player_info"] = drop_duplicates(dfs["player_info"], "player_id")
dfs["team_info"] = drop_duplicates(dfs["team_info"], "team_id")



✅ dropped 2570 rows using game_id
✅ dropped 833466 rows using play_id
✅ dropped 0 rows using player_id
✅ dropped 0 rows using team_id


## Drop unreferenced data


In [8]:
# Drop unreferenced data that is unrecoverable
def drop_unreferenced(
    df_foreign: dd.DataFrame,  # type: ignore
    df_foreign_col: str,
    df_primary: dd.DataFrame,  # type: ignore
    df_primary_col: str,
) -> dd.DataFrame:  # type: ignore
    """Drop unreferenced keys and returns the dataframe"""
    rows_before = df_foreign.shape[0].compute()
    # reduce memory usage with partitions
    selections = []
    for i in range(df_primary.npartitions):
        selections.append(
            df_foreign[df_foreign_col].isin(df_primary[df_primary_col].get_partition(i))
        )
    selector = reduce(lambda x, y: x | y, selections)
    df_foreign = df_foreign[selector]
    rows_after = df_foreign.shape[0].compute()
    print(f"✅ dropped {rows_before-rows_after} rows on {df_foreign_col}")
    return df_foreign


dfs["game_goalie_stats"] = drop_unreferenced(
    dfs["game_goalie_stats"], "team_id", dfs["team_info"], "team_id"
)
dfs["game_plays_players"] = drop_unreferenced(
    dfs["game_plays_players"], "play_id", dfs["game_plays"], "play_id"
)
dfs["game_skater_stats"] = drop_unreferenced(
    dfs["game_skater_stats"], "team_id", dfs["team_info"], "team_id"
)
dfs["game_teams_stats"] = drop_unreferenced(
    dfs["game_teams_stats"], "team_id", dfs["team_info"], "team_id"
)
# TODO: check for more instances of unreferenced data

# CHECKED COLUMNS:
# game_goals - play_id
# game_officials - game_id
# game_penalties - play_id
# game_plays - game_id
# game_plays_players - play_id, game_id, player_id
# game_scratches - game_id, team_id, player_id
# game_shifts - game_id, player_id
# game_skater_stats - game_id, player_id, team_id
# game_teams_stats - game_id, team_id


✅ dropped 40 rows on team_id
✅ dropped 23 rows on play_id
✅ dropped 180 rows on team_id
✅ dropped 20 rows on team_id


# Load


In [None]:
# Load the tables into an sql server
# note that it takes about an hour due to data size
def load_with_progress(df: dd.DataFrame, table_name: str, uri: str) -> None:  # type: ignore
    with tqdm(total=df.npartitions, desc=table_name) as pbar:
        for i in range(df.npartitions):
            if_exists = "replace" if i == 0 else "append"
            df.get_partition(i).to_sql(
                table_name,
                uri,
                index=False,
                if_exists=if_exists,
                chunksize=CHUNKSIZE,
                method="multi",
            )
            pbar.update(1)


for k in dfs:
    load_with_progress(dfs[k], k, sql_uri)