In [18]:
import numpy as np
import pandas as pd
from scipy.optimize import minimize
from scipy.special import factorial
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')
import sqlite3

conn = sqlite3.connect(r'C:\Users\Owner\dev\algobetting\infra\data\db\algobetting.db')

df = pd.read_sql_query("""
                SELECT 
                    team as home_team,
                    opp_team as away_team,
                    summary_xg as home_xg,
                    opp_summary_xg as away_xg,
                    summary_goals as home_goals,
                    opp_summary_goals as away_goals,
                    match_date as date,
                    season
                FROM fbref_match_all_columns
                WHERE division = 'Premier League'
                    AND season IN ('2024-2025', '2023-2024')
                    AND summary_xg IS NOT NULL
                    AND opp_summary_xg IS NOT NULL
                    AND is_home = 1
                       """, conn)

conn.close()

df

Unnamed: 0,home_team,away_team,home_xg,away_xg,home_goals,away_goals,date,season
0,Tottenham,Brighton,2.0,2.2,1.0,4.0,2025-05-25,2024-2025
1,Bournemouth,Leicester City,1.6,0.3,2.0,0.0,2025-05-25,2024-2025
2,Newcastle Utd,Everton,1.2,1.2,0.0,1.0,2025-05-25,2024-2025
3,Fulham,Manchester City,1.3,3.1,0.0,2.0,2025-05-25,2024-2025
4,Nott'ham Forest,Chelsea,1.2,1.1,0.0,1.0,2025-05-25,2024-2025
...,...,...,...,...,...,...,...,...
755,Sheffield Utd,Crystal Palace,0.5,1.9,0.0,1.0,2023-08-12,2023-2024
756,Brighton,Luton Town,4.0,1.5,4.0,1.0,2023-08-12,2023-2024
757,Everton,Fulham,2.7,1.5,0.0,1.0,2023-08-12,2023-2024
758,Arsenal,Nott'ham Forest,0.8,1.2,2.0,1.0,2023-08-12,2023-2024


In [19]:
import numpy as np
import pandas as pd
import pymc as pm
import arviz as az
from scipy.special import factorial
import warnings

class HierarchicalDixonColes:
    """
    Hierarchical Dixon-Coles model using PyMC with season-varying home advantage.
    
    This model addresses the issue of noisy home advantage estimates by modeling
    home advantage as varying by season but drawn from a common distribution,
    allowing for partial pooling between seasons.
    """
    
    def __init__(self, xi=0.0018):
        """
        Initialize the hierarchical Dixon-Coles model.
        
        Parameters:
        xi (float): Time decay parameter for weighting matches by recency.
        """
        self.xi = xi
        self.teams = None
        self.seasons = None
        self.team_to_idx = None
        self.season_to_idx = None
        self.model = None
        self.trace = None
        self.is_fitted = False
        
    def time_weight(self, match_dates, current_date=None):
        """Calculate time decay weights for matches."""
        if current_date is None:
            current_date = max(match_dates)
        
        days_ago = [(current_date - date).days for date in match_dates]
        weights = np.exp(-self.xi * np.array(days_ago))
        return weights
    
    def tau_correction(self, home_goals, away_goals, lambda_home, lambda_away, rho):
        """
        Dixon-Coles correction factor for low-scoring games.
        Note: This is a simplified version for demonstration.
        Full implementation would require custom PyMC distribution.
        """
        if home_goals == 0 and away_goals == 0:
            return 1 - lambda_home * lambda_away * rho
        elif home_goals == 0 and away_goals == 1:
            return 1 + lambda_home * rho
        elif home_goals == 1 and away_goals == 0:
            return 1 + lambda_away * rho
        elif home_goals == 1 and away_goals == 1:
            return 1 - rho
        else:
            return 1.0
    
    def prepare_data(self, matches_df):
        """Prepare data for PyMC model."""
        # Ensure required columns exist
        required_cols = ['date', 'season', 'home_team', 'away_team', 'home_goals', 'away_goals']
        for col in required_cols:
            if col not in matches_df.columns:
                raise ValueError(f"DataFrame must contain column: {col}")
        
        # Convert date column
        matches_df['date'] = pd.to_datetime(matches_df['date'])
        
        # Get unique teams and seasons
        self.teams = sorted(list(set(matches_df['home_team'].tolist() + matches_df['away_team'].tolist())))
        self.seasons = sorted(matches_df['season'].unique())
        self.team_to_idx = {team: i for i, team in enumerate(self.teams)}
        self.season_to_idx = {season: i for i, season in enumerate(self.seasons)}
        
        # Create index arrays for model
        home_idx = matches_df['home_team'].map(self.team_to_idx).values
        away_idx = matches_df['away_team'].map(self.team_to_idx).values
        season_idx = matches_df['season'].map(self.season_to_idx).values
        
        # Calculate time weights
        time_weights = self.time_weight(matches_df['date'].tolist())
        
        return {
            'home_idx': home_idx,
            'away_idx': away_idx,
            'season_idx': season_idx,
            'home_goals': matches_df['home_goals'].values,
            'away_goals': matches_df['away_goals'].values,
            'time_weights': time_weights,
            'n_teams': len(self.teams),
            'n_seasons': len(self.seasons),
            'n_matches': len(matches_df)
        }
    
    def build_model(self, data):
        """Build the PyMC hierarchical model."""
        with pm.Model() as model:
            # Team attack parameters
            attack = pm.Normal('attack', 0, 1, shape=data['n_teams'])
            
            # Team defense parameters with sum-to-zero constraint
            defense_raw = pm.Normal('defense_raw', 0, 1, shape=data['n_teams'] - 1)
            defense = pm.Deterministic('defense', 
                                     pm.math.concatenate([[-pm.math.sum(defense_raw)], defense_raw]))
            
            # Hierarchical home advantage - KEY FEATURE
            # Each season has its own home advantage, but they're drawn from a common distribution
            home_mean = pm.Normal('home_mean', 0, 0.5)  # Population mean of home advantage
            home_sd = pm.HalfNormal('home_sd', 0.2)     # Between-season standard deviation
            home_advantage = pm.Normal('home_advantage', home_mean, home_sd, shape=data['n_seasons'])
            
            # Dixon-Coles rho parameter for low-scoring correction
            rho = pm.Normal('rho', 0, 0.05)
            
            # Expected goals calculations
            lambda_home = pm.math.exp(
                attack[data['home_idx']] + 
                defense[data['away_idx']] + 
                home_advantage[data['season_idx']]
            )
            
            lambda_away = pm.math.exp(
                attack[data['away_idx']] + 
                defense[data['home_idx']]
            )
            
            # Likelihood - using weighted Poisson for simplicity
            # Note: Full Dixon-Coles correction would require custom distribution
            pm.Poisson('goals_home', lambda_home, observed=data['home_goals'])
            pm.Poisson('goals_away', lambda_away, observed=data['away_goals'])
            
            # Add time weighting through potential (log-likelihood scaling)
            # This is a simplified approach - more sophisticated weighting could be implemented
            pm.Potential('time_weighting', 
                        pm.math.sum(pm.math.log(data['time_weights'])))
        
        return model
    
    def fit(self, matches_df, draws=2000, tune=1000, cores=4, target_accept=0.9):
        """
        Fit the hierarchical model using MCMC sampling.
        
        Parameters:
        matches_df (DataFrame): Match data with required columns
        draws (int): Number of posterior samples
        tune (int): Number of tuning samples
        cores (int): Number of CPU cores to use
        target_accept (float): Target acceptance rate for NUTS sampler
        """
        print("Preparing data for hierarchical model...")
        data = self.prepare_data(matches_df)
        
        print(f"Building model with {data['n_teams']} teams across {data['n_seasons']} seasons...")
        self.model = self.build_model(data)
        
        with self.model:
            print(f"Sampling {draws} draws with {tune} tuning steps...")
            self.trace = pm.sample(
                draws=draws,
                tune=tune,
                cores=cores,
                target_accept=target_accept,
                return_inferencedata=True
            )
            
            # Check for convergence warnings
            rhat = az.rhat(self.trace)
            # Convert to numpy array and find max across all parameters
            rhat_values = []
            for var in rhat.data_vars:
                rhat_values.extend(rhat[var].values.flatten())
            max_rhat = float(np.max(rhat_values))
            if max_rhat > 1.01:
                warnings.warn(f"Some parameters may not have converged (max R-hat: {max_rhat:.3f})")
            
            print("Model fitted successfully!")
            self.is_fitted = True
            
            # Print summary of key parameters
            self._print_fit_summary()
    
    def _print_fit_summary(self):
        """Print summary of fitted parameters."""
        if not self.is_fitted:
            return
            
        # Extract posterior means
        posterior = self.trace.posterior
        
        home_mean = float(posterior['home_mean'].mean())
        home_sd = float(posterior['home_sd'].mean())
        rho_mean = float(posterior['rho'].mean())
        
        print(f"\n=== MODEL SUMMARY ===")
        print(f"Overall home advantage mean: {np.exp(home_mean):.4f}")
        print(f"Between-season home advantage SD: {home_sd:.4f}")
        print(f"Rho parameter: {rho_mean:.4f}")
        
        # Show season-specific home advantages
        print(f"\nSeason-specific home advantages:")
        home_advantages = posterior['home_advantage'].mean(dim=['chain', 'draw'])
        for i, season in enumerate(self.seasons):
            ha_exp = float(np.exp(home_advantages[i]))
            print(f"  {season}: {ha_exp:.4f}")
    
    def predict_match(self, home_team, away_team, season):
        """
        Predict match outcome with full uncertainty quantification.
        
        Returns:
        dict: Contains posterior distributions for expected goals and probabilities
        """
        if not self.is_fitted:
            raise RuntimeError("Model must be fitted before making predictions")
            
        if home_team not in self.teams or away_team not in self.teams:
            raise ValueError("One or both teams not in training data")
            
        if season not in self.seasons:
            # Use most recent season if requested season not in training data
            season = max(self.seasons)
            warnings.warn(f"Season not found, using {season}")
        
        home_idx = self.team_to_idx[home_team]
        away_idx = self.team_to_idx[away_team]
        season_idx = self.season_to_idx[season]
        
        # Extract posterior samples
        posterior = self.trace.posterior
        attack_samples = posterior['attack'][:, :, home_idx].values.flatten()
        defense_away_samples = posterior['defense'][:, :, away_idx].values.flatten()
        defense_home_samples = posterior['defense'][:, :, home_idx].values.flatten()
        attack_away_samples = posterior['attack'][:, :, away_idx].values.flatten()
        home_adv_samples = posterior['home_advantage'][:, :, season_idx].values.flatten()
        
        # Calculate expected goals for each posterior sample
        lambda_home_samples = np.exp(attack_samples + defense_away_samples + home_adv_samples)
        lambda_away_samples = np.exp(attack_away_samples + defense_home_samples)
        
        # Calculate outcome probabilities (simplified approach)
        home_wins = np.sum(np.random.poisson(lambda_home_samples) > np.random.poisson(lambda_away_samples))
        draws = np.sum(np.random.poisson(lambda_home_samples) == np.random.poisson(lambda_away_samples))
        away_wins = len(lambda_home_samples) - home_wins - draws
        
        total_sims = len(lambda_home_samples)
        
        return {
            'expected_goals_home': {
                'mean': float(np.mean(lambda_home_samples)),
                'std': float(np.std(lambda_home_samples)),
                'ci_95': [float(np.percentile(lambda_home_samples, 2.5)), 
                         float(np.percentile(lambda_home_samples, 97.5))]
            },
            'expected_goals_away': {
                'mean': float(np.mean(lambda_away_samples)),
                'std': float(np.std(lambda_away_samples)),
                'ci_95': [float(np.percentile(lambda_away_samples, 2.5)), 
                         float(np.percentile(lambda_away_samples, 97.5))]
            },
            'outcome_probabilities': {
                'home_win': home_wins / total_sims,
                'draw': draws / total_sims,
                'away_win': away_wins / total_sims
            },
            'season_used': season
        }
    
    def get_team_ratings(self, season=None):
        """
        Get team ratings with uncertainty quantification.
        
        Parameters:
        season (str): Season to use for home advantage. If None, uses most recent.
        """
        if not self.is_fitted:
            raise RuntimeError("Model must be fitted before getting ratings")
            
        if season is None:
            season = max(self.seasons)
        elif season not in self.seasons:
            season = max(self.seasons)
            warnings.warn(f"Season not found, using {season}")
        
        posterior = self.trace.posterior
        attack_means = posterior['attack'].mean(dim=['chain', 'draw']).values
        defense_means = posterior['defense'].mean(dim=['chain', 'draw']).values
        
        # Calculate ratings vs average opposition
        avg_attack = np.mean(attack_means)
        
        ratings_data = []
        for i, team in enumerate(self.teams):
            attack_vs_avg = float(np.exp(attack_means[i]))
            defense_vs_avg = float(np.exp(avg_attack + defense_means[i]))
            
            ratings_data.append({
                'team': team,
                'attack_rating': attack_vs_avg,
                'defense_rating': defense_vs_avg,
                'goal_difference': attack_vs_avg - defense_vs_avg
            })
        
        ratings_df = pd.DataFrame(ratings_data)
        return ratings_df.sort_values('goal_difference', ascending=False)
    
    def get_convergence_diagnostics(self):
        """Get MCMC convergence diagnostics."""
        if not self.is_fitted:
            raise RuntimeError("Model must be fitted before checking diagnostics")
            
        return {
            'rhat': az.rhat(self.trace),
            'ess_bulk': az.ess(self.trace, kind='bulk'),
            'ess_tail': az.ess(self.trace, kind='tail'),
            'mcse': az.mcse(self.trace)
        }
    
    def plot_home_advantage_evolution(self):
        """Plot how home advantage varies across seasons."""
        if not self.is_fitted:
            raise RuntimeError("Model must be fitted before plotting")
            
        try:
            import matplotlib.pyplot as plt
            
            posterior = self.trace.posterior
            home_advantages = posterior['home_advantage']
            
            fig, ax = plt.subplots(figsize=(10, 6))
            
            # Plot posterior distributions for each season
            for i, season in enumerate(self.seasons):
                samples = home_advantages[:, :, i].values.flatten()
                ax.hist(np.exp(samples), alpha=0.6, label=f'{season}', bins=30)
            
            ax.set_xlabel('Home Advantage Multiplier')
            ax.set_ylabel('Density')
            ax.set_title('Home Advantage by Season (Posterior Distributions)')
            ax.legend()
            
            plt.tight_layout()
            return fig
            
        except ImportError:
            print("matplotlib not available for plotting")
            return None

In [20]:
model = HierarchicalDixonColes(xi=0.0056)
model.fit(df, draws=50, tune=50)

ratings = model.get_team_ratings()

ratings

Only 50 samples per chain. Reliable r-hat and ESS diagnostics require longer chains for accurate estimate.


Preparing data for hierarchical model...
Building model with 23 teams across 2 seasons...
Sampling 50 draws with 50 tuning steps...


Auto-assigning NUTS sampler...
Initializing NUTS using jitter+adapt_diag...
Multiprocess sampling (4 chains in 4 jobs)
NUTS: [attack, defense_raw, home_mean, home_sd, home_advantage, rho]


Output()

Sampling 4 chains for 50 tune and 50 draw iterations (200 + 200 draws total) took 172 seconds.
The number of samples is too small to check convergence reliably.


Model fitted successfully!

=== MODEL SUMMARY ===
Overall home advantage mean: 1.1247
Between-season home advantage SD: 0.1554
Rho parameter: -0.0030

Season-specific home advantages:
  2023-2024: 1.2123
  2024-2025: 1.0653


Unnamed: 0,team,attack_rating,defense_rating,goal_difference
0,Arsenal,1.934813,0.697162,1.237651
14,Manchester City,2.048069,0.889912,1.158157
12,Liverpool,2.097556,0.947139,1.150417
16,Newcastle Utd,1.897325,1.23323,0.664095
6,Chelsea,1.732527,1.184189,0.548339
1,Aston Villa,1.668152,1.265136,0.403016
20,Tottenham,1.709883,1.415297,0.294587
3,Brentford,1.519021,1.369467,0.149554
4,Brighton,1.505771,1.359556,0.146215
2,Bournemouth,1.380595,1.252908,0.127687
