In [1]:
import os
import sys

from neo4j import GraphDatabase

sys.path.append(os.path.abspath(os.path.join(os.getcwd(), "..")))

uri = os.getenv("NEO4J_URI")
username = os.getenv("NEO4J_USERNAME")
password = os.getenv("NEO4J_PASSWORD")

In [2]:
import pandas as pd
from src.config import RAW_DIR, PROCESSED_DIR

profiles = pd.read_parquet(PROCESSED_DIR / "player_profiles.parquet")
weekly_stats = pd.read_parquet(PROCESSED_DIR / "player_stats_2018_2024.parquet")
injuries = pd.read_parquet(RAW_DIR / "injuries_2018_2024.parquet")
schedules = pd.read_parquet(RAW_DIR / "schedules.parquet")
pbp = pd.read_parquet(RAW_DIR / "pbp_2018_2024.parquet")

In [3]:
from neo4j import GraphDatabase

AUTH = (username, password)

driver = GraphDatabase.driver(uri, auth=AUTH, keep_alive=True, max_connection_lifetime=7200)

with driver.session() as session:
    result = session.run("RETURN 1 AS test")
    print(result.single()["test"])

1


In [4]:
from datetime import datetime

def _to_iso_datetime(gameday, gametime):
    """Return ISO8601 string if both parts present; else None."""
    if pd.isna(gameday) or pd.isna(gametime):
        return None
    try:
        # Handle common formats, e.g., '2023-10-22' + '16:25:00'
        dt = datetime.fromisoformat(str(gameday) + " " + str(gametime))
        return dt.isoformat()
    except Exception:
        return None

In [5]:
def create_player(tx, player):
    tx.run("""
        MERGE (p:Player {gsis_id: $gsis_id})
        SET p += $player_props
    """, gsis_id=player["gsis_id"], player_props=player)
    
def create_team(tx, team_name):
    tx.run("MERGE (t:Team {name: $team_name})", team_name=team_name)

def create_injury(tx, injury):
    tx.run("""
        MERGE (i:Injury {gsis_id: $gsis_id, week: $week, season: $season})
        SET i.game_type = $game_type,
            i.report_primary_injury = $report_primary_injury,
            i.report_secondary_injury = $report_secondary_injury,
            i.report_status = $report_status,
            i.practice_primary_injury = $practice_primary_injury,
            i.practice_secondary_injury = $practice_secondary_injury,
            i.practice_status = $practice_status,
            i.date_modified = $date_modified
        WITH i
        MATCH (p:Player {gsis_id: $gsis_id})
        MERGE (p)-[:INJURED_DURING]->(i)
    """, **injury)

def create_weekly_stats(tx, stats_batch):
    # remove non-property fields
    for stat in stats_batch:
        stat["stat_props"] = {k: stat[k] for k in stat.keys() if k not in [
            "gsis_id", "player_name", "player_display_name", "position",
            "position_group", "headshot_url", "recent_team", "opponent_team", "pfr_id"
        ]}

    tx.run("""
        UNWIND $stats AS stat
        MERGE (ws:WeeklyStat {gsis_id: stat.gsis_id, season: stat.season, week: stat.week})
        SET ws += stat.stat_props
        WITH ws, stat
        MATCH (p:Player {gsis_id: stat.gsis_id})
        MERGE (p)-[:HAS_STATS]->(ws)
    """, stats=stats_batch)

def create_or_update_games(tx, games_batch):
    # Preprocess kickoff timestamps
    for game in games_batch:
        kickoff_ts = _to_iso_datetime(game.get("gameday"), game.get("gametime"))
        game["kickoff_ts"] = kickoff_ts

    tx.run("""
        UNWIND $games AS row

        // Ensure Teams exist
        MERGE (ht:Team {code: row.home_team})
        MERGE (at:Team {code: row.away_team})

        // Upsert Game node
        MERGE (g:Game {game_id: row.game_id})
        SET g += {
            season: row.season,
            game_type: row.game_type,
            week: row.week,
            gameday: row.gameday,
            weekday: row.weekday,
            gametime: row.gametime,
            location: row.location,
            stadium_id: row.stadium_id,
            stadium: row.stadium,
            roof: row.roof,
            surface: row.surface,
            temp: row.temp,
            wind: row.wind,
            overtime: row.overtime,
            home_team: row.home_team,
            home_score: row.home_score,
            away_team: row.away_team,
            away_score: row.away_score,
            result: row.result,
            total: row.total,
            old_game_id: row.old_game_id,
            gsis: row.gsis,
            nfl_detail_id: row.nfl_detail_id,
            pfr: row.pfr,
            pff: row.pff,
            espn: row.espn,
            ftn: row.ftn,
            away_moneyline: row.away_moneyline,
            home_moneyline: row.home_moneyline,
            spread_line: row.spread_line,
            away_spread_odds: row.away_spread_odds,
            home_spread_odds: row.home_spread_odds,
            total_line: row.total_line,
            under_odds: row.under_odds,
            over_odds: row.over_odds,
            div_game: row.div_game,
            home_coach: row.home_coach,
            away_coach: row.away_coach,
            home_qb_id: row.home_qb_id,
            away_qb_id: row.away_qb_id,
            home_qb_name: row.home_qb_name,
            away_qb_name: row.away_qb_name,
            referee: row.referee,
            kickoff_ts: row.kickoff_ts
        }

        // Link Teams to Game
        MERGE (ht)-[:HOME_IN]->(g)
        MERGE (at)-[:AWAY_IN]->(g)
        MERGE (g)-[:HOME_TEAM]->(ht)
        MERGE (g)-[:AWAY_TEAM]->(at)

        // Link starting QBs if present
        FOREACH (side IN ['home','away'] |
            FOREACH (_ IN CASE WHEN row[side + '_qb_id'] IS NOT NULL THEN [1] ELSE [] END |
                MERGE (p:Player {gsis_id: row[side + '_qb_id']})
                MERGE (p)-[:STARTED_AT_QB {team: side}]->(g)
            )
        )
    """, games=games_batch)

def create_or_update_play(tx, row):
    # Ensure Play node exists
    play_props = {
        k: row.get(k) for k in [
            "game_id", "season", "season_type", "week", "game_date",
            "qtr", "down", "ydstogo", "yardline_100", "desc", "play_type",
            "yards_gained", "epa", "wpa", "success", "touchdown",
            "posteam", "defteam", "drive"
        ]
    }
    tx.run("""
        MERGE (p:Play {play_id: $play_id})
        SET p += $play_props
        WITH p
        MATCH (g:Game {game_id: $game_id})
        MERGE (g)-[:HAS_PLAY]->(p)
    """, play_id=row["play_id"], play_props=play_props, game_id=row["game_id"])

    # Link offense and defense teams
    if row.get("posteam"):
        tx.run("""
            MATCH (p:Play {play_id: $play_id}), (t:Team {code: $team})
            MERGE (p)-[:OFFENSE]->(t)
        """, play_id=row["play_id"], team=row["posteam"])
    if row.get("defteam"):
        tx.run("""
            MATCH (p:Play {play_id: $play_id}), (t:Team {code: $team})
            MERGE (p)-[:DEFENSE]->(t)
        """, play_id=row["play_id"], team=row["defteam"])

    # Link key players (passer, rusher, receiver)
    for role in ["passer", "rusher", "receiver"]:
        pid = row.get(f"{role}_player_id") or row.get(f"{role}_id")
        if pid and pd.notna(pid):
            tx.run("""
                MATCH (p:Play {play_id: $play_id})
                MERGE (pl:Player {gsis_id: $pid})
                MERGE (pl)-[:PARTICIPATED_IN {role:$role}]->(p)
            """, play_id=row["play_id"], pid=str(pid), role=role)

In [6]:
def load_data(df, create_fn):
    with driver.session() as session:
        for _, row in df.iterrows():
            row_dict = row.to_dict()
            session.execute_write(create_fn, row_dict)

def load_data_batch(df, create_fn):
    with driver.session() as session:
        BATCH_SIZE = 1000
        for i in range(0, len(weekly_stats), BATCH_SIZE):
            batch = weekly_stats[i:i+BATCH_SIZE].to_dict("records")
            session.execute_write(create_weekly_stats, batch)

In [7]:
teams = set(schedules["home_team"]).union(schedules["away_team"])
with driver.session() as session:
    for t in teams:
        session.execute_write(create_team, t)

In [None]:
load_data(profiles, create_player)

ClientError: {code: Neo.ClientError.Statement.ParameterMissing} {message: Expected parameter(s): player_id}

In [7]:
load_data(injuries, create_injury)

In [9]:
load_data_batch(weekly_stats, create_weekly_stats)