In [None]:
import json
import pandas as pd
import os

# --- Define the path to our data ---
train_file_path = os.path.join("data",'train.jsonl')
test_file_path = os.path.join("data",'test.jsonl')
train_data = []

# Read the file line by line
print(f"Loading data from '{train_file_path}'...")
try:
    with open(train_file_path, 'r') as f:
        for line in f:
            # json.loads() parses one line (one JSON object) into a Python dictionary
            train_data.append(json.loads(line))

    print(f"Successfully loaded {len(train_data)} battles.")

    # Let's inspect the first battle to see its structure
    print("\n--- Structure of the first train battle: ---")
    if train_data:
        first_battle = train_data[0]

        # To keep the output clean, we can create a copy and truncate the timeline
        battle_for_display = first_battle.copy()
        battle_for_display['battle_timeline'] = battle_for_display.get('battle_timeline', [])[:10] # Show first 2 turns

        # Use json.dumps for pretty-printing the dictionary
        print(json.dumps(battle_for_display, indent=4))
        if len(first_battle.get('battle_timeline', [])) > 3:
            print("    ...")
            print("    (battle_timeline has been truncated for display)")


except FileNotFoundError:
    print(f"ERROR: Could not find the training file at '{train_file_path}'.")
    print("Please make sure you have added the competition data to this notebook.")

print("\nProcessing test data...")
test_data = []
with open(test_file_path, 'r') as f:
    for line in f:
        test_data.append(json.loads(line))

### 2. Basic Feature Engineering

A successful model will likely require creating many complex features. For this starter notebook, however, we will create a very simple feature set based **only on the initial team stats**. This will be enough to train a model and generate a submission file.

It's up to you to engineer more powerful features!

In [None]:
def extract_pokemon_and_types(battles):
    all_pokemon = []
    all_types = set()

    for battle in battles:
        # Team PokÃ©mon
        for p in battle["p1_team_details"]:
            all_pokemon.append({
                "name": p["name"].lower(),
                "types": p["types"],
                "base_hp": p["base_hp"],
                "base_atk": p["base_atk"],
                "base_def": p["base_def"],
                "base_spa": p["base_spa"],
                "base_spd": p["base_spd"],
                "base_spe": p["base_spe"],
            })
            all_types.update([t for t in p["types"] if t != "notype"])
        enemy_pokemon = battle["p2_lead_details"]
        all_pokemon.append({
                "name": enemy_pokemon["name"].lower(),
                "types": enemy_pokemon["types"],
                "base_hp": enemy_pokemon["base_hp"],
                "base_atk": enemy_pokemon["base_atk"],
                "base_def": enemy_pokemon["base_def"],
                "base_spa": enemy_pokemon["base_spa"],
                "base_spd": enemy_pokemon["base_spd"],
                "base_spe": enemy_pokemon["base_spe"],
            })
        all_types.update([t for t in enemy_pokemon["types"] if t != "notype"])
    # Als DataFrame, Duplikate entfernen
    pokemon_df = pd.DataFrame(all_pokemon).drop_duplicates(subset="name").reset_index(drop=True)
    types_list = sorted(list(all_types))

    return pokemon_df, types_list


# Beispiel:
pokemon_df, all_types = extract_pokemon_and_types(train_data)

print("ðŸ§© Alle Typen:")
print(all_types)
print("\nðŸ“Š PokÃ©mon DataFrame:")
print(pokemon_df)


In [None]:
# now compute the type advantage and stat advantage out of the pokemons the enemy has and player 1 has
import numpy as np

# simplified Gen 1 type chart
type_chart = {
    "normal":    {"rock": 0.5, "ghost": 0.0},
    "fire":      {"fire": 0.5, "water": 0.5, "grass": 2, "ice": 2, "bug": 2, "rock": 0.5, "dragon": 0.5},
    "water":     {"fire": 2, "water": 0.5, "grass": 0.5, "ground": 2, "rock": 2, "dragon": 0.5},
    "electric":  {"water": 2, "electric": 0.5, "grass": 0.5, "ground": 0, "flying": 2, "dragon": 0.5},
    "grass":     {"fire": 0.5, "water": 2, "grass": 0.5, "poison": 0.5, "ground": 2, "flying": 0.5, "rock": 2},
    "ice":       {"water": 0.5, "grass": 2, "ice": 0.5, "ground": 2, "flying": 2, "dragon": 2},
    "fighting":  {"normal": 2, "ice": 2, "rock": 2, "ghost": 0, "psychic": 0.5},
    "poison":    {"grass": 2, "poison": 0.5, "ground": 0.5, "rock": 0.5, "ghost": 0.5},
    "ground":    {"fire": 2, "electric": 2, "grass": 0.5, "poison": 2, "flying": 0, "rock": 2},
    "flying":    {"electric": 0.5, "grass": 2, "fighting": 2, "bug": 2, "rock": 0.5},
    "psychic":   {"fighting": 2, "poison": 2, "psychic": 0.5},
    "bug":       {"fire": 0.5, "grass": 2, "fighting": 0.5, "poison": 2, "flying": 0.5, "psychic": 2},
    "rock":      {"fire": 2, "ice": 2, "fighting": 0.5, "ground": 0.5, "flying": 2, "bug": 2},
    "ghost":     {"normal": 0, "psychic": 0},
    "dragon":    {"dragon": 2},
    "notype":    {}
}

def compute_type_advantage(p1_types, p2_types, chart):
    multipliers = []
    for atk_type in p1_types:
        for def_type in p2_types:
            if atk_type in chart:
                mult = chart[atk_type].get(def_type, 1.0)
            else:
                mult = 1.0
            multipliers.append(mult)
    if not multipliers:
        return 1.0
    return np.mean(multipliers)

def compute_type_and_stat_advantages(battles, pokemon_df, all_types):
    type_advantages = []
    stat_advantages = []

    type_to_index = {t: i for i, t in enumerate(all_types)}

    for battle in battles:
        p1_pokemon = []
        for p in battle["p1_team_details"]:
            p1_pokemon.append(p["name"].lower())
        # get all pokemon the enemy has
        enemy_pokemon = [battle["p2_lead_details"]['name']]
        for turn in battle["battle_timeline"]:
            e_p = turn['p2_pokemon_state']['name']
            if e_p.lower() not in enemy_pokemon:
                enemy_pokemon.append(e_p.lower())

        p1_types = []
        p1_stats = []
        for p_name in p1_pokemon:
            p_data = pokemon_df[pokemon_df['name'] == p_name].iloc[0]
            p1_types.append(p_data['types'])
            p1_stats.append([
                p_data['base_hp'],
                p_data['base_atk'],
                p_data['base_def'],
                p_data['base_spa'],
                p_data['base_spd'],
                p_data['base_spe'],
            ])
        enemy_types = []
        enemy_stats = []
        for e_name in enemy_pokemon:
            e_data = pokemon_df[pokemon_df['name'] == e_name].iloc[0]
            enemy_types.append(e_data['types'])
            enemy_stats.append([
                e_data['base_hp'],
                e_data['base_atk'],
                e_data['base_def'],
                e_data['base_spa'],
                e_data['base_spd'],
                e_data['base_spe'],
            ])
        #print(p1_pokemon, enemy_pokemon)
        #print(p1_types, enemy_types)
        #print(p1_stats, enemy_stats)

        # now compute the type advantage and stat advantage
        # flatten type lists, remove "notype"
        p1_all_types = [t for ts in p1_types for t in ts if t != "notype"]
        p2_all_types = [t for ts in enemy_types for t in ts if t != "notype"]
        # team vs team type advantage
        p1_type_adv = compute_type_advantage(p1_all_types, p2_all_types, type_chart)
        p2_type_adv = compute_type_advantage(p2_all_types, p1_all_types, type_chart)

        type_advantage = p1_type_adv / p2_type_adv

        # convert to numpy arrays
        p1_stats = np.array(p1_stats, dtype=float)
        p2_stats = np.array(enemy_stats, dtype=float)

        # compute mean overall stat
        p1_avg = p1_stats.mean()
        p2_avg = p2_stats.mean()

        stat_advantage = p1_avg / p2_avg

        type_advantages.append(type_advantage)
        stat_advantages.append(stat_advantage)

    return type_advantages, stat_advantages



In [None]:
from tqdm.notebook import tqdm
import numpy as np

def create_simple_features(data: list[dict]) -> pd.DataFrame:
    """
    A very basic feature extraction function.
    It only uses the aggregated base stats of the player's team and opponent's lead.
    """
    feature_list = []
    for battle in tqdm(data, desc="Extracting features"):
        features = {}

        # --- Player 1 Team Features ---
        p1_team = battle.get('p1_team_details', [])
        if p1_team:
            features['p1_mean_hp'] = np.mean([p.get('base_hp', 0) for p in p1_team])
            features['p1_mean_spe'] = np.mean([p.get('base_spe', 0) for p in p1_team])
            features['p1_mean_atk'] = np.mean([p.get('base_atk', 0) for p in p1_team])
            features['p1_mean_def'] = np.mean([p.get('base_def', 0) for p in p1_team])
            features['p1_mean_spa'] = np.mean([p.get('base_spa', 0) for p in p1_team])
            features['p1_mean_spd'] = np.mean([p.get('base_spd', 0) for p in p1_team])
        # --- Player 2 Lead Features ---
        p2_lead = battle.get('p2_lead_details')
        if p2_lead:
            # Player 2's lead PokÃ©mon's stats
            features['p2_lead_hp'] = p2_lead.get('base_hp', 0)
            features['p2_lead_spe'] = p2_lead.get('base_spe', 0)
            features['p2_lead_atk'] = p2_lead.get('base_atk', 0)
            features['p2_lead_def'] = p2_lead.get('base_def', 0)
            features['p2_lead_spa'] = p2_lead.get('base_spa', 0)
            features['p2_lead_spd'] = p2_lead.get('base_spd', 0)

        # We also need the ID and the target variable (if it exists)
        features['battle_id'] = battle.get('battle_id')
        if 'player_won' in battle:
            features['player_won'] = int(battle['player_won'])

        feature_list.append(features)

    return pd.DataFrame(feature_list).fillna(0)

# Create feature DataFrames for both training and test sets
print("Processing training data...")
train_df = create_simple_features(train_data)

test_df = create_simple_features(test_data)

print("\nTraining features preview:")
display(train_df.head())

#### Create Dynamic Features out of the battle timeline

In [None]:
from collections import Counter

# has recovery move

STALL_MOVES = {
    # healing
    "recover", "softboiled", "rest",

    # defensive buffs / shields
    "lightscreen", "reflect", "substitute",

    # passive damage / tempo control
    "toxic", "leechseed",

    # status spreading
    "thunderwave", "stunspore", "poisonpowder",
    "sleeppowder", "sing", "hypnosis",

    # accuracy / confusion stall
    "confuseray", "supersonic",
    "flash", "kinesis", "smokescreen", "sandattack",

    # utility disruption
    "disable"
}

def get_sum_stall_moves(timeline, player_prefix="p1"):
    stall_move_count = 0

    for turn in timeline:
        move_details = turn.get(f"{player_prefix}_move_details")
        if move_details and move_details["name"].lower() in STALL_MOVES:
            stall_move_count += 1

    return stall_move_count

BOOST_MOVES = {
    "swordsdance", "meditate", "sharpen",
    "amnesia", "growth",
    "agility",
    "harden", "defensecurl", "barrier", "acidarmor",
    "doubleteam", "minimize"
}

def get_sum_boost_moves(timeline, player_prefix="p1"):
    boost_move_count = 0

    for turn in timeline:
        move_details = turn.get(f"{player_prefix}_move_details")
        if move_details and move_details["name"].lower() in BOOST_MOVES:
            boost_move_count += 1

    return boost_move_count

def get_number_of_switches(timeline, player_prefix="p1"):
    switch_count = 0

    previous_pokemon = None
    for turn in timeline:
        state = turn.get(f"{player_prefix}_pokemon_state", {})
        if not state or "name" not in state:
            continue

        current_pokemon = state["name"]
        if previous_pokemon is not None and current_pokemon != previous_pokemon:
            switch_count += 1
        previous_pokemon = current_pokemon

    return switch_count

SETUP_POKEMON = {
    # Amnesia users
    "slowbro", "snorlax", "mewtwo", "mew",

    # Swords Dance users
    "pinsir", "kingler", "scyther", "sandslash", "mew",

    # Growth users
    "victreebel", "venusaur", "tangela",

    # Agility sweepers
    "jolteon", "zapdos", "dragonite", "fearow",

    # Barrier / Acid Armor
    "mrmime", "dewgong", "muk", "vaporeon", "mewtwo",
}

def get_setup_pokemon(pokemon_dict):
    setup_users = []

    for pokemon, values in pokemon_dict.items():
        moves = values.get("moves", [])
        for m in moves:
            normalized = m.lower().replace(" ", "")
            if normalized in BOOST_MOVES:
                setup_users.append(pokemon)
                break  # no need to check more moves

    return setup_users

def num_setup(pokemon_dict):
    return len(get_setup_pokemon(pokemon_dict))

def build_player_dict(timeline, prefix):
    player_pokemons = {}

    for turn in timeline:
        state = turn.get(f"{prefix}_pokemon_state", {})
        if not state or "name" not in state:
            continue

        name = state["name"]
        if name not in player_pokemons:
            player_pokemons[name] = {
                "hp": 1,
                "status": "",
                "moves": [],
                "boosts": {k: [] for k in ["atk", "def", "spa", "spd", "spe"]}            }

        # HP and status
        player_pokemons[name]["hp"] = state.get("hp_pct", 0)
        player_pokemons[name]["status"] = state["status"]

        # Boosts
        boosts = state.get("boosts", {})
        for k in player_pokemons[name]["boosts"]:
            player_pokemons[name]["boosts"][k] = boosts.get(k, 0)

        # Moves used
        move_details = turn.get(f"{prefix}_move_details")
        if move_details != None:
            if move_details['name'] not in player_pokemons[name]["moves"]:
                player_pokemons[name]["moves"].append(move_details["name"])

    # Summarize per PokÃ©mon
    return player_pokemons


def aggregate_player_stats(player_dict):
    """Aggregates all PokÃ©mon stats for one player."""
    if not player_dict:
        return {
            "mean_hp": 0,
            "total_hp_left": 0,
            "num_seen": 0,
            "num_fainted": 0,
            "avg_boosts": {k: 0 for k in ["atk", "def", "spa", "spd", "spe"]},
            "status_freq": {s: 0 for s in ["par", "frz", "psn", "brn", "slp"]},
            "types": [],
        }

    pokemons_names = player_dict.keys()
    pokemons = list(player_dict.values())
    num_fainted = sum(1 for pokemon in pokemons if pokemon['status'] == "fnt")

    # return the pokemon left
    pokemon_left = [name for name, p in zip(pokemons_names, pokemons) if p['status'] != "fnt"]
    pokemon_left_stats = [p for name, p in zip(pokemons_names, pokemons) if p['status'] != "fnt"]
    total_hp_left = sum(p["hp"] for p in pokemon_left_stats)
    status_counts = Counter(p['status'] for p in pokemon_left_stats if p.get('status'))
    num_paralyzed = status_counts['par']
    num_frozen = status_counts['frz']
    num_psn = status_counts['psn']
    num_brn = status_counts['brn']
    num_slp = status_counts['slp']
    boosts = {k: np.mean([p["boosts"][k] for p in pokemon_left_stats]) for k in ["atk", "def", "spa", "spd", "spe"]}
    #print("Boosts:", boosts)

    return {
        "total_hp_left": total_hp_left,
        "num_seen": len(pokemons),
        "num_fainted": num_fainted,
        "num_paralyzed": num_paralyzed,
        "num_frozen": num_frozen,
        "num_psn": num_psn,
        "num_brn": num_brn,
        "num_slp": num_slp,
        "avg_boosts": boosts,
    }, pokemon_left


def create_dynamic_features(data: list[dict]) -> pd.DataFrame:
    feature_list = []

    for battle in tqdm(data, desc="Extracting two-dict features"):
        timeline = battle.get("battle_timeline", [])
        if not timeline:
            continue

        p1_dict = build_player_dict(timeline, "p1")
        p2_dict = build_player_dict(timeline, "p2")
        #print("Player 1 dict:", p1_dict, "\n")
        #print("Player 2 dict:", p2_dict, "\n")
        p1_stats, p1_pokemon_left = aggregate_player_stats(p1_dict)
        p2_stats, p2_pokemon_left = aggregate_player_stats(p2_dict)

        n_stall_moves_p1 = get_sum_stall_moves(timeline)
        n_stall_moves_p2 = get_sum_stall_moves(timeline, player_prefix="p2")

        stall_move_usage_diff = n_stall_moves_p1 - n_stall_moves_p2

        n_boost_moves_p1 = get_sum_boost_moves(timeline)
        n_boost_moves_p2 = get_sum_boost_moves(timeline, player_prefix="p2")

        boost_move_usage_diff = n_boost_moves_p1 - n_boost_moves_p2

        n_switches_p1 = get_number_of_switches(timeline)
        n_switches_p2 = get_number_of_switches(timeline, player_prefix="p2")
        switch_frequency_diff = n_switches_p1 - n_switches_p2

        num_setup_p1 = num_setup(p1_dict)
        num_setup_p2 = num_setup(p2_dict)
        num_setup_diff = num_setup_p1 - num_setup_p2

        has_recovery_p1 = any(move.lower() == 'recover' for p in p1_dict.values() for move in p['moves'])
        has_recovery_p2 = any(move.lower() == 'recover' for p in p2_dict.values() for move in p['moves'])
        has_recovery_diff = has_recovery_p1 - has_recovery_p2

        def type_and_stat_advantages(p1_pokemon_left, p2_pokemon_left, pokemon_df, all_types):

            type_to_index = {t: i for i, t in enumerate(all_types)}

            p1_types = []
            p1_stats = []
            for p_name in p1_pokemon_left:
                p_data = pokemon_df[pokemon_df['name'] == p_name].iloc[0]
                p1_types.append(p_data['types'])
                p1_stats.append([
                    p_data['base_hp'],
                    p_data['base_atk'],
                    p_data['base_def'],
                    p_data['base_spa'],
                    p_data['base_spd'],
                    p_data['base_spe'],
                ])
            enemy_types = []
            enemy_stats = []
            for e_name in p2_pokemon_left:
                e_data = pokemon_df[pokemon_df['name'] == e_name].iloc[0]
                enemy_types.append(e_data['types'])
                enemy_stats.append([
                    e_data['base_hp'],
                    e_data['base_atk'],
                    e_data['base_def'],
                    e_data['base_spa'],
                    e_data['base_spd'],
                    e_data['base_spe'],
                ])

                # now compute the type advantage and stat advantage
                # flatten type lists, remove "notype"
                p1_all_types = [t for ts in p1_types for t in ts if t != "notype"]
                p2_all_types = [t for ts in enemy_types for t in ts if t != "notype"]
                # team vs team type advantage
                p1_type_adv = compute_type_advantage(p1_all_types, p2_all_types, type_chart)
                p2_type_adv = compute_type_advantage(p2_all_types, p1_all_types, type_chart)

                type_advantage = p1_type_adv / p2_type_adv

                # convert to numpy arrays
                p1_stats = np.array(p1_stats, dtype=float)
                p2_stats = np.array(enemy_stats, dtype=float)

                # compute mean overall stat
                p1_avg = p1_stats.mean()
                p2_avg = p2_stats.mean()

                stat_advantage = p1_avg / p2_avg


            return type_advantage, stat_advantage

        type_advantage, stat_advantage = type_and_stat_advantages(p1_pokemon_left, p2_pokemon_left,
                                                                                        pokemon_df,
                                                                                        all_types)

        boost_advantage = np.mean(list(p1_stats["avg_boosts"].values())) - np.mean(list(p2_stats["avg_boosts"].values()))

        features = {
            "battle_id": battle.get("battle_id"),
            "hp_ratio": p1_stats["total_hp_left"] / (p2_stats["total_hp_left"] + 1e-9),
            "p1_num_seen": p1_stats["num_seen"],
            "p2_num_seen": p2_stats["num_seen"],
            "p1_num_fainted": p1_stats["num_fainted"],
            "p2_num_fainted": p2_stats["num_fainted"],
            "num_paralyzed_diff": p1_stats["num_paralyzed"] - p2_stats["num_paralyzed"],
            "num_frozen_diff": p1_stats["num_frozen"] - p2_stats["num_frozen"],
            "num_psn_diff": p1_stats["num_psn"] - p2_stats["num_psn"],
            "num_brn_diff": p1_stats["num_brn"] - p2_stats["num_brn"],
            "num_slp_diff": p1_stats["num_slp"] - p2_stats["num_slp"],
            "num_seen_diff": p1_stats["num_seen"] - p2_stats["num_seen"],
            "num_fainted_diff": p1_stats["num_fainted"] - p2_stats["num_fainted"],
            "type_advantage": type_advantage,
            "stat_advantage": stat_advantage,
            "boost_advantage": boost_advantage,
            "stall_move_usage_diff": stall_move_usage_diff,
            "boost_move_usage_diff": boost_move_usage_diff,
            "switch_frequency_diff": switch_frequency_diff,
            "num_setup_diff": num_setup_diff,
            "has_recovery_diff": has_recovery_diff
        }



        feature_list.append(features)

    return pd.DataFrame(feature_list).fillna(0)

print("Processing training data...")
train_df_dynamic = create_dynamic_features(train_data)

print("\nProcessing test data...")
test_data = []
with open(test_file_path, 'r') as f:
    for line in f:
        test_data.append(json.loads(line))
test_df_dynamic = create_dynamic_features(test_data)

print("\nTraining features preview:")
display(train_df_dynamic.head())

In [None]:
# Combine Dynamic Features with Simple Features
train_df_combined = pd.merge(train_df, train_df_dynamic, on="battle_id", how="inner")
test_df_combined = pd.merge(test_df, test_df_dynamic, on="battle_id", how="inner")

print(train_df_combined.head(), train_df_combined.columns)

In [None]:
# Feature Selection
# rfe(cv), correlation, SFS
# optuna (best params)
features = [col for col in train_df_combined.columns if col not in ['battle_id',
                                                                    'player_won'
                                                                    ]]
print("Final feature set:", features)
print(test_df_combined.head(), test_df_combined.columns)
X_train = train_df_combined[features]
y_train = train_df_combined['player_won']
X_test = test_df_combined[features]
# Maybe compute the type and stat advantages only on the pokemon that are alive

## Training Pipeline

In [None]:
!pip install xgboost
!pip install scikit-learn
!pip install lightgbm
!pip install catboost
!pip install optuna
!pip install optuna-integration[pytorch_lightning]

In [None]:
import optuna
from sklearn.model_selection import cross_val_score, StratifiedKFold
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
import xgboost as xgb
import lightgbm as lgb
from catboost import CatBoostClassifier
from sklearn.metrics import make_scorer, accuracy_score


In [None]:
from sklearn.model_selection import train_test_split

scaler = StandardScaler()
X_scaled = scaler.fit_transform(X_train)
X_train, X_val, y_train, y_val = train_test_split(
    X_scaled, y_train, test_size=0.05, random_state=42
)

def objective(trial):

    model_name = trial.suggest_categorical("model", ["LogReg", "XGBoost", "CatBoost"])

    if model_name == "LogReg":
        C = trial.suggest_loguniform("logreg_C", 0.01, 10)
        model = Pipeline([
            ("scaler", StandardScaler()),
            ("model", LogisticRegression(
                C=C,
                penalty="l2",
                solver="lbfgs",
                max_iter=1000
            ))
        ])

    elif model_name == "XGBoost":
        n_estimators = trial.suggest_int("xgb_n_estimators", 20, 300)
        max_depth = trial.suggest_int("xgb_max_depth", 2, 15)
        learning_rate = trial.suggest_loguniform("xgb_lr", 0.01, 0.3)
        subsample = trial.suggest_float("xgb_subsample", 0.6, 1.0)

        model = xgb.XGBClassifier(
            n_estimators=n_estimators,
            max_depth=max_depth,
            learning_rate=learning_rate,
            subsample=subsample,
            use_label_encoder=False,
            eval_metric="logloss",
            random_state=42
        )

    elif model_name == "CatBoost":
        iterations = trial.suggest_int("cat_iterations", 100, 500)
        depth = trial.suggest_int("cat_depth", 2, 15)
        learning_rate = trial.suggest_loguniform("cat_learning_rate", 0.01, 0.3)

        model = CatBoostClassifier(
            iterations=iterations,
            depth=depth,
            learning_rate=learning_rate,
            verbose=False,
            random_state=42
        )

    # --- Fit on training fold ---
    model.fit(X_train, y_train)

    # --- Predict on validation fold ---
    y_pred = model.predict(X_val)

    val_score = accuracy_score(y_val, y_pred)

    return val_score


In [None]:

study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=150)

print("Best model and params:")
print(study.best_trial.params)
print("Best CV accuracy:", study.best_trial.value)


best_params = study.best_trial.params
print(best_params)

print("Training best model:", best_params["model"])
if best_params["model"] == "logreg":
    model = Pipeline([("scaler", StandardScaler()),
                      ("model", LogisticRegression(**best_params, penalty="l2", solver="lbfgs", max_iter=1000))])
elif best_params["model"] == "XGBoost":
    model = xgb.XGBClassifier(**best_params, use_label_encoder=False, eval_metric="logloss", random_state=42)

elif best_params["model"] == "CatBoost":
    model = CatBoostClassifier(iterations=best_params['cat_iterations'],
                               depth=best_params['cat_depth'],
                               learning_rate=best_params['cat_learning_rate'], verbose=False, random_state=42)

model.fit(X_train, y_train)

from sklearn.metrics import accuracy_score, confusion_matrix

y_pred = model.predict(X_val)
acc = accuracy_score(y_val, y_pred)
print(f"Validation Accuracy: {acc:.4f}")
print("\nConfusion Matrix:")
print(confusion_matrix(y_val, y_pred))

print("Generating predictions on the test set...")
test_predictions = model.predict(X_test)

submission_df = pd.DataFrame({
    'battle_id': test_df['battle_id'],
    'player_won': test_predictions
})

submission_df.to_csv('submission.csv', index=False)

print("\n'submission.csv' file created successfully!")
display(submission_df.head())

##### Neural Network with Hyperparameter Optimization

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

import optuna
from optuna.integration import PyTorchLightningPruningCallback
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import roc_auc_score
import numpy as np

In [None]:
# scaler = StandardScaler()
# X_scaled = scaler.fit_transform(X_train)

# X_train, X_val, y_train, y_val = train_test_split(
#     X_scaled, y_train, test_size=0.1, random_state=42
# )

# X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
# y_train_tensor = torch.tensor(y_train.values, dtype=torch.float32)
# X_val_tensor = torch.tensor(X_val, dtype=torch.float32)
# y_val_tensor = torch.tensor(y_val.values, dtype=torch.float32)

# print(X_train.shape, y_train.shape)
# class MLP(nn.Module):
#     def __init__(self, input_dim, hidden_size, num_layers, dropout, activation):
#         super().__init__()

#         layers = []
#         act_funcs = {
#             "relu": nn.ReLU(),
#             "gelu": nn.GELU(),
#             "leaky_relu": nn.LeakyReLU()
#         }

#         layers.append(nn.Linear(input_dim, hidden_size))
#         layers.append(act_funcs[activation])
#         layers.append(nn.Dropout(dropout))

#         for _ in range(num_layers - 1):
#             layers.append(nn.Linear(hidden_size, hidden_size))
#             layers.append(act_funcs[activation])
#             layers.append(nn.Dropout(dropout))

#         layers.append(nn.Linear(hidden_size, 1))
#         self.net = nn.Sequential(*layers)

#     def forward(self, x):
#         return torch.sigmoid(self.net(x))


In [None]:
# def train_model(model, train_loader, val_loader, optimizer, criterion, device, trial):
#     model.to(device)
#     best_acc = 0
#     patience = 10
#     wait = 0
#     for epoch in range(100):  # max epochs
#         model.train()
#         for xb, yb in train_loader:
#             xb, yb = xb.to(device), yb.to(device).unsqueeze(1)

#             optimizer.zero_grad()
#             preds = model(xb)
#             loss = criterion(preds, yb)
#             loss.backward()
#             optimizer.step()

#         # --- Validation ---
#         model.eval()
#         acc_list = []
#         with torch.no_grad():
#             for xb, yb in val_loader:
#                 xb = xb.to(device)
#                 preds = model(xb).cpu().numpy().flatten()
#                 pred_labels = (preds >= 0.5).astype(int)
#                 acc_list.append(accuracy_score(yb, pred_labels))

#         acc = np.mean(acc_list)

#         trial.report(acc, epoch)

#         if trial.should_prune():
#             raise optuna.exceptions.TrialPruned()

#         # Early stopping
#         if acc > best_acc:
#             best_acc = acc
#             wait = 0
#         else:
#             wait += 1

#         if wait > patience:
#             break

#     return best_acc


In [None]:
# def objective(trial):
#     # Hyperparameter search space
#     params = {
#         "num_layers": trial.suggest_int("num_layers", 1, 4),
#         "hidden_size": trial.suggest_int("hidden_size", 32, 256, log=True),
#         "activation": trial.suggest_categorical("activation", ["relu", "gelu", "leaky_relu"]),
#         "dropout": trial.suggest_float("dropout", 0.0, 0.5),
#         "lr": trial.suggest_float("lr", 1e-4, 3e-2, log=True),
#         "optimizer_name": trial.suggest_categorical("optimizer_name", ["adam", "adamw", "rmsprop"]),
#         "weight_decay": trial.suggest_float("weight_decay", 1e-6, 1e-2, log=True),
#         "batch_size": trial.suggest_categorical("batch_size", [32, 64, 128, 256]),
#     }

#     # Dataloader
#     train_loader = DataLoader(
#         TensorDataset(X_train_tensor, y_train_tensor),
#         batch_size=params["batch_size"],
#         shuffle=True
#     )
#     val_loader = DataLoader(
#         TensorDataset(X_val_tensor, y_val_tensor),
#         batch_size=params["batch_size"],
#         shuffle=False
#     )

#     # Model
#     model = MLP(
#         input_dim=32,
#         hidden_size=params["hidden_size"],
#         num_layers=params["num_layers"],
#         dropout=params["dropout"],
#         activation=params["activation"]
#     )

#     # Optimizer
#     if params["optimizer_name"] == "adam":
#         optimizer = optim.Adam(model.parameters(), lr=params["lr"], weight_decay=params["weight_decay"])
#     elif params["optimizer_name"] == "adamw":
#         optimizer = optim.AdamW(model.parameters(), lr=params["lr"], weight_decay=params["weight_decay"])
#     else:
#         optimizer = optim.RMSprop(model.parameters(), lr=params["lr"], weight_decay=params["weight_decay"])

#     device = "cuda" if torch.cuda.is_available() else "cpu"
#     criterion = nn.BCELoss()

#     acc = train_model(model, train_loader, val_loader, optimizer, criterion, device, trial)

#     return acc


In [None]:
# study = optuna.create_study(direction="maximize")
# study.optimize(objective, n_trials=100)

# print("Beste acc:", study.best_value)
# print("Beste Hyperparameter:", study.best_params)

# best_params = study.best_params

In [None]:
# best_params = study.best_params
# print("Best parameters:", best_params)

# train_loader = DataLoader(
#     TensorDataset(X_train_tensor, y_train_tensor),
#     batch_size=best_params["batch_size"],
#     shuffle=True
# )

# val_loader = DataLoader(
#     TensorDataset(X_val_tensor, y_val_tensor),
#     batch_size=best_params["batch_size"],
#     shuffle=False
# )

# model = MLP(
#     input_dim=32,
#     hidden_size=best_params["hidden_size"],
#     num_layers=best_params["num_layers"],
#     dropout=best_params["dropout"],
#     activation=best_params["activation"]
# )

# device = "cuda" if torch.cuda.is_available() else "cpu"
# model.to(device)

# criterion = nn.BCELoss()

# if best_params["optimizer_name"] == "adam":
#     optimizer = optim.Adam(model.parameters(), lr=best_params["lr"], weight_decay=best_params["weight_decay"])
# elif best_params["optimizer_name"] == "adamw":
#     optimizer = optim.AdamW(model.parameters(), lr=best_params["lr"], weight_decay=best_params["weight_decay"])
# else:
#     optimizer = optim.RMSprop(model.parameters(), lr=best_params["lr"], weight_decay=best_params["weight_decay"])

# best_acc = 0
# patience = 30
# wait = 0

# for epoch in range(200):
#     model.train()
#     for xb, yb in train_loader:
#         xb, yb = xb.to(device), yb.to(device).unsqueeze(1)

#         optimizer.zero_grad()
#         preds = model(xb)
#         loss = criterion(preds, yb)
#         loss.backward()
#         optimizer.step()

#     model.eval()
#     acc_list = []
#     with torch.no_grad():
#         for xb, yb in val_loader:
#             xb = xb.to(device)
#             preds = model(xb).cpu().numpy().flatten()
#             pred_labels = (preds >= 0.5).astype(int)
#             acc_list.append(accuracy_score(yb, pred_labels))

#     acc = np.mean(acc_list)
#     print(f"Epoch {epoch+1} Accuracy: {acc:.4f}")

#     # Early stopping
#     if acc > best_acc:
#         best_acc = acc
#         best_state = model.state_dict()
#         wait = 0
#     else:
#         wait += 1

#     if wait >= patience:
#         print("Early stopping triggered.")
#         break

# # Load best weights
# model.load_state_dict(best_state)
# print("Final Accuracy:", best_acc)

# # ---------------------------------------------
# # 5. Save final model
# # ---------------------------------------------
# torch.save(model.state_dict(), "best_mlp.pth")
# print("Model saved: best_mlp.pth")



In [None]:

# # Make predictions on the test data
# print("Generating predictions on the test set...")
# X_scaled = scaler.fit_transform(X_test)

# X_test_tensor = torch.tensor(X_scaled, dtype=torch.float32)

# preds = model(X_test_tensor).detach().cpu().numpy().flatten()
# pred_labels = (preds >= 0.5).astype(int)
# # Create the submission DataFrame
# submission_df = pd.DataFrame({
#     'battle_id': test_df['battle_id'],
#     'player_won': pred_labels
# })

# # Save the DataFrame to a .csv file
# submission_df.to_csv('submission.csv', index=False)

# print("\n'submission.csv' file created successfully!")
# display(submission_df.head())

#### XGBoost with Hyperparameter Optimization

In [None]:
# def objective(trial):

#     params = {
#         "objective": "binary:logistic",
#         "eval_metric": "logloss",
#         "booster": trial.suggest_categorical("booster", ["gbtree", "dart"]),
#         "max_depth": trial.suggest_int("max_depth", 2, 10),
#         "min_child_weight": trial.suggest_int("min_child_weight", 1, 20),
#         "gamma": trial.suggest_float("gamma", 1e-8, 10, log=True),
#         "lambda": trial.suggest_float("lambda", 1e-8, 10, log=True),
#         "alpha": trial.suggest_float("alpha", 1e-8, 10, log=True),
#         "subsample": trial.suggest_float("subsample", 0.5, 1.0),
#         "colsample_bytree": trial.suggest_float("colsample_bytree", 0.5, 1.0),
#         "eta": trial.suggest_float("eta", 0.005, 0.3, log=True),
#         "n_estimators": trial.suggest_int("n_estimators", 300, 2000),
#     }

#     # DART extras
#     if params["booster"] == "dart":
#         params["rate_drop"] = trial.suggest_float("rate_drop", 0.0, 0.5)
#         params["skip_drop"] = trial.suggest_float("skip_drop", 0.0, 0.9)

#     pruning_callback = optuna.integration.XGBoostPruningCallback(
#         trial,
#         "validation_0-logloss"
#     )

#     model = xgb.XGBClassifier(**params,
#                               early_stopping_rounds=100,
#                               callbacks=[pruning_callback])

#     model.fit(
#         X_train,
#         y_train,
#         eval_set=[(X_val, y_val)],
#         verbose=False)

#     probs = model.predict_proba(X_val)[:, 1]
#     pred_label = (probs >= 0.5).astype(int)

#     return accuracy_score(y_val, pred_label)

# study = optuna.create_study(
#     direction="maximize",
#     pruner=optuna.pruners.MedianPruner(n_warmup_steps=10)
# )
# study.optimize(objective, n_trials=300)

In [None]:
# best_params = study.best_trial.params
# print(best_params)
# final_model = xgb.XGBClassifier(**best_params)

# final_model.fit(X_train, y_train)

In [None]:
# preds = final_model.predict(X_val)
# pred_label = (preds >= 0.5).astype(int)
# acc = accuracy_score(y_val, pred_label)
# print("Validation accuracy:", acc)

# # Make predictions on the test data
# print("Generating predictions on the test set...")
# test_predictions = final_model.predict(X_test)
# pred_label = (test_predictions >= 0.5).astype(int)

# # Create the submission DataFrame
# submission_df = pd.DataFrame({
#     'battle_id': test_df['battle_id'],
#     'player_won': pred_label
# })

# # Save the DataFrame to a .csv file
# submission_df.to_csv('submission.csv', index=False)

# print("\n'submission.csv' file created successfully!")
# display(submission_df.head())

#### Catboost with Hyperparameter Optimization

In [None]:
# def objective(trial):
#     params = {
#         "loss_function": "Logloss",
#         "eval_metric": "Accuracy",
#         "verbose": False,
#         "random_state": 42,

#         # Hyperparameters to tune
#         "iterations": trial.suggest_int("iterations", 100, 1000),
#         "depth": trial.suggest_int("depth", 3, 10),
#         "learning_rate": trial.suggest_float("learning_rate", 1e-4, 0.3, log=True),
#         "l2_leaf_reg": trial.suggest_float("l2_leaf_reg", 1e-3, 10.0, log=True),
#         "bagging_temperature": trial.suggest_float("bagging_temperature", 0.0, 1.0),
#         "border_count": trial.suggest_int("border_count", 32, 255),
#     }

#     model = CatBoostClassifier(**params)

#     model.fit(
#         X_train, y_train,
#         eval_set=(X_val, y_val),
#         early_stopping_rounds=50,
#         verbose=False
#     )

#     probs = model.predict_proba(X_val)[:, 1]         # probabilities
#     preds = (probs >= 0.5).astype(int)               # convert to 0/1

#     acc = accuracy_score(y_val, preds)
#     return acc


In [None]:
# study = optuna.create_study(direction="maximize")
# study.optimize(objective, n_trials=100)
# print("Best accuracy:", study.best_value)
# print("Best params:", study.best_params)


In [None]:
# best_params = study.best_params

# best_params["loss_function"] = "Logloss"
# best_params["eval_metric"] = "Accuracy"
# best_params["verbose"] = False

# final_model = CatBoostClassifier(**best_params)

# final_model.fit(
#     X_train, y_train,
#     eval_set=(X_val, y_val),
#     early_stopping_rounds=50,
#     verbose=False
# )


In [None]:
# probs = final_model.predict_proba(X_val)[:, 1]
# preds = (probs >= 0.5).astype(int)

# accuracy = accuracy_score(y_val, preds)
# print("Final Model Accuracy:", accuracy)



In [None]:
# Make predictions on the test data
# print("Generating predictions on the test set...")
# test_probs = final_model.predict_proba(X_test)[:, 1]
# pred_label = (test_probs >= 0.5).astype(int)

# # Create the submission DataFrame
# submission_df = pd.DataFrame({
#     'battle_id': test_df['battle_id'],
#     'player_won': pred_label
# })

# # Save the DataFrame to a .csv file
# submission_df.to_csv('submission.csv', index=False)

# print("\n'submission.csv' file created successfully!")
# display(submission_df.head())