# Penalty Measure of Foul Baiting using Poisson Distribution

## Project Outline

1. **Introduction**
   - Overview of the project
   - Goals and objectives

2. **Data Collection**
   - Import necessary libraries
   - Fetch player data from the NBA API

3. **Data Processing**
   - Load data into a Pandas DataFrame
   - Data cleaning and transformation

4. **Data Analysis**
   - Exploratory data analysis (EDA)
   - Visualizations

5. **Conclusion**
   - Summary of findings
   - Future work


## 0. Environment

### Python

This project is written in Python, which means that Python must be installed in your environment to run the project. The minimum supported version is 3.10.

#### Windows

You can use the Windows package manager `winget`, or the [installer](https://www.python.org/downloads/windows/) from the website.
```powershell
# you can change the version in the package name to your desired version
winget install Python.Python.3.12
```

#### MacOS
Python is already installed by default on recent versions of MacOS. If you have an older version that is not supported, you can use the [Homebrew](https://brew.sh/) package manager to install it, or the [installer](https://www.python.org/downloads/macos/) from the website.
```zsh
brew install python
```

#### Linux
Python is already installed by default on most distributions of Linux. If it isn't, you can use your distribution's package manager to install Python.

### Dependencies

This project uses [uv](https://github.com/astral-sh/uv) to manage its dependencies. You can install the dependencies with the `uv` command:

`uv add pandas`

If you don't want to use `uv`, a `requirements.txt` is also provided. You can install this using `pip`:

`pip install -r requirements.txt`

### Imports

In [None]:
import os
import time

import pandas as pd

### Environment Variables

We will load all our environment variables from a `.env` file, if one is provided.

If database information is provided, all dataframes used for analysis are uploaded to it. We use [Microsoft SQL Server](https://www.microsoft.com/en-us/sql-server/sql-server-downloads) by default but any kind of database is supported.

In [None]:
from dotenv import load_dotenv

load_dotenv()
DB_TYPE = os.getenv("DB_TYPE", "sqlserver")
DB_USER = os.getenv("DB_USER", "sqladmin")
DB_PASSWORD = os.getenv("DB_PASSWORD")
DB_HOST = os.getenv("DB_HOST", "localhost")
DB_PORT = os.getenv("DB_PORT", "1433")
DB_NAME = os.getenv("DB_NAME", "dataframes")
DB_DRIVER = os.getenv("DB_DRIVER")  # some databases require a database driver

### Presentation

By default, Pandas dataframes are truncated when they are printed. We want to be able to view all of the data at once, so we embed the dataframe in a scrollable element.

In [None]:
from IPython.core.interactiveshell import InteractiveShell
from IPython.display import HTML, display


def custom_scrollable_display(df: pd.DataFrame, max_height=400):
    """
    Custom display function to render DataFrames as scrollable elements.

    Parameters:
    - df: The DataFrame to display.
    - max_height: The maximum height of the scrollable area in pixels.
    """
    style = f"""
    <style>
    .scrollable-dataframe {{
        display: inline-block;
        white-space: nowrap;
        overflow-x: scroll;
        max-height: {max_height}px;
        overflow-y: scroll;
    }}
    </style>
    """
    display(HTML(style + f'<div class="scrollable-dataframe">{df.to_html()}</div>'))


def custom_display_hook(df):
    custom_scrollable_display(df)
    return ""


# hook up the custom display function to the automatic printer
InteractiveShell.instance().display_formatter.formatters["text/html"].for_type(
    pd.DataFrame, custom_display_hook
);

### Pre-Commit Hooks (Developer Only)

This notebook uses `nbstripout` to strip notebook output from Git commits. If you are committing code, please run the following command to set up the Git filter.

Poetry is required for the pre-commit hooks, so make sure it is installed before you commit code. You will also need to add the plugin `poetry-plugin-export` in order to run the export hook.
```bash
poetry self add poetry-plugin-export
```

In [None]:
!nbstripout --install
!pre-commit install

## 1. Data Collection

### Fetch Player Data from NBA API

`nba_api` provides static player and team information, which we will download here so that we can reuse it without requesting the API unnecessarily.

In [None]:
from nba_api.stats.static import players, teams

DATA_DIR = "../../data"

PLAYERS_LIST_FILE = "players_list.csv"
TEAMS_LIST_FILE = "teams_list.csv"

if os.path.exists(f"{DATA_DIR}/{PLAYERS_LIST_FILE}"):
    players_list = pd.read_csv(f"{DATA_DIR}/{PLAYERS_LIST_FILE}")
else:
    players_list = pd.DataFrame(players.get_players())
    players_list.to_csv(f"{DATA_DIR}/{PLAYERS_LIST_FILE}")

if os.path.exists(f"{DATA_DIR}/{TEAMS_LIST_FILE}"):
    teams_list = pd.read_csv(f"{DATA_DIR}/{TEAMS_LIST_FILE}")
else:
    teams_list = pd.DataFrame(teams.get_teams())
    teams_list.to_csv(f"{DATA_DIR}/{TEAMS_LIST_FILE}")

### Fetch Game Data



We're only interested in games that are either in the regular season or in the playoffs. We'll add an enum to distinguish the type of game and use it to differentiate them.

In [None]:
from enum import Enum


class SeasonType(Enum):
    PRESEASON = 1
    REGULAR_SEASON = 2
    ALL_STAR = 3
    PLAYOFFS = 4
    PLAY_IN = 5
    NBA_CUP = 6


class Season:
    def __init__(self, season_id: int) -> None:
        season_id_str = str(season_id)
        self.season_type = SeasonType(int(season_id_str[0]))
        self.season_year = int(season_id_str[1:])

In [None]:
from nba_api.stats.endpoints import leaguegamefinder
from nba_api.stats.library.parameters import LeagueIDNullable

START_SEASON = 2023
END_SEASON = 2024
GAMES_LIST_FILE = "games_list.csv"
if os.path.exists(f"{DATA_DIR}/{GAMES_LIST_FILE}"):
    games_list: pd.DataFrame = pd.read_csv(f"{DATA_DIR}/{GAMES_LIST_FILE}")
else:
    games_list = pd.DataFrame()
    for season in range(START_SEASON, END_SEASON + 1):
        # put season into the correct form e.g. 2023 -> 2023-24
        season_str = f"{season}-{str(season + 1)[2:]}"
        print(f"Fetching games for season: {season_str}", end="\r")
        gamefinder = leaguegamefinder.LeagueGameFinder(
            season_nullable=season_str, league_id_nullable=LeagueIDNullable.nba
        )
        games = gamefinder.get_data_frames()[0]
        games_list = pd.concat([games_list, games], ignore_index=True)
        time.sleep(0.6)
    games_list.to_csv(f"{DATA_DIR}/{GAMES_LIST_FILE}", index=False)
# games_list["SEASON_ID"].unique()
# games_list.loc[(games_list["TEAM_NAME"] == "San Antonio Spurs") & (games_list["SEASON_ID"] == 22023)]
games_list.head()

### Fetch Play by Plays

In [None]:
from nba_api.stats.endpoints import playbyplayv3
from requests.exceptions import ReadTimeout

# took 483 minutes to download up to 2012
PBP_LIST_FILE = "../../data/pbp_list.csv"
if os.path.exists(PBP_LIST_FILE):
    pbp_list = pd.read_csv(PBP_LIST_FILE)
else:
    unique_games_list = games_list.drop_duplicates(subset="GAME_ID")
    pbp_list = pd.DataFrame()
    for index, row in unique_games_list.iterrows():
        err = False
        game_id = row["GAME_ID"]
        game_date = row["GAME_DATE"]
        season_id = row["SEASON_ID"]
        season = Season(season_id)
        if (
            season.season_type != SeasonType.REGULAR_SEASON
            and season.season_type != SeasonType.PLAYOFFS
        ):
            continue
        print(f"Fetching play by play for game {game_id} on {game_date}", end="\r")
        while True:
            try:
                pbpfinder = playbyplayv3.PlayByPlayV3(f"{game_id:010}")
                break
            except ReadTimeout as e:
                print(f"{e}! Try again")
            except Exception:
                with open("../data/err.log", "a") as f:
                    print(f"{game_id} does not have a play by play", file=f)
                err = True
                break
        if err:
            continue
        pbp = pbpfinder.get_data_frames()[0]
        pbp_list = pd.concat([pbp_list, pbp], ignore_index=True)
        time.sleep(0.6)
    pbp_list.to_csv(PBP_LIST_FILE, index=False)
pbp_list.head()

### Fetch Box Scores

#### Player Track

In [None]:
from nba_api.stats.endpoints import boxscoreplayertrackv3
from requests.exceptions import ReadTimeout

# took 483 minutes to download up to 2012
BOXSCORE_PT_LIST_FILE = "../../data/boxscore_pt_list.csv"
if os.path.exists(BOXSCORE_PT_LIST_FILE):
    boxscore_pt_list = pd.read_csv(BOXSCORE_PT_LIST_FILE)
else:
    unique_games_list = games_list.drop_duplicates(subset="GAME_ID")
    boxscore_pt_list = pd.DataFrame()
    for index, row in unique_games_list.iterrows():
        err = False
        game_id = row["GAME_ID"]
        game_date = row["GAME_DATE"]
        season_id = row["SEASON_ID"]
        season = Season(season_id)
        if (
            season.season_type != SeasonType.REGULAR_SEASON
            and season.season_type != SeasonType.PLAYOFFS
        ):
            continue
        print(f"Fetching box score for game {game_id} on {game_date}", end="\r")
        while True:
            try:
                boxscorefinder = boxscoreplayertrackv3.BoxScorePlayerTrackV3(
                    f"{game_id:010}"
                )
                break
            except ReadTimeout as e:
                print(f"{e}! Try again")
            except Exception:
                with open("../data/err.log", "a") as f:
                    print(f"{game_id} does not have a player track box score", file=f)
                err = True
                break
        if err:
            continue
        boxscore_pt = boxscorefinder.get_data_frames()[0]
        boxscore_pt_list = pd.concat([boxscore_pt_list, boxscore_pt], ignore_index=True)
        time.sleep(0.6)
    boxscore_pt_list.to_csv(BOXSCORE_PT_LIST_FILE, index=False)
boxscore_pt_list.head()

## 2. Data Processing

### Recency

Our analysis will consider only the last two years, so we'll get rid of data from before that.

In [None]:
season_year = games_list["SEASON_ID"].astype(str).str[1:].astype(int)
games_list["season_year"] = season_year
current_season_year = 2023  # Replace with the current season's start year
games_list = games_list[
    games_list["season_year"].isin([current_season_year, current_season_year - 1])
]

In [None]:
pbp_list = pbp_list[pbp_list["gameId"].isin(games_list["GAME_ID"].unique())]

In [None]:
boxscore_pt_list = boxscore_pt_list[
    boxscore_pt_list["gameId"].isin(games_list["GAME_ID"].unique())
]

### Unnecessary Columns

Some of this data isn't useful to us, so we'll drop it to ignore the noise.

In [None]:
pbp_columns_to_drop = [
    "actionNumber",
    "pointsTotal",
    "videoAvailable",
    "actionId",
    "playerNameI",
    "teamTricode",
]
pbp_list.drop(
    columns=[col for col in pbp_columns_to_drop if col in pbp_list.columns],
    inplace=True,
)
pbp_list.head()

In [None]:
bspt_columns_to_drop = [
    "teamCity",
    "teamName",
    "teamTricode",
    "teamSlug",
    "playerNameI",
    "teamTricode",
    "playerSlug",
    "jerseyNum",
]
boxscore_pt_list.drop(
    columns=[col for col in bspt_columns_to_drop if col in boxscore_pt_list.columns],
    inplace=True,
)
boxscore_pt_list.head()

### Categorization

To save on memory, we will also turn variables that can be understood as categorical variables into that type.

In [None]:
pbp_categorical_columns = [
    "gameId",
    "teamId",
    "shotResult",
    "isFieldGoal",
    "location",
    "actionType",
    "subType",
    "personId",
    "playerName",
]
pbp_list[pbp_categorical_columns] = pbp_list[pbp_categorical_columns].astype("category")
pbp_list.head()

In [None]:
bspt_categorical_columns = [
    "gameId",
    "teamId",
    "personId",
    "firstName",
    "familyName",
    "nameI",
    "position",
]
boxscore_pt_list[bspt_categorical_columns] = boxscore_pt_list[
    bspt_categorical_columns
].astype("category")
boxscore_pt_list.head()

### Clock

We'll transform the clock data from a string into the total number of seconds.

In [None]:
if pbp_list["clock"].dtype != "int64":
    pbp_list["clock"] = pbp_list["clock"].astype(str)
    pbp_list["minutes"] = pbp_list["clock"].str[2:4].astype(int)
    pbp_list["seconds"] = pbp_list["clock"].str[5:7].astype(int)
    pbp_list["clock"] = pbp_list["minutes"] * 60 + pbp_list["seconds"]
    pbp_list.drop(columns=["minutes", "seconds"], inplace=True)
pbp_list["clock"].head()

In [None]:
if boxscore_pt_list["minutes"].dtype != "int64":
    boxscore_pt_list["minutes"] = boxscore_pt_list["minutes"].astype(str)
    boxscore_pt_list["mins"] = boxscore_pt_list["minutes"].str[:-3].astype(int)
    boxscore_pt_list["seconds"] = boxscore_pt_list["minutes"].str[-2:].astype(int)
    boxscore_pt_list["minutes"] = (
        boxscore_pt_list["mins"] * 60 + boxscore_pt_list["seconds"]
    )
    boxscore_pt_list.drop(columns=["mins", "seconds"], inplace=True)
boxscore_pt_list["minutes"].head()

## 3. Data Analysis

### Parse Data

Get the foul data.

In [None]:
from typing import Any
import numpy as np
from scipy.stats import poisson


fouls_df: pd.DataFrame = pbp_list[pbp_list["actionType"] == "Foul"].copy()

# Step 2: Group by game, period, and team.
# We use "gameId", "period", and "teamId" as grouping keys.
grouped = fouls_df.groupby(["gameId", "period", "teamId"])

# Prepare a list to collect the data.
records: list[dict[str, Any]] = []

# Process each group.
for (game_id, period, team_id), group in grouped:
    # Sort events in descending order by clock.
    # In many play-by-play datasets, the clock counts down, so higher values are earlier in the quarter.
    group_sorted = group.sort_values(by="clock", ascending=False)

    # Extract the clock times of the fouls.
    foul_times: list = group_sorted["clock"].tolist()

    # Pre-penalty fouls: the first five foul times.
    pre_penalty_times: list = foul_times[:5]

    # Ensure we have five columns (fill with np.nan if not enough fouls).
    pre_penalty_times += [np.nan] * (5 - len(pre_penalty_times))

    # Total fouls in the quarter.
    total_fouls: int = len(foul_times)

    # Post-penalty fouls: count after the first five.
    fouls_post_penalty: int = max(total_fouls - 5, 0)

    # Create a record.
    record = {
        "game_id": game_id,
        "team_id": team_id,
        "period": period,
        "foul_time_1": pre_penalty_times[0],
        "foul_time_2": pre_penalty_times[1],
        "foul_time_3": pre_penalty_times[2],
        "foul_time_4": pre_penalty_times[3],
        "foul_time_5": pre_penalty_times[4],
        "fouls_post_penalty": fouls_post_penalty,
        # Also store the count of pre-penalty fouls (could be useful later)
        "fouls_pre_penalty": min(total_fouls, 5),
    }
    records.append(record)

# Create the penalty_fouls DataFrame.
penalty_fouls: pd.DataFrame = pd.DataFrame(records)

# Step 4: Merge with team names from games_list.
# First, merge to get the team name for the team in question.
games_subset: pd.DataFrame = games_list[["GAME_ID", "TEAM_ID", "TEAM_NAME"]].copy()
games_subset.rename(
    columns={"GAME_ID": "game_id", "TEAM_ID": "team_id", "TEAM_NAME": "team_name"},
    inplace=True,
)

penalty_fouls = penalty_fouls.merge(games_subset, on=["game_id", "team_id"], how="left")

# Next, get the opponent's name for each game.
# For each record, the opponent is the team in games_list for the same game_id with a different team_id.
# We create a mapping from game_id to the two teams.
opponent_mapping: dict[str, dict[int, str]] = {}

# Create a dictionary that maps game_id to a dict of team_id: team_name.
for _, row in games_subset.iterrows():
    g_id = row["game_id"]
    t_id = row["team_id"]
    t_name = row["team_name"]
    if g_id not in opponent_mapping:
        opponent_mapping[g_id] = {}
    opponent_mapping[g_id][t_id] = t_name


# Define a helper function to get the opponent's name.
def get_opponent_name(game_id: str, team_id: int) -> str:
    teams: dict[int, str] = opponent_mapping.get(game_id, {})
    # The opponent is the team whose id is not team_id.
    for t_id, t_name in teams.items():
        if t_id != team_id:
            return t_name
    return np.nan  # In case there is no opponent found.


# Apply the function to create an "opponent_name" column.
penalty_fouls["opponent_name"] = penalty_fouls.apply(
    lambda row: get_opponent_name(row["game_id"], row["team_id"]), axis=1
)

# Optionally, drop the team_id column if not needed.
penalty_fouls.drop(columns=["team_id"], inplace=True)

# Rearranging the columns to match the desired order.
penalty_fouls = penalty_fouls[
    [
        "game_id",
        "team_name",
        "opponent_name",
        "foul_time_1",
        "foul_time_2",
        "foul_time_3",
        "foul_time_4",
        "foul_time_5",
        "period",
        "fouls_post_penalty",
        "fouls_pre_penalty",
    ]
]

### Empirical Data

Get empirical pmfs.

In [None]:
# Compute the total fouls per quarter (pre + post)
penalty_fouls["total_fouls"] = (
    penalty_fouls["fouls_pre_penalty"] + penalty_fouls["fouls_post_penalty"]
)

# Compute the empirical PMF for total quarter fouls
total_counts: pd.Series = penalty_fouls["total_fouls"].value_counts().sort_index()
empirical_total: pd.Series = total_counts / total_counts.sum()

# Compute the empirical PMF for post-penalty fouls
pre_counts: pd.Series = penalty_fouls["fouls_pre_penalty"].value_counts().sort_index()
empirical_pre: pd.Series = pre_counts / pre_counts.sum()

# Compute the empirical PMF for post-penalty fouls
post_counts: pd.Series = penalty_fouls["fouls_post_penalty"].value_counts().sort_index()
empirical_post: pd.Series = post_counts / post_counts.sum()

print("Empirical PMF for total quarter fouls (pre + post):")
print(empirical_total)
print("\nEmpirical PMF for pre-penalty fouls:")
print(empirical_post)
print("\nEmpirical PMF for post-penalty fouls:")
print(empirical_post)

Empirical post-penalty pmf conditioned on the penalty occurring.

In [None]:
# Filter only rows where the fifth pre-penalty foul occurred
penalty_fouls_conditioned = penalty_fouls[penalty_fouls["foul_time_5"].notna()]

# Compute the empirical PMF for post-penalty fouls, given that five pre-penalty fouls occurred
post_counts_conditioned: pd.Series = (
    penalty_fouls_conditioned["fouls_post_penalty"].value_counts().sort_index()
)
empirical_post_conditioned: pd.Series = (
    post_counts_conditioned / post_counts_conditioned.sum()
)

print("Empirical PMF for post-penalty fouls (conditioned on 5 pre-penalty fouls):")
print(empirical_post_conditioned)

### Post-Penalty Distributions

Estimate λ for standard Poisson distribution of post-penalty fouls.

In [None]:
# Step 5: Estimate the Poisson distribution.
# For the pre-penalty fouls, the count per observation is min(total fouls, 5).
# Create fouls_pre_penalty column by counting non-NaN values in the first five foul time columns.

# For the post-penalty fouls, we already computed "fouls_post_penalty".
# Compute the average rate for post-penalty fouls.
lambda_post: float = penalty_fouls["fouls_post_penalty"].mean()

print(
    "Estimated Poisson parameter (λ) for post-penalty fouls: {:.3f}".format(lambda_post)
)

# Additionally, you can compute the probability mass function (PMF) for a range of counts.
# For example, for counts from 0 to 10, using the estimated λ's.
x_vals = np.arange(0, 11)

pmf_post = poisson.pmf(x_vals, lambda_post)

print("\nPMF for post-penalty fouls (counts 0-10):")
for x, pmf in zip(x_vals, pmf_post):
    print("Fouls = {}: Probability = {:.4f}".format(x, pmf))
penalty_fouls["fouls_post_penalty"]
post_counts_conditioned

Estimate for standard Poisson using conditional post-penalty.

In [None]:
lambda_post_conditional: float = post_counts_conditioned.mean()

print(
    "Estimated Poisson parameter (λ) for post-penalty fouls: {:.3f}".format(
        lambda_post_conditional
    )
)

# Additionally, you can compute the probability mass function (PMF) for a range of counts.
# For example, for counts from 0 to 10, using the estimated λ's.
x_vals = np.arange(0, 11)

pmf_post_conditional = poisson.pmf(x_vals, lambda_post_conditional)

print("\nPMF for post-penalty fouls (counts 0-10):")
for x, pmf in zip(x_vals, pmf_post_conditional):
    print("Fouls = {}: Probability = {:.4f}".format(x, pmf))

Graph post-penalty distributions.

In [None]:
import matplotlib.pyplot as plt
import numpy as np
from scipy.stats import poisson

# Assume lambda_pre and lambda_post are already estimated from your data.

# Define x-values:
# For pre-penalty, our observed values range from 0 to 5.
x_vals_pre: np.ndarray = np.arange(0, 6)
# For post-penalty, we can use a wider range (e.g., 0 to 10).
x_vals_post: np.ndarray = np.arange(0, 11)

# Convert empirical PMF series to arrays for plotting.
x_total: np.ndarray = empirical_total.index.to_numpy()
y_total: np.ndarray = empirical_total.values

x_post_empirical: np.ndarray = empirical_post.index.to_numpy()
y_post_empirical: np.ndarray = empirical_post.values

x_post_conditioned: np.ndarray = empirical_post_conditioned.index.to_numpy()
y_post_conditioned: np.ndarray = empirical_post_conditioned.values

# Calculate the standard Poisson PMF for post-penalty fouls.
pmf_post: np.ndarray = poisson.pmf(x_vals_post, lambda_post)

# Create the graph.
plt.figure(figsize=(10, 6))

# Plot empirical PMF for total quarter fouls (solid line with circles)
plt.plot(
    x_total,
    y_total,
    marker="o",
    linestyle="-",
    color="C3",
    label="Empirical Total Quarter Fouls",
)

# Plot empirical PMF for post-penalty fouls (dashed line with triangle markers)
plt.plot(
    x_post_empirical,
    y_post_empirical,
    marker="^",
    linestyle="--",
    color="C4",
    label="Empirical Post-Penalty Fouls",
)

plt.plot(
    x_post_conditioned,
    y_post_conditioned,
    marker="*",
    linestyle="solid",
    label="Empirical (Post-Penalty | 5 Pre Fouls)",
)

# Plot the standard Poisson PMF for post-penalty fouls as a solid line with markers.
plt.plot(
    x_vals_post,
    pmf_post,
    linestyle="-",
    marker="s",
    color="C1",
    label=f"Post-Penalty (λ = {lambda_post:.2f})",
)

plt.title("Foul Count Distributions")
plt.xlabel("Number of Fouls")
plt.ylabel("Probability")
plt.legend()
plt.grid(True)
plt.show()

### Pre-Penalty Distributions


Estimate λ for truncated Poisson distribution for pre-penalty.

In [None]:
import numpy as np
import pandas as pd
from scipy.optimize import minimize_scalar
from scipy.stats import poisson


def neg_log_likelihood(lam: float, data: np.ndarray, max_val: int) -> float:
    """
    Compute the negative log-likelihood for a truncated Poisson distribution.

    The PMF for a truncated Poisson (truncated at max_val) is:
      f(x; lam) = [e^(-lam) lam^x / x!] / (sum_{j=0}^{max_val} e^(-lam) lam^j / j!)

    Parameters:
        lam (float): Poisson rate parameter (lambda).
        data (np.ndarray): Array of observed counts (0 to max_val).
        max_val (int): The truncation point.

    Returns:
        float: Negative log-likelihood value.
    """
    if lam <= 0:
        return np.inf  # Return a large value if lambda is non-positive.

    # Normalization constant for the truncated Poisson.
    norm_const: float = np.sum(poisson.pmf(np.arange(0, max_val + 1), lam))

    # Compute log likelihood.
    # Note: np.math.factorial expects an integer input.
    log_likelihood: float = 0.0
    for x in data:
        # Compute log(PMF) = -lam + x*log(lam) - log(x!) - log(norm_const)
        log_likelihood += (
            -lam
            + x * np.log(lam)
            - np.log(np.math.factorial(int(x)))
            - np.log(norm_const)
        )

    return -log_likelihood  # We return the negative log-likelihood.


# Extract the observed pre-penalty foul counts from the DataFrame.
data_pre: np.ndarray = penalty_fouls["fouls_pre_penalty"].to_numpy()
max_val: int = 5  # Since the pre-penalty counts are truncated at 5.

# Optimize the negative log-likelihood to estimate lambda.
result = minimize_scalar(
    neg_log_likelihood, bounds=(1e-5, 20), args=(data_pre, max_val), method="bounded"
)
lambda_pre: float = result.x

print(f"Estimated truncated Poisson lambda for pre-penalty fouls: {lambda_pre:.4f}")

Create function for calculating truncated Poisson pmf.

In [None]:
import numpy as np
from scipy.stats import poisson
from typing import Union


def truncated_poisson_pmf(
    k: Union[int, np.ndarray], lam: float, max_val: int
) -> Union[float, np.ndarray]:
    """
    Calculate the PMF of a Poisson distribution truncated at max_val.

    Parameters:
        k (int or np.ndarray): The value(s) for which to compute the PMF (should be between 0 and max_val).
        lam (float): The lambda (rate) parameter of the underlying Poisson distribution.
        max_val (int): The maximum possible value (truncation point).

    Returns:
        float or np.ndarray: The truncated PMF evaluated at k.
    """
    # Compute the normalization constant: the sum of Poisson PMFs from 0 to max_val.
    norm_const: float = np.sum(poisson.pmf(np.arange(0, max_val + 1), lam))
    # Compute the truncated PMF.
    return poisson.pmf(k, lam) / norm_const


# Example usage for demonstration:
lambda_pre_example: float = 3.0  # Replace with your actual estimated lambda_pre
x_vals_truncated: np.ndarray = np.arange(0, 6)  # Valid values: 0 to 5 (inclusive)
pmf_truncated: np.ndarray = truncated_poisson_pmf(
    x_vals_truncated, lambda_pre_example, 5
)

print("Truncated Poisson PMF for pre-penalty fouls (0 to 5):")
for x, prob in zip(x_vals_truncated, pmf_truncated):
    print(f"Fouls = {x}: Probability = {prob:.4f}")

In [None]:
# Calculate the truncated Poisson PMF for pre-penalty using the function from Code Block 1.
pmf_truncated: np.ndarray = truncated_poisson_pmf(x_vals_pre, lambda_pre, 5)

# Create the graph.
plt.figure(figsize=(10, 6))

# Plot the truncated PMF for pre-penalty fouls using a stem plot.
plt.plot(
    x_vals_pre,
    pmf_truncated,
    linestyle="-",
    marker="o",
    label=f"Truncated Pre-Penalty (λ = {lambda_pre:.2f})",
)

plt.title("Foul Count Distributions")
plt.xlabel("Number of Fouls")
plt.ylabel("Probability")
plt.legend()
plt.grid(True)
plt.show()