In [303]:
import json
import pandas as pd
import os

# --- Define the path to our data ---
COMPETITION_NAME = 'fds-pokemon-battles-prediction-2025'
DATA_PATH = os.path.join('../input', COMPETITION_NAME)

train_file_path = os.path.join(DATA_PATH, 'train.jsonl')
test_file_path = os.path.join(DATA_PATH, 'test.jsonl')
train_data = []

# Read the file line by line
print(f"Loading data from '{train_file_path}'...")
try:
    with open(train_file_path, 'r') as f:
        for line in f:
            # json.loads() parses one line (one JSON object) into a Python dictionary
            train_data.append(json.loads(line))

    print(f"Successfully loaded {len(train_data)} battles.")

    # Let's inspect the first battle to see its structure
    print("\n--- Structure of the first train battle: ---")
    if train_data:
        first_battle = train_data[0]
        
        # To keep the output clean, we can create a copy and truncate the timeline
        battle_for_display = first_battle.copy()
        battle_for_display['battle_timeline'] = battle_for_display.get('battle_timeline', [])[:2] # Show first 2 turns
        
        # Use json.dumps for pretty-printing the dictionary
        print(json.dumps(battle_for_display, indent=4))
        if len(first_battle.get('battle_timeline', [])) > 3:
            print("    ...")
            print("    (battle_timeline has been truncated for display)")


except FileNotFoundError:
    print(f"ERROR: Could not find the training file at '{train_file_path}'.")
    print("Please make sure you have added the competition data to this notebook.")

Loading data from '../input/fds-pokemon-battles-prediction-2025/train.jsonl'...
Successfully loaded 10000 battles.

--- Structure of the first train battle: ---
{
    "player_won": true,
    "p1_team_details": [
        {
            "name": "starmie",
            "level": 100,
            "types": [
                "psychic",
                "water"
            ],
            "base_hp": 60,
            "base_atk": 75,
            "base_def": 85,
            "base_spa": 100,
            "base_spd": 100,
            "base_spe": 115
        },
        {
            "name": "exeggutor",
            "level": 100,
            "types": [
                "grass",
                "psychic"
            ],
            "base_hp": 95,
            "base_atk": 95,
            "base_def": 85,
            "base_spa": 125,
            "base_spd": 125,
            "base_spe": 55
        },
        {
            "name": "chansey",
            "level": 100,
            "types": [
                "normal",

In [None]:
from tqdm.notebook import tqdm
import numpy as np
import importlib
#reload del modulo caricato la prima volta, per prendere la versione aggiornata
import utils.type_effects as type_effects
importlib.reload(type_effects)

from utils.type_effects import TYPE_EFFECTIVENESS, type_advantage



def create_simple_features(data: list[dict]) -> pd.DataFrame:
    """
    A very basic feature extraction function.
    It only uses the aggregated base stats of the player's team and opponent's lead.
    """
    feature_list = []
    for battle in tqdm(data, desc="Extracting features"):
        features = {}
        
        # --- Player 1 Team Features ---
        p1_mean_hp = p1_mean_spe = p1_mean_atk = p1_mean_def = p1_mean_spa = p1_mean_spd = 0.0
        p1_team = battle.get('p1_team_details', [])
        if p1_team:

            p1_mean_hp = np.mean([p.get('base_hp', 0) for p in p1_team])
            p1_mean_spe = np.mean([p.get('base_spe', 0) for p in p1_team])
            p1_mean_atk = np.mean([p.get('base_atk', 0) for p in p1_team])
            p1_mean_def = np.mean([p.get('base_def', 0) for p in p1_team])
            p1_mean_spa = np.mean([p.get('base_spa', 0) for p in p1_team])
            p1_mean_spd = np.mean([p.get('base_spd', 0) for p in p1_team])

            features['p1_mean_hp'] = p1_mean_hp
            features['p1_mean_spe'] = p1_mean_spe
            features['p1_mean_atk'] = p1_mean_atk
            features['p1_mean_def'] = p1_mean_def
            features['p1_mean_spa'] = p1_mean_spa
            features['p1_mean_spd'] = p1_mean_spd

        # --- Player 2 Lead Features ---
        p2_hp = p2_spe = p2_atk = p2_def = p2_spa = p2_spd = 0.0
        p2_lead = battle.get('p2_lead_details')
        if p2_lead:
            # Player 2's lead Pokémon's stats

            p2_hp = p2_lead.get('base_hp', 0)
            p2_spe = p2_lead.get('base_spe', 0)
            p2_atk = p2_lead.get('base_atk', 0)
            p2_def = p2_lead.get('base_def', 0)
            p2_spa = p2_lead.get('base_spa', 0)
            p2_spd = p2_lead.get('base_spd', 0)

            features['p2_lead_hp'] = p2_hp
            features['p2_lead_spe'] = p2_spe
            features['p2_lead_atk'] = p2_atk
            features['p2_lead_def'] = p2_def
            features['p2_spa'] = p2_spa
            features['p2_spd'] = p2_spd

        # I ADD THE DIFFS/DELTAS
        features['diff_hp']  = p1_mean_hp  - p2_hp
        features['diff_spe'] = p1_mean_spe - p2_spe
        features['diff_atk'] = p1_mean_atk - p2_atk
        features['diff_def'] = p1_mean_def - p2_def
        features['diff_spa'] = p1_mean_spa - p2_spa
        features['diff_spd'] = p1_mean_spd - p2_spd

        # TYPE ADVANTAGE (e.g. fire vs grass)
        p1_types = []
        for p in p1_team:
            p1_types.extend([t for t in p.get('types', []) if t != 'notype'])
        p2_types = []
        if p2_lead:
            p2_types = [t for t in p2_lead.get('types', []) if t != 'notype']

        features['type_advantage'] = type_advantage(p1_types, p2_types)
        # Reverse type advantage: opponent's lead effectiveness vs player's team
        features['opp_type_advantage'] = type_advantage(p2_types, p1_types)

        # Combined net advantage
        features['net_type_advantage'] = features['type_advantage'] - features['opp_type_advantage']
        ##  end type advantage

        #DYNAMIC INFO
        """
        Chi mantiene più HP medi e conduce più turni,  nella maggior parte dei casi vince anche se la battaglia non è ancora finita
        """
        timeline = battle.get('battle_timeline', [])
        if timeline:

            #SALUTE
            p1_hp = [t['p1_pokemon_state']['hp_pct'] for t in timeline if t.get('p1_pokemon_state')]
            p2_hp = [t['p2_pokemon_state']['hp_pct'] for t in timeline if t.get('p2_pokemon_state')]
            #salute media dei pokemon del primo giocatore
            features['p1_mean_hp_pct'] = np.mean(p1_hp)
            #salute media dei pokemon del secondo giocatore
            features['p2_mean_hp_pct'] = np.mean(p2_hp)
            #vantaggio medio in salute (media della differenza tra la salute dei pokemon del primo giocatore e quella dei pokemon del secondo giocatore)
            features['hp_diff_mean'] = np.mean(np.array(p1_hp) - np.array(p2_hp))
            #percentuale di tempo in vantaggio (ovvero media dei booleani che indicano il vantaggio => proporzione del vantaggio)
            features['p1_hp_advantage_mean'] = np.mean(np.array(p1_hp) > np.array(p2_hp))

            #vedo anche come la salute media evolve nel tempo
            phases = 3 #early, mid, late game
            nr_turns = 30 #numero turni
            slice_idx = nr_turns // phases #slice index must be integer
            #print("slice_idx: ",slice_idx, "len p1_hp: ",len(p1_hp))
            features['early_hp_mean_diff'] = np.mean(np.array(p1_hp[:slice_idx]) - np.array(p2_hp[:slice_idx]))
            features['late_hp_mean_diff'] = np.mean(np.array(p1_hp[-slice_idx:]) - np.array(p2_hp[-slice_idx:]))
            features['phases_hp_mean_diff'] = features['late_hp_mean_diff'] - features['early_hp_mean_diff']
            #77.94% (+/- 0.35%) => 77.94% (+/- 0.41%)
            hp_delta = np.array(p1_hp) - np.array(p2_hp)
            features['hp_delta_trend'] = np.polyfit(range(len(hp_delta)), hp_delta, 1)[0]
            
            #fluttuazioni negli hp (andamento della partita: stabile o molto caotica)
            #77.94% (+/- 0.41%) => 79.09% (+/- 1.02%)
            features['p1_hp_std'] = np.std(p1_hp)
            features['p2_hp_std'] = np.std(p2_hp)
            features['hp_delta_std'] = np.std(hp_delta)

            
            ##STATUS (default nostatus, gli altri sono considerati negativi - i boost sono positivi)
            p1_status = [t['p1_pokemon_state'].get('status', 'nostatus') for t in timeline if t.get('p1_pokemon_state')]
            p2_status = [t['p2_pokemon_state'].get('status', 'nostatus') for t in timeline if t.get('p2_pokemon_state')]
            total_status = set(p1_status + p2_status)
            no_effect_status = {'nostatus', 'noeffect'}
            negative_status = {s for s in total_status if s not in no_effect_status}
            #mean of negative status
            features['p1_negative_status_mean'] = np.mean([s in negative_status for s in p1_status])
            features['p2_negative_status_mean'] = np.mean([s in negative_status for s in p2_status])
            
            #status advantage if p1 applied more status to p2
            features['p1_bad_status_advantage'] = features['p2_negative_status_mean'] - features['p1_negative_status_mean']

            #how many times status changed? 
            # we have to check that first array shifted by 1 is 
            # different from the same array excluding the last element 
            # (so basically checking if status change in time)
            p1_status_change = np.sum(np.array(p1_status[1:]) != np.array(p1_status[:-1]))
            p2_status_change = np.sum(np.array(p2_status[1:]) != np.array(p2_status[:-1]))
            features['p1_status_change'] = p1_status_change
            features['p2_status_change'] = p2_status_change


            #QUANTO IL TEAM è BILANCIATO (TIPI E VELOCITA)
            #79.09% (+/- 1.02%) => 79.29% (+/- 0.92%)
            p1_types = [t for p in p1_team for t in p.get('types', []) if t != 'notype']
            features['p1_type_diversity'] = len(set(p1_types))

            MEDIUM_SPEED_THRESHOLD = 90 #medium-speed pokemon
            HIGH_SPEED_THRESHOLD = 100 #fast pokemon
            speeds = np.array([p.get('base_spe', 0) for p in p1_team])
            features['p1_speed_mean'] = np.mean(speeds)
            features['p1_medium_speed_stat'] = np.mean(np.array(speeds) > MEDIUM_SPEED_THRESHOLD)
            features['p1_high_speed_stat'] = np.mean(np.array(speeds) > HIGH_SPEED_THRESHOLD)


            #COMBINAZIONI DI FEATURE
            #combino vantaggio negli hp con l'avere pochi status negativi
            #79.09% (+/- 1.02%) => 79.13% (+/- 1.06%)
            features['hp_advantage_no_negative_status'] = features['hp_delta_trend'] * (1 - features['p1_negative_status_mean'])
            
        # We also need the ID and the target variable (if it exists)
        features['battle_id'] = battle.get('battle_id')
        if 'player_won' in battle:
            features['player_won'] = int(battle['player_won'])
            
        feature_list.append(features)
        
    return pd.DataFrame(feature_list).fillna(0)

# Create feature DataFrames for both training and test sets
print("Processing training data...")
train_df = create_simple_features(train_data)

print("\nProcessing test data...")
test_data = []
with open(test_file_path, 'r') as f:
    for line in f:
        test_data.append(json.loads(line))
test_df = create_simple_features(test_data)

print("\nTraining features preview:")
display(train_df.head())

Processing training data...


Extracting features:   0%|          | 0/10000 [00:00<?, ?it/s]


Processing test data...


Extracting features:   0%|          | 0/5000 [00:00<?, ?it/s]


Training features preview:


Unnamed: 0,p1_mean_hp,p1_mean_spe,p1_mean_atk,p1_mean_def,p1_mean_spa,p1_mean_spd,p2_lead_hp,p2_lead_spe,p2_lead_atk,p2_lead_def,...,p1_bad_status_advantage,p1_status_change,p2_status_change,p1_type_diversity,p1_speed_mean,p1_medium_speed_stat,p1_high_speed_stat,hp_advantage_no_negative_status,battle_id,player_won
0,115.833333,80.0,72.5,63.333333,100.0,100.0,60,115,75,85,...,0.333333,7,8,4,80.0,0.5,0.5,0.00363,0,1
1,123.333333,61.666667,72.5,65.833333,90.0,90.0,55,120,50,45,...,-0.2,11,6,5,61.666667,0.333333,0.166667,0.000616,1,1
2,124.166667,65.833333,84.166667,71.666667,90.0,90.0,250,50,5,5,...,-0.033333,10,4,7,65.833333,0.333333,0.333333,-0.008158,2,1
3,121.666667,75.833333,77.5,65.833333,103.333333,103.333333,75,110,100,95,...,-0.5,8,4,7,75.833333,0.5,0.333333,0.006443,3,1
4,114.166667,72.5,75.833333,79.166667,97.5,97.5,60,115,75,85,...,0.433333,5,10,5,72.5,0.333333,0.333333,0.014461,4,1


In [305]:
"""
from sklearn.linear_model import LogisticRegression

# Define our features (X) and target (y)
features = [col for col in train_df.columns if col not in ['battle_id', 'player_won']]
X_train = train_df[features]
y_train = train_df['player_won']

X_test = test_df[features]

# Initialize and train the model
print("Training a simple Logistic Regression model...")
model = LogisticRegression(random_state=42, max_iter=1000)
model.fit(X_train, y_train)
print("Model training complete.")
"""

'\nfrom sklearn.linear_model import LogisticRegression\n\n# Define our features (X) and target (y)\nfeatures = [col for col in train_df.columns if col not in [\'battle_id\', \'player_won\']]\nX_train = train_df[features]\ny_train = train_df[\'player_won\']\n\nX_test = test_df[features]\n\n# Initialize and train the model\nprint("Training a simple Logistic Regression model...")\nmodel = LogisticRegression(random_state=42, max_iter=1000)\nmodel.fit(X_train, y_train)\nprint("Model training complete.")\n'

In [306]:
"""
# Make predictions on the test data
print("Generating predictions on the test set...")
test_predictions = model.predict(X_test)

# Create the submission DataFrame
submission_df = pd.DataFrame({
    'battle_id': test_df['battle_id'],
    'player_won': test_predictions
})

# Save the DataFrame to a .csv file
submission_df.to_csv('submission.csv', index=False)

print("\n'submission.csv' file created successfully!")
display(submission_df.head())
"""

'\n# Make predictions on the test data\nprint("Generating predictions on the test set...")\ntest_predictions = model.predict(X_test)\n\n# Create the submission DataFrame\nsubmission_df = pd.DataFrame({\n    \'battle_id\': test_df[\'battle_id\'],\n    \'player_won\': test_predictions\n})\n\n# Save the DataFrame to a .csv file\nsubmission_df.to_csv(\'submission.csv\', index=False)\n\nprint("\n\'submission.csv\' file created successfully!")\ndisplay(submission_df.head())\n'

In [307]:
##new
"""
print(test_df.columns)
print(train_df.columns)
"""
"""
from sklearn.metrics import accuracy_score
val_accuracy = accuracy_score(y_val, val_predictions)
print(f"Validation accuracy: {val_accuracy*100:.2f}%")
"""

'\nfrom sklearn.metrics import accuracy_score\nval_accuracy = accuracy_score(y_val, val_predictions)\nprint(f"Validation accuracy: {val_accuracy*100:.2f}%")\n'

In [308]:
#train_df.describe()

In [309]:
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from sklearn.preprocessing import StandardScaler

# Define features and target
features = [col for col in train_df.columns if col not in ['battle_id', 'player_won']]
X = train_df[features]
y = train_df['player_won']

"""
no, ora faccio k-fold cross validation (sotto)

# Split training data into train and validation sets (80/20 split)
X_train, X_val, y_train, y_val = train_test_split(
    X, y, test_size=0.2, random_state=42
)
"""



'\nno, ora faccio k-fold cross validation (sotto)\n\n# Split training data into train and validation sets (80/20 split)\nX_train, X_val, y_train, y_val = train_test_split(\n    X, y, test_size=0.2, random_state=42\n)\n'

### SCALING

In [310]:
"""
# Create our scaler
scaler = StandardScaler()

# First, we want to fit our scaler to our training data and subsequently transform
# that training data through our scaler. This can all be done in a single command.
X_train = scaler.fit_transform(X_train)

# Next, we want to transform the test features by using the parameters learned
# from the training set
X_val = scaler.transform(X_val)

# Notice the values are now standardized
columns = X.columns
X_train_scaled_df = pd.DataFrame(X_train, columns = columns)
X_train_scaled_df.head()
"""

'\n# Create our scaler\nscaler = StandardScaler()\n\n# First, we want to fit our scaler to our training data and subsequently transform\n# that training data through our scaler. This can all be done in a single command.\nX_train = scaler.fit_transform(X_train)\n\n# Next, we want to transform the test features by using the parameters learned\n# from the training set\nX_val = scaler.transform(X_val)\n\n# Notice the values are now standardized\ncolumns = X.columns\nX_train_scaled_df = pd.DataFrame(X_train, columns = columns)\nX_train_scaled_df.head()\n'

### STUDIAMO LA CORRELAZIONE TRA FEATURE E OUTPUT E TRA FEATURE

### PolynomialFeatures 
crea nuove feature come potenze e relazioni tra le feature numeriche originali, per vedere relazioni non lineari; il modello cattura curvature e relazioni mantenendo la linearità nei parametri => purtroppo aggiunge $\binom{n+d}{n}$ feature (n=numero feature originali e d=degree=2 o altro intero) e aumenta il tempo di addestramento => valuta se usare una PCA per gestire l'esplosione dimensionale.

Usalo solo se le feature originali sono informative

### TRAIN AND SUBMIT

In [311]:
"""
# Initialize and train the model
print("Training a simple Logistic Regression model...")
model = LogisticRegression(random_state=42, max_iter=1000)
model.fit(X_train, y_train)
print("Model training complete.")

# Evaluate on validation set
val_predictions = model.predict(X_val)
val_accuracy = accuracy_score(y_val, val_predictions)
print(f"Validation accuracy: {val_accuracy*100:.2f}%")
"""
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, PolynomialFeatures
from sklearn.decomposition import PCA
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import KFold, cross_val_score

#feat_cols = [c for c in train_df.columns if c not in ("player_won","battle_id")]

# PCA?
USE_PCA = False
POLY_ENABLED = False# se enabled 77.64% (+/- 0.69%) altrimenti 77.94% (+/- 0.35%)

steps = []
if POLY_ENABLED:
    steps.append(("poly", PolynomialFeatures(degree=2, include_bias=False)))
#standardizza
steps.append(("scaler", StandardScaler()))
if USE_PCA:
    steps.append(("pca", PCA(n_components=0.95, svd_solver="full")))  # ~95% varianza
steps.append(("logreg", LogisticRegression(max_iter=2000, random_state=42)))

pipe = Pipeline(steps)

#kfold cross-validation
kfold = KFold(n_splits=5, shuffle=True, random_state=42)  # 5-fold CV
print("Training Logistic Regression con 5-Fold Cross-Validation...\n")
scores = cross_val_score(pipe, X, y, cv=kfold, scoring='accuracy', n_jobs=-1)
print(f"Cross-validation accuracies: {np.round(scores, 4)}")
print(f"Mean CV accuracy: {np.mean(scores)*100:.2f}% (+/- {np.std(scores)*100:.2f}%)")

#Training finale
pipe.fit(X, y)
print("\nFinal model trained on all training data.")
"""
vecchio codice, senza k-fold cross-v

# Train
print("Training Logistic Regression (con scaler{}pca)...".format(" + " if USE_PCA else " senza "))
pipe.fit(X_train, y_train)
print("Model training complete.")

# Valutazione su validation
val_pred = pipe.predict(X_val)
val_acc = accuracy_score(y_val, val_pred)
print(f"Validation accuracy: {val_acc*100:.2f}%")

# Nr componenti PCA usate
if USE_PCA:
    print("Componenti PCA usate:", pipe.named_steps["pca"].n_components_)
"""

Training Logistic Regression con 5-Fold Cross-Validation...

Cross-validation accuracies: [0.8105 0.785  0.7865 0.7925 0.79  ]
Mean CV accuracy: 79.29% (+/- 0.92%)

Final model trained on all training data.


'\nvecchio codice, senza k-fold cross-v\n\n# Train\nprint("Training Logistic Regression (con scaler{}pca)...".format(" + " if USE_PCA else " senza "))\npipe.fit(X_train, y_train)\nprint("Model training complete.")\n\n# Valutazione su validation\nval_pred = pipe.predict(X_val)\nval_acc = accuracy_score(y_val, val_pred)\nprint(f"Validation accuracy: {val_acc*100:.2f}%")\n\n# Nr componenti PCA usate\nif USE_PCA:\n    print("Componenti PCA usate:", pipe.named_steps["pca"].n_components_)\n'

### SUBMIT

In [312]:
# Make predictions on the real test data
X_test = test_df[features]
print("Generating predictions on the test set...")
test_predictions = pipe.predict(X_test)

# Create the submission DataFrame
submission_df = pd.DataFrame({
    'battle_id': test_df['battle_id'],
    'player_won': test_predictions
})

# Save submission CSV
submission_df.to_csv('submission.csv', index=False)
print("\n'submission.csv' file created successfully!")
display(submission_df.head())

Generating predictions on the test set...

'submission.csv' file created successfully!


Unnamed: 0,battle_id,player_won
0,0,1
1,1,1
2,2,1
3,3,1
4,4,0
