# Phase 4a: Engine Analysis (Multi-Depth Stockfish)

Run multi-depth Stockfish analysis on prioritized games.

**Inputs:**
- Phase 1: Raw games cache
- Phase 3: high_priority_games.json

**Outputs:**
- `engine_analysis.parquet` - Position evaluations and accuracy metrics

In [None]:
# Parameters (injected by Papermill)
username = "default_user"  # Chess.com username
analysis_depths = [5, 10, 20]  # Depths for multi-depth analysis
engine_threads = 4  # CPU threads for Stockfish
engine_hash_mb = 512  # Hash table size in MB

In [None]:
# Setup
import sys
sys.path.insert(0, '..')
from common import (
    setup_notebook, validate_parameters, print_section, print_subsection,
    get_user_data_dir, save_phase_output, load_phase_output,
    load_dataset_parquet, load_cached_games_v2,
    CachedEngineAnalyzer, calculate_centipawn_loss, calculate_game_accuracy_simple,
    parse_pgn_file,
    ENGINE_CACHE_DIR,
    # Fragility & Complexity
    calculate_fragility_simple,
    # Game phase detection
    detect_game_phase, GamePhase,
    # Engine complexity heuristics
    PositionComplexityHeuristics,
    categorize_complexity,
)
import chess
import chess.pgn
import io
import json
import pandas as pd
import numpy as np
from tqdm import tqdm

setup_notebook()
validate_parameters(username)

In [None]:
# Load data
print_section(f"ENGINE ANALYSIS: {username}")

user_data_dir = get_user_data_dir(username)
MIN_GAMES_FOR_ANALYSIS = 10  # Minimum games to analyze

# Load high priority games from phase 3, or fall back to random selection
try:
    priority_data = load_phase_output(username, "phase3", "high_priority_games.json")
    game_ids_to_analyze = set(priority_data['game_ids'])
    print(f"High priority games from phase 3: {len(game_ids_to_analyze)}")
except FileNotFoundError:
    print("Phase 3 output not found, falling back to game selection...")
    game_ids_to_analyze = set()

# Load raw games cache
cache_dir = user_data_dir / "games_cache"
if cache_dir.exists():
    all_games_raw, _ = load_cached_games_v2(cache_dir)
else:
    # Try loading from user data directory
    cache_dir = user_data_dir
    all_games_raw, _ = load_cached_games_v2(cache_dir)

print(f"Raw games loaded: {len(all_games_raw)}")

# Fallback: if no high priority games or not enough, select from all games
if len(game_ids_to_analyze) < MIN_GAMES_FOR_ANALYSIS:
    print(f"\nOnly {len(game_ids_to_analyze)} games from phase 3, need at least {MIN_GAMES_FOR_ANALYSIS}")
    print("Selecting additional games for analysis...")
    
    # Extract game IDs from raw games
    all_game_ids = []
    for game in all_games_raw:
        url = game.get('url', '')
        if url:
            game_id = url.split('/')[-1]
            all_game_ids.append(game_id)
    
    # Add games until we have enough (prioritize wins for more interesting analysis)
    wins_first = []
    others = []
    for game in all_games_raw:
        url = game.get('url', '')
        game_id = url.split('/')[-1] if url else None
        if game_id and game_id not in game_ids_to_analyze:
            # Check if player won
            white = game.get('white', {})
            black = game.get('black', {})
            player_won = (
                (white.get('username', '').lower() == username.lower() and white.get('result') == 'win') or
                (black.get('username', '').lower() == username.lower() and black.get('result') == 'win')
            )
            if player_won:
                wins_first.append(game_id)
            else:
                others.append(game_id)
    
    # Add wins first, then others
    needed = MIN_GAMES_FOR_ANALYSIS - len(game_ids_to_analyze)
    for game_id in wins_first[:needed]:
        game_ids_to_analyze.add(game_id)
        needed -= 1
        if needed <= 0:
            break
    
    if needed > 0:
        for game_id in others[:needed]:
            game_ids_to_analyze.add(game_id)
    
    print(f"Final game count for analysis: {len(game_ids_to_analyze)}")

In [None]:
# Helper to get PGN from raw game
def get_game_pgn(game_id: str, all_games: list) -> chess.pgn.Game:
    """Extract PGN for a specific game."""
    for game in all_games:
        url = game.get('url', '')
        if game_id in url or url.endswith(f'/{game_id}'):
            pgn_str = game.get('pgn', '')
            if pgn_str:
                return chess.pgn.read_game(io.StringIO(pgn_str))
    return None

# Filter games to analyze
games_to_process = []
for game_id in game_ids_to_analyze:
    pgn = get_game_pgn(game_id, all_games_raw)
    if pgn:
        games_to_process.append((game_id, pgn))

print(f"Games with PGN found: {len(games_to_process)}")

In [None]:
# Run multi-depth engine analysis
print_subsection("RUNNING STOCKFISH ANALYSIS")
print(f"Depths: {analysis_depths}")
print(f"Engine threads: {engine_threads}")
print(f"Hash size: {engine_hash_mb} MB")

analysis_results = []


def classify_move_by_cpl(cpl: float) -> str:
    """
    Classify a move based on centipawn loss.
    
    Classification thresholds:
    - Best: 0 CPL
    - Excellent: 1-9 CPL
    - Good: 10-24 CPL
    - Inaccuracy: 25-49 CPL
    - Mistake: 50-99 CPL
    - Blunder: â‰¥100 CPL
    """
    if cpl == 0:
        return "Best"
    elif cpl < 10:
        return "Excellent"
    elif cpl < 25:
        return "Good"
    elif cpl < 50:
        return "Inaccuracy"
    elif cpl < 100:
        return "Mistake"
    else:
        return "Blunder"


with CachedEngineAnalyzer(
    depth=max(analysis_depths),
    threads=engine_threads,
    hash_mb=engine_hash_mb,
    cache_dir=ENGINE_CACHE_DIR,
) as engine:
    for game_id, pgn in tqdm(games_to_process, desc="Analyzing games"):
        try:
            # Determine player color
            white = pgn.headers.get('White', '').lower()
            black = pgn.headers.get('Black', '').lower()
            player_is_white = username.lower() == white
            
            # Analyze each position
            board = pgn.board()
            positions = []
            fragilities = []  # Track for peak detection
            
            for ply, move in enumerate(pgn.mainline_moves()):
                is_white_move = (ply % 2 == 0)
                is_player_move = is_white_move == player_is_white
                
                if is_player_move:
                    # Store FEN before the move for key position rendering
                    fen_before = board.fen()
                    
                    # Detect game phase
                    phase = detect_game_phase(board)
                    
                    # Calculate fragility (cheap - no engine needed)
                    fragility = calculate_fragility_simple(board)
                    fragilities.append((ply, fragility))
                    
                    # Use extended multi-depth analysis with engine heuristics
                    result = engine.analyze_multi_depth_extended(
                        board, 
                        analysis_depths,
                        multipv=2,  # For gap metric
                        capture_search_stats=True,
                    )
                    best_eval = result.evaluations.get(max(analysis_depths), 0)
                    best_move = result.best_moves.get(max(analysis_depths), '')
                    
                    # Extract engine complexity heuristics
                    heuristics = result.complexity_heuristics
                    
                    # Push move and get post-move eval (also from white's perspective)
                    board.push(move)
                    post_result = engine.analyze(board, max(analysis_depths))
                    post_eval = post_result.get('score', 0)
                    
                    # Calculate centipawn loss using the proper function
                    # Both evals are from white's perspective
                    cpl = calculate_centipawn_loss(best_eval, post_eval, is_white_move)
                    
                    # Classify move by CPL (not accuracy!)
                    move_class = classify_move_by_cpl(cpl)
                    
                    positions.append({
                        'ply': ply,
                        'fen': fen_before,  # Store FEN for key position rendering
                        'move': move.uci(),
                        'best_move': best_move,
                        'eval_before': best_eval,
                        'eval_after': post_eval,
                        'cpl': cpl,
                        'move_class': move_class,
                        'is_best': move.uci() == best_move,
                        'move_consistency': result.move_consistency,
                        'eval_swing': result.eval_swing,
                        # Game phase
                        'phase': phase.value,
                        # Graph-based fragility
                        'fragility': fragility,
                        # Engine complexity heuristics
                        'eval_volatility': heuristics.eval_volatility if heuristics else 0,
                        'eval_volatility_normalized': heuristics.eval_volatility_normalized if heuristics else 0,
                        'gap_cp': heuristics.gap_at_max_depth if heuristics else 0,
                        'avg_gap_cp': heuristics.avg_gap if heuristics else 0,
                        'convergence_depth': heuristics.convergence_depth if heuristics else None,
                        'branching_factor': heuristics.branching_factor_estimate if heuristics else 3.5,
                        'engine_complexity_score': heuristics.complexity_score if heuristics else 0,
                        'engine_complexity_category': heuristics.complexity_category if heuristics else 'UNKNOWN',
                        'total_nodes': heuristics.total_nodes if heuristics else 0,
                        # Legal moves count from the position
                        'legal_moves': len(list(chess.Board(fen_before).legal_moves)),
                    })
                else:
                    board.push(move)
            
            # Mark positions relative to fragility peak
            if fragilities:
                # Find peak fragility
                peak_ply, peak_fragility = max(fragilities, key=lambda x: x[1])
                
                # Mark positions as pre-peak (5 moves before peak is decisive zone)
                for pos in positions:
                    pos_ply = pos['ply']
                    pos['is_pre_fragility_peak'] = (pos_ply < peak_ply and pos_ply >= peak_ply - 10)
                    pos['peak_fragility'] = peak_fragility
            
            # Calculate game-level metrics
            if positions:
                cpl_values = [p['cpl'] for p in positions]
                acpl = np.mean(cpl_values)
                accuracy = calculate_game_accuracy_simple(cpl_values)
                best_move_rate = sum(1 for p in positions if p['is_best']) / len(positions)
                
                # Game-level fragility
                avg_fragility = np.mean([p['fragility'] for p in positions])
                
                # Game-level engine complexity heuristics
                avg_eval_volatility = np.mean([p['eval_volatility'] for p in positions])
                avg_gap_cp = np.mean([p['gap_cp'] for p in positions])
                avg_engine_complexity = np.mean([p['engine_complexity_score'] for p in positions])
                high_complexity_moves = sum(1 for p in positions 
                                           if p['engine_complexity_category'] in ['HIGH', 'VERY_HIGH'])
                
                analysis_results.append({
                    'game_id': game_id,
                    'moves_analyzed': len(positions),
                    'acpl': acpl,
                    'accuracy': accuracy,
                    'best_move_rate': best_move_rate,
                    'blunders': sum(1 for p in positions if p['move_class'] == 'Blunder'),
                    'mistakes': sum(1 for p in positions if p['move_class'] == 'Mistake'),
                    'inaccuracies': sum(1 for p in positions if p['move_class'] == 'Inaccuracy'),
                    'avg_fragility': avg_fragility,
                    # Engine complexity heuristics (aggregated)
                    'avg_eval_volatility': avg_eval_volatility,
                    'avg_gap_cp': avg_gap_cp,
                    'avg_engine_complexity': avg_engine_complexity,
                    'high_complexity_moves': high_complexity_moves,
                    'positions': positions,
                })
        except Exception as e:
            print(f"Error analyzing {game_id}: {e}")
            continue
    
    # Print cache stats
    stats = engine.cache_stats()
    print(f"\nCache stats: {stats['hit_rate']} hit rate ({stats['cache_hits']} hits, {stats['cache_misses']} misses)")

In [None]:
# Display results
print_subsection("ENGINE ANALYSIS RESULTS")

if analysis_results:
    results_df = pd.DataFrame([{
        'game_id': r['game_id'],
        'moves': r['moves_analyzed'],
        'acpl': r['acpl'],
        'accuracy': r['accuracy'],
        'best_move_rate': r['best_move_rate'],
        'blunders': r['blunders'],
        'mistakes': r['mistakes'],
        'inaccuracies': r['inaccuracies'],
        'avg_fragility': r.get('avg_fragility', 0),
        # Engine complexity heuristics
        'avg_eval_volatility': r.get('avg_eval_volatility', 0),
        'avg_gap_cp': r.get('avg_gap_cp', 0),
        'avg_engine_complexity': r.get('avg_engine_complexity', 0),
        'high_complexity_moves': r.get('high_complexity_moves', 0),
    } for r in analysis_results])
    
    print(f"Games analyzed: {len(results_df)}")
    print(f"\nSummary statistics:")
    print(f"  Average ACPL: {results_df['acpl'].mean():.1f}")
    print(f"  Average Accuracy: {results_df['accuracy'].mean():.1f}%")
    print(f"  Average Best Move Rate: {results_df['best_move_rate'].mean():.1%}")
    print(f"  Average Fragility: {results_df['avg_fragility'].mean():.3f}")
    
    print(f"\nEngine Complexity Heuristics:")
    print(f"  Average Eval Volatility: {results_df['avg_eval_volatility'].mean():.1f}cp")
    print(f"  Average Gap to 2nd Best: {results_df['avg_gap_cp'].mean():.0f}cp")
    print(f"  Average Engine Complexity: {results_df['avg_engine_complexity'].mean():.1%}")
    print(f"  High Complexity Moves: {results_df['high_complexity_moves'].sum()} total")
    
    print(f"\nTop 10 games by accuracy:")
    print(results_df.nlargest(10, 'accuracy')[['game_id', 'acpl', 'accuracy', 'best_move_rate', 'avg_engine_complexity']].to_string())
else:
    print("No games were analyzed.")
    results_df = pd.DataFrame()

In [None]:
# Save outputs
if not results_df.empty:
    save_phase_output(username, "phase4a", "engine_analysis.parquet", results_df)
    
    # Save detailed positions separately (large)
    detailed_output = {
        "username": username,
        "depths": analysis_depths,
        "games": analysis_results,
    }
    save_phase_output(username, "phase4a", "engine_positions.json", detailed_output)

print(f"\nPhase 4a complete!")

In [None]:
# Visualization
import matplotlib.pyplot as plt

if not results_df.empty:
    fig, axes = plt.subplots(1, 3, figsize=(15, 4))
    
    # ACPL distribution
    axes[0].hist(results_df['acpl'], bins=20, color='steelblue', edgecolor='white')
    axes[0].axvline(results_df['acpl'].mean(), color='red', linestyle='--', label=f"Mean: {results_df['acpl'].mean():.1f}")
    axes[0].set_xlabel('ACPL')
    axes[0].set_ylabel('Games')
    axes[0].set_title('ACPL Distribution')
    axes[0].legend()
    
    # Accuracy distribution (0-100 scale)
    axes[1].hist(results_df['accuracy'], bins=20, color='green', edgecolor='white')
    axes[1].axvline(results_df['accuracy'].mean(), color='red', linestyle='--', label=f"Mean: {results_df['accuracy'].mean():.1f}%")
    axes[1].set_xlabel('Accuracy (%)')
    axes[1].set_ylabel('Games')
    axes[1].set_title('Accuracy Distribution')
    axes[1].legend()
    
    # Best move rate distribution
    axes[2].hist(results_df['best_move_rate'], bins=20, color='purple', edgecolor='white')
    axes[2].axvline(results_df['best_move_rate'].mean(), color='red', linestyle='--', label=f"Mean: {results_df['best_move_rate'].mean():.1%}")
    axes[2].set_xlabel('Best Move Rate')
    axes[2].set_ylabel('Games')
    axes[2].set_title('Best Move Rate Distribution')
    axes[2].legend()
    
    plt.tight_layout()
    plt.show()