In [22]:
import datetime
import duckdb
import enum
import json
import ollama
import os
import pandas as pd
import pydantic
import requests
import time
import tqdm
from typing import Dict, List

In [23]:
SLEEPER_REST_API = "https://api.sleeper.com"
SLEEPER_GRAPHQL_API = "https://sleeper.com/graphql"


def get_player_reports(player_id: str, limit: int) -> List[Dict]:
    operation_name = "get_player_news"
    kwargs = {
        "operation_name": operation_name,
        "player_id": player_id,
        "limit": limit,
    }
    query = """
        query {operation_name} {{
            get_player_news(sport: "nfl", player_id: "{player_id}", limit: {limit}){{
                metadata
                player_id
                published
                source
                source_key
                sport
            }}
        }}
    """.format(**kwargs)
    body = {
        "operationName": operation_name,
        "query": query,
        "variables": {},
    }
    time.sleep(0.1)
    req = requests.post(SLEEPER_GRAPHQL_API, json=body)
    res = req.json()
    reports = res.get("data", {}).get(operation_name, [])
    return reports


def transform_reports(raw_reports: List[Dict]) -> List[Dict]:
    reports = [
        {
            "report_id": r.get("source") + "_" + r.get("source_key"),
            "player_id": r.get("player_id"),
            "source": r.get("source"),
            "category": "retrospective" if r.get("metadata", {}).get("analysis") is not None else "prospective",
            "published_at": r.get("published"),
            "title": r.get("metadata", {}).get("title"),
            "description": r.get("metadata", {}).get("description"),
            "analysis": r.get("metadata", {}).get("analysis"),
        }
        for r in raw_reports
    ]
    output = list(filter(lambda r: r.get("category") == "prospective", reports))
    return output


def get_games_for_week(season: int, week: int) -> List[Dict]:
    operation_name = "scores"
    kwargs = {
        "operation_name": operation_name,
        "season": season,
        "week": week,
    }
    query = """
        query {operation_name} {{
            scores(sport: "nfl", season_type: "regular", season: "{season}", week: {week}){{
              date
              game_id
              metadata
              season
              season_type
              sport
              status
              week
              start_time
            }}
        }}
    """.format(**kwargs)
    body = {
        "operationName": operation_name,
        "query": query,
        "variables": {},
    }
    time.sleep(0.1)
    req = requests.post(SLEEPER_GRAPHQL_API, json=body)
    res = req.json()
    games = res.get("data", {}).get(operation_name, [])
    return games


def get_games_for_season(season: int, max_weeks: int) -> List[Dict]:
    output = []
    for week in range(1, max_weeks + 1):
        games = get_games_for_week(season, week)
        output.extend(games)
    return output


def transform_games(raw_games: List[Dict]) -> List[Dict]:
    return [
        {
            "game_id": r.get("game_id"),
            "season": r.get("season"),
            "week": r.get("week"),
            "date": r.get("date"),
            "started_at": r.get("start_time"),
            "home_team": r.get("metadata", {}).get("home_team"),
            "away_team": r.get("metadata", {}).get("away_team"),
            "home_score": int(r.get("metadata", {}).get("home_score", 0)),
            "away_score": int(r.get("metadata", {}).get("away_score", 0)),
        }
        for r in raw_games
    ]


def get_all_players() -> List[Dict]:
    url = f"{SLEEPER_REST_API}/v1/players/nfl"
    req = requests.get(url)
    players_by_id = req.json()
    players = [
        {
            "player_id": p.get("player_id"),
            "name": p.get("full_name"),
            "position": p.get("position"),
            "current_team": p.get("team"),
            "jersey_number": int(p.get("number")) if p.get("number") is not None else None,
        }
        for p in players_by_id.values()
    ]
    return players


def get_player_week_stats(player_id: str, season: int) -> List[Dict]:
    url = f"{SLEEPER_REST_API}/stats/nfl/player/{player_id}?season_type=regular&season={season}&grouping=week"
    time.sleep(0.1)
    req = requests.get(url)
    res = req.json()
    raw = res.values()
    week_stats = list(filter(lambda v: v is not None, raw))
    return week_stats


def transform_week_stats(raw_week_stats: List[Dict]) -> List[Dict]:
    return [
        {
            "stat_id": r.get("season") + "_" + str(r.get("week")) + "_" + r.get("player_id"),
            "week_id": r.get("season") + "_" + str(r.get("week")),
            "player_id": r.get("player_id"),
            "game_id": r.get("game_id"),
            "season": r.get("season"),
            "week": r.get("week"),
            "team": r.get("team"),
            "opponent": r.get("opponent"),
            "played_at": int(datetime.datetime.strptime(r.get("date"), "%Y-%m-%d").timestamp() * 1000),
            "was_active": int(r.get("stats", {}).get("gms_active", 0)) > 0,
            "was_played": int(r.get("stats", {}).get("gp", 0)) > 0,
            "offensive_snaps": int(r.get("stats", {}).get("off_snp", 0)),
            "offensive_snaps": int(r.get("stats", {}).get("off_snp", 0)),
            "rushing_attempts": int(r.get("stats", {}).get("rush_att", 0)),
            "rushing_yards": int(r.get("stats", {}).get("rush_yd", 0)),
            "receiving_targets": int(r.get("stats", {}).get("rec_tgt", 0)),
            "receiving_catches": int(r.get("stats", {}).get("rec", 0)),
            "receiving_yards": int(r.get("stats", {}).get("rec_yd", 0)),
            
        }
        for r in raw_week_stats
    ]

In [24]:
"""
Temporarily Removed Fields:
- "reason"
  - A brief description (less than 20 words) of why the report thinks that workload change will happen
"""

EXPECTED_WORKLOAD_CHANGE_PROMPT = """
You are an expert in semantics and NFL football.

Read the following news report about an NFL player and extract structured data that captures what
the report thinks will happen to that player's workload in the next game.

Your output should be JSON with the following fields:
- "target_player"
  - The name of the player the report focuses on
- "expected_workload_change"
  - The expected change in workload (carries, targets) for the target player
  - Value can be: "much_higher", "somewhat_higher", "similar", "somewhat_lower", "much_lower", or "unknown"
- "reason_category"
  - Categorization of the primary reason for the expected workload change
  - Value can be one of the following:
    - "promotion" if the target player is getting higher workload due to good play
    - "demotion" if the target player is getting lower workload due to poor play
    - "teammate_injury" if the target player is getting higher workload due to a teammate's injury
    - "own_injury" if the target player is getting lower workload due to their own injury
    - "strong_opponent" if the opponent is expected to be stronger against the team
    - "weak_opponent" if the opponent is expected to be weaker against the team
    - "rumor" if based on a rumor about players, coaches, or strategy
    - "no_change" if workload is expected to be the same

News Report:
{report_content}

Output:
"""


class ExpectedWorkloadChangeEnum(enum.Enum):
    much_higher = "much_higher"
    somewhat_higher = "somewhat_higher"
    similar = "similar"
    somewhat_lower = "somewhat_lower"
    much_lower = "much_lower"
    unknown = "unknown"


class ReasonCategoryEnum(enum.Enum):
    promotion = "promotion"
    demotion = "demotion"
    teammate_injury = "teammate_injury"
    own_injury = "own_injury"
    strong_opponent = "strong_opponent"
    weak_opponent = "weak_opponent"
    rumor = "rumor"
    no_change = "no_change"


class ExpectedWorkloadChange(pydantic.BaseModel):
    target_player: str
    expected_workload_change: ExpectedWorkloadChangeEnum
    # reason: str
    reason_category: ReasonCategoryEnum


def strip_code_block(raw: str) -> str:
    return raw.lstrip("```json").lstrip("```").rstrip("```")


def extract_expected_workload_change(report_content: str) -> Dict:
    kwargs = { "report_content": report_content }
    prompt = EXPECTED_WORKLOAD_CHANGE_PROMPT.format(**kwargs)
    res = ollama.chat(
        model="gemma3:1b",
        messages=[{ "role": "user", "content": prompt}],
        format=ExpectedWorkloadChange.model_json_schema()
    )
    raw_message = res.message.content
    raw_json = strip_code_block(raw_message)
    data = json.loads(raw_json)
    return data


def get_report_predictions(reports: List[Dict]) -> List[Dict]:
    output = []
    for r in tqdm.tqdm(reports):
        description = r.get("description")
        report_content = description
        expected = extract_expected_workload_change(report_content=report_content)
        output.append({
            "report_id": r.get("report_id"),
            "expected_workload_change": expected.get("expected_workload_change", "unknown"),
            "reason_category": expected.get("reason_category"),
            # "reason": expected.get("reason"),
        })
    return output

In [25]:
DROP_IF_EXISTS_QUERY = """
DROP TABLE IF EXISTS {table_name}
"""

CREATE_FROM_CSV_QUERY = """
CREATE TABLE {table_name} AS
SELECT *
FROM read_csv_auto('{filename}')
""".strip()

CREATE_FROM_JSON_QUERY = """
CREATE TABLE {table_name} AS
SELECT *
FROM read_json_auto('{filename}')
""".strip()

CREATE_FROM_PARQUET_QUERY = """
CREATE TABLE {table_name} AS
SELECT *
FROM read_parquet('{filename}')
""".strip()

CREATE_TEMPLATE_BY_FORMAT = {
    "csv": CREATE_FROM_CSV_QUERY,
    "json": CREATE_FROM_JSON_QUERY,
    "parquet": CREATE_FROM_PARQUET_QUERY
}


def create_table(con, config: Dict):
    table_name = config.get("name")
    if not table_name:
        raise ValueError("Must specify table name.")

    table_path = config.get("path")
    if not table_path:
        raise ValueError("Must specify table path.")

    table_format = config.get("format")
    if not table_format:
        raise ValueError("Must specify table format.")
    if table_format not in CREATE_TEMPLATE_BY_FORMAT:
        valid_formats = ", ".join(CREATE_TEMPLATE_BY_FORMAT.keys())
        error = f"Invalid format `{table_format}`. Must be one of: {valid_formats}."
        raise ValueError(error)

    drop_query = DROP_IF_EXISTS_QUERY.format(table_name=table_name)
    con.execute(drop_query)

    template = CREATE_TEMPLATE_BY_FORMAT.get(table_format)
    create_query = template.format(table_name=table_name, filename=table_path)
    con.execute(create_query)


def create_database(database_name: str):
    if os.path.exists(database_name):
        os.remove(database_name)


def create_tables(database_name: str, table_configs: List[Dict]):
    with duckdb.connect(database_name) as con:
        for table_config in table_configs:
            create_table(con, table_config)

In [26]:
SEASON = 2025
WEEKS = 18
MAX_REPORTS_PER_WEEK = 15
MAX_REPORTS = WEEKS * MAX_REPORTS_PER_WEEK

In [27]:
raw_games = get_games_for_season(season=SEASON, max_weeks=WEEKS)
games = transform_games(raw_games)
df_games = pd.DataFrame(games)
df_games.to_csv("../data/processed/game.csv", index=False)

In [28]:
df_players = pd.DataFrame(get_all_players())
df_players.jersey_number = df_players.jersey_number.astype("Int64")
df_players.to_csv("../data/processed/player.csv", index=False)

In [52]:
PLAYER_IDS = ["4147", "7021"]

In [53]:
raw_stats = []
for player_id in PLAYER_IDS:
    player_stats = get_player_week_stats(player_id=player_id, season=SEASON)
    raw_stats.extend(player_stats)

stats = transform_week_stats(raw_stats)
df_stats = pd.DataFrame(stats)
df_stats.to_csv("../data/processed/player_game_outcome.csv", index=False)

In [54]:
raw_reports = []
for player_id in PLAYER_IDS:
    player_reports = get_player_reports(player_id=player_id, limit=MAX_REPORTS)
    raw_reports.extend(player_reports)

reports = transform_reports(raw_reports)
df_reports = pd.DataFrame(reports)
df_reports.to_csv("../data/processed/report.csv", index=False)

In [55]:
DATABASE = "../database.db"

create_database(database_name=DATABASE)

create_tables(
    database_name=DATABASE,
    table_configs=[
        {
            "name": "player",
            "path": "../data/processed/player.csv",
            "format": "csv",
        },
        {
            "name": "game",
            "path": "../data/processed/game.csv",
            "format": "csv",
        },
        {
            "name": "player_game_outcome",
            "path": "../data/processed/player_game_outcome.csv",
            "format": "csv",
        },
        {
            "name": "report",
            "path": "../data/processed/report.csv",
            "format": "csv",
        },
    ]
)

In [56]:
with duckdb.connect(DATABASE) as con:
    con.execute("""
    CREATE OR REPLACE TABLE player_season_main_team AS
    WITH
    player_season_team_count AS (
        SELECT
            player_id,
            season,
            team,
            count(1) AS weeks,
        FROM player_game_outcome
        GROUP BY
            player_id,
            season,
            team
    ),
    player_season_main_team AS (
        SELECT
            player_id,
            season,
            arg_max(team, weeks) AS main_team,
            count(1) AS unique_teams,
        FROM player_season_team_count
        GROUP BY
            player_id,
            season
        HAVING unique_teams == 1
    )
    SELECT
        player_id,
        season,
        main_team,
    FROM player_season_main_team
    ;
    """)

    con.sql("""
    SELECT *
    FROM player_season_main_team
    LIMIT 5
    ;
    """).show()

┌───────────┬────────┬───────────┐
│ player_id │ season │ main_team │
│   int64   │ int64  │  varchar  │
├───────────┼────────┼───────────┤
│      4147 │   2025 │ CIN       │
│      7021 │   2025 │ CAR       │
└───────────┴────────┴───────────┘



In [58]:
with duckdb.connect(DATABASE) as con:
    con.execute("""
    CREATE OR REPLACE TABLE player_game_assignment AS
    SELECT
        t.player_id,
        t.season,
        g.week,
        g.game_id,
        g.started_at,
        t.main_team AS team,
        CASE
            WHEN t.main_team = g.home_team THEN g.away_team
            WHEN t.main_team = g.away_team THEN g.home_team
            ELSE NULL
        END AS opponent,
    FROM player_season_main_team t
    LEFT JOIN game g
        ON (t.main_team = g.home_team OR t.main_team = g.away_team)
        AND t.season = g.season
    ;
    """)

    con.sql("""
    SELECT
        *,
        strftime(to_timestamp(started_at / 1000), '%Y-%m-%d %I:%M %p') AS started_at,
    FROM player_game_assignment
    ORDER BY
        season ASC,
        player_id ASC,
        week ASC
    LIMIT 17
    ;
    """).show()

┌───────────┬────────┬───────┬───────────┬───────────────┬─────────┬──────────┬─────────────────────┐
│ player_id │ season │ week  │  game_id  │  started_at   │  team   │ opponent │     started_at      │
│   int64   │ int64  │ int64 │   int64   │     int64     │ varchar │ varchar  │       varchar       │
├───────────┼────────┼───────┼───────────┼───────────────┼─────────┼──────────┼─────────────────────┤
│      4147 │   2025 │     1 │ 202510108 │ 1757264400000 │ CIN     │ CLE      │ 2025-09-07 12:00 PM │
│      4147 │   2025 │     2 │ 202510207 │ 1757869200000 │ CIN     │ JAX      │ 2025-09-14 12:00 PM │
│      4147 │   2025 │     3 │ 202510320 │ 1758474000000 │ CIN     │ MIN      │ 2025-09-21 12:00 PM │
│      4147 │   2025 │     4 │ 202510410 │ 1759191300000 │ CIN     │ DEN      │ 2025-09-29 07:15 PM │
│      4147 │   2025 │     5 │ 202510507 │ 1759695900000 │ CIN     │ DET      │ 2025-10-05 03:25 PM │
│      4147 │   2025 │     6 │ 202510612 │ 1760300700000 │ CIN     │ GB       │ 20

In [59]:
with duckdb.connect(DATABASE) as con:
    con.execute("""
    CREATE OR REPLACE TABLE report_game_assignment AS
    WITH
    report_upcoming_game_closest AS (
        SELECT
            r.report_id,
            arg_min(g.game_id, g.started_at) AS closest_game_id
        FROM report r
        LEFT JOIN player_game_assignment g
            ON r.player_id = g.player_id
        WHERE
            g.started_at > r.published_at
        GROUP BY r.report_id
    ),
    report_previous_game_closest AS (
        SELECT
            r.report_id,
            arg_max(g.game_id, g.started_at) AS closest_game_id
        FROM report r
        LEFT JOIN player_game_assignment g
            ON r.player_id = g.player_id
        WHERE
            r.published_at > g.started_at
        GROUP BY r.report_id
    ),
    report_game_assignment AS (
        SELECT
            r.report_id,
            u.closest_game_id AS upcoming_game_id,
            v.closest_game_id AS previous_game_id,
        FROM report r
        LEFT JOIN report_upcoming_game_closest u
            ON r.report_id = u.report_id
        LEFT JOIN report_previous_game_closest v
            ON r.report_id = v.report_id
    )
    SELECT *
    FROM report_game_assignment
    ;
    """)

    con.sql("""
    SELECT *
    FROM report_game_assignment
    LIMIT 10
    ;
    """).show()

┌───────────────────┬──────────────────┬──────────────────┐
│     report_id     │ upcoming_game_id │ previous_game_id │
│      varchar      │      int64       │      int64       │
├───────────────────┼──────────────────┼──────────────────┤
│ rotoballer_212390 │        202511807 │        202511707 │
│ rotoballer_212107 │        202511707 │        202511619 │
│ rotoballer_211920 │        202511707 │        202511619 │
│ rotoballer_211695 │        202511707 │        202511619 │
│ rotoballer_211057 │        202511619 │        202511507 │
│ rotoballer_210792 │        202511507 │        202511404 │
│ rotoballer_210304 │        202511507 │        202511404 │
│ rotoballer_209827 │        202511404 │        202511303 │
│ rotoballer_209669 │        202511404 │        202511303 │
│ rotoballer_208863 │        202511303 │        202511207 │
├───────────────────┴──────────────────┴──────────────────┤
│ 10 rows                                       3 columns │
└───────────────────────────────────────

In [60]:
with duckdb.connect(DATABASE) as con:
    con.execute("""
    CREATE OR REPLACE TABLE comparable_report AS
    SELECT
        r.report_id
    FROM report r
    LEFT JOIN report_game_assignment a
        ON r.report_id = a.report_id
    LEFT JOIN player_game_assignment uga
        ON a.upcoming_game_id = uga.game_id
        AND r.player_id = uga.player_id
    LEFT JOIN player_game_assignment vga
        ON a.previous_game_id = vga.game_id
        AND r.player_id = vga.player_id
    LEFT JOIN player_game_outcome ugo
        ON uga.game_id = ugo.game_id
        AND uga.player_id = ugo.player_id
    LEFT JOIN player_game_outcome vgo
        ON vga.game_id = vgo.game_id
        AND vga.player_id = vgo.player_id
    WHERE
        uga.week > 1
        AND ugo.player_id IS NOT NULL
        AND vgo.player_id IS NOT NULL
    ;
    """)

    con.sql("""
    SELECT
        *
    FROM comparable_report
    LIMIT 5
    ;
    """).show()

┌───────────────────┐
│     report_id     │
│      varchar      │
├───────────────────┤
│ rotoballer_201562 │
│ rotoballer_209827 │
│ rotoballer_210792 │
│ rotoballer_211057 │
│ rotoballer_212107 │
└───────────────────┘



In [61]:
comparable_reports = []
with duckdb.connect(DATABASE) as con:
    cur = con.sql("""
    SELECT
        c.report_id,
        r.description,
    FROM comparable_report c
    LEFT JOIN report r
        ON c.report_id = r.report_id
    ;
    """)
    df_rows = cur.df()
    records = df_rows.to_dict(orient="records")
    comparable_reports.extend(records)

In [62]:
report_predictions = get_report_predictions(comparable_reports)
df_report_predictions = pd.DataFrame(report_predictions)
df_report_predictions.to_csv("../data/processed/report_prediction.csv", index=False)

100%|███████████████████████████████████████████| 62/62 [01:27<00:00,  1.41s/it]


In [63]:
create_tables(
    database_name=DATABASE,
    table_configs=[
        {
            "name": "report_prediction",
            "path": "../data/processed/report_prediction.csv",
            "format": "csv",
        },
    ]
)

In [64]:
with duckdb.connect(DATABASE) as con:
    con.sql("""
    SELECT *
    FROM player
    WHERE current_team IS NOT NULL
    LIMIT 5
    ;
    """).show()

    
    con.sql("""
    SELECT *
    FROM game
    LIMIT 5
    ;
    """).show()

    con.sql("""
    SELECT
        player_id,
        season,
        week,
        team,
        opponent,
        played_at,
        rushing_attempts,
        rushing_yards,
    FROM player_game_outcome
    LIMIT 5
    ;
    """).show()

    con.sql("""
    SELECT
        report_id,
        player_id,
        published_at,
        title,
    FROM report
    LIMIT 5
    ;
    """).show()

    con.sql("""
    SELECT
        report_id,
        expected_workload_change,
        reason_category,
    FROM report_prediction
    LIMIT 5
    ;
    """).show()

┌───────────┬───────────────────┬──────────┬──────────────┬───────────────┐
│ player_id │       name        │ position │ current_team │ jersey_number │
│  varchar  │      varchar      │ varchar  │   varchar    │     int64     │
├───────────┼───────────────────┼──────────┼──────────────┼───────────────┤
│ 8478      │ Samuel Womack     │ CB       │ NYJ          │            39 │
│ 1408      │ Le'Veon Bell      │ RB       │ TB           │             6 │
│ 2091      │ Bashaud Breeland  │ CB       │ ARI          │            24 │
│ 11533     │ Brandon Aubrey    │ K        │ DAL          │            17 │
│ 2064      │ DeMarcus Lawrence │ DE       │ SEA          │             0 │
└───────────┴───────────────────┴──────────┴──────────────┴───────────────┘

┌───────────┬────────┬───────┬────────────┬───────────────┬───────────┬───────────┬────────────┬────────────┐
│  game_id  │ season │ week  │    date    │  started_at   │ home_team │ away_team │ home_score │ away_score │
│   int64   │ int64