# Pokémon battles — XGBoost with 10-fold outer CV
Notebook breve che esegue: feature engineering, split train/val/test, 10-fold outer CV con GridSearchCV interno, valutazione per fold, valutazione su holdout e generazione submission.csv.

# Load data

In [1]:
import json
import os
import numpy as np
import pandas as pd
from tqdm import tqdm
from sklearn.model_selection import train_test_split, StratifiedKFold, GridSearchCV
from sklearn.metrics import accuracy_score
from xgboost import XGBClassifier
import warnings
warnings.filterwarnings('ignore')

# --- Percorsi (modificare se necessario) ---
COMPETITION_NAME = 'fds-pokemon-battles-prediction-2025'
train_file_path = 'train.jsonl'
test_file_path = 'test.jsonl'

def load_jsonl(path):
    data = []
    with open(path, 'r') as f:
        for line in f:
            data.append(json.loads(line))
    return data

print('Caricamento dati...')
train_raw = load_jsonl(train_file_path)
test_raw = load_jsonl(test_file_path)
print(f'Train records: {len(train_raw)}, Test records: {len(test_raw)}')

Caricamento dati...
Train records: 10000, Test records: 5000


# Features engeneering

In [11]:
import math
import numpy as np
import pandas as pd
from collections import Counter
from tqdm.notebook import tqdm

# ==============================================================================
# 1. COSTANTI & FUNZIONI BASE
# ==============================================================================

TYPE_CHART = {
    'normal': {'rock': 0.5, 'ghost': 0},
    'fire': {'fire': 0.5, 'water': 0.5, 'grass': 2, 'ice': 2, 'bug': 2, 'rock': 0.5, 'dragon': 0.5},
    'water': {'fire': 2, 'water': 0.5, 'grass': 0.5, 'ground': 2, 'rock': 2, 'dragon': 0.5},
    'grass': {'fire': 0.5, 'water': 2, 'grass': 0.5, 'poison': 0.5, 'ground': 2, 'flying': 0.5, 'bug': 0.5, 'rock': 2, 'dragon': 0.5},
    'electric': {'water': 2, 'grass': 0.5, 'electric': 0.5, 'ground': 0, 'flying': 2, 'dragon': 0.5},
    'ice': {'fire': 0.5, 'water': 0.5, 'grass': 2, 'ground': 2, 'flying': 2, 'dragon': 2},
    'fighting': {'normal': 2, 'ice': 2, 'poison': 0.5, 'flying': 0.5, 'psychic': 0.5, 'bug': 0.5, 'rock': 2, 'ghost': 0},
    'poison': {'grass': 2, 'poison': 0.5, 'ground': 0.5, 'bug': 2, 'rock': 0.5, 'ghost': 0.5},
    'ground': {'fire': 2, 'grass': 0.5, 'electric': 2, 'poison': 2, 'flying': 0, 'bug': 0.5, 'rock': 2},
    'flying': {'grass': 2, 'electric': 0.5, 'fighting': 2, 'bug': 2, 'rock': 0.5},
    'psychic': {'fighting': 2, 'poison': 2, 'psychic': 0.5, 'ghost': 0},
    'bug': {'fire': 0.5, 'grass': 2, 'fighting': 0.5, 'poison': 2, 'flying': 0.5, 'psychic': 2, 'ghost': 0.5},
    'rock': {'fire': 2, 'ice': 2, 'fighting': 0.5, 'ground': 0.5, 'flying': 2, 'bug': 2},
    'ghost': {'normal': 0, 'psychic': 0, 'ghost': 2},
    'dragon': {'dragon': 2}
}

ALL_ATTACK_TYPES = list(TYPE_CHART.keys())

def get_effectiveness(attack_type: str, defense_types: list) -> float:
    if not attack_type or not defense_types: return 1.0
    eff = 1.0
    for d in defense_types: eff *= TYPE_CHART.get(attack_type, {}).get(d, 1.0)
    return eff

def _entropy(counter: Counter) -> float:
    total = sum(counter.values())
    if total == 0: return 0.0
    ent = 0.0
    for v in counter.values():
        p = v / total
        if p > 0: ent -= p * math.log(p, 2)
    return ent

# ==============================================================================
# 2. ESTRATTORI FEATURE ORIGINALI (CONSERVATI)
# ==============================================================================
# (Queste sono le funzioni V1 che avevi già)

def calculate_type_advantage(team1: list, team2_lead: dict) -> dict:
    out = {'p1_vs_lead_avg_effectiveness': 0.0, 'p1_vs_lead_max_effectiveness': 0.0, 'p1_super_effective_options': 0}
    if not team1 or not team2_lead: return out
    lead_types = [t.lower() for t in team2_lead.get('types', [])]
    if not lead_types: return out
    effs = []
    for p in team1:
        p_types = [t.lower() for t in p.get('types', [])]
        max_eff = 0.0
        for pt in p_types: max_eff = max(max_eff, get_effectiveness(pt, lead_types))
        effs.append(max_eff)
    if not effs: return out
    out['p1_vs_lead_avg_effectiveness'] = float(np.mean(effs))
    out['p1_vs_lead_max_effectiveness'] = float(np.max(effs))
    out['p1_super_effective_options'] = int(sum(1 for e in effs if e >= 2))
    return out

def team_aggregate_features(team: list, prefix: str = 'p1_') -> dict:
    stats = ['base_hp','base_atk','base_def','base_spa','base_spd','base_spe']
    out = {}
    vals = {s: [] for s in stats}
    levels = []; types_counter = Counter(); names = []
    for p in team:
        names.append(p.get('name',''))
        for s in stats: vals[s].append(p.get(s, 0))
        levels.append(p.get('level', 0))
        for t in p.get('types', []): types_counter[t.lower()] += 1
    for s in stats:
        arr = np.array(vals[s], dtype=float)
        out[f'{prefix}{s}_sum'] = float(arr.sum())
        out[f'{prefix}{s}_mean'] = float(arr.mean())
        out[f'{prefix}{s}_max'] = float(arr.max())
        out[f'{prefix}{s}_min'] = float(arr.min())
        out[f'{prefix}{s}_std'] = float(arr.std())
    level_arr = np.array(levels, dtype=float)
    out[f'{prefix}level_mean'] = float(level_arr.mean()) if level_arr.size else 0.0
    out[f'{prefix}level_sum'] = float(level_arr.sum()) if level_arr.size else 0.0
    out[f'{prefix}n_unique_types'] = int(len(types_counter))
    for t in ['normal','fire','water','electric','grass','psychic','ice','dragon','rock','ground','flying']:
        out[f'{prefix}type_{t}_count'] = int(types_counter.get(t, 0))
    out[f'{prefix}lead_name'] = names[0] if names else ''
    out[f'{prefix}n_unique_names'] = int(len(set(names)))
    out[f'{prefix}type_entropy'] = float(_entropy(types_counter))
    spe_arr = np.array(vals['base_spe'], dtype=float)
    out[f'{prefix}spe_p25'] = float(np.percentile(spe_arr, 25)) if spe_arr.size else 0.0
    out[f'{prefix}spe_p50'] = float(np.percentile(spe_arr, 50)) if spe_arr.size else 0.0
    out[f'{prefix}spe_p75'] = float(np.percentile(spe_arr, 75)) if spe_arr.size else 0.0
    return out

def lead_vs_lead_features(p1_lead: dict, p2_lead: dict) -> dict:
    out = {}
    for s in ['base_hp','base_atk','base_def','base_spa','base_spd','base_spe']:
        out[f'lead_diff_{s}'] = float(p1_lead.get(s,0) - p2_lead.get(s,0))
    out['lead_speed_advantage'] = float(p1_lead.get('base_spe',0) - p2_lead.get('base_spe',0))
    p1_types = [t.lower() for t in p1_lead.get('types', [])]
    p2_types = [t.lower() for t in p2_lead.get('types', [])]
    max_eff = 0.0
    for pt in p1_types: max_eff = max(max_eff, get_effectiveness(pt, p2_types))
    out['lead_p1_vs_p2_effectiveness'] = float(max_eff)
    return out

def lead_aggregate_features(pokemon: dict, prefix: str = 'p2_lead_') -> dict:
    out = {}
    for s in ['base_hp','base_atk','base_def','base_spa','base_spd','base_spe']:
        out[f'{prefix}{s}'] = float(pokemon.get(s,0))
    out[f'{prefix}level'] = int(pokemon.get('level',0))
    types = [x.lower() for x in pokemon.get('types', [])]
    for t in ['normal','fire','water','electric','grass','psychic','ice','dragon','rock','ground','flying']:
        out[f'{prefix}type_{t}'] = int(t in types)
    out[f'{prefix}name'] = pokemon.get('name','')
    out[f'{prefix}n_unique_types'] = int(len(set(types)))
    return out

def quick_boost_features_v2(record: dict) -> dict:
    out = {}
    p1_team = record.get('p1_team_details', [])
    p2_lead = record.get('p2_lead_details', {})
    timeline = record.get('battle_timeline', [])
    if not p1_team: return out
    
    p2_lead_spe = p2_lead.get('base_spe', 0)
    faster_count = sum(1 for p in p1_team if p.get('base_spe', 0) > p2_lead_spe)
    slower_count = sum(1 for p in p1_team if p.get('base_spe', 0) <= p2_lead_spe)
    out['p1_faster_than_lead_count'] = faster_count
    out['p1_slower_than_lead_count'] = slower_count
    out['p1_speed_control_ratio'] = faster_count / max(1, len(p1_team))
    
    p1_avg_bulk = np.mean([p.get('base_hp', 0)*(p.get('base_def', 0)+p.get('base_spd', 0)) for p in p1_team])
    p2_lead_bulk = p2_lead.get('base_hp', 1)*(p2_lead.get('base_def', 1)+p2_lead.get('base_spd', 1))
    out['p1_avg_bulk_vs_lead'] = p1_avg_bulk / max(p2_lead_bulk, 1)
    
    p1_total_atk = sum(p.get('base_atk', 0) + p.get('base_spa', 0) for p in p1_team)
    p2_lead_offense = p2_lead.get('base_atk', 0) + p2_lead.get('base_spa', 0)
    out['p1_total_offense'] = p1_total_atk
    out['p1_offense_advantage'] = p1_total_atk / max(p2_lead_offense, 1)
    
    p2_lead_types = [t.lower() for t in p2_lead.get('types', [])]
    if p2_lead_types:
        coverage_scores = []
        for p in p1_team:
            p_types = [t.lower() for t in p.get('types', [])]
            max_eff = max([get_effectiveness(pt, p2_lead_types) for pt in p_types] or [1.0])
            coverage_scores.append(max_eff)
        out['p1_avg_effectiveness_vs_lead'] = float(np.mean(coverage_scores))
        out['p1_max_effectiveness_vs_lead'] = float(np.max(coverage_scores))
        out['p1_se_count_vs_lead'] = sum(1 for s in coverage_scores if s >= 2.0)
        out['p1_weak_count_vs_lead'] = sum(1 for s in coverage_scores if s <= 0.5)
        
    if timeline:
        first_p1_ko = False; first_p2_ko = False
        for turn in timeline[:30]:
            if not first_p2_ko and turn.get('p2_pokemon_state', {}).get('fainted'):
                first_p1_ko = True; out['p1_first_blood'] = 1; out['p1_first_blood_turn'] = turn.get('turn', 0); break
            if not first_p1_ko and turn.get('p1_pokemon_state', {}).get('fainted'):
                first_p2_ko = True; out['p1_first_blood'] = 0; out['p1_first_blood_turn'] = turn.get('turn', 0); break
        if not first_p1_ko and not first_p2_ko:
            out['p1_first_blood'] = -1; out['p1_first_blood_turn'] = 0
            
    p1_avg_level = np.mean([p.get('level', 50) for p in p1_team])
    out['p1_avg_level_advantage'] = p1_avg_level - p2_lead.get('level', 50)
    
    p1_stat_products = [(p.get('base_hp',1)*p.get('base_atk',1)*p.get('base_def',1)*p.get('base_spa',1)*p.get('base_spd',1)*p.get('base_spe',1)) for p in p1_team]
    out['p1_avg_stat_product'] = float(np.mean(p1_stat_products))
    out['p1_max_stat_product'] = float(np.max(p1_stat_products))
    p2_prod = p2_lead.get('base_hp',1)*p2_lead.get('base_atk',1)*p2_lead.get('base_def',1)*p2_lead.get('base_spa',1)*p2_lead.get('base_spd',1)*p2_lead.get('base_spe',1)
    out['p1_stat_product_advantage'] = out['p1_avg_stat_product'] / max(p2_prod, 1)
    return out

def summary_from_timeline(timeline: list, p1_team: list) -> dict:
    out = {}
    if not timeline: return {'tl_p1_moves':0,'tl_p2_moves':0,'tl_p1_est_damage':0.0,'tl_p2_est_damage':0.0,'damage_diff':0.0}
    p1_moves = p2_moves = 0; p1_damage = p2_damage = 0.0
    p1_last_active = p2_last_active = ''; p1_last_hp = p2_last_hp = np.nan
    p1_fainted = p2_fainted = 0
    p1_fainted_names = set(); p2_fainted_names = set()
    last_p1_hp = {}; last_p2_hp = {}
    p1_comeback_kos = p2_comeback_kos = 0
    p1_inflicted_statuses = Counter(); p2_inflicted_statuses = Counter()
    p1_pokemon_statuses = {}; p2_pokemon_statuses = {}
    p1_move_type_counts = Counter(); p2_move_type_counts = Counter()
    p1_damage_first2 = 0.0; p2_damage_first2 = 0.0
    p1_dmg_by_turn = {}; p2_dmg_by_turn = {}; seen_turns = set()
    first_ko_turn_p1_taken = None; first_ko_turn_p1_inflicted = None
    early_threshold = 10; p1_kos_early = p1_kos_late = p2_kos_early = p2_kos_late = 0
    
    # --- NUOVA AGGIUNTA V5: Tracciamento Crit/Turni Persi ---
    p1_crit_count = 0; p2_crit_count = 0
    p1_lost_turns_status = 0; p2_lost_turns_status = 0

    for i, turn in enumerate(timeline[:30]):
        prev_p1_fainted, prev_p2_fainted = p1_fainted, p2_fainted
        p1_state = turn.get('p1_pokemon_state',{}) or {}; p2_state = turn.get('p2_pokemon_state',{}) or {}
        tnum = turn.get('turn', len(seen_turns) + 1); seen_turns.add(tnum)

        if p1_state.get('name'): p1_last_active = p1_state.get('name')
        if p2_state.get('name'): p2_last_active = p2_state.get('name')

        if p1_state.get('fainted') and p1_state.get('name') not in p1_fainted_names:
            p1_fainted += 1; p1_fainted_names.add(p1_state.get('name'))
            if first_ko_turn_p1_taken is None: first_ko_turn_p1_taken = tnum
            if tnum <= early_threshold: p2_kos_early += 1
            else: p2_kos_late += 1
        if p2_state.get('fainted') and p2_state.get('name') not in p2_fainted_names:
            p2_fainted += 1; p2_fainted_names.add(p2_state.get('name'))
            if first_ko_turn_p1_inflicted is None: first_ko_turn_p1_inflicted = tnum
            if tnum <= early_threshold: p1_kos_early += 1
            else: p1_kos_late += 1

        p2_name, p2_hp = p2_state.get('name'), p2_state.get('hp_pct')
        if p2_name and p2_hp is not None:
            prev_hp = last_p2_hp.get(p2_name)
            if prev_hp is not None:
                delta = max(0.0, prev_hp - p2_hp)
                p1_damage += delta
                p1_dmg_by_turn[tnum] = p1_dmg_by_turn.get(tnum, 0.0) + delta
                if turn.get('turn',999) <= 2: p1_damage_first2 += delta
            last_p2_hp[p2_name] = p2_hp

        p1_name, p1_hp = p1_state.get('name'), p1_state.get('hp_pct')
        if p1_name and p1_hp is not None:
            prev_hp = last_p1_hp.get(p1_name)
            if prev_hp is not None:
                delta = max(0.0, prev_hp - p1_hp)
                p2_damage += delta
                p2_dmg_by_turn[tnum] = p2_dmg_by_turn.get(tnum, 0.0) + delta
                if turn.get('turn',999) <= 2: p2_damage_first2 += delta
            last_p1_hp[p1_name] = p1_hp

        damage_diff_so_far = p1_damage - p2_damage
        if p2_fainted > prev_p2_fainted and damage_diff_so_far < -1.0: p1_comeback_kos += 1
        if p1_fainted > prev_p1_fainted and damage_diff_so_far > 1.0: p2_comeback_kos += 1

        p2_status = p2_state.get('status')
        if p2_name and p2_status and p2_pokemon_statuses.get(p2_name) != p2_status:
            p1_inflicted_statuses[p2_status] += 1; p2_pokemon_statuses[p2_name] = p2_status
        p1_status = p1_state.get('status')
        if p1_name and p1_status and p1_pokemon_statuses.get(p1_name) != p1_status:
            p2_inflicted_statuses[p1_status] += 1; p1_pokemon_statuses[p1_name] = p1_status

        p1_move = turn.get('p1_move_details') or {}; p2_move = turn.get('p2_move_details') or {}
        if p1_move and p1_move.get('type'): p1_move_type_counts[(p1_move.get('type') or '').lower()] += 1
        if p2_move and p2_move.get('type'): p2_move_type_counts[(p2_move.get('type') or '').lower()] += 1
        if p1_move: p1_moves += 1
        if p2_move: p2_moves += 1
        
        # --- NUOVA AGGIUNTA V5: Logica Crit/Turni Persi ---
        if p1_move.get('critical_hit', False): p1_crit_count += 1
        if p2_move.get('critical_hit', False): p2_crit_count += 1
        
        prev_p1_name = timeline[i-1].get('p1_pokemon_state',{}).get('name') if i > 0 else None
        if p1_state.get('status') in ['par', 'slp'] and not p1_move and p1_state.get('name') == prev_p1_name:
            p1_lost_turns_status += 1
            
        prev_p2_name = timeline[i-1].get('p2_pokemon_state',{}).get('name') if i > 0 else None
        if p2_state.get('status') in ['par', 'slp'] and not p2_move and p2_state.get('name') == prev_p2_name:
            p2_lost_turns_status += 1
            
        p1_last_hp = p1_state.get('hp_pct', np.nan); p2_last_hp = p2_state.get('hp_pct', np.nan)
    
    # --- Fine Loop Timeline ---

    out['tl_p1_moves'] = int(p1_moves); out['tl_p2_moves'] = int(p2_moves)
    out['tl_p1_est_damage'] = float(p1_damage); out['tl_p2_est_damage'] = float(p2_damage)
    out['tl_p1_fainted'] = int(p1_fainted); out['tl_p2_fainted'] = int(p2_fainted)
    turns_count = max(1, len(seen_turns))
    out['tl_p1_fainted_rate'] = float(out['tl_p1_fainted'] / turns_count)
    out['tl_p2_fainted_rate'] = float(out['tl_p2_fainted'] / turns_count)
    out['damage_diff'] = float(p1_damage - p2_damage)
    out['fainted_diff'] = int(p1_fainted - p2_fainted)
    out['tl_p1_last_hp'] = float(p1_last_hp) if not np.isnan(p1_last_hp) else 0.0
    out['tl_p2_last_hp'] = float(p2_last_hp) if not np.isnan(p2_last_hp) else 0.0
    out['tl_p1_last_active'] = p1_last_active; out['tl_p2_last_active'] = p2_last_active
    
    if p1_team:
        p1_total_hp_sum = sum(p.get('base_hp',0) for p in p1_team)
        p1_avg_def = np.mean([p.get('base_def',0) for p in p1_team] or [0])
        p1_avg_spd = np.mean([p.get('base_spd',0) for p in p1_team] or [0])
        out['tl_p2_damage_vs_p1_hp_pool'] = float(p2_damage / (p1_total_hp_sum + 1e-6))
        out['tl_p1_defensive_endurance'] = float((p1_avg_def + p1_avg_spd) / (p2_damage + 1e-6))
        
    out['tl_p1_comeback_kos'] = int(p1_comeback_kos); out['tl_p2_comeback_kos'] = int(p2_comeback_kos)
    out['tl_comeback_kos_diff'] = int(p1_comeback_kos - p2_comeback_kos)

    common_statuses = ['brn','par','slp','frz','psn','tox']
    for status in common_statuses:
        out[f'tl_p1_inflicted_{status}_count'] = int(p1_inflicted_statuses.get(status,0))
        out[f'tl_p2_inflicted_{status}_count'] = int(p2_inflicted_statuses.get(status,0))
        out[f'tl_inflicted_{status}_diff'] = int(p1_inflicted_statuses.get(status,0) - p2_inflicted_statuses.get(status,0))
        c1 = p1_inflicted_statuses.get(status,0); c2 = p2_inflicted_statuses.get(status,0)
        out[f'tl_p1_inflicted_{status}_rate'] = float(c1 / turns_count)
        out[f'tl_p2_inflicted_{status}_rate'] = float(c2 / turns_count)
        out[f'tl_inflicted_{status}_rate_diff'] = float((c1 - c2) / turns_count)

    common_move_types = ['normal','fire','water','electric','grass','psychic','ice','dragon','rock','ground','flying','ghost','bug','poison','fighting']
    for mt in common_move_types:
        out[f'tl_p1_move_type_{mt}_count'] = int(p1_move_type_counts.get(mt,0))
        out[f'tl_p2_move_type_{mt}_count'] = int(p2_move_type_counts.get(mt,0))
        out[f'tl_move_type_{mt}_count_diff'] = int(p1_move_type_counts.get(mt,0) - p2_move_type_counts.get(mt,0))

    out['tl_p1_damage_first2'] = float(p1_damage_first2)
    out['tl_p2_damage_first2'] = float(p2_damage_first2)
    out['tl_first2_damage_diff'] = float(p1_damage_first2 - p2_damage_first2)
    out['tl_turns_count'] = int(turns_count)
    out['tl_p1_moves_rate'] = float(p1_moves / turns_count); out['tl_p2_moves_rate'] = float(p2_moves / turns_count)
    out['tl_p1_damage_per_turn'] = float(p1_damage / turns_count); out['tl_p2_damage_per_turn'] = float(p2_damage / turns_count)
    out['tl_damage_rate_diff'] = float(out['tl_p1_damage_per_turn'] - out['tl_p2_damage_per_turn'])

    recent_turns = sorted(seen_turns)[-5:] if seen_turns else []
    p1_last5 = sum(p1_dmg_by_turn.get(t,0.0) for t in recent_turns)
    p2_last5 = sum(p2_dmg_by_turn.get(t,0.0) for t in recent_turns)
    out['tl_p1_damage_last5'] = float(p1_last5); out['tl_p2_damage_last5'] = float(p2_last5)
    out['tl_last5_damage_diff'] = float(p1_last5 - p2_last5)
    out['tl_p1_last5_damage_ratio'] = float(p1_last5 / (p1_damage + 1e-6))
    out['tl_p2_last5_damage_ratio'] = float(p2_last5 / (p2_damage + 1e-6))
    out['tl_last5_damage_ratio_diff'] = float(out['tl_p1_last5_damage_ratio'] - out['tl_p2_last5_damage_ratio'])

    if seen_turns:
        ts = sorted(seen_turns); w = np.linspace(1.0, 2.0, num=len(ts)); w = w / (w.sum() + 1e-9)
        adv = [(p1_dmg_by_turn.get(t,0.0) - p2_dmg_by_turn.get(t,0.0)) for t in ts]
        out['tl_weighted_damage_diff'] = float(np.dot(w, adv))
        cum = 0.0; signs = []
        for t in ts:
            cum += (p1_dmg_by_turn.get(t,0.0) - p2_dmg_by_turn.get(t,0.0))
            s = 1 if cum > 1e-9 else (-1 if cum < -1e-9 else 0)
            if s != 0 and (not signs or signs[-1] != s): signs.append(s)
        out['tl_damage_adv_sign_flips'] = int(max(0, len(signs) - 1))
        out['tl_comeback_flag'] = int(1 if (len(signs) >= 2 and signs[0] != signs[-1]) else 0)
    else:
        out['tl_weighted_damage_diff'] = 0.0; out['tl_damage_adv_sign_flips'] = 0; out['tl_comeback_flag'] = 0

    out['tl_first_ko_turn_p1_inflicted'] = int(first_ko_turn_p1_inflicted or 0)
    out['tl_first_ko_turn_p1_taken'] = int(first_ko_turn_p1_taken or 0)
    out['tl_first_ko_turn_diff'] = int((first_ko_turn_p1_inflicted or 0) - (first_ko_turn_p1_taken or 0))
    out['tl_kos_early_p1'] = int(p1_kos_early); out['tl_kos_late_p1'] = int(p1_kos_late)
    out['tl_kos_early_p2'] = int(p2_kos_early); out['tl_kos_late_p2'] = int(p2_kos_late)

    # --- NUOVA AGGIUNTA V5: Salva risultati RNG/Hax ---
    out['tl_p1_crit_count'] = int(p1_crit_count)
    out['tl_p2_crit_count'] = int(p2_crit_count)
    out['tl_crit_diff'] = int(p1_crit_count - p2_crit_count)
    out['tl_p1_lost_turns_status'] = int(p1_lost_turns_status)
    out['tl_p2_lost_turns_status'] = int(p2_lost_turns_status)
    out['tl_status_luck_diff'] = int(p2_lost_turns_status - p1_lost_turns_status)
    
    return out

def extract_move_coverage_from_timeline(timeline: list, prefix: str = 'p1_') -> dict:
    out = {}; move_types_used = set(); move_categories_used = Counter()
    unique_moves = set(); stab_count = 0
    for turn in timeline[:30]:
        move_details = turn.get(f'{prefix[:-1]}_move_details')
        pokemon_state = turn.get(f'{prefix[:-1]}_pokemon_state', {})
        if not move_details: continue
        move_name = move_details.get('name', ''); move_type = (move_details.get('type') or '').lower()
        move_category = move_details.get('category', '')
        if move_name: unique_moves.add(move_name)
        if move_type: move_types_used.add(move_type)
        if move_category: move_categories_used[move_category] += 1
        if move_type in [t.lower() for t in pokemon_state.get('types', [])]: stab_count += 1
    
    out[f'{prefix}tl_unique_move_types'] = len(move_types_used)
    out[f'{prefix}tl_unique_moves_used'] = len(unique_moves)
    out[f'{prefix}tl_stab_moves'] = stab_count
    out[f'{prefix}tl_physical_moves'] = move_categories_used.get('physical', 0)
    out[f'{prefix}tl_special_moves'] = move_categories_used.get('special', 0)
    out[f'{prefix}tl_status_moves'] = move_categories_used.get('status', 0)
    out[f'{prefix}tl_coverage_score'] = len(move_types_used) / max(1, len(unique_moves))
    total_moves = sum(move_categories_used.values())
    if total_moves > 0:
        out[f'{prefix}tl_offensive_ratio'] = (move_categories_used.get('physical',0)+move_categories_used.get('special',0)) / total_moves
        out[f'{prefix}tl_status_ratio'] = move_categories_used.get('status', 0) / total_moves
    else:
        out[f'{prefix}tl_offensive_ratio'] = 0.0; out[f'{prefix}tl_status_ratio'] = 0.0
    return out

def ability_features(team: list, prefix: str) -> dict:
    immunity_abilities = {'levitate':0,'volt_absorb':0,'water_absorb':0,'flash_fire':0}
    stat_drop_abilities = {'intimidate':0}; weather_abilities = {'drought':0,'drizzle':0,'sand_stream':0}
    out = {}
    for pokemon in team:
        ability = (pokemon.get('ability','') or '').lower().replace(' ','_')
        if ability in immunity_abilities: immunity_abilities[ability] += 1
        if ability in stat_drop_abilities: stat_drop_abilities[ability] += 1
        if ability in weather_abilities: weather_abilities[ability] += 1
    for ability,count in immunity_abilities.items(): out[f'{prefix}ability_{ability}_count'] = int(count)
    for ability,count in stat_drop_abilities.items(): out[f'{prefix}ability_{ability}_count'] = int(count)
    for ability,count in weather_abilities.items(): out[f'{prefix}ability_{ability}_count'] = int(count)
    out[f'{prefix}total_immunity_abilities'] = int(sum(immunity_abilities.values()))
    out[f'{prefix}total_stat_drop_abilities'] = int(sum(stat_drop_abilities.values()))
    return out

def momentum_features(timeline: list) -> dict:
    out = {}; p1_advantages = []; cumulative_advantage = 0.0
    if not timeline: return out
    for i, turn in enumerate(timeline[:30]):
        p1_hp = turn.get('p1_pokemon_state', {}).get('hp_pct', 100)
        p2_hp = turn.get('p2_pokemon_state', {}).get('hp_pct', 100)
        turn_advantage = p1_hp - p2_hp
        cumulative_advantage += turn_advantage; p1_advantages.append(cumulative_advantage)
    if p1_advantages:
        x = np.arange(len(p1_advantages)); slope, intercept = np.polyfit(x, p1_advantages, 1)
        out['p1_momentum_slope'] = float(slope); out['p1_momentum_intercept'] = float(intercept)
        out['p1_final_advantage'] = float(p1_advantages[-1])
        out['p1_advantage_volatility'] = float(np.std(p1_advantages))
        out['p1_max_advantage'] = float(np.max(p1_advantages)); out['p1_min_advantage'] = float(np.min(p1_advantages))
        out['p1_advantage_range'] = float(out['p1_max_advantage'] - out['p1_min_advantage'])
    return out

def extract_opponent_team_from_timeline(timeline: list, p1_team: list) -> dict:
    out = {}; p2_pokemon_seen = set(); p2_pokemon_types = []
    for turn in timeline[:30]:
        p2_state = turn.get('p2_pokemon_state', {})
        if not p2_state: continue
        p2_name = p2_state.get('name')
        if p2_name and p2_name not in p2_pokemon_seen:
            p2_pokemon_seen.add(p2_name)
            p2_pokemon_types.extend([t.lower() for t in p2_state.get('types', [])])
    
    out['p2_tl_unique_pokemon_seen'] = len(p2_pokemon_seen)
    out['p2_tl_switches_count'] = len(p2_pokemon_seen) - 1
    p2_type_counter = Counter(p2_pokemon_types)
    out['p2_tl_unique_types_seen'] = len(p2_type_counter)
    out['p2_tl_type_entropy'] = _entropy(p2_type_counter)
    
    if p2_pokemon_types and p1_team:
        matchup_advantages = 0
        for p1_poke in p1_team:
            p1_types = [t.lower() for t in p1_poke.get('types', [])]
            for p1_type in p1_types:
                for p2_type in set(p2_pokemon_types):
                    eff = get_effectiveness(p1_type, [p2_type])
                    if eff >= 2.0: matchup_advantages += 1
        out['p1_vs_p2_tl_type_advantages'] = matchup_advantages
        out['p1_vs_p2_tl_type_advantages_per_poke'] = matchup_advantages / max(1, len(p1_team))
    
    total_turns = len(timeline[:30])
    out['p2_tl_switch_rate'] = len(p2_pokemon_seen) / max(1, total_turns)
    return out

# ==============================================================================
# 3. NUOVE FEATURE AVANZATE (DATA-DRIVEN, V4/V5)
# ==============================================================================

def extract_information_advantage(timeline: list) -> dict:
    p1_rev = set(); p2_rev = set(); reveal_turns_p2 = []
    for turn in timeline[:30]:
        t = turn.get('turn', 0)
        if n1 := turn.get('p1_pokemon_state', {}).get('name'): p1_rev.add(n1)
        if n2 := turn.get('p2_pokemon_state', {}).get('name'):
            if n2 not in p2_rev: p2_rev.add(n2); reveal_turns_p2.append(t)
    return {
        'tl_p1_revealed_count': len(p1_rev),
        'tl_p2_revealed_count': len(p2_rev),
        'tl_info_advantage': len(p2_rev) - len(p1_rev),
        'tl_p2_avg_reveal_turn': float(np.mean(reveal_turns_p2)) if reveal_turns_p2 else 30.0
    }

def extract_advanced_momentum(timeline: list) -> dict:
    p1_immune = 0; p2_forced = 0
    for i, turn in enumerate(timeline[:30]):
        if i == 0: continue
        prev = timeline[i-1]
        c1 = turn.get('p1_pokemon_state', {}).get('name')
        p1 = prev.get('p1_pokemon_state', {}).get('name')
        if c1 != p1 and not prev.get('p1_pokemon_state', {}).get('fainted'):
            m2_type = (turn.get('p2_move_details') or {}).get('type', '').lower()
            p1_types = [t.lower() for t in turn.get('p1_pokemon_state', {}).get('types', [])]
            if m2_type and p1_types and get_effectiveness(m2_type, p1_types) == 0.0:
                p1_immune += 1
        c2 = turn.get('p2_pokemon_state', {}).get('name')
        p2 = prev.get('p2_pokemon_state', {}).get('name')
        if c2 != p2 and not prev.get('p2_pokemon_state', {}).get('fainted'):
            if prev.get('p2_pokemon_state', {}).get('hp_pct', 1.0) < 0.50:
                p2_forced += 1
    return {'tl_p1_immune_switches': p1_immune, 'tl_p2_forced_switches': p2_forced}

def extract_gamestate_snapshots(timeline: list) -> dict:
    turns_lead = 0; hp_diff_10 = 0.0; hp_diff_20 = 0.0; hp_diff_end = 0.0
    for i, turn in enumerate(timeline[:30]):
        t = i + 1
        h1 = turn.get('p1_pokemon_state', {}).get('hp_pct', 0)
        h2 = turn.get('p2_pokemon_state', {}).get('hp_pct', 0)
        if h1 > h2: turns_lead += 1
        if t == 10: hp_diff_10 = h1 - h2
        if t == 20: hp_diff_20 = h1 - h2
        hp_diff_end = h1 - h2 # Ultimo stato disponibile
    return {
        'tl_turns_with_hp_lead': turns_lead,
        'tl_hp_diff_turn_10': float(hp_diff_10),
        'tl_hp_diff_turn_20': float(hp_diff_20),
        'tl_hp_diff_end': float(hp_diff_end)
    }

def extract_observed_mechanics(timeline: list) -> dict:
    p1_heals = 0; p2_heals = 0; p1_frz = 0; p2_frz = 0
    for i, turn in enumerate(timeline[:30]):
        if i == 0: continue
        prev = timeline[i-1]
        p1s = turn.get('p1_pokemon_state', {}); p1s_prev = prev.get('p1_pokemon_state', {})
        p2s = turn.get('p2_pokemon_state', {}); p2s_prev = prev.get('p2_pokemon_state', {})
        if p1s.get('name') == p1s_prev.get('name'):
             if p1s.get('hp_pct', 0) > p1s_prev.get('hp_pct', 0): p1_heals += 1
        if p2s.get('name') == p2s_prev.get('name'):
             if p2s.get('hp_pct', 0) > p2s_prev.get('hp_pct', 0): p2_heals += 1
        if p1s.get('status') == 'frz': p1_frz = 1
        if p2s.get('status') == 'frz': p2_frz = 1
    return {'tl_heal_diff': p1_heals - p2_heals, 'tl_freeze_adv': p2_frz - p1_frz}

# --- NUOVE FEATURE V5 (RUOLI E COESIONE) ---

def team_role_features(team: list, prefix: str = 'p1_') -> dict:
    """Estrae gli specialisti del team (Muro, Sweeper, etc.)"""
    if not team: return {}
    
    spe_list = []; bulk_list = []; offense_list = []
    for p in team:
        spe_list.append(p.get('base_spe', 0))
        bulk_list.append(p.get('base_hp', 1) * (p.get('base_def', 1) + p.get('base_spd', 1)))
        offense_list.append(p.get('base_atk', 1) + p.get('base_spa', 1))
        
    return {
        f'{prefix}fastest_spe': float(np.max(spe_list)),
        f'{prefix}slowest_spe': float(np.min(spe_list)),
        f'{prefix}max_bulk': float(np.max(bulk_list)),
        f'{prefix}max_offense': float(np.max(offense_list))
    }

def calculate_defensive_cohesion(team: list, prefix: str = 'p1_') -> dict:
    """Calcola quanto è debole il team a un singolo tipo (max debolezza comune)"""
    if not team: return {}
    
    weakness_counts = Counter()
    for atk_type in ALL_ATTACK_TYPES:
        count = 0
        for p in team:
            def_types = [t.lower() for t in p.get('types', [])]
            if not def_types: continue
            if get_effectiveness(atk_type, def_types) >= 2.0:
                count += 1
        weakness_counts[atk_type] = count
        
    return {
        f'{prefix}max_common_weakness': float(max(weakness_counts.values()))
    }

def role_vs_lead_comparison(p1_roles: dict, p2_lead: dict) -> dict:
    """Confronta gli specialisti P1 con il lead P2"""
    out = {}
    p2_lead_spe = p2_lead.get('base_spe', 0)
    p2_lead_offense = p2_lead.get('base_atk', 1) + p2_lead.get('base_spa', 1)
    
    out['p1_fastest_vs_lead_spe'] = p1_roles.get('p1_fastest_spe', 0) - p2_lead_spe
    out['p1_max_bulk_vs_lead_offense'] = p1_roles.get('p1_max_bulk', 1) / max(1, p2_lead_offense)
    return out


# ==============================================================================
# 5. MASTER FUNCTION (TUTTO INCLUSO)
# ==============================================================================

def prepare_record_features_COMPLETE_V5(record: dict, max_turns: int = 30) -> dict:
    out = {'battle_id': record.get('battle_id')}
    if 'player_won' in record: out['player_won'] = int(bool(record['player_won']))
    
    p1_team = record.get('p1_team_details', [])
    p2_lead = record.get('p2_lead_details', {})
    p1_lead = p1_team[0] if p1_team else {}
    tl = record.get('battle_timeline', [])
    tl_limited = tl[:max_turns]
    
    # --- V1: Feature Originali Statiche ---
    out.update(team_aggregate_features(p1_team, 'p1_'))
    out.update(lead_aggregate_features(p2_lead, 'p2_lead_'))
    out.update(ability_features(p1_team, 'p1_'))
    out.update(lead_vs_lead_features(p1_lead, p2_lead))
    out.update(ability_features([p2_lead], 'p2_lead_'))
    out['p1_intimidate_vs_lead'] = int(out.get('p1_ability_intimidate_count',0) > 0)
    
    # --- V1: Feature Originali Dinamiche ---
    out.update(summary_from_timeline(tl_limited, p1_team))
    out.update(extract_move_coverage_from_timeline(tl_limited, 'p1_'))
    out.update(extract_move_coverage_from_timeline(tl_limited, 'p2_'))
    out.update(extract_opponent_team_from_timeline(tl_limited, p1_team))
    out.update(quick_boost_features_v2(record)) # quick_boost usa 'record'
    out.update(momentum_features(tl_limited))
    
    # --- V1: Feature Originali Calcolate a Mano ---
    out['team_hp_sum_minus_p2lead_hp'] = out.get('p1_base_hp_sum', 0) - out.get('p2_lead_base_hp', 0)
    out['team_spa_mean_minus_p2spa'] = out.get('p1_base_spa_mean', 0) - out.get('p2_lead_base_spa', 0)
    out['speed_advantage'] = out.get('p1_base_spe_sum', 0) - out.get('p2_lead_base_spe', 0)
    out['n_unique_types_diff'] = out.get('p1_n_unique_types', 0) - out.get('p2_lead_n_unique_types', 1)
    p1_moves = max(out.get('tl_p1_moves',1),1); p2_moves = max(out.get('tl_p2_moves',1),1)
    out['damage_per_turn_diff'] = (out.get('tl_p1_est_damage',0.0)/p1_moves) - (out.get('tl_p2_est_damage',0.0)/p2_moves)
    out['last_pair'] = f"{out.get('tl_p1_last_active','')}_VS_{out.get('tl_p2_last_active','')}"
    out.update(calculate_type_advantage(p1_team, p2_lead))
    p2_lead_bulk = out.get('p2_lead_base_def',1) + out.get('p2_lead_base_spd',1)
    out['p1_se_options_vs_lead_bulk'] = out.get('p1_super_effective_options',0) / (p2_lead_bulk + 1e-6)
    
    if p2_team := record.get('p2_team_details', []):
        out.update(team_aggregate_features(p2_team, 'p2_'))
        out['team_hp_sum_diff'] = out.get('p1_base_hp_sum',0) - out.get('p2_base_hp_sum',0)
        out['team_spa_mean_diff'] = out.get('p1_base_spa_mean',0) - out.get('p2_base_spa_mean',0)
        out['team_spe_mean_diff'] = out.get('p1_base_spe_mean',0) - out.get('p2_base_spe_mean',0)
        out['n_unique_types_team_diff'] = out.get('p1_n_unique_types',0) - out.get('p2_n_unique_types',0)
        
    # --- NUOVE FEATURE (V2/V4) ---
    if tl_limited:
        out.update(extract_information_advantage(tl_limited))
        out.update(extract_advanced_momentum(tl_limited))
        out.update(extract_gamestate_snapshots(tl_limited))
        out.update(extract_observed_mechanics(tl_limited))
    else:
        out.update({
            'tl_p1_revealed_count': 1, 'tl_p2_revealed_count': 1, 'tl_info_advantage': 0,
            'tl_p2_avg_reveal_turn': 30.0, 'tl_p1_immune_switches': 0, 'tl_p2_forced_switches': 0,
            'tl_turns_with_hp_lead': 0, 'tl_hp_diff_turn_10': 0.0, 'tl_hp_diff_turn_20': 0.0,
            'tl_hp_diff_end': 0.0, 'tl_heal_diff': 0, 'tl_freeze_adv': 0
        })
        
    # --- NUOVE FEATURE (V5) ---
    p1_role_feats = team_role_features(p1_team, 'p1_')
    out.update(p1_role_feats)
    out.update(calculate_defensive_cohesion(p1_team, 'p1_'))
    out.update(role_vs_lead_comparison(p1_role_feats, p2_lead))
        
    return out

def create_features_from_raw(data: list, feature_func=prepare_record_features_COMPLETE_V5) -> pd.DataFrame:
    rows = []
    for b in tqdm(data, desc='FE (V5 Complete)'):
        try:
            feat = feature_func(b, max_turns=30)
            if 'battle_id' not in feat: feat['battle_id'] = b.get('battle_id')
            rows.append(feat)
        except Exception as e:
            rows.append({'battle_id': b.get('battle_id'), 'error': 1})
    df = pd.DataFrame(rows)
    if 'player_won' in df.columns:
        df['player_won'] = df['player_won'].astype(int)
    return df.fillna(0)

# ==============================================================================
# 5. ESECUZIONE
# ==============================================================================
train_df = create_features_from_raw(train_raw)
test_df = create_features_from_raw(test_raw)
print('Feature shape train/test:', train_df.shape, test_df.shape)
display(train_df.head())

FE (V5 Complete):   0%|          | 0/10000 [00:00<?, ?it/s]

FE (V5 Complete):   0%|          | 0/5000 [00:00<?, ?it/s]

Feature shape train/test: (10000, 307) (5000, 306)


Unnamed: 0,battle_id,player_won,p1_base_hp_sum,p1_base_hp_mean,p1_base_hp_max,p1_base_hp_min,p1_base_hp_std,p1_base_atk_sum,p1_base_atk_mean,p1_base_atk_max,...,tl_hp_diff_end,tl_heal_diff,tl_freeze_adv,p1_fastest_spe,p1_slowest_spe,p1_max_bulk,p1_max_offense,p1_max_common_weakness,p1_fastest_vs_lead_spe,p1_max_bulk_vs_lead_offense
0,0,1,695.0,115.833333,250.0,55.0,69.367179,435.0,72.5,110.0,...,0.279549,0,1,120.0,30.0,27500.0,220.0,3.0,5.0,157.142857
1,1,1,740.0,123.333333,250.0,65.0,64.204534,435.0,72.5,110.0,...,-0.32,0,0,110.0,30.0,27500.0,220.0,3.0,-10.0,148.648649
2,2,1,745.0,124.166667,250.0,60.0,64.382753,505.0,84.166667,130.0,...,-0.24,-1,0,110.0,30.0,27500.0,220.0,4.0,60.0,250.0
3,3,1,730.0,121.666667,250.0,60.0,65.362239,465.0,77.5,110.0,...,-0.06,2,0,110.0,30.0,27500.0,220.0,3.0,0.0,161.764706
4,4,1,685.0,114.166667,250.0,50.0,70.794107,455.0,75.833333,110.0,...,0.32,-1,0,120.0,30.0,27500.0,220.0,4.0,5.0,157.142857


# Preprocessing

In [12]:
# ====== Preprocessing (senza transformer sklearn) =========
import numpy as np
import pandas as pd

# base exclusions
exclude_cols = ['battle_id', 'player_won']
string_cols = train_df.select_dtypes(include=['object']).columns.tolist()
exclude_cols.extend(string_cols)

# tutte le colonne numeriche candidate
ALL_NUMERIC_FEATURES = [c for c in train_df.columns if c not in exclude_cols]

# flag per usare top features se necessario
use_top_features = False

# carica TOP100 se presente (comportamento invariato)
top100_path = r'top100_shap_features.csv'
try:
    top100_df = pd.read_csv(top100_path)
    TOP100 = [str(x).strip() for x in top100_df['feature'].tolist()]
except Exception:
    TOP100 = []

# --- INIZIO: filtro dalle keep_features_list se richiesto ---
features_filter = False  # imposta True per applicare il filtro, False per comportamento attuale
keep_list_path = 'keep_features_list.txt'

if features_filter:
    try:
        import os
        if os.path.exists(keep_list_path):
            keep_df = pd.read_csv(keep_list_path, header=None)
            keep_list = [str(x).strip() for x in keep_df.iloc[:, 0].tolist()]
            # mantieni solo feature numeriche valide presenti in ALL_NUMERIC_FEATURES
            filtered = [f for f in ALL_NUMERIC_FEATURES if f in keep_list]
            if filtered:
                # sovrascrive FEATURES più avanti: qui memorizziamo in temp
                FEATURES_FROM_KEEP = filtered
                print(f"features_filter=ON: trovato {len(filtered)} feature valide in {keep_list_path}")
            else:
                FEATURES_FROM_KEEP = None
                print(f"features_filter=ON: nessuna feature di {keep_list_path} presente in ALL_NUMERIC_FEATURES")
        else:
            FEATURES_FROM_KEEP = None
            print(f"features_filter=ON ma file {keep_list_path} non trovato. Nessun filtro applicato.")
    except Exception as e:
        FEATURES_FROM_KEEP = None
        print("Errore caricando keep_features_list.txt, nessun filtro applicato:", e)
else:
    FEATURES_FROM_KEEP = None
# --- FINE: filtro dalle keep_features_list ---

if use_top_features and TOP100:
    FEATURES = [f for f in TOP100 if f in ALL_NUMERIC_FEATURES]
elif features_filter:
    FEATURES = FEATURES_FROM_KEEP
else:
    FEATURES = ALL_NUMERIC_FEATURES

print(f'Num FEATURES numeriche rilevate (ALL): {len(ALL_NUMERIC_FEATURES)}')
print(f'Num FEATURES effettive usate (FEATURES): {len(FEATURES)}')
print(f'Num TOP100 caricate: {len(TOP100)}')

# costruisco DataFrame numerico raw
num_df = train_df[FEATURES].astype(float).replace([np.inf, -np.inf], np.nan)

# Imputazione semplice: usiamo la mediana per ogni feature calcolata sul train
medians = num_df.median()
train_imputed = num_df.fillna(medians)

# NON eseguo alcuno scaling: lascio i valori nella loro scala naturale
train_preproc_df = train_imputed.copy()

# target
y = train_df['player_won'].astype(int).values

# MODIFICATO: usa tutto il dataset per CV, nessun holdout
X = train_preproc_df.values
print('Preprocessing (no transformers) completato.')
print('Dataset completo size:', X.shape[0])
print('Preprocessed feature count:', len(FEATURES))

# Allinea e imputa test_df usando le mediane del train (coerente con l'imputazione sopra)
test_aligned = test_df.reindex(columns=FEATURES, fill_value=np.nan).astype(float).replace([np.inf, -np.inf], np.nan)
test_imputed = test_aligned.fillna(medians)
test_preproc_df = pd.DataFrame(test_imputed.values, columns=FEATURES, index=test_df.index)

# Variabili pronte per le celle successive:
# FEATURES, X, y, test_preproc_df

Num FEATURES numeriche rilevate (ALL): 300
Num FEATURES effettive usate (FEATURES): 300
Num TOP100 caricate: 0
Preprocessing (no transformers) completato.
Dataset completo size: 10000
Preprocessed feature count: 300


# Hyperparameter serch

In [19]:
# === OPTUNA HYPERPARAMETER TUNING (FIXED) ===
import optuna
import numpy as np
import xgboost as xgb
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import accuracy_score

# 1. Configurazione base
N_TRIALS = 150
TIMEOUT_SEC = 7200
EARLY_STOPPING_ROUNDS = 50
RANDOM_STATE = 42

print(f"Avvio ottimizzazione Optuna per {N_TRIALS} trial (o {TIMEOUT_SEC} secondi)...")

def objective(trial):
    # 2. Definizione dello spazio di ricerca
    params = {
        'objective': 'binary:logistic',
        'tree_method': 'hist',
        'eval_metric': 'logloss',
        'use_label_encoder': False,
        'n_jobs': -1,
        'random_state': RANDOM_STATE,
        # FIX: Early stopping ora va passato qui nel costruttore per le nuove versioni di XGBoost
        'early_stopping_rounds': EARLY_STOPPING_ROUNDS,
        # Iperparametri da ottimizzare
        'n_estimators': 2000,
        'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.15, log=True),
        'max_depth': trial.suggest_int('max_depth', 3, 8),
        'min_child_weight': trial.suggest_int('min_child_weight', 1, 10),
        'gamma': trial.suggest_float('gamma', 0.0, 5.0),
        'subsample': trial.suggest_float('subsample', 0.6, 1.0),
        'colsample_bytree': trial.suggest_float('colsample_bytree', 0.6, 1.0),
        'reg_alpha': trial.suggest_float('reg_alpha', 1e-3, 10.0, log=True),
        'reg_lambda': trial.suggest_float('reg_lambda', 1e-3, 10.0, log=True),
    }

    # 3. Cross-Validation interna
    cv_inner = StratifiedKFold(n_splits=5, shuffle=True, random_state=RANDOM_STATE)
    fold_scores = []

    for train_idx, val_idx in cv_inner.split(X, y):
        X_tr, X_val = X[train_idx], X[val_idx]
        y_tr, y_val = y[train_idx], y[val_idx]

        model = xgb.XGBClassifier(**params)

        # Fit corretto: early_stopping_rounds è già in params (costruttore)
        model.fit(
            X_tr, y_tr,
            eval_set=[(X_val, y_val)],
            verbose=False
        )

        # Valutazione (usa il miglior numero di iterazioni trovato)
        preds = model.predict(X_val)
        acc = accuracy_score(y_val, preds)
        fold_scores.append(acc)

        trial.report(acc, step=len(fold_scores)-1)
        if trial.should_prune():
             raise optuna.TrialPruned()

    return np.mean(fold_scores)

# 4. Avvio ottimizzazione
study = optuna.create_study(direction='maximize', pruner=optuna.pruners.MedianPruner())
study.optimize(objective, n_trials=N_TRIALS, timeout=TIMEOUT_SEC, show_progress_bar=True)

# 5. Risultati
print("\n=== RISULTATI OPTUNA ===")
print(f"Best Trial: {study.best_trial.number}")
print(f"Best Accuracy (CV media): {study.best_value:.4f}")
print("Best Params:")
for key, value in study.best_params.items():
    print(f"  {key}: {value}")

best_params_optuna = study.best_params
best_params_optuna.update({
    'objective': 'binary:logistic',
    'tree_method': 'hist',
    'eval_metric': 'logloss',
    'use_label_encoder': False,
    # Manteniamo n_estimators alto per il training finale,
    # ma potresti voler usare il valore medio trovato con early stopping se lo salvassi.
    # Per ora 1000 è un buon compromesso safe.
    'n_estimators': 1000
})

[I 2025-11-09 12:51:51,956] A new study created in memory with name: no-name-4c7854ab-d063-4f84-bb78-6d74304f305b


Avvio ottimizzazione Optuna per 150 trial (o 7200 secondi)...


  0%|          | 0/150 [00:00<?, ?it/s]

[I 2025-11-09 12:51:54,780] Trial 0 finished with value: 0.8295999999999999 and parameters: {'learning_rate': 0.057443186136344686, 'max_depth': 5, 'min_child_weight': 1, 'gamma': 4.430714555554042, 'subsample': 0.8668876182416037, 'colsample_bytree': 0.7135505134979183, 'reg_alpha': 0.020893581492795237, 'reg_lambda': 0.5280998338916184}. Best is trial 0 with value: 0.8295999999999999.
[I 2025-11-09 12:52:00,446] Trial 1 finished with value: 0.8295 and parameters: {'learning_rate': 0.026617626701253033, 'max_depth': 6, 'min_child_weight': 5, 'gamma': 3.6290418937760034, 'subsample': 0.7364479724550128, 'colsample_bytree': 0.8176109584584819, 'reg_alpha': 0.10884042327241734, 'reg_lambda': 0.07878737301093909}. Best is trial 0 with value: 0.8295999999999999.
[I 2025-11-09 12:52:10,629] Trial 2 finished with value: 0.8300000000000001 and parameters: {'learning_rate': 0.013215255640929511, 'max_depth': 5, 'min_child_weight': 9, 'gamma': 1.6289453860866572, 'subsample': 0.6280654777094311

# Cross validation

In [13]:
# === 10-Fold Cross-Validation con iperparametri FISSI ===
# IMPORTANTE: Assegna qui i migliori iperparametri trovati dalla cella precedente
# Oppure lascia questi di default (conservativi per ridurre overfitting)

best_params = {
    'objective': 'binary:logistic',
    'tree_method': 'hist',
    'eval_metric': 'logloss',
    'use_label_encoder': False,
    'n_estimators': 1000,
    'learning_rate': 0.019161090151695974,
    'max_depth': 3,
    'min_child_weight': 6,
    'gamma': 2.4267729113636345,
    'subsample': 0.6391418336680764,
    'colsample_bytree': 0.8034979811909722,
    'reg_alpha': 0.04656745646903133,
    'reg_lambda': 0.25114366021463247
}

print("=== 10-Fold Cross-Validation (tutto il dataset) ===")
print(f"Parametri utilizzati: {best_params}\n")

from sklearn.model_selection import StratifiedKFold
import xgboost as xgb
skf = StratifiedKFold(n_splits=10, shuffle=True, random_state=42)
outer_accuracies = []
folds_info = []
train_accuracies = []
train_val_gaps = []
outer_accuracies_opt = []

EARLY_STOPPING_ROUNDS = 50

def best_threshold_for_accuracy(y_true, proba, n_grid=201):
    y_true = np.asarray(y_true).astype(int)
    proba = np.asarray(proba).astype(float)
    grid = np.unique(np.quantile(proba, np.linspace(0, 1, n_grid)))
    best_thr, best_acc = 0.5, 0.0
    for t in grid:
        acc = ( ((proba >= t).astype(int) == y_true).mean() )
        if (acc > best_acc) or (abs(acc - best_acc) < 1e-12 and abs(t - 0.5) < abs(best_thr - 0.5)):
            best_acc, best_thr = float(acc), float(t)
    return best_thr, best_acc

def _fit_with_es(clf, X_tr, y_tr, X_val, y_val):
    """Fit con EarlyStopping via callback se supportato; fallback senza ES."""
    try:
        cb = getattr(xgb.callback, 'EarlyStopping', None)
        if cb is not None:
            clf.fit(X_tr, y_tr, eval_set=[(X_val, y_val)], callbacks=[cb(rounds=EARLY_STOPPING_ROUNDS, save_best=True, maximize=False)], verbose=False)
            return True
    except TypeError:
        pass
    clf.fit(X_tr, y_tr, eval_set=[(X_val, y_val)], verbose=False)
    return False

def _predict_proba_best(clf, X, best_iter=None, best_ntree_limit=None):
    """Version-safe predict_proba using either iteration_range (new) or ntree_limit (old)."""
    try:
        if best_iter is not None:
            return clf.predict_proba(X, iteration_range=(0, int(best_iter)+1))[:, 1]
    except TypeError:
        pass
    try:
        if best_ntree_limit is not None:
            return clf.predict_proba(X, ntree_limit=int(best_ntree_limit))[:, 1]
    except TypeError:
        pass
    return clf.predict_proba(X)[:, 1]

fold_idx = 0
for train_idx, val_idx in skf.split(X, y):  # MODIFICATO: usa X, y invece di X_train_val, y_train_val
    fold_idx += 1
    X_tr, X_val = X[train_idx], X[val_idx]  # MODIFICATO
    y_tr, y_val = y[train_idx], y[val_idx]  # MODIFICATO

    clf = XGBClassifier(**best_params)
    used_es = _fit_with_es(clf, X_tr, y_tr, X_val, y_val)

    best_iter = getattr(clf, 'best_iteration', None)
    try:
        booster = clf.get_booster()
    except Exception:
        booster = None
    best_ntree_limit = getattr(booster, 'best_ntree_limit', None) if booster is not None else None

    y_val_proba = _predict_proba_best(clf, X_val, best_iter, best_ntree_limit)
    y_pred = (y_val_proba >= 0.5).astype(int)
    acc = accuracy_score(y_val, y_pred)
    outer_accuracies.append(acc)

    y_tr_proba = _predict_proba_best(clf, X_tr, best_iter, best_ntree_limit)
    y_tr_pred = (y_tr_proba >= 0.5).astype(int)
    tr_acc = accuracy_score(y_tr, y_tr_pred)
    gap = float(tr_acc - acc)
    train_accuracies.append(tr_acc)
    train_val_gaps.append(gap)

    thr_acc, acc_opt = best_threshold_for_accuracy(y_val, y_val_proba, n_grid=301)
    outer_accuracies_opt.append(acc_opt)

    val_index_global = val_idx
    train_index_global = train_idx

    folds_info.append({
        'fold': fold_idx,
        'acc': float(acc),
        'train_acc': float(tr_acc),
        'gap_train_minus_val': float(gap),
        'acc_opt': float(acc_opt),
        'thr_acc': float(thr_acc),
        'best_iteration': int(best_iter) if best_iter is not None else None,
        'train_idx': train_idx,
        'val_idx': val_idx,
        'train_index_global': train_index_global,
        'val_index_global': val_index_global,
        'y_true': y_val.astype(int),
        'y_pred': y_pred.astype(int),
        'y_proba': y_val_proba.astype(float)
    })

    es_tag = 'with ES' if used_es else 'no ES'
    print(f'Fold {fold_idx}: {es_tag}, train={len(y_tr)}, val={len(y_val)}, acc_val={acc*100:.2f}%, acc_val_opt={acc_opt*100:.2f}% @thr={thr_acc:.3f}, acc_train={tr_acc*100:.2f}%, gap={(gap)*100:.2f}%')

print('\n' + '='*60)
print('Risultati Cross-Validation')
print('='*60)
for i, a in enumerate(outer_accuracies, 1):
    print(f'  Fold {i}: val_acc={a*100:.2f}%, val_acc_opt={outer_accuracies_opt[i-1]*100:.2f}% @thr={folds_info[i-1]["thr_acc"]:.3f}, train_acc={train_accuracies[i-1]*100:.2f}%, gap={train_val_gaps[i-1]*100:.2f}%')
print(f'\nMean CV accuracy (0.5): {np.mean(outer_accuracies)*100:.2f}%')
print(f'Mean CV accuracy (opt thr): {np.mean(outer_accuracies_opt)*100:.2f}%')
print(f'Mean train accuracy: {np.mean(train_accuracies)*100:.2f}%')
print(f'Mean gap (train - val): {np.mean(train_val_gaps)*100:.2f}%')
print(f'Std CV accuracy:  {np.std(outer_accuracies)*100:.2f}%')
print(f'Min/Max val acc:  {np.min(outer_accuracies)*100:.2f}% / {np.max(outer_accuracies)*100:.2f}%')

WORST_FOLD_IDX = int(np.argmin(outer_accuracies))
WORST_FOLD_NUM = int(folds_info[WORST_FOLD_IDX]['fold'])
print(f"\nPeggiore fold: #{WORST_FOLD_NUM} con acc_val={outer_accuracies[WORST_FOLD_IDX]*100:.2f}% | acc_val_opt={outer_accuracies_opt[WORST_FOLD_IDX]*100:.2f}% | acc_train={train_accuracies[WORST_FOLD_IDX]*100:.2f}% | gap={train_val_gaps[WORST_FOLD_IDX]*100:.2f}%")

=== 10-Fold Cross-Validation (tutto il dataset) ===
Parametri utilizzati: {'objective': 'binary:logistic', 'tree_method': 'hist', 'eval_metric': 'logloss', 'use_label_encoder': False, 'n_estimators': 1000, 'learning_rate': 0.019161090151695974, 'max_depth': 3, 'min_child_weight': 6, 'gamma': 2.4267729113636345, 'subsample': 0.6391418336680764, 'colsample_bytree': 0.8034979811909722, 'reg_alpha': 0.04656745646903133, 'reg_lambda': 0.25114366021463247}

Fold 1: no ES, train=9000, val=1000, acc_val=83.40%, acc_val_opt=83.80% @thr=0.426, acc_train=87.77%, gap=4.37%
Fold 2: no ES, train=9000, val=1000, acc_val=84.10%, acc_val_opt=84.50% @thr=0.536, acc_train=87.73%, gap=3.63%
Fold 3: no ES, train=9000, val=1000, acc_val=84.00%, acc_val_opt=84.30% @thr=0.522, acc_train=87.66%, gap=3.66%
Fold 4: no ES, train=9000, val=1000, acc_val=84.60%, acc_val_opt=85.20% @thr=0.569, acc_train=87.70%, gap=3.10%
Fold 5: no ES, train=9000, val=1000, acc_val=85.70%, acc_val_opt=86.00% @thr=0.457, acc_train=87

# Make submission

In [None]:
print("=== Submission con modello trainato su tutto il dataset ===")

# MODIFICATO: train su tutto il dataset
cv_submission_model = XGBClassifier(**best_params, use_label_encoder=False, eval_metric='logloss', random_state=42)
cv_submission_model.fit(X, y)  # MODIFICATO: usa X, y invece di X_train_val, y_train_val

test_aligned = test_df.reindex(columns=FEATURES, fill_value=0)
X_test_matrix = test_aligned.astype(float).to_numpy()
test_predictions = cv_submission_model.predict(X_test_matrix).astype(int)

submission_df = pd.DataFrame({
    'battle_id': test_df['battle_id'].astype(np.int64),
    'player_won': test_predictions.astype(np.int64)
})

submission_path = 'submission.csv'
submission_df.to_csv(submission_path, index=False)
print(f"✅ File di submission salvato in {submission_path}")
print(f"Modello trainato su {len(X)} samples (dataset completo)")
print(f"Stima CV accuracy: {np.mean(outer_accuracies)*100:.2f}% ± {np.std(outer_accuracies)*100:.2f}%")
print(submission_df.head())

=== Submission rapida post-CV ===
✅ File di submission salvato in submission.csv
   battle_id  player_won
0          0           0
1          1           1
2          2           1
3          3           1
4          4           1
