## In this notebook

- Generate situation time data.

In [3]:
import dotenv
import json
import os
from enum import Enum
from typing import List, Tuple
from pathlib import Path

import boto3
import pandas as pd

from tqdm.notebook import tqdm

In [4]:
dotenv.load_dotenv()

S3_ACCESS_KEY_ID = os.getenv("S3_ACCESS_KEY_ID")
S3_SECRET_ACCESS_KEY = os.getenv("S3_SECRET_ACCESS_KEY")
S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME")

## Dependencies

In [3]:
class TeamType(Enum):
    HOME = "home"
    AWAY = "away"


class SituationType(Enum):
    FIVE_ON_FIVE = "5v5"
    FIVE_ON_FOUR = "5v4"
    FOUR_ON_FIVE = "4v5"
    OTHER = "other"

    @staticmethod
    def from_situation_code_and_team_type(situation_code: str, team_type: TeamType) -> str:
        if situation_code == "1551":
            return SituationType.FIVE_ON_FIVE.value
        elif situation_code == "1451" and team_type == TeamType.HOME:
            return SituationType.FIVE_ON_FOUR.value
        elif situation_code == "1451" and team_type == TeamType.AWAY:
            return SituationType.FOUR_ON_FIVE.value
        elif situation_code == "1541" and team_type == TeamType.HOME:
            return SituationType.FOUR_ON_FIVE.value
        elif situation_code == "1541" and team_type == TeamType.AWAY:
            return SituationType.FIVE_ON_FOUR.value
        else:
            return SituationType.OTHER.value

In [4]:
def get_general_game_features(game: dict) -> dict:
    """Extract general features from a game dictionary.

    Parameters:
    -----------
    game : dict
        A dictionary containing raw game information.

    Returns:
    --------
    dict
    """
    return {
        "game_id": game.get("id"),
        "game_date": game.get("gameDate"),
        "away_team_id": game.get("awayTeam", {}).get("id"),
        "home_team_id": game.get("homeTeam", {}).get("id"),
    }


def get_situation_time_base(game: dict) -> List[dict]:
    """Extract situation time from a game dictionary.

    Parameters:
    -----------
    game : dict
        A dictionary containing game information.

    Returns:
    --------
    List[dict]
    """
    home_team_id = game.get("homeTeam", {}).get("id")
    away_team_id = game.get("awayTeam", {}).get("id")
    situation_code_to_time = get_situation_code_to_time(game=game)

    return [
        {
            **get_general_game_features(game=game),
            "situation_team_id": team_id,
            "situation_code": situation_code,
            "situation_type": SituationType.from_situation_code_and_team_type(
                situation_code=situation_code,
                team_type=team_type,
            ),
            "situation_time": time,
        }
        for team_id, team_type in [(home_team_id, TeamType.HOME), (away_team_id, TeamType.AWAY)]
        for situation_code, time in situation_code_to_time.items()
    ]


def get_situation_code_to_time(game: dict) -> dict:
    """Compute the total time spent in each game situation.

    Parameters:
    -----------
    game : dict
        A dictionary containing game data, including plays with timestamps and situation codes.

    Returns:
    --------
    dict
    """
    return (
        pd.DataFrame(
            {
                "game_id": game.get("id"),
                "type": play.get("typeDescKey"),
                "period": play.get("periodDescriptor", {}).get("number"),
                "time_in_period": play.get("timeInPeriod"),
                "situation_code": play.get("situationCode"),
            }
            for play in game.get("plays", [])
        )
        # compute time in game and categorize situations
        .assign(
            minutes_in_period=lambda _df: _df.time_in_period.str.split(":").str[0].astype(int),
            seconds_in_period=lambda _df: _df.time_in_period.str.split(":").str[1].astype(int),
            time_in_game=lambda _df: 60 * 20 * (_df.period - 1) + _df.minutes_in_period * 60 + _df.seconds_in_period,
            is_previous_situation_different=lambda _df: _df.situation_code.ne(_df.situation_code.shift()),
            situation_order_id=lambda _df: _df.is_previous_situation_different.cumsum(),
        )
        # determine the start and end time of each situation
        .groupby(["situation_order_id", "situation_code"])
        .agg(
            situation_start=("time_in_game", "min"),
            situation_end=("time_in_game", "max"),
        )
        # adjust for gaps in time between situations
        .assign(
            situation_start_correction=lambda _df: (_df.situation_start - _df.situation_end.shift()) / 2,
            situation_end_correction=lambda _df: (_df.situation_start.shift(-1) - _df.situation_end) / 2,
            situation_start=lambda _df: _df.situation_start - _df.situation_start_correction.fillna(0),
            situation_end=lambda _df: _df.situation_end + _df.situation_end_correction.fillna(0),
            time_on_ice=lambda _df: _df.situation_end - _df.situation_start,
        )
        # aggregate total time spent in each situation
        .groupby("situation_code")
        .agg(
            time_on_ice=("time_on_ice", "sum"),
        )
        .time_on_ice.to_dict()
    )


## Read data from S3, process them and save back

In [5]:
s3 = boto3.resource(
    "s3",
    aws_access_key_id=S3_ACCESS_KEY_ID,
    aws_secret_access_key=S3_SECRET_ACCESS_KEY
)

In [13]:
class SeasonType(Enum):
    PRESEASON = 1
    REGULAR = 2
    PLAYOFF = 3
    ALLSTAR = 4


def extract_info_from(key: str) -> Tuple[str, str, str]:
    """Extract game_id, season, and season type from a key.

    Parameters:
    -----------
    key : str
        A string representing the unique identifier S3 bucket file.

    Returns:
    --------
    Tuple[str, str, str]
        A tuple containing information about the game_id, season, and season type extracted
        from the provided key.
    """
    game_id = Path(key).stem

    season = game_id[:4]

    season_type_val = int(game_id[5])
    season_type = SeasonType(season_type_val).name.lower()

    return game_id, season, season_type

In [6]:
def read_json_from_s3(bucket_name: str, key: str) -> pd.DataFrame:
    return json.loads(
        s3.Object(bucket_name=bucket_name, key=key)
        .get()["Body"]
        .read()
        .decode("utf-8")
    )

In [27]:
bucket_raw = s3.Bucket("frozen-facts-center-raw")
bucket_base = s3.Bucket("frozen-facts-center-base")

for obj_raw in tqdm(bucket_raw.objects.all(), total=len(list(bucket_raw.objects.all()))):

    if not obj_raw.key.endswith(".json"):
        continue

    # create file path
    # print(obj_raw.key)

    # read game data
    game_id, season, season_type = extract_info_from(key=obj_raw.key)
    game = read_json_from_s3(bucket_name=bucket_raw.name, key=obj_raw.key)

    # generate situation time data
    situation_time_base = get_situation_time_base(game=game)
    df = pd.DataFrame(situation_time_base).query("situation_time > 0")

    # save data into PARQUET file
    parquet_file_key = f"situation-time/{season}/{season_type}/{game_id}.parquet"
    parquet_file_path = f"s3://{bucket_base.name}/{parquet_file_key}"
    df.to_parquet(path=parquet_file_path, index=False)
    # df.to_parquet(path=file_path)

    #Â break


  0%|          | 0/6303 [00:00<?, ?it/s]