The core strategy in this Jupyter notebook is Stacked Generalization (Stacking), a highly effective form of Meta-Ensemble Learning.

Base Layer (Level 0) Modeling
A diverse set of Base Models (including Ridge Regression, QDA, SVC, Logistic Regression, and various tree-based ensembles like Hist Gradient Boosting, Random Forest, and Extra Trees) are trained. Each base model is trained on either the Timeline or Combat feature view.

The key output of this layer is the set of Out-of-Fold (OOF) probability predictions from all base models on the training data.

Meta Layer (Level 1) Modeling
The OOF probabilities generated by the Base Layer are concatenated to form a new, small feature matrix. This matrix is used to train a Meta-Model (Logistic Regression).

The Meta-Model learns how to optimally combine (weigh) the strengths and weaknesses of the base models' predictions to produce a final, improved probability score, which is then converted to the final prediction using an optimized probability threshold.

In [None]:
# Import necessary libraries for data manipulation, machine learning, and utilities.
import os, json, math, warnings
from typing import Dict, Any, List, Tuple
from collections import Counter, defaultdict

warnings.filterwarnings("ignore") # Suppress warnings

import numpy as np
import pandas as pd

from joblib import Parallel, delayed
import multiprocessing

from tqdm.auto import tqdm
from packaging.version import Version
from sklearn import __version__ as skl_version

# Import various scikit-learn modules for model selection, metrics, preprocessing, pipelines, and feature selection
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import accuracy_score
from sklearn.preprocessing import StandardScaler, PolynomialFeatures
from sklearn.pipeline import Pipeline, make_pipeline
from sklearn.feature_selection import SelectKBest, mutual_info_classif, VarianceThreshold
# Import various scikit-learn classifiers
from sklearn.linear_model import LogisticRegression, LogisticRegressionCV, RidgeClassifierCV, SGDClassifier
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis, QuadraticDiscriminantAnalysis
from sklearn.naive_bayes import GaussianNB, BernoulliNB
from sklearn.svm import LinearSVC, SVC
from sklearn.calibration import CalibratedClassifierCV
from sklearn.ensemble import ExtraTreesClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.cluster import KMeans
from sklearn.kernel_ridge import KernelRidge


In [None]:
import json
import pandas as pd
import os

# --- Define the path to our data ---
COMPETITION_NAME = 'fds-pokemon-battles-prediction-2025'
DATA_PATH = os.path.join('../input', COMPETITION_NAME)

train_file_path = os.path.join(DATA_PATH, 'train.jsonl')
test_file_path = os.path.join(DATA_PATH, 'test.jsonl')
train_data = []
test_data = []

# --- Function to load JSONL file safely ---
def load_jsonl(file_path):
    data = []
    if os.path.exists(file_path):
        with open(file_path, 'r') as f:
            for i, line in enumerate(f):
                data.append(json.loads(line))
        print(f"✅ Successfully loaded {len(data)} records from '{file_path}'")
    else:
        print(f"❌ File not found: '{file_path}'")
    return data

# Load train and test data
train_data = load_jsonl(train_file_path)
test_data = load_jsonl(test_file_path)

# Inspect the first battle in train data
if train_data:
    print("\n--- Structure of the first train battle ---")
    first_battle = train_data[0]

    # Make a copy and truncate the timeline for display
    battle_for_display = first_battle.copy()
    battle_for_display['battle_timeline'] = battle_for_display.get('battle_timeline', [])[:2]  # first 2 turns

    print(json.dumps(battle_for_display, indent=4))

    if len(first_battle.get('battle_timeline', [])) > 2:
        print("    ...")
        print("    (battle_timeline truncated for display)")

In [None]:

# Set a random state for reproducibility.
RANDOM_STATE = 42
np.random.seed(RANDOM_STATE)


In [None]:
# Define turn cutoffs for analysis (None means full timeline).
TURN_CUTOFFS = [None]


In [None]:
# Filter out battles where player 1's Pokemon are not level 100.
filtered = []
for battle in train_data:
    keep = True
    for pok in battle['p1_team_details']:
        if pok.get('level', 100) < 100: # Check if Pokemon level is less than 100
            keep = False
            break
    if keep:
        filtered.append(battle)

train_data = filtered # Update train_data with filtered battles
print(len(train_data))


9986


In [None]:
# Define the Gen 1 Pokemon type effectiveness chart.
GEN1_TYPES = {
    "normal": {"rock": 0.5, "ghost": 0.0, "steel": 0.5},
    "fire": {"fire": 0.5, "water": 0.5, "grass": 2.0, "ice": 2.0, "bug": 2.0, "rock": 0.5, "dragon": 0.5, "steel": 2.0},
    "water": {"fire": 2.0, "water": 0.5, "grass": 0.5, "ground": 2.0, "rock": 2.0, "dragon": 0.5},
    "electric": {"water": 2.0, "electric": 0.5, "grass": 0.5, "ground": 0.0, "flying": 2.0, "dragon": 0.5},
    "grass": {"fire": 0.5, "water": 2.0, "grass": 0.5, "poison": 0.5, "ground": 2.0, "flying": 0.5, "bug": 0.5, "rock": 2.0, "dragon": 0.5, "steel": 0.5},
    "ice": {"fire": 0.5, "water": 0.5, "grass": 2.0, "ground": 2.0, "flying": 2.0, "dragon": 2.0, "steel": 0.5},
    "fighting": {"normal": 2.0, "ice": 2.0, "poison": 0.5, "flying": 0.5, "psychic": 0.5, "bug": 0.5, "rock": 2.0, "ghost": 0.0, "dark": 2.0, "steel": 2.0, "fairy": 0.5},
    "poison": {"grass": 2.0, "poison": 0.5, "ground": 0.5, "rock": 0.5, "ghost": 0.5, "steel": 0.0, "fairy": 2.0},
    "ground": {"fire": 2.0, "electric": 2.0, "grass": 0.5, "poison": 2.0, "flying": 0.0, "bug": 0.5, "rock": 2.0, "steel": 2.0},
    "flying": {"electric": 0.5, "grass": 2.0, "fighting": 2.0, "bug": 2.0, "rock": 0.5, "steel": 0.5},
    "psychic": {"fighting": 2.0, "poison": 2.0, "psychic": 0.5, "dark": 0.0, "steel": 0.5},
    "bug": {"fire": 0.5, "grass": 2.0, "fighting": 0.5, "poison": 0.5, "flying": 0.5, "psychic": 2.0, "ghost": 0.5, "dark": 2.0, "steel": 0.5, "fairy": 0.5},
    "rock": {"fire": 2.0, "ice": 2.0, "fighting": 0.5, "ground": 0.5, "flying": 2.0, "bug": 2.0, "steel": 0.5},
    "ghost": {"normal": 0.0, "psychic": 2.0, "ghost": 2.0, "dark": 0.5},
    "dragon": {"dragon": 2.0, "steel": 0.5, "fairy": 0.0},
    "dark": {"fighting": 0.5, "psychic": 2.0, "ghost": 2.0, "dark": 0.5, "fairy": 0.5},
    "steel": {"fire": 0.5, "water": 0.5, "electric": 0.5, "ice": 2.0, "rock": 2.0, "steel": 0.5, "fairy": 2.0},
    "fairy": {"fire": 0.5, "fighting": 2.0, "poison": 0.5, "dragon": 2.0, "dark": 2.0, "steel": 0.5}
}


In [None]:
# Remove a specific battle from the training data.
train_data.remove(train_data[4877])


In [None]:
# Iterate and remove battles if any P1 Pokemon is below level 100.
for i, battle in enumerate(train_data):
  squadra1 = battle['p1_team_details']
  for pok in squadra1:
    livello = pok['level']
    if livello < 100:
      print(livello)
      train_data.remove(train_data[i])

print(len(train_data))


55
55
55
55
55
55
55
55
55
55
55
55
85
9986


In [None]:
# Calculate type effectiveness multiplier for a given attack and defense types.
def type_multiplier(attack_type: str, defend_types: List[str]) -> float:
    at = str(attack_type or "").lower()
    mult = 1.0
    chart = GEN1_TYPES.get(at, {}) # Get type chart for attack type
    for dt in defend_types or []: # Iterate through defender types
        mult *= chart.get(str(dt or "").lower(), 1.0)
    return mult


In [None]:
# Safely convert a value to float, returning a default if conversion fails.
def safe_float(x, default=1.0):
    try:
        return float(x)
    except:
        return default


In [None]:
# Extract unique species names seen in the opponent's timeline.
def species_seen_in_timeline(timeline: List[Dict[str, Any]]) -> set:
    names = set()
    for turn in timeline or []:
        name = (turn.get("p2_pokemon_state") or {}).get("name") # Get opponent's Pokemon name
        if name:
            names.add(str(name).lower())
    return names


In [None]:
# Discover all unique Pokemon types present in the dataset.
def discover_all_types(data: List[Dict[str, Any]]) -> List[str]:
    s = set()
    for b in data:
        for p in b.get("p1_team_details") or []:
            for t in p.get("types") or []:
                tl = str(t or "").lower()
                if tl and tl != "notype" and tl in GEN1_TYPES:
                    s.add(tl)
    return sorted(s)


In [None]:
# Get a sorted list of all unique Pokemon types from the training data.
ALL_TYPES = discover_all_types(train_data)


In [None]:
# Print the list of all discovered types and their count.
print(ALL_TYPES)
print(len(ALL_TYPES))


['dragon', 'electric', 'fire', 'flying', 'ghost', 'grass', 'ground', 'ice', 'normal', 'poison', 'psychic', 'rock', 'water']
13


Identify top species and moves to track as features

In [None]:
# Identify the top K most common Pokemon species from battle data.
def top_species(data: List[Dict[str, Any]], top_k=120) -> List[str]:
    c = Counter()
    for b in data:
        for p in b.get("p1_team_details") or []:
            nm = str(p.get("name","")).lower()
            if nm: c[nm]+=1
        for sp in species_seen_in_timeline(b.get("battle_timeline")): # Account for seen opponent Pokemon
            c[sp]+=0.5
    return [s for s,_ in c.most_common(top_k)]


In [None]:
# Get the top 120 most frequent Pokemon species.
TOP_SPECIES = top_species(train_data, 120)


In [None]:
# Print the first 5 top species.
print(TOP_SPECIES[:5])


['chansey', 'tauros', 'snorlax', 'exeggutor', 'alakazam']


In [None]:
# Map Pokemon names to their types from the training data.
NAME_TO_TYPES: Dict[str, List[str]] = {}
for b in train_data:
    for p in b.get("p1_team_details") or []:
        nm = str(p.get("name","")).lower()
        if nm and nm not in NAME_TO_TYPES:
            NAME_TO_TYPES[nm] = [t.lower() for t in (p.get("types") or []) if t and t!="notype"]


In [None]:
# Calculate STAB (Same-Type Attack Bonus) adjusted base power for a move.
def stab_adjusted_bp(move, attacker_name: str) -> float:
    if not move:
      return 0.0
    bp = float(move.get("base_power") or 0)
    if bp <= 0:
      return 0.0
    mtype = str(move.get("type") or "").lower()
    if not mtype or not attacker_name:
      return bp
    attacker_types = NAME_TO_TYPES.get(attacker_name, []) # Get types of the attacking Pokemon
    return bp * 1.5 if mtype in attacker_types else bp # Apply STAB if move type matches attacker's type


In [None]:
# Build prior probabilities of seeing certain Pokemon when facing a specific lead.
def build_lead_back_priors(train_data, top_species):
    co = defaultdict(lambda: Counter()) # Counter for lead-seen species pairs
    lead_cnt = Counter() # Counter for lead Pokemon occurrences
    for b in train_data:
        lead = (b.get("p2_lead_details") or {}).get("name","")
        lead = str(lead).lower()
        if not lead: continue
        seen = species_seen_in_timeline(b.get("battle_timeline"))
        lead_cnt[lead]+=1
        for s in seen:
            co[lead][s]+=1
    priors = {}
    for lead, cnt in lead_cnt.items():
        priors[lead] = {s: co[lead][s]/cnt for s in top_species}
    return priors


In [None]:
# Calculate prior probabilities for opponent's lead and seen Pokemon.
P_LEAD_BACK = build_lead_back_priors(train_data, TOP_SPECIES)


In [None]:
# Generate features based on prior probabilities of opponent's Pokemon given their lead.
def prior_features_for_lead(lead_name: str) -> Dict[str,float]:
    lead = str(lead_name or "").lower()
    row = P_LEAD_BACK.get(lead, {}) # Get prior probabilities for the lead Pokemon
    feats = {f"p2_prob_{s}_given_lead": float(row.get(s, 0.0)) for s in TOP_SPECIES}
    return feats


In [None]:
# Normalize move names for consistent comparison.
def normalize_move_name(name: str) -> str:
    if not name:
        return ""
    n = str(name).strip().lower() # Convert to lowercase and strip whitespace
    n = n.replace(" ", "").replace("-", "") # Remove spaces and hyphens
    return n


In [None]:
# Count the occurrences of each move in battles.
def move_counts(battles):
    c = Counter()
    for b in battles:
        for tr in b.get("battle_timeline") or []:
            for side in ("p1","p2"):
                md = tr.get(f"{side}_move_details") # Get move details for player side
                if md and md.get("name"):
                    c[normalize_move_name(md["name"])] += 1
    return c


In [None]:
# Build a vocabulary of frequently used moves based on coverage and minimum frequency.
def build_move_vocab_by_coverage(battles, target_coverage=0.985, min_freq=3):
    c = move_counts(battles)
    total = sum(c.values()) if c else 1
    running = 0
    vocab = []
    for m,f in c.most_common():
        if f < min_freq:
            break
        vocab.append(m)
        running += f
        if running / total >= target_coverage:
            break
    return vocab, c

TRACK_MOVES, MOVE_FREQ = build_move_vocab_by_coverage(train_data, target_coverage=0.985, min_freq=3)

print("TRACK_MOVES size:", len(TRACK_MOVES))


TRACK_MOVES size: 33


In [None]:
# Print the list of moves being tracked.
print(TRACK_MOVES)


['bodyslam', 'psychic', 'thunderwave', 'blizzard', 'thunderbolt', 'icebeam', 'seismictoss', 'earthquake', 'sleeppowder', 'softboiled', 'hyperbeam', 'rest', 'recover', 'reflect', 'lovelykiss', 'clamp', 'amnesia', 'drillpeck', 'surf', 'hypnosis', 'sing', 'stunspore', 'explosion', 'wrap', 'counter', 'doubleedge', 'nightshade', 'rockslide', 'megadrain', 'selfdestruct', 'confuseray', 'substitute', 'doublekick']


In [None]:
# Define sets of specific move categories.
STATUS_MOVES = {
    "thunderwave","stunspore","sleeppowder","sing","lovelykiss","toxic",
    "hypnosis","substitute","reflect","recover","softboiled","rest",
    "agility","swordsdance","amnesia","barrier","lightscreen"
}
PARTIAL_TRAP = {"wrap","clamp","bind","firespin"}
EXPLODE     = {"explosion","selfdestruct"}


In [None]:
# Define sets for healing and priority moves.
HEALING_MOVES = {
    "recover","softboiled","rest","synthesis","moonlight","morningsun",
    "leechseed","megadrain","gigaDrain","absorb","drainpunch"
}
PRIORITY_MOVES = {"quickattack"}


In [None]:

# Collect all unique moves observed in the training data.
def collect_all_moves(battles):
    moves = set()
    for b in battles:
        for tr in b.get("battle_timeline") or []:
            for side in ("p1", "p2"):
                md = tr.get(f"{side}_move_details")
                if md and md.get("name"):
                    mv = normalize_move_name(md["name"])
                    if mv:
                        moves.add(mv)
    return moves

ALL_MOVES = collect_all_moves(train_data)


In [None]:
# Dynamically identify status and explosive moves from battle timelines.
def dynamic_status_moves(battles):
    s = set()
    for b in battles:
        for tr in b.get("battle_timeline") or []:
            for side in ("p1","p2"):
                md = tr.get(f"{side}_move_details")
                if not md: continue
                mv = normalize_move_name(md["name"])
                cat = md.get("category"," схо").lower()
                if cat == "status": s.add(mv)
                if mv in {"toxic","thunderwave"}: s.add(mv)
    return s

def dynamic_explode(battles):
    boom = set()
    for b in battles:
        for tr in b.get("battle_timeline") or []:
            for side in ("p1","p2"):
                md = tr.get(f"{side}_move_details")
                if md:
                    mv = normalize_move_name(md["name"])
                    if "explode" in mv or "selfdestruct" in mv: # Check for explosion/self-destruct moves
                        boom.add(mv)
    return boom

DYN_STATUS  = dynamic_status_moves(train_data)
DYN_EXPLODE = dynamic_explode(train_data)


In [None]:
# Update and print the sets of move categories based on dynamically identified moves.
STATUS_MOVES   = (STATUS_MOVES | DYN_STATUS) & ALL_MOVES # Union with dynamic status moves and intersect with all moves
PARTIAL_TRAP   = PARTIAL_TRAP & ALL_MOVES
EXPLODE        = (EXPLODE | DYN_EXPLODE) & ALL_MOVES # Union with dynamic explode moves
HEALING_MOVES  = HEALING_MOVES & ALL_MOVES


print("STATUS_MOVES:", STATUS_MOVES)
print("HEALING_MOVES:", HEALING_MOVES)
print("PARTIAL_TRAP:", PARTIAL_TRAP)
print("EXPLODE:", EXPLODE)
print("PRIORITY_MOVES:", PRIORITY_MOVES)


STATUS_MOVES: {'agility', 'amnesia', 'substitute', 'hypnosis', 'stunspore', 'rest', 'recover', 'confuseray', 'thunderwave', 'softboiled', 'swordsdance', 'lovelykiss', 'sing', 'reflect', 'sleeppowder', 'toxic'}
HEALING_MOVES: {'megadrain', 'softboiled', 'rest', 'recover'}
PARTIAL_TRAP: {'firespin', 'wrap', 'clamp'}
EXPLODE: {'explosion', 'selfdestruct'}
PRIORITY_MOVES: {'quickattack'}


In [None]:
# Print the counts of moves in each category.
print(len(STATUS_MOVES))
print(len(PARTIAL_TRAP))
print(len(EXPLODE))
print(len(HEALING_MOVES))
print(len(PRIORITY_MOVES))


16
3
2
4
1


In [None]:
# Helper function to sum boosts from a dictionary
def sum_boosts(d):
    if not d: return 0.0
    s=0.0
    for v in d.values():
        try: s+=float(v)
        except: pass
    return s

# Calculate move counts for tracked moves and early status effects from the timeline.
def timeline_move_counts(timeline) -> Dict[str,float]:
    feats = {f"p1_mv_{m}":0.0 for m in TRACK_MOVES}
    feats.update({f"p2_mv_{m}":0.0 for m in TRACK_MOVES})
    feats["p1_mv_other"] = 0.0
    feats["p2_mv_other"] = 0.0

    early_status_score = 0.0
    for idx, turn in enumerate(timeline or [], start=1):
        for side in ("p1","p2"):
            md = turn.get(f"{side}_move_details")
            if md and md.get("name"):
                mv = normalize_move_name(md["name"])
                key = f"{side}_mv_{mv}" if mv in TRACK_MOVES else f"{side}_mv_other" # Assign to tracked move or 'other'
                feats[key] += 1.0

        if idx <= 3: # Check for early status effects within the first 3 turns
            s1 = (turn.get("p1_pokemon_state") or {}).get("status")
            s2 = (turn.get("p2_pokemon_state") or {}).get("status")
            if s2 in ("slp","frz","par"): early_status_score += 1.0
            if s1 in ("slp","frz","par"): early_status_score -= 1.0

    feats["early_status_score"] = early_status_score
    return feats


In [None]:
# Extract additional timeline-based features like move accuracy, healing, priority, and HP/status changes.
def timeline_additions(timeline: List[Dict[str, Any]]) -> Dict[str, float]:
    feats = {
        "p1_mult_mean": 1.0, "p2_mult_mean": 1.0,
        "p1_move_accuracy": 0.0, "p2_move_accuracy": 0.0,
        "p1_healing_moves": 0.0, "p2_healing_moves": 0.0,
        "p1_priority_moves": 0.0, "p2_priority_moves": 0.0,
        "p1_status_score": 0.0, "p2_status_score": 0.0,
        "p1_life_turn30": 1.0, "p2_life_turn30": 1.0,

        "mult_diff": 0.0,
        "diff_move_accuracy": 0.0,
        "diff_healing_moves": 0.0,
        "priority_diff": 0.0,
        "diff_status": 0.0,
        "life_diff_turn30": 0.0,
    }
    if not timeline:
        return feats

    prev_p1_hp = safe_float((timeline[0].get("p1_pokemon_state") or {}).get("hp_pct", 1.0))
    prev_p2_hp = safe_float((timeline[0].get("p2_pokemon_state") or {}).get("hp_pct", 1.0))

    p1_acc_hits = p1_acc_attempts = 0
    p2_acc_hits = p2_acc_attempts = 0
    p1_heal = p2_heal = 0
    p1_pri  = p2_pri  = 0

    mult_p1, mult_p2 = [], []

    def status_weight(s):
        return 1.0 if s=="par" else 2.0 if s=="slp" else 3.0 if s=="frz" else 0.0

    last_turn_seen = 0

    for tr in timeline:
        last_turn_seen = int(tr.get("turn", last_turn_seen))
        p1s = tr.get("p1_pokemon_state") or {}
        p2s = tr.get("p2_pokemon_state") or {}
        m1  = tr.get("p1_move_details") or {}
        m2  = tr.get("p2_move_details") or {}

        p1_hp = safe_float(p1s.get("hp_pct", prev_p1_hp))
        p2_hp = safe_float(p2s.get("hp_pct", prev_p2_hp))

        n1 = normalize_move_name(m1.get("name",""))
        n2 = normalize_move_name(m2.get("name",""))
        if n1 in HEALING_MOVES:  p1_heal += 1 # Count healing moves
        if n2 in HEALING_MOVES:  p2_heal += 1
        if n1 in PRIORITY_MOVES: p1_pri  += 1 # Count priority moves
        if n2 in PRIORITY_MOVES: p2_pri  += 1

        def damaging(m): return float(m.get("base_power") or 0) > 0
        if m1 and damaging(m1):
            p1_acc_attempts += 1
            if p2_hp < prev_p2_hp - 1e-9: # Check if opponent took damage
                p1_acc_hits += 1
        if m2 and damaging(m2):
            p2_acc_attempts += 1
            if p1_hp < prev_p1_hp - 1e-9: # Check if player took damage
                p2_acc_hits += 1

        def push_mult(m, defender_state, bag):
            if not damaging(m): return
            mt = str(m.get("type","")).lower()
            def_types = [t for t in defender_state.get("types", []) if t and t!="notype"]
            bag.append(type_multiplier(mt, def_types))
        push_mult(m1, p2s, mult_p1)
        push_mult(m2, p1s, mult_p2)

        # status score accumulation
        feats["p1_status_score"] += status_weight(p1s.get("status"))
        feats["p2_status_score"] += status_weight(p2s.get("status"))

        prev_p1_hp, prev_p2_hp = p1_hp, p2_hp

    target_turn = min(30, last_turn_seen if last_turn_seen>0 else 30)
    p1_t = p2_t = 1.0
    for tr in timeline:
        if int(tr.get("turn", 0)) <= target_turn:
            p1_t = safe_float((tr.get("p1_pokemon_state") or {}).get("hp_pct", p1_t))
            p2_t = safe_float((tr.get("p2_pokemon_state") or {}).get("hp_pct", p2_t))
    feats["p1_life_turn30"] = float(p1_t)
    feats["p2_life_turn30"] = float(p2_t)
    feats["life_diff_turn30"] = float(p1_t - p2_t)

    feats["p1_mult_mean"] = float(np.mean(mult_p1)) if mult_p1 else 1.0 # Average type multiplier for P1's attacks
    feats["p2_mult_mean"] = float(np.mean(mult_p2)) if mult_p2 else 1.0 # Average type multiplier for P2's attacks
    p1_acc = (p1_acc_hits / p1_acc_attempts) if p1_acc_attempts > 0 else 0.0
    p2_acc = (p2_acc_hits / p2_acc_attempts) if p2_acc_attempts > 0 else 0.0
    feats["p1_move_accuracy"] = float(p1_acc)
    feats["p2_move_accuracy"] = float(p2_acc)
    feats["p1_healing_moves"] = float(p1_heal)
    feats["p2_healing_moves"] = float(p2_heal)
    feats["p1_priority_moves"] = float(p1_pri)
    feats["p2_priority_moves"] = float(p2_pri)


    feats["mult_diff"] = feats["p1_mult_mean"] - feats["p2_mult_mean"]
    feats["diff_move_accuracy"] = feats["p1_move_accuracy"] - feats["p2_move_accuracy"]
    feats["diff_healing_moves"] = feats["p1_healing_moves"] - feats["p2_healing_moves"]
    feats["priority_diff"] = feats["p1_priority_moves"] - feats["p2_priority_moves"]
    feats["diff_status"] = feats["p2_status_score"] - feats["p1_status_score"]
    return feats


In [None]:
# Count occurrences of status, partial trap, explode, and STAB moves.
def move_buckets_features(timeline):
    b = {
        "p1_status_moves":0.0,"p2_status_moves":0.0,
        "p1_partial_trap":0.0,"p2_partial_trap":0.0,
        "p1_explodes":0.0,"p2_explodes":0.0,
        "p1_stab_hits":0.0,"p2_stab_hits":0.0
    }
    for tr in (timeline or []):
        p1s = (tr.get("p1_pokemon_state") or {})
        p2s = (tr.get("p2_pokemon_state") or {})
        for side, ms, opps in [("p1","p1_move_details","p2_pokemon_state"), ("p2","p2_move_details","p1_pokemon_state")]:
            md = tr.get(ms)
            if not md: continue
            mv = normalize_move_name(md.get("name",""))
            if not mv: continue
            if mv in STATUS_MOVES: b[f"{side}_status_moves"] += 1.0 # Count status moves
            if mv in PARTIAL_TRAP: b[f"{side}_partial_trap"] += 1.0 # Count partial trapping moves
            if mv in EXPLODE:      b[f"{side}_explodes"]     += 1.0 # Count exploding moves
            attacker = (tr.get(f"{side}_pokemon_state") or {}).get("name","")
            if (md.get("base_power",0) or 0) > 0:
                atypes = NAME_TO_TYPES.get(str(attacker).lower(), [])
                if str(md.get("type","")).lower() in atypes: # Check for STAB bonus
                    b[f"{side}_stab_hits"] += 1.0
    return b


In [None]:
# Count the number of turns each player's Pokemon spent under paralysis, sleep, or freeze status.
def status_turn_counts(timeline) -> Dict[str,float]:
    t = timeline or []
    c = {"p1_par":0.0,"p1_slp":0.0,"p1_frz":0.0,"p2_par":0.0,"p2_slp":0.0,"p2_frz":0.0}
    for turn in t:
        s1 = (turn.get("p1_pokemon_state") or {}).get("status")
        s2 = (turn.get("p2_pokemon_state") or {}).get("status")
        if s1 == "par": c["p1_par"] += 1 # Player 1 paralyzed
        if s1 == "slp": c["p1_slp"] += 1 # Player 1 asleep
        if s1 == "frz": c["p1_frz"] += 1 # Player 1 frozen
        if s2 == "par": c["p2_par"] += 1 # Player 2 paralyzed
        if s2 == "slp": c["p2_slp"] += 1 # Player 2 asleep
        if s2 == "frz": c["p2_frz"] += 1 # Player 2 frozen
    c["par_diff"] = c["p2_par"] - c["p1_par"] # Difference in paralysis turns
    c["slp_diff"] = c["p2_slp"] - c["p1_slp"] # Difference in sleep turns
    c["frz_diff"] = c["p2_frz"] - c["p1_frz"] # Difference in freeze turns
    return c


Status Effects & Critical Hits

In [None]:
# Extract features related to early crowd control (sleep, freeze) in the battle timeline.
def early_cc_features(timeline, window_size=5):
    feats = {"first_cc_winner":0.0, "early_sleep_hits":0.0, "early_freeze_hits":0.0}
    first_cc = None
    for _, turn in enumerate(timeline[:window_size] or [], start=1):
        s1 = (turn.get("p1_pokemon_state") or {}).get("status")
        s2 = (turn.get("p2_pokemon_state") or {}).get("status")
        if first_cc is None:
            if s1 in ("slp","frz") and s2 not in ("slp","frz"): first_cc = -1 # P1 inflicted CC first
            if s2 in ("slp","frz") and s1 not in ("slp","frz"): first_cc = +1 # P2 inflicted CC first
        if s2 == "slp": feats["early_sleep_hits"] += 1 # Count early sleep on P2
        if s1 == "slp": feats["early_sleep_hits"] -= 1 # Count early sleep on P1 (negative score)
        if s2 == "frz": feats["early_freeze_hits"] += 1 # Count early freeze on P2
        if s1 == "frz": feats["early_freeze_hits"] -= 1 # Count early freeze on P1 (negative score)
    feats["first_cc_winner"] = float(first_cc or 0)
    return feats


In [None]:
# Identify if Hyper Beam resulted in a KO for either player.
def hyper_beam_ko_flags(timeline):
    o = {"p1_hb_ko":0.0,"p2_hb_ko":0.0}
    for turn in timeline or []:
        m1 = (turn.get("p1_move_details") or {})
        m2 = (turn.get("p2_move_details") or {})
        # Check if P1 used Hyper Beam and P2 fainted
        if str(m1.get("name","")).lower().replace(" ","").replace("-","")=="hyperbeam" and (turn.get("p2_pokemon_state") or {}).get("status")=="fnt":
            o["p1_hb_ko"]=1.0
        # Check if P2 used Hyper Beam and P1 fainted
        if str(m2.get("name","")).lower().replace(" ","").replace("-","")=="hyperbeam" and (turn.get("p1_pokemon_state") or {}).get("status")=="fnt":
            o["p2_hb_ko"]=1.0
    return o


In [None]:
# Encode player team types and opponent lead types as features.
def encode_types_block(player_team: List[Dict[str, Any]], opponent_lead: Dict[str, Any]) -> Dict[str,float]:
    feats = {}
    player_types=[]
    for p in (player_team or []):
        for t in p.get("types") or []:
            tl = str(t or "").lower()
            if tl and tl in ALL_TYPES:
                player_types.append(tl)
    for t in ALL_TYPES:
        feats[f"p1_has_{t}"] = float(player_types.count(t)) # Feature: player 1 has this type
    opp_types = [str(t or "").lower() for t in (opponent_lead or {}).get("types", []) if str(t or "").lower() in ALL_TYPES]
    for t in ALL_TYPES:
        feats[f"p2_lead_is_{t}"] = 1.0 if t in opp_types else 0.0 # Feature: opponent lead is this type
    return feats


In [None]:
# Summarize base stats for player's team and compare with opponent's lead Pokemon.
def stat_summary_block(player_team: List[Dict[str, Any]], opponent_lead: Dict[str, Any]) -> Dict[str,float]:
    feats={}
    if not player_team: return feats
    cols = ["base_hp","base_atk","base_def","base_spa","base_spd","base_spe"]
    df = pd.DataFrame([{c:(p.get(c,0) or 0) for c in cols} for p in player_team])
    lead = {c:(opponent_lead.get(c,0) or 0) for c in cols}
    for c in cols:
        feats[f"p1_avg_{c[5:]}"] = float(df[c].mean()) # Average stat for player's team
        feats[f"lead_diff_{c[5:]}"] = float((df.iloc[0][c] if len(df)>0 else 0) - lead[c]) # Difference with opponent lead
    feats["p1_total_avg"] = float(df.sum(axis=1).mean())
    feats["p2_lead_total"] = float(sum(lead.values()))
    feats["total_diff"] = feats["p1_total_avg"] - feats["p2_lead_total"]
    feats["speed_adv"] = float(df["base_spe"].mean() - lead["base_spe"]) # Player speed advantage
    feats["p1_fastest_spe"] = float(df["base_spe"].max()) # Fastest Pokemon in player's team
    feats["p1_bulk_sum95"] = float(np.percentile(df["base_hp"]+df["base_def"]+df["base_spd"], 95)) # 95th percentile bulk
    return feats


In [None]:
# Calculate type and speed matchup features between player's team and opponent's lead.
def lead_matchup_block(player_team, opponent_lead) -> Dict[str,float]:
    feats={}
    if not player_team or not opponent_lead: return feats
    opp_types = [t for t in opponent_lead.get("types",[]) if t!="notype"]
    mults=[]
    for p in player_team:
        for t in p.get("types") or []:
            if t!="notype":
                mults.append(type_multiplier(t, opp_types)) # Calculate type multipliers
    if mults:
        feats["lead_type_avg"] = float(np.mean(mults))
        feats["lead_type_max"] = float(np.max(mults))
        feats["lead_type_se"]  = float(sum(1 for m in mults if m>=2.0)) # Count super effective hits
        feats["lead_type_res"] = float(sum(1 for m in mults if m<=0.5)) # Count resisted hits
    fastest = max((p.get("base_spe",0) for p in player_team), default=0)
    feats["lead_speed_gap_max"] = float(fastest - (opponent_lead.get("base_spe",0) or 0)) # Speed gap with fastest P1
    return feats


In [None]:
# Create a dictionary mapping Pokemon names to their details from the training data.
POKEMON_BY_NAME = {}
for b in train_data:
    for p in b.get("p1_team_details") or []:
        nm = str(p.get("name","")).lower()
        if nm and nm not in POKEMON_BY_NAME:
            POKEMON_BY_NAME[nm] = p


In [None]:

# Approximate opponent's team based on species seen in the timeline and known Pokemon details.
def approximate_p2_team(timeline: List[Dict[str, Any]]) -> List[Dict[str,Any]]:
    team, seen = [], set()
    for nm in species_seen_in_timeline(timeline):
        if nm in POKEMON_BY_NAME and nm not in seen:
            team.append(POKEMON_BY_NAME[nm]); seen.add(nm)
    return team


In [None]:

# Calculate overall type matchup advantages between player's and approximated opponent's teams.
def type_matchup_block(player_team, opponent_team_approx) -> Dict[str,float]:
    feats={}
    p_off, o_off = [], []
    # Calculate offensive type multipliers for player 1
    for atk in (player_team or []):
        atk_types = [t for t in atk.get("types",[]) if t!="notype"]
        for d in (opponent_team_approx or []):
            def_types = [t for t in d.get("types",[]) if t!="notype"]
            for t in atk_types:
                p_off.append(type_multiplier(t, def_types))
    # Calculate offensive type multipliers for opponent
    for atk in (opponent_team_approx or []):
        atk_types = [t for t in atk.get("types",[]) if t!="notype"]
        for d in (player_team or []):
            def_types = [t for t in d.get("types",[]) if t!="notype"]
            for t in atk_types:
                o_off.append(type_multiplier(t, def_types))
    if p_off:
        feats["p1_type_avg"] = float(np.mean(p_off))
        feats["p1_type_max"] = float(np.max(p_off))
        feats["p1_type_se"]  = float(sum(1 for m in p_off if m>=2.0))
        feats["p1_type_res"] = float(sum(1 for m in p_off if m<=0.5))
    if o_off:
        feats["p2_type_avg"] = float(np.mean(o_off))
        feats["p2_type_se"]  = float(sum(1 for m in o_off if m>=2.0))
    feats["type_adv"] = feats.get("p1_type_avg",1.0) - feats.get("p2_type_avg",1.0) # Player 1 type advantage
    feats["type_se_diff"] = feats.get("p1_type_se",0.0) - feats.get("p2_type_se",0.0) # Difference in super effective hits
    return feats


In [None]:
# Calculate base stat differences between player's and approximated opponent's teams.
def base_stat_diffs_vs_team(player_team: List[Dict[str, Any]], opponent_team_approx: List[Dict[str, Any]]) -> Dict[str, float]:
    feats = {}
    keys = ["hp","atk","def","spa","spd","spe","special"]
    for k in keys:
        feats[f"base_{k}_diff_vs_team"] = 0.0
    feats["p2_seen_count"] = float(len(opponent_team_approx or []))
    if not player_team or not opponent_team_approx:
        return feats

    cols = ["base_hp","base_atk","base_def","base_spa","base_spd","base_spe"]

    p1_df = pd.DataFrame([{c: float(p.get(c, 0) or 0) for c in cols} for p in player_team])
    p2_df = pd.DataFrame([{c: float(p.get(c, 0) or 0) for c in cols} for p in opponent_team_approx])

    p1_avg = p1_df.mean(axis=0)
    p2_avg = p2_df.mean(axis=0)

    feats["base_hp_diff_vs_team"]  = float(p1_avg["base_hp"]  - p2_avg["base_hp"])
    feats["base_atk_diff_vs_team"] = float(p1_avg["base_atk"] - p2_avg["base_atk"])
    feats["base_def_diff_vs_team"] = float(p1_avg["base_def"] - p2_avg["base_def"])
    feats["base_spa_diff_vs_team"] = float(p1_avg["base_spa"] - p2_avg["base_spa"])
    feats["base_spd_diff_vs_team"] = float(p1_avg["base_spd"] - p2_avg["base_spd"])
    feats["base_spe_diff_vs_team"] = float(p1_avg["base_spe"] - p2_avg["base_spe"])

    p1_special = 0.5 * (p1_avg["base_spa"] + p1_avg["base_spd"])
    p2_special = 0.5 * (p2_avg["base_spa"] + p2_avg["base_spd"])
    feats["special_diff_vs_team"] = float(p1_special - p2_special)

    feats["base_spe_max_gap_vs_team"] = float(p1_df["base_spe"].max() - p2_df["base_spe"].max())
    feats["base_bulk95_gap_vs_team"] = float(
        np.percentile(p1_df["base_hp"]+p1_df["base_def"]+p1_df["base_spd"], 95)
        - np.percentile(p2_df["base_hp"]+p2_df["base_def"]+p2_df["base_spd"], 95)
    )
    return feats


In [None]:
# Generate bag-of-species features indicating presence of top species in player's team and opponent's seen Pokemon.
def bag_of_species_features(player_team, opponent_seen) -> Dict[str,float]:
    feats={}
    player_names = [str(p.get("name","")).lower() for p in (player_team or [])]
    player_set = set(player_names)
    for s in TOP_SPECIES:
        feats[f"p1_has_{s}"] = 1.0 if s in player_set else 0.0 # Feature if player 1 has this species
        feats[f"p2_seen_{s}"] = 1.0 if s in opponent_seen else 0.0 # Feature if opponent has shown this species
    return feats


In [None]:
# Calculate mean, standard deviation, min, and max for a given array.
def window_stats(arr):
    if len(arr)==0:
        return {"mean":0.0,"std":0.0,"min":0.0,"max":0.0}
    return {"mean": float(np.mean(arr)), "std": float(np.std(arr)), "min": float(np.min(arr)), "max": float(np.max(arr))}


In [None]:
# Calculate HP differential and its derivative statistics over various windows in the timeline.
def hp_windows_features(timeline):
    t = timeline or []
    p_hp = np.array([safe_float((tr.get("p1_pokemon_state") or {}).get("hp_pct",1.0)) for tr in t], dtype=float)
    o_hp = np.array([safe_float((tr.get("p2_pokemon_state") or {}).get("hp_pct",1.0)) for tr in t], dtype=float)
    diff = p_hp - o_hp # HP differential
    deriv = np.diff(diff) if len(diff) >= 2 else np.array([], dtype=float) # Derivative of HP differential
    def band(a,b):
        sw = window_stats(diff[a:b] if len(diff) else np.array([]))
        sd = window_stats(deriv[a:b] if len(deriv) else np.array([]))
        return {
            f"hp_diff_{a}_{b}_mean": sw["mean"],
            f"hp_diff_{a}_{b}_std":  sw["std"],
            f"hp_diff_{a}_{b}_min":  sw["min"],
            f"hp_diff_{a}_{b}_max":  sw["max"],
            f"hp_mom_{a}_{b}_mean":  sd["mean"],
            f"hp_mom_{a}_{b}_std":   sd["std"],
        }
    feats = {}
    feats.update(band(0, min(3, len(diff)))) # Features for first 3 turns
    feats.update(band(0, min(5, len(diff)))) # Features for first 5 turns
    feats.update(band(5, min(10, len(diff)))) # Features for turns 5 to 10
    return feats


In [None]:
# Extract features related to the timing of the first paralysis, sleep, or freeze status infliction/reception.
def first_status_timing_features(timeline):
    t = timeline or []
    INF = 999.0
    def find_first(status, side):
        for i, tr in enumerate(t, start=1):
            st = (tr.get(f"{side}_pokemon_state") or {}).get("status")
            if st == status: return float(i)
        return INF
    feats={}
    for st in ("par","slp","frz"):
        o_turn = find_first(st, "p2") # Turn opponent received status
        p_turn = find_first(st, "p1") # Turn player received status
        feats[f"first_{st}_we_inflict"] = o_turn
        feats[f"first_{st}_we_receive"] = p_turn
        feats[f"first_{st}_diff"] = p_turn - o_turn
    feats["first_par_diff"] = feats.get("first_par_we_receive",999.0) - feats.get("first_par_we_inflict",999.0)
    feats["first_slp_diff"] = feats.get("first_slp_we_receive",999.0) - feats.get("first_slp_we_inflict",999.0)
    feats["first_frz_diff"] = feats.get("first_frz_we_receive",999.0) - feats.get("first_frz_we_inflict",999.0)
    return feats


Transform raw battle data into meaningful features

In [None]:
# Extract core timeline features including battle length, HP differences, switches, faints, status, damage, and boosts.
def timeline_core_features(timeline) -> Dict[str,float]:
    feats = {}
    t = timeline or []
    if not t:
        feats.update({
            "battle_len":0.0,"faint_diff":0.0,"hp_diff_final":0.0,"hp_auc":0.0,
            "time_in_lead":0.0,"first_faint_adv":0.0,"effective_damage_diff":0.0,
            "base_power_diff":0.0,"boost_adv":0.0,"status_adv":0.0,
            "eff_damage_ratio":1.0,"bp_ratio":1.0
        })
        return feats
    feats["battle_len"] = float(len(t))
    p_sw = o_sw = 0
    prev_p = prev_o = None
    auc = 0.0; lead = 0.0
    p_f = o_f = 0
    p_st = o_st = 0
    p_bp = o_bp = 0.0
    p_eff = o_eff = 0.0
    p_boost = o_boost = 0.0
    p_f_first = math.inf; o_f_first = math.inf
    for idx, turn in enumerate(t, start=1):
        ps = turn.get("p1_pokemon_state") or {}
        os = turn.get("p2_pokemon_state") or {}
        pm = turn.get("p1_move_details")
        om = turn.get("p2_move_details")
        pn = ps.get("name"); on = os.get("name")

        if prev_p is not None and pn and pn != prev_p: p_sw += 1 # Count player 1 switches
        if prev_o is not None and on and on != prev_o: o_sw += 1 # Count opponent switches
        prev_p, prev_o = pn, on

        php = safe_float(ps.get("hp_pct",1.0), 1.0)
        ohp = safe_float(os.get("hp_pct",1.0), 1.0)
        d = php - ohp
        auc += d # Accumulate HP difference for AUC
        if d > 0: lead += 1 # Count turns player 1 is in the lead by HP

        pstatus = ps.get("status"); ostatus = os.get("status")
        if pstatus and pstatus not in ("nostatus","fnt"): p_st += 1 # Count player 1 status turns
        if ostatus and ostatus not in ("nostatus","fnt"): o_st += 1 # Count opponent status turns
        if pstatus == "fnt":
            p_f += 1 # Count player 1 faints
            if p_f_first == math.inf: p_f_first = idx # Record first faint turn
        if ostatus == "fnt":
            o_f += 1 # Count opponent faints
            if o_f_first == math.inf: o_f_first = idx # Record first faint turn

        p_boost += sum_boosts(ps.get("boosts")) # Sum player 1 boosts
        o_boost += sum_boosts(os.get("boosts")) # Sum opponent boosts

        if pm and float(pm.get("base_power") or 0) > 0:
            adj = stab_adjusted_bp(pm, str(pn or "").lower())
            mt = str(pm.get("type") or "").lower()
            otypes = [x for x in os.get("types",[]) if x!="notype"]
            p_bp += adj # Accumulate player 1 base power
            p_eff += adj * type_multiplier(mt, otypes) # Accumulate player 1 effective damage
        if om and float(om.get("base_power") or 0) > 0:
            adj = stab_adjusted_bp(om, str(on or "").lower())
            mt = str(om.get("type") or "").lower()
            ptypes = [x for x in ps.get("types",[]) if x!="notype"]
            o_bp += adj # Accumulate opponent base power
            o_eff += adj * type_multiplier(mt, ptypes) # Accumulate opponent effective damage

    feats["p1_switches"] = float(p_sw)
    feats["p2_switches"] = float(o_sw)
    feats["switch_diff"] = float(o_sw - p_sw)
    feats["hp_auc"] = float(auc)
    feats["time_in_lead"] = float(lead)
    feats["hp_diff_final"] = float(
        safe_float((t[-1].get("p1_pokemon_state") or {}).get("hp_pct",1.0)) -
        safe_float((t[-1].get("p2_pokemon_state") or {}).get("hp_pct",1.0))
    )
    feats["p1_status_turns"] = float(p_st)
    feats["p2_status_turns"] = float(o_st)
    feats["status_adv"] = float(o_st - p_st) # Opponent status turns advantage
    feats["p1_faints"] = float(p_f)
    feats["p2_faints"] = float(o_f)
    feats["faint_diff"] = float(o_f - p_f) # Opponent faint advantage
    feats["base_power_diff"] = float(p_bp - o_bp)
    feats["effective_damage_diff"] = float(p_eff - o_eff)
    feats["boost_adv"] = float(p_boost - o_boost) # Player boost advantage

    pft = p_f_first if p_f_first < math.inf else 999.0
    oft = o_f_first if o_f_first < math.inf else 999.0
    feats["first_faint_adv"] = float(oft - pft) # Advantage in first faint

    p_series = np.array([safe_float((tr.get("p1_pokemon_state") or {}).get("hp_pct",1.0)) for tr in t])
    o_series = np.array([safe_float((tr.get("p2_pokemon_state") or {}).get("hp_pct",1.0)) for tr in t])
    diff_series = p_series - o_series
    feats["hp_diff_std"] = float(np.std(diff_series)) if len(diff_series)>1 else 0.0 # Std dev of HP difference
    feats["hp_diff_deriv_abs_mean"] = float(np.mean(np.abs(np.diff(diff_series)))) if len(diff_series)>2 else 0.0 # Mean absolute derivative of HP difference

    def time_below(arr, thr):
      if len(arr)==0: return 0.0
      return float(np.mean(arr <= thr))

    feats["p1_time_below_50"] = time_below(p_series, 0.5) # Proportion of time P1 is below 50% HP
    feats["p2_time_below_50"] = time_below(o_series, 0.5) # Proportion of time P2 is below 50% HP
    feats["below50_diff"] = feats["p2_time_below_50"] - feats["p1_time_below_50"]
    feats["eff_damage_ratio"] = (p_eff + 1e-3) / (o_eff + 1e-3)
    feats["bp_ratio"] = (p_bp + 1e-3) / (o_bp + 1e-3)
    return feats


In [None]:
# Generate interaction features by combining existing feature values.
def interaction_features(fd: Dict[str,float]) -> Dict[str,float]:
    out={}
    if "faint_diff" in fd and "hp_diff_final" in fd:
        out["faint_hp_compound"] = fd["faint_diff"] * fd["hp_diff_final"] # Compound feature of faints and final HP
    if "status_adv" in fd and "effective_damage_diff" in fd:
        out["status_damage_compound"] = fd["status_adv"] * fd["effective_damage_diff"] # Compound feature of status and damage
    if "type_adv" in fd and "effective_damage_diff" in fd:
        out["type_damage_synergy"] = fd["type_adv"] * fd["effective_damage_diff"] # Synergy between type advantage and damage
    if "hp_auc" in fd and "time_in_lead" in fd:
        out["lead_control_score"] = fd["hp_auc"] * (1.0 + 0.01 * fd["time_in_lead"]) # Score for lead control
    return out


In [None]:
# Calculate features related to critical pressure based on Pokemon speed stats.
def crit_pressure_features(player_team, opponent_lead):
    sp = [p.get("base_spe",0) for p in (player_team or [])]
    p80 = np.percentile(sp, 80) if sp else 0.0 # 80th percentile of player's team speed
    gap = (max(sp) if sp else 0.0) - float((opponent_lead or {}).get("base_spe",0)) # Speed gap with opponent lead
    return {"crit_pressure_p80": float(p80), "lead_crit_gap": float(gap)}


In [None]:

# Add polynomial features and interaction terms to the DataFrame.
def add_compact_polys(df: pd.DataFrame) -> pd.DataFrame:
    new_df = df.copy()
    if {"p1_faints","p2_faints"}.issubset(new_df.columns):
        new_df["faint_ratio_safe"] = (new_df["p2_faints"] + 0.5) / (new_df["p1_faints"] + 0.5) # Safe ratio of faints
    if {"time_in_lead","battle_len"}.issubset(new_df.columns):
        new_df["lead_share"] = np.where(new_df["battle_len"] > 0, new_df["time_in_lead"] / new_df["battle_len"], 0.0) # Share of battle time in lead
    if {"hp_auc","battle_len"}.issubset(new_df.columns):
        new_df["hp_auc_norm"] = np.where(new_df["battle_len"] > 0, new_df["hp_auc"]/new_df["battle_len"], 0.0) # Normalized HP AUC
    for col in ("hp_diff_std","hp_diff_deriv_abs_mean","effective_damage_diff","hp_auc","lead_control_score"):
        if col in new_df.columns:
            new_df[col] = np.clip(new_df[col], np.percentile(new_df[col],1), np.percentile(new_df[col],99)) # Clip outliers
    poly_cols = [c for c in [
        "lead_share","hp_auc_norm","effective_damage_diff","base_power_diff",
        "type_adv","lead_control_score","faint_diff","status_adv","lead_speed_gap_max",
        "eff_damage_ratio","bp_ratio","below50_diff"
    ] if c in new_df.columns]
    if poly_cols:
        poly = PolynomialFeatures(degree=2, include_bias=False) # Create polynomial features
        feats = poly.fit_transform(new_df[poly_cols])
        names = [f"poly_{n}" for n in poly.get_feature_names_out(poly_cols)]
        new_df = pd.concat([new_df, pd.DataFrame(feats, columns=names, index=new_df.index)], axis=1)
    return new_df


Captures who's winning right now

In [None]:
# Calculate momentum features based on HP differentials over different turn windows.
def momentum_features(timeline):
    feats={}
    timeline = timeline or []
    if len(timeline) < 1:
        for cp in [5,10,15,20,25]: # Initialize momentum features to 0 if timeline is empty
            feats[f"momentum_{cp}"]=0.0
            feats[f"momentum_trend_{cp}"]=0.0
        return feats
    for cp in [5,10,15,20,25]:
        if cp <= len(timeline):
            turns = timeline[:cp]
            diffs=[]
            for tr in turns:
                p1 = safe_float((tr.get("p1_pokemon_state") or {}).get("hp_pct",1.0))
                p2 = safe_float((tr.get("p2_pokemon_state") or {}).get("hp_pct",1.0))
                diffs.append(p1 - p2) # Store HP differences
            feats[f"momentum_{cp}"] = float(np.mean(diffs[-3:])) if len(diffs)>=1 else 0.0 # Mean of last 3 HP differences
            feats[f"momentum_trend_{cp}"] = float(np.mean(np.diff(diffs[-4:]))) if len(diffs) > 3 else 0.0 # Trend of last 4 HP differences
        else:
            feats[f"momentum_{cp}"]=0.0
            feats[f"momentum_trend_{cp}"]=0.0
    return feats


In [None]:
# Analyze timeline for comebacks and largest HP swing features.
def critical_turn_features(timeline):
    feats={"p1_comebacks":0.0,"p2_comebacks":0.0,"largest_swing_p1":0.0,"largest_swing_p2":0.0}
    timeline = timeline or []
    if len(timeline) < 2: return feats
    diffs=[]
    for tr in timeline:
        p1 = safe_float((tr.get("p1_pokemon_state") or {}).get("hp_pct",1.0))
        p2 = safe_float((tr.get("p2_pokemon_state") or {}).get("hp_pct",1.0))
        diffs.append(p1 - p2) # Calculate HP differential for each turn
    changes = np.diff(diffs) # Calculate change in HP differential
    feats["largest_swing_p1"] = float(np.max(changes)) if len(changes)>0 else 0.0 # Largest positive swing (P1 comeback)
    feats["largest_swing_p2"] = float(np.min(changes)) if len(changes)>0 else 0.0 # Largest negative swing (P2 comeback)
    for i in range(1, len(diffs)):
        if diffs[i-1] < 0 and diffs[i] > 0: feats["p1_comebacks"] += 1 # Count P1 comebacks
        elif diffs[i-1] > 0 and diffs[i] < 0: feats["p2_comebacks"] += 1 # Count P2 comebacks
    return feats


In [None]:
# Calculate features related to move effectiveness, counting super-effective and resisted hits.
def move_effectiveness_features(timeline):
    feats={}
    p1_se = p2_se = 0 # Player 1 and Player 2 super effective hits
    p1_res = p2_res = 0 # Player 1 and Player 2 resisted hits
    for tr in timeline or []:
        p1m = tr.get("p1_move_details"); p2m = tr.get("p2_move_details")
        p1s = tr.get("p1_pokemon_state") or {}
        p2s = tr.get("p2_pokemon_state") or {}
        if p1m and float(p1m.get("base_power",0)) > 0: # If P1 made a damaging move
            mtype = str(p1m.get("type","度に")).lower()
            def_types = [t for t in p2s.get("types", []) if t != "notype"]
            mult = type_multiplier(mtype, def_types)
            if mult >= 2.0: p1_se += 1 # Super effective hit
            elif mult <= 0.5: p1_res += 1 # Resisted hit
        if p2m and float(p2m.get("base_power",0)) > 0: # If P2 made a damaging move
            mtype = str(p2m.get("type","度に")).lower()
            def_types = [t for t in p1s.get("types", []) if t != "notype"]
            mult = type_multiplier(mtype, def_types)
            if mult >= 2.0: p2_se += 1
            elif mult <= 0.5: p2_res += 1
    feats["effective_hits_diff"] = float(p1_se - p2_se) # Difference in super effective hits
    feats["resisted_hits_diff"] = float(p2_res - p1_res) # Difference in resisted hits
    feats["hit_quality_score"] = feats["effective_hits_diff"] - feats["resisted_hits_diff"] # Overall hit quality
    return feats


In [None]:
# Aggregate all feature extraction functions to create a comprehensive feature set for a given battle.
def extract_example_features(battle: Dict[str,Any], max_turn=None) -> Dict[str,float]:
    feats = {"battle_id": battle.get("battle_id")}
    pteam = battle.get("p1_team_details") or []
    opp_lead = battle.get("p2_lead_details") or {}
    timeline = battle.get("battle_timeline") or []
    if max_turn is not None:
        timeline = [t for t in timeline if int(t.get("turn",0)) <= max_turn] # Limit timeline to max_turn
    opp_approx = approximate_p2_team(timeline) # Approximate opponent's team


    feats.update(prior_features_for_lead((opp_lead or {}).get("name","")))
    feats.update(encode_types_block(pteam, opp_lead))
    feats.update(stat_summary_block(pteam, opp_lead))
    feats.update(lead_matchup_block(pteam, opp_lead))

    feats.update(base_stat_diffs_vs_team(pteam, opp_approx))

    feats.update(type_matchup_block(pteam, opp_approx))
    feats.update(timeline_core_features(timeline))
    feats.update(interaction_features(feats))
    feats.update(bag_of_species_features(pteam, species_seen_in_timeline(timeline)))
    feats.update(timeline_move_counts(timeline))
    feats.update(move_buckets_features(timeline))
    feats.update(status_turn_counts(timeline))
    feats.update(early_cc_features(timeline, window_size=5))
    feats.update(hyper_beam_ko_flags(timeline))
    feats.update(crit_pressure_features(pteam, opp_lead))
    feats.update(hp_windows_features(timeline))
    feats.update(first_status_timing_features(timeline))
    feats.update(momentum_features(timeline))
    feats.update(critical_turn_features(timeline))
    feats.update(move_effectiveness_features(timeline))

    feats.update(timeline_additions(timeline))
    return feats


In [None]:
# Perform parallel feature extraction for a list of battles, handling train/test data and adding polynomial features.
def parallel_feature_extraction(battles, is_train=True, n_jobs=-1, max_turn=None):
    if n_jobs == -1:
        n_jobs = multiprocessing.cpu_count() # Use all available CPU cores
    rows = Parallel(n_jobs=n_jobs, backend='loky')(
        delayed(extract_example_features)(b, max_turn=max_turn) for b in tqdm(battles, desc=f"Feature extraction (max_turn={max_turn})")
    )
    y = []
    if is_train:
        y = [1 if b.get("player_won") else 0 for b in battles] # Extract target variable for training
    df = pd.DataFrame(rows)
    ids = df["battle_id"].values
    X = df.drop(columns=["battle_id"]).fillna(0.0) # Drop battle_id and fill NaNs
    X = add_compact_polys(X) # Add polynomial features
    return (X, np.array(y), ids) if is_train else (X, ids)


In [None]:

# Perform Out-of-Fold (OOF) target encoding for presence features.
def target_encode_presence_oof(X: pd.DataFrame, y: np.ndarray, prefix="p1_has_", folds=5):
    skf = StratifiedKFold(n_splits=folds, shuffle=True, random_state=RANDOM_STATE)
    cols = [c for c in X.columns if c.startswith(prefix)]
    if not cols: return X
    te = pd.DataFrame(0.5, index=X.index, columns=[c+"_te" for c in cols]) # Initialize target encoded features with 0.5
    for tr_idx, va_idx in skf.split(X, y):
        Xtr = X.iloc[tr_idx][cols]; ytr = y[tr_idx]
        means = {}
        for c in cols:
            pos = (((Xtr[c] > 0).astype(int)) & (ytr == 1)).sum() # Count positive instances
            cnt = (Xtr[c] > 0).sum()
            means[c] = (pos + 1.0) / (cnt + 2.0) if cnt > 0 else 0.5 # Calculate smoothed mean
        for c in cols:
            te.loc[te.index[va_idx], c+"_te"] = np.where(X.iloc[va_idx][c] > 0, means[c], 0.5) # Apply encoded means to validation set
    return pd.concat([X, te], axis=1)


In [None]:
# Apply target encoding for presence features from training data to test data.
def target_encode_presence_test(Xtr: pd.DataFrame, ytr: np.ndarray, Xte: pd.DataFrame, prefix="p1_has_"):
    cols = [c for c in Xtr.columns if c.startswith(prefix)]
    for c in cols:
        if c not in Xte.columns:
            Xte[c] = 0.0 # Add missing columns to test set
    means = {}
    for c in cols:
        pos = (((Xtr[c] > 0).astype(int)) & (ytr == 1)).sum()
        cnt = (Xtr[c] > 0).sum()
        means[c] = (pos + 1.0) / (cnt + 2.0) if cnt > 0 else 0.5 # Calculate smoothed mean from training data
    for c in cols:
        Xte[c+"_te"] = np.where(Xte[c] > 0, means[c], 0.5) # Apply encoded means to test set
    return Xte


In [None]:
# Prune features with low variance or high correlation to reduce dimensionality.
def prune_low_var_high_corr(features: pd.DataFrame, correlation_cutoff=0.995, variance_cutoff=1e-10):
    filtered = features.loc[:, features.var(numeric_only=True) > variance_cutoff] # Remove low variance features
    corr = filtered.corr(numeric_only=True).abs()
    up = corr.where(np.triu(np.ones(corr.shape), k=1).astype(bool)) # Get upper triangle of correlation matrix
    drop = [c for c in up.columns if any(up[c] > correlation_cutoff)] # Identify highly correlated features to drop
    return filtered.drop(columns=drop, errors="ignore")


In [None]:
# Get prediction probabilities from a model, handling different types of model outputs.
def model_proba(model, X):
    if hasattr(model, "predict_proba"):
        return model.predict_proba(X)[:,1]
    if hasattr(model, "decision_function"):
        d = model.decision_function(X)
        return 1.0 / (1.0 + np.exp(-d)) # Convert decision function to probabilities
    pred = model.predict(X).astype(float)
    return pred * 0.999 + 0.0005 # Small adjustment for models returning binary predictions


In [None]:
# Fit a list of models to the provided data.
def fit_models(model_list, X, y):
    fitted=[]
    for m in tqdm(model_list, desc="Training base models"):
        m.fit(X, y) # Fit each model
        fitted.append(m)
    return fitted


In [None]:

# Stack prediction probabilities from multiple models.
def stack_probas(model_list, X) -> np.ndarray:
    return np.vstack([model_proba(m, X) for m in model_list])


In [None]:
# Define a list of key features for meta-modeling.
META_KEYS = [
    "lead_share","hp_auc_norm","effective_damage_diff","base_power_diff",
    "type_adv","lead_control_score","faint_diff","status_adv",
    "lead_speed_gap_max","below50_diff","early_status_score",
    "par_diff","slp_diff","frz_diff","eff_damage_ratio","bp_ratio",
    "first_cc_winner","early_sleep_hits","early_freeze_hits",
    "crit_pressure_p80","lead_crit_gap","p1_hb_ko","p2_hb_ko",
    "hp_diff_0_3_mean","hp_mom_0_3_mean","hp_diff_0_5_mean","hp_mom_0_5_mean","hp_diff_5_10_mean","hp_mom_5_10_mean",
    "first_par_diff","first_slp_diff","first_frz_diff",
    "momentum_5","momentum_trend_5","momentum_10","momentum_trend_10","momentum_15","momentum_trend_15",
    "momentum_20","momentum_trend_20","momentum_25","momentum_trend_25",
    "p1_comebacks","p2_comebacks","largest_swing_p1","largest_swing_p2",
    "effective_hits_diff","resisted_hits_diff","hit_quality_score",
    "p1_mv_other","p2_mv_other","p1_status_moves","p2_status_moves",
    "p1_partial_trap","p2_partial_trap","p1_explodes","p2_explodes",
    "p1_stab_hits","p2_stab_hits"
]


In [None]:
# Extend the meta-keys with additional timeline-derived features.
META_KEYS += [
    "p1_mult_mean","p2_mult_mean","mult_diff",
    "p1_move_accuracy","p2_move_accuracy","diff_move_accuracy",
    "p1_healing_moves","p2_healing_moves","diff_healing_moves",
    "p1_priority_moves","p2_priority_moves","priority_diff",
    "p1_life_turn30","p2_life_turn30","life_diff_turn30",
    "p1_status_score","p2_status_score","diff_status",
]



For each model select most predictive features to prevent overfitting and generate unbiased predictions on training data using cross-validation.

In [None]:
# Perform Out-of-Fold (OOF) stacking with inner feature selection using cross-validation.
def oof_stack_with_inner_fs(model_builder, X, y, k_features=450, folds=5):
    skf = StratifiedKFold(n_splits=folds, shuffle=True, random_state=RANDOM_STATE)
    num_models = len(model_builder()) # Number of base models
    N = len(y)
    oof = np.zeros((num_models, N), dtype=float) # Array to store OOF predictions
    selected_cols_union = set()

    for _, (tr, va) in enumerate(tqdm(skf.split(X, y), total=folds, desc="OOF Stacking + inner FS")):
        vt = VarianceThreshold(1e-6)
        Xtr_vt = vt.fit_transform(X.iloc[tr]) # Apply variance thresholding
        cols_vt = X.columns[vt.get_support()]
        skb = SelectKBest(mutual_info_classif, k=min(k_features, len(cols_vt))) # Select top K features
        skb.fit(Xtr_vt, y[tr])
        cols_fold = cols_vt[skb.get_support()]
        selected_cols_union.update(cols_fold)

        Xtr = X.iloc[tr][cols_fold] # Training data for the fold
        Xva = X.iloc[va][cols_fold] # Validation data for the fold

        fold_models = model_builder()
        for m in fold_models:
            m.fit(Xtr, y[tr]) # Fit base models on selected features
        fold_probs = [model_proba(m, Xva) for m in fold_models]
        for mi, pr in enumerate(fold_probs):
            oof[mi, va] = pr

    selected_cols_union = list(sorted(selected_cols_union))
    return oof, selected_cols_union


Find optimal probability threshold for converting predictions to binary decisions

In [None]:
# Tune the optimal probability threshold for binary classification based on accuracy.
def tune_threshold(probs: np.ndarray, y: np.ndarray, low=0.2, high=0.8, steps=601) -> float:
    grid = np.linspace(low, high, steps)
    best_t, best_acc = 0.5, -1.0
    for t in grid:
        acc = accuracy_score(y, (probs >= t).astype(int)) # Calculate accuracy for current threshold
        if acc > best_acc:
            best_acc, best_t = acc, float(t)
    return best_t


In [None]:
from sklearn.ensemble import ExtraTreesClassifier

# Import ExtraTreesClassifier for ensemble modeling.


Split features into 2 groups to create model diversity.

In [None]:
# Create two distinct feature views: 'timeline' (time-dependent) and 'combat' (static/aggregated combat statistics).
def build_feature_views(df: pd.DataFrame) -> Dict[str, List[str]]:
    cols = list(df.columns)

    views = {
        "timeline": [],
        "combat": [],
    }

    for c in cols:
        if c.startswith((
            "battle_len", "hp_", "hp_diff_", "hp_mom_",
            "time_in_lead", "lead_share", "hp_auc", "hp_auc_norm",
            "lead_control_score", "switch_", "p1_time_below_50",
            "p2_time_below_50", "below50_diff",
            "faint_diff", "p1_faints", "p2_faints",
            "status_adv", "p1_status_turns", "p2_status_turns",
            "first_faint_adv", "first_par_", "first_slp_", "first_frz_",
            "early_status_score", "early_sleep_hits", "early_freeze_hits",
            "p1_hb_ko", "p2_hb_ko",
            "p1_life_turn30", "p2_life_turn30", "life_diff_turn30",
            "par_diff", "slp_diff", "frz_diff",
            "first_cc_winner"
        )): # Features related to battle timeline
            views["timeline"].append(c)
        else:

            views["combat"].append(c) # Features related to combat statistics

    views = {name: v for name, v in views.items() if len(v) > 0}

    print("\n" + "="*60)
    print("FEATURE VIEWS:")
    print("="*60)
    for name, v in views.items():
        print(f"  {name:15}: {len(v):4} features")
    print("="*60 + "\n")

    return views


In [None]:

# Execute the full feature engineering and model training pipeline for a specific turn cutoff.
def run_one_cutoff(max_turn, builder):
    # Feature extraction for training and test data
    Xtr, ytr, _ = parallel_feature_extraction(
        train_data, is_train=True, n_jobs=-1, max_turn=max_turn
    )
    Xte, ids = parallel_feature_extraction(
        test_data, is_train=False, n_jobs=-1, max_turn=max_turn
    )

    # Target encoding for presence features
    Xtr = target_encode_presence_oof(Xtr, ytr, prefix="p1_has_")
    Xtr = target_encode_presence_oof(Xtr, ytr, prefix="p2_seen_")

    # Align test features with training features and apply target encoding
    Xte = Xte.reindex(columns=Xtr.columns, fill_value=0.0)
    Xte = target_encode_presence_test(Xtr, ytr, Xte, prefix="p1_has_")
    Xte = target_encode_presence_test(Xtr, ytr, Xte, prefix="p2_seen_")

    # Prune features with low variance or high correlation
    Xtr = prune_low_var_high_corr(Xtr)
    Xte = Xte.reindex(columns=Xtr.columns, fill_value=0.0)

    views = build_feature_views(Xtr)

    all_oof_blocks = []
    all_train_blocks = []
    all_test_blocks = []

    for view_name, view_cols in views.items():
        print(f"\n=== View '{view_name}' with {len(view_cols)} features ===")

        Xtr_view = Xtr[view_cols]
        Xte_view = Xte[view_cols]

        # OOF stacking with inner feature selection
        base_oof_view, selected_cols_union = oof_stack_with_inner_fs(
            builder,
            Xtr_view,
            ytr,
            k_features=min(800, len(view_cols)),
            folds=5
        )

        selected_cols_union = list(selected_cols_union)
        Xtr_sel_view = Xtr_view[selected_cols_union]
        Xte_sel_view = Xte_view.reindex(columns=selected_cols_union, fill_value=0.0)

        print(f"View '{view_name}': {len(selected_cols_union)} columns after inner FS")

        # Fit final base models on the full training data
        final_base_models_view = fit_models(builder(), Xtr_sel_view, ytr)

        # Generate predictions for training and test sets
        train_base_probs_view = stack_probas(final_base_models_view, Xtr_sel_view)
        test_base_probs_view  = stack_probas(final_base_models_view, Xte_sel_view)

        all_oof_blocks.append(base_oof_view)
        all_train_blocks.append(train_base_probs_view)
        all_test_blocks.append(test_base_probs_view)

    base_oof_probs = np.vstack(all_oof_blocks)
    train_base_probs = np.vstack(all_train_blocks)
    test_base_probs  = np.vstack(all_test_blocks)

    print(f"\n[cutoff={max_turn}] Total base models after views: {base_oof_probs.shape[0]}")

    return {
        "Xtr_sel": Xtr,
        "Xte_sel": Xte,
        "ytr": ytr,
        "test_ids": ids,
        "base_oof_probs": base_oof_probs,
        "train_base_probs": train_base_probs,
        "test_base_probs": test_base_probs,
    }


In [None]:
# Re-build and print the size of the tracked moves vocabulary.
TRACK_MOVES, MOVE_FREQ = build_move_vocab_by_coverage(train_data, target_coverage=0.985, min_freq=3)
print(f"TRACK_MOVES size: {len(TRACK_MOVES)}")


TRACK_MOVES size: 33


In [None]:
# Import additional classifiers and metrics for model evaluation.
from sklearn.metrics import accuracy_score, roc_auc_score
from sklearn.ensemble import (
        HistGradientBoostingClassifier,
        ExtraTreesClassifier,
        GradientBoostingClassifier,
        RandomForestClassifier,
        AdaBoostClassifier,

    )




-----

In [None]:
# Define a function to build a diverse set of base models for stacking.
def build_base_models():
    from sklearn.ensemble import HistGradientBoostingClassifier, ExtraTreesClassifier, GradientBoostingClassifier
    from sklearn.linear_model import LogisticRegression

    models = [
        # Ridge Classifier with StandardScaler
        Pipeline([("scaler", StandardScaler()), ("ridge", RidgeClassifierCV(alphas=np.logspace(-3, 3, 20)))]),

        # Quadratic Discriminant Analysis with StandardScaler
        Pipeline([("scaler", StandardScaler()), ("qda", QuadraticDiscriminantAnalysis(reg_param=0.01)))]),

        # Logistic Regression with StandardScaler
        Pipeline([("scaler", StandardScaler()), ("logreg", LogisticRegression(C=1.0, max_iter=2000, random_state=RANDOM_STATE)))]),

        # Gradient Boosting Classifier
        GradientBoostingClassifier(n_estimators=300, learning_rate=0.05, max_depth=5, random_state=RANDOM_STATE),

        # SVC with StandardScaler and probability estimation
        Pipeline([("scaler", StandardScaler()), ("svc", SVC(kernel="rbf", C=2.5, gamma="scale", probability=True, random_state=RANDOM_STATE)))]),

        # Random Forest Classifier
        RandomForestClassifier(n_estimators=500, max_depth=15, min_samples_split=10, n_jobs=-1, random_state=RANDOM_STATE),
        # AdaBoost Classifier
        AdaBoostClassifier(n_estimators=200, learning_rate=0.8, random_state=RANDOM_STATE),

        # Calibrated Extra Trees Classifier
        CalibratedClassifierCV(
            estimator=ExtraTreesClassifier(
                n_estimators=1200, max_depth=None, max_features=0.6,
                min_samples_split=4, min_samples_leaf=2, bootstrap=True,
                n_jobs=-1, random_state=RANDOM_STATE
            ),
            method="isotonic", cv=3
        ),

        # HistGradientBoosting Classifier
        HistGradientBoostingClassifier(max_iter=500, learning_rate=0.03, max_depth=10, l2_regularization=1.0, random_state=RANDOM_STATE),

    ]

    return tuple(models)


In [None]:
# Assign the base model builder function to a variable.
builder = build_base_models


In [None]:
# Run the feature extraction and model training pipeline for each defined turn cutoff.
per_cut = []
for N in TURN_CUTOFFS:
  per_cut.append(run_one_cutoff(N, builder))


Feature extraction (max_turn=None):   0%|          | 0/9986 [00:00<?, ?it/s]

Feature extraction (max_turn=None):   0%|          | 0/5000 [00:00<?, ?it/s]


FEATURE VIEWS:
  timeline       :   54 features
  combat         :  321 features


=== View 'timeline' with 54 features ===


OOF Stacking + inner FS:   0%|          | 0/5 [00:00<?, ?it/s]

View 'timeline': 54 columns after inner FS


Training base models:   0%|          | 0/9 [00:00<?, ?it/s]


=== View 'combat' with 321 features ===


OOF Stacking + inner FS:   0%|          | 0/5 [00:00<?, ?it/s]

View 'combat': 318 columns after inner FS


Training base models:   0%|          | 0/9 [00:00<?, ?it/s]


[cutoff=None] Total base models after views: 18


In [None]:
# Import accuracy and ROC AUC score metrics.
from sklearn.metrics import accuracy_score, roc_auc_score


-----

In [None]:
# Vertically stack OOF, training, and test probabilities from all cutoffs.
base_oof_all = np.vstack([d["base_oof_probs"] for d in per_cut])
base_train_all = np.vstack([d["train_base_probs"] for d in per_cut])
base_test_all = np.vstack([d["test_base_probs"] for d in per_cut])


In [None]:
# Extract the true training labels and test battle IDs.
y_train = per_cut[0]["ytr"]
test_ids = per_cut[0]["test_ids"]


In [None]:
# Print statistics about the number of cutoffs, base models, views, and total models.
n_cutoffs = len(TURN_CUTOFFS)
n_base_models = 10
n_views = 2
n_total_models = base_oof_all.shape[0]

print(f"Turn cutoffs: {TURN_CUTOFFS}")
print(f"Base model types: {n_base_models}")
print(f"Feature views: {n_views}")
print(f"Total models: {n_total_models} ({n_cutoffs} × {n_base_models} × {n_views})")


Turn cutoffs: [None]
Base model types: 10
Feature views: 2
Total models: 18 (1 × 10 × 2)


In [None]:
# Print summary statistics for turn cutoffs and model counts.
print(f"Turn cutoffs: {TURN_CUTOFFS}")
print(f"Base models per cutoff: {n_base_models}")
print(f"Total models: {n_total_models}")


Turn cutoffs: [None]
Base models per cutoff: 10
Total models: 18


In [None]:
# Calculate and print the accuracy of each base model in the ensemble.
cutoff_names = [f"full" if c is None else f"t{c}" for c in TURN_CUTOFFS]
model_names_base = ['Ridge', 'QDA', 'LogReg', 'GB', 'SVC', 'RandomF', 'AdaB', 'ExtraT', 'HistGB']
view_names = ['timeline', 'combat']

for cutoff_idx, cutoff_name in enumerate(cutoff_names):
    for view_idx, view_name in enumerate(view_names):
        print(f"\nCutoff: {cutoff_name}, View: {view_name}")
        for model_idx, model_name in enumerate(model_names_base):
            global_idx = cutoff_idx * (n_base_models * n_views) + view_idx * n_base_models + model_idx

            if global_idx < n_total_models:
                preds = (base_oof_all[global_idx] >= 0.5).astype(int) # Convert probabilities to binary predictions
                acc = accuracy_score(y_train, preds)
                print(f"  {model_name:10} @ {view_name:12}: {acc:.4f}")



Cutoff: full, View: timeline
  Ridge      @ timeline    : 0.8201
  QDA        @ timeline    : 0.7816
  LogReg     @ timeline    : 0.8193
  GB         @ timeline    : 0.8159
  SVC        @ timeline    : 0.8106
  RandomF    @ timeline    : 0.8109
  AdaB       @ timeline    : 0.8134
  ExtraT     @ timeline    : 0.8115
  HistGB     @ timeline    : 0.8184

Cutoff: full, View: combat
  Ridge      @ combat      : 0.7883
  QDA        @ combat      : 0.8324
  LogReg     @ combat      : 0.8282
  GB         @ combat      : 0.8282
  SVC        @ combat      : 0.8142
  RandomF    @ combat      : 0.8258
  AdaB       @ combat      : 0.8224
  ExtraT     @ combat      : 0.8309


In [None]:

# Calculate and print the average, minimum, and maximum pairwise Spearman correlations between base model OOF predictions.
from scipy.stats import spearmanr

correlations = []
for i in range(n_total_models):
    for j in range(i+1, n_total_models):
        corr, _ = spearmanr(base_oof_all[i], base_oof_all[j])
        correlations.append(corr)

avg_corr = np.mean(correlations)
min_corr = np.min(correlations)
max_corr = np.max(correlations)

print(f"Average pairwise correlation: {avg_corr:.3f}")
print(f"Min correlation: {min_corr:.3f}")
print(f"Max correlation: {max_corr:.3f}")


Average pairwise correlation: 0.884
Min correlation: 0.719
Max correlation: 0.999


In [None]:
# Prepare the meta-feature matrix for the meta-learner by transposing base probabilities and adding a bias term.
def meta_matrix(base_probs: np.ndarray) -> np.ndarray:
    M = base_probs.T # Transpose base probabilities
    M = np.concatenate([M, np.ones((M.shape[0], 1))], axis=1) # Add bias term (column of ones)
    return M


In [None]:
# Create meta-feature matrices for OOF and test sets.
X_meta_oof = meta_matrix(base_oof_all)
X_meta_test = meta_matrix(base_test_all)


In [None]:
# Import necessary modules for meta-modeling and evaluation.
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_predict
from sklearn.metrics import accuracy_score, roc_auc_score
from scipy.stats import spearmanr
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_predict, StratifiedKFold


9 Different model types
1. Ridge
2. QDA
3. LogReg
4. GradientBoosting
5. SVC
6. RandomForest
7. AdaBoost
8. ExtraTrees
9. HistGradientBoosting

Each trained on 2 views:
- Timeline view (54 features)
- Combat view (321 features)

Total: 9 models × 2 views = 18 base models


1 Logistic Regression Meta-Learner

Input: 20 base model predictions + 1 bias term = 19 features
Output: Final prediction

In [None]:

# Train a logistic regression meta-model and generate OOF and test predictions.
meta_model = LogisticRegression(
    penalty='l2', # L2 regularization
    C=0.5,
    fit_intercept=True,
    solver='lbfgs',
    max_iter=2000,
    random_state=RANDOM_STATE
)

# Generate Out-of-Fold predictions for the meta-model
meta_oof_probs = cross_val_predict(
    meta_model, X_meta_oof, y_train,
    cv=10,
    method='predict_proba'
)[:, 1]

# Fit the meta-model on the full OOF predictions
meta_model.fit(X_meta_oof, y_train)
# Generate predictions for the test set
meta_test_probs = meta_model.predict_proba(X_meta_test)[:, 1]


In [None]:
# Calculate the average accuracy and AUC of the base models' OOF predictions.
base_oof_avg = np.mean(base_oof_all, axis=0)
base_acc = accuracy_score(y_train, (base_oof_avg >= 0.5).astype(int))
base_auc = roc_auc_score(y_train, base_oof_avg)


In [None]:
# Tune the classification threshold for the meta-model and calculate its accuracy and AUC.
global_thr = tune_threshold(meta_oof_probs, y_train, low=0.3, high=0.7, steps=401)
meta_acc = accuracy_score(y_train, (meta_oof_probs >= global_thr).astype(int))
meta_auc = roc_auc_score(y_train, meta_oof_probs)


In [None]:
# Print the performance comparison between the base models and the meta-model.
print(f"   Base OOF: {base_acc:.4f} (AUC: {base_auc:.4f})")
print(f"   Meta OOF @ {global_thr:.3f}: {meta_acc:.4f} (AUC: {meta_auc:.4f})")
print(f"   Improvement: {meta_acc - base_acc:+.4f} ({(meta_acc/base_acc - 1)*100:+.1f}%)")


   Base OOF: 0.8321 (AUC: 0.8998)
   Meta OOF @ 0.498: 0.8402 (AUC: 0.9066)
   Improvement: +0.0081 (+1.0%)


In [None]:
# Print the meta-model's accuracy.
print(f"\n{meta_acc:.4f}")



0.8402


In [None]:

# Generate final predictions, create a submission file, and print submission statistics.
final_preds = (meta_test_probs >= global_thr).astype(int) # Apply tuned threshold to test probabilities

submission = pd.DataFrame({"battle_id": test_ids, "player_won": final_preds})
submission.to_csv(SUBMISSION_PATH, index=False) # Save submission file

print(f"\nSubmission saved: {SUBMISSION_PATH}")
print(f"Test predicted wins: {np.sum(final_preds)}/{len(final_preds)} ({np.mean(final_preds)*100:.1f}%)")



Submission saved: /content/drive/MyDrive/challenge ds/data/submission_final_last_v1.csv
Test predicted wins: 2483/5000 (49.7%)


In [None]:
# Recalculate and print the average pairwise Spearman correlation of base model OOF predictions.
correlations = []
for i in range(n_total_models):
    for j in range(i+1, n_total_models):
        corr, _ = spearmanr(base_oof_all[i], base_oof_all[j])
        correlations.append(corr)


avg_corr = np.mean(correlations)
print(f"Average pairwise correlation: {avg_corr:.3f}")


Average pairwise correlation: 0.884


In [None]:
# Perform cross-validation to estimate the final test accuracy with threshold tuning.
from sklearn.model_selection import StratifiedKFold

def oof_threshold_cv(probs, y, outer_folds=5, low=0.3, high=0.7, steps=201):
    skf = StratifiedKFold(n_splits=outer_folds, shuffle=True, random_state=RANDOM_STATE)
    accs = []

    for tr, va in skf.split(probs.reshape(-1,1), y):
        thr = tune_threshold(probs[tr], y[tr], low=low, high=high, steps=steps) # Tune threshold on training fold
        preds = (probs[va] >= thr).astype(int)
        accs.append(accuracy_score(y[va], preds)) # Evaluate on validation fold

    return np.mean(accs), np.std(accs)

mean_cv_acc, std_cv_acc = oof_threshold_cv(meta_oof_probs, y_train)
print(f"CV estimate of final test accuracy: {mean_cv_acc:.4f} ± {std_cv_acc:.4f}")


CV estimate of final test accuracy: 0.8396 ± 0.0068
