## Loading and Inspecting Data

In [2]:
import json
import pandas as pd
import numpy as np
import os, sys
import yaml
PROJECT_ROOT = os.path.abspath(os.path.join(os.getcwd(), ".."))
if PROJECT_ROOT not in sys.path:
    sys.path.append(PROJECT_ROOT)

import src.data_management as my_dm
seed=456

In [3]:


with open("../config.yaml", "r") as f:
    config = yaml.safe_load(f)

train_file_path = config["data"]["input_train_path"]
test_file_path = config["data"]["input_test_path"]

train_data = []

print(f"Loading data from '{train_file_path}'...")
try:
    with open(train_file_path, 'r') as f:
        for line in f:
            # json.loads() parses one line (one JSON object) into a Python dictionary
            train_data.append(json.loads(line))

    print(f"Successfully loaded {len(train_data)} battles.")

except FileNotFoundError:
    print(f"ERROR: Could not find the training file at '{train_file_path}'.")
    print("Please make sure you have added the competition data to this notebook.")

Loading data from '../data/train/raw/train.jsonl'...
Successfully loaded 9999 battles.


This code loads the training data from a JSONL (JSON Lines) file. First, it reads the configuration file to get the file paths for both training and test datasets. Then it reads the training file line by line, where each line contains a JSON object representing a single battle. The json.loads() function parses each line into a Python dictionary, which is then appended to the train_data list. Error handling is included to catch cases where the file might not exist at the specified path.

In [None]:
def create_simple_features(data: list[dict]) -> pd.DataFrame:
    """
    A very basic feature extraction function.
    It only uses the aggregated base stats of the player's team and opponent's lead.
    """
    feature_list = []
    for battle in data:
        features = {}
        features["battle_id"] = battle.get("battle_id", -1)
        if battle.get('player_won') is not None:
            features['player_won'] = int(battle['player_won'])
        # --- Player 1 Team Features ---

        p1_team = battle.get('p1_team_details', [])
        if p1_team:
            #"""
            stats = ['base_hp', 'base_atk', 'base_def', 'base_spa', 'base_spd', 'base_spe']
            for stat in stats:               ### This helps the model to have a better idea abt the time instead of having only mean
                values = [p.get(stat, 0) for p in p1_team]
                features[f'p1_mean_{stat}'] = np.mean(values)
                #features[f'p1_min_{stat}'] = np.min(values)
                #features[f'p1_max_{stat}'] = np.max(values)
                features[f'p1_std_{stat}'] = np.std(values)
            '''
            features['p1_mean_hp'] = np.mean([p.get('base_hp', 0) for p in p1_team])
            features['p1_mean_spe'] = np.mean([p.get('base_spe', 0) for p in p1_team])
            features['p1_mean_atk'] = np.mean([p.get('base_atk', 0) for p in p1_team])
            features['p1_mean_def'] = np.mean([p.get('base_def', 0) for p in p1_team])
            features['p1_mean_spa'] = np.mean([p.get('base_spa', 0) for p in p1_team])
            features['p1_mean_spd'] = np.mean([p.get('base_spd', 0) for p in p1_team])
            '''
            #features['p1_mean_stats'] = np.mean([features['p1_mean_base_hp'], features['p1_mean_base_spe'], features['p1_mean_base_atk'], features['p1_mean_base_def'], features['p1_mean_base_spa'], features['p1_mean_base_spd']])
            
            ### We can also build derivated feature like how much is off/def our team
            # team stats
            base_atk = features['p1_mean_base_atk']
            base_spa = features['p1_mean_base_spa']
            base_def = features['p1_mean_base_def']
            base_spd = features['p1_mean_base_spd']
            base_spe = features['p1_mean_base_spe']
            base_hp  = features['p1_mean_base_hp']

            ## constructing new features
            offense = base_atk + base_spa
            defense = base_def + base_spd
            #features['p1_offense_mean']    = offense
            #features['p1_defense_mean']    = defense
            features['p1_atk_def_ratio']   = offense / (defense + 1e-6)
            # average per-Pokémon total base stats
            p1_totals = [sum(p.get(s, 0) for s in stats) for p in p1_team]
            features['p1_total_base_power'] = float(np.mean(p1_totals))
            features['p1_stat_variety']     = float(np.std(p1_totals))
            features['p1_style_index']      = offense / (offense + defense + 1e-6)
            features['p1_hp_ratio']         = base_hp / (offense + defense + base_spe + 1e-6)
            # fastest member speed (robust comparator vs P2 lead)
            features['p1_max_speed']        = float(np.max([p.get('base_spe', 0) for p in p1_team]))
        
    
            """
            """
            '''
###########################################AGGIUNTA####################################################
          # Estrazione tipi
            type_counts = {t: 0 for t in all_types}
            for p in p1_team:
                for t in p.get('types', []):
                    type_counts[t] += 1
            team_size = len(p1_team)
            for t in all_types:
                features[f'p1_type_{t}'] = type_counts[t] / team_size if team_size > 0 else 0
##########################################AGGIUNTA#####################################################
            '''
            """
        # --- Player 2 Lead Features ---
        p2_lead = battle.get('p2_lead_details')
        
        if p2_lead:
            # Player 2's lead Pokémon's stats
            features['p2_lead_hp'] = p2_lead.get('base_hp', 0)
            features['p2_lead_spe'] = p2_lead.get('base_spe', 0)
            features['p2_lead_atk'] = p2_lead.get('base_atk', 0)
            features['p2_lead_def'] = p2_lead.get('base_def', 0)
            features['p2_lead_spd'] = p2_lead.get('base_spd', 0)
            features['p2_lead_spa'] = p2_lead.get('base_spa', 0)
            ## types of p2 lead
            for t in all_types:
                features[f'p2_lead_type_{t}'] = 0.0
            for t in p2_lead.get('types', []):
                if t in all_types:
                    features[f'p2_lead_type_{t}'] = 1.0
               
            """
        ## Extracting battle features
        timeline = battle.get("battle_timeline", [])
        if len(timeline) > 0:
            #"""
            # Average HP percentage for both players
            p1_hp = [turn["p1_pokemon_state"].get("hp_pct", np.nan) for turn in timeline]
            p2_hp = [turn["p2_pokemon_state"].get("hp_pct", np.nan) for turn in timeline]

            features["p1_mean_hp_pct"] = np.nanmean(p1_hp)
            features["p2_mean_hp_pct"] = np.nanmean(p2_hp)
            features["p1_final_hp"] = p1_hp[-1]
            features["p2_final_hp"] = p2_hp[-1]
            features["p1_total_damage"] = np.nansum(np.maximum(0, np.diff(p1_hp)))
            features["p2_total_damage"] = np.nansum(np.maximum(0, np.diff(p2_hp)))

            # Count total moves used
            p1_moves = [turn.get("p1_move_details", {}).get("name") for turn in timeline if turn.get("p1_move_details")]
            p2_moves = [turn.get("p2_move_details", {}).get("name") for turn in timeline if turn.get("p2_move_details")]
            p1_totmoves= features["p1_total_moves"] = len(p1_moves)
            p2_totmoves= features["p2_total_moves"] = len(p2_moves)
            features["total_moves_difference"] = p2_totmoves-p1_totmoves

            # Count unique move types used
            p1_move_types = [turn["p1_move_details"].get("type") for turn in timeline if turn.get("p1_move_details")]
            p2_move_types = [turn["p2_move_details"].get("type") for turn in timeline if turn.get("p2_move_details")]
            p1_unique= features["p1_unique_move_types"] = len(set(p1_move_types))
            p2_unique= features["p2_unique_move_types"] = len(set(p2_move_types))
            features["unique_moves_difference"]=p1_unique-p2_unique
            #"""

            p1_pkmns =  set([turn["p1_pokemon_state"]["name"] for turn in timeline])
            p2_pkmns = set([turn["p2_pokemon_state"]["name"] for turn in timeline])

            ### status and effects features
            total_statuses = ['slp', 'fnt', 'tox', 'psn', 'brn', 'frz', 'par', 'nostatus']
            p1_status = [turn["p1_pokemon_state"].get("status") for turn in timeline]
            p2_status = [turn["p2_pokemon_state"].get("status") for turn in timeline]

            total_effects = ['disable', 'firespin', 'confusion', 'substitute', 'wrap', 'clamp', 'typechange', 'reflect', 'noeffect']
            p1_effects = [turn["p1_pokemon_state"].get("effects") for turn in timeline]
            p1_effects = [effect for effects in p1_effects for effect in effects]
            p2_effects = [turn["p2_pokemon_state"].get("effects") for turn in timeline]
            p2_effects = [effect for effects in p2_effects for effect in effects]

            ## general status features

            # Count probably critical moves
            
            p1_moves_used = [t["p1_move_details"]["name"] for t in timeline if t.get("p1_move_details")]
            p2_moves_used = [t["p2_move_details"]["name"] for t in timeline if t.get("p2_move_details")]
            high_crit = {"Crabhammer", "Karate Chop", "Razor Leaf", "Slash", "crabhammer", "karate chop", "razor leaf", "slash"}

            high1 = features["p1_highcrit_moves_used"] = sum(m in high_crit for m in p1_moves_used)
            high2 = features["p2_highcrit_moves_used"] = sum(m in high_crit for m in p2_moves_used)
            features["highcrit_difference"]=high1-high2

            # Count total statuses inflicted
            
            
            changeofstatus_1= features["p1_status_changes"] = sum(1 for s in p1_status if s not in ["nostatus", "noeffect", None])
            changeofstatus_2= features["p2_status_changes"] = sum(1 for s in p2_status if s not in ["nostatus", "noeffect", None])
            features["status_changes_difference"]= changeofstatus_1-changeofstatus_2
            
            # Count single status count 

            dict_status_p1 = {status : 0 for status in total_statuses}
            dict_status_p2 = {status : 0 for status in total_statuses}
            for turn in timeline:
                if turn["p1_pokemon_state"].get("status"):
                    turn_status = turn["p1_pokemon_state"].get("status")
                    dict_status_p1[turn_status] += 1
                if turn["p2_pokemon_state"].get("status"):
                    turn_status = turn["p2_pokemon_state"].get("status")
                    dict_status_p2[turn_status] += 1
            
            for status in total_statuses:
                features[f"p1_{status}_count"] = dict_status_p1.get(status, 0)
                features[f"p2_{status}_count"] = dict_status_p2.get(status, 0)
                

            ## fnt features
            
            # Difference in fnt pkmn

            features["p2-p1_fnt_pokemon_number"] = features["p1_fnt_count"] - features["p2_fnt_count"]

            # fnt pkmn over total pokemon

            #features["p1_fnt_over_total_pkmn"] = features["p1_fnt_count"]/6
            #features["p2_fnt_over_total_pkmn"] = features["p2_fnt_count"]/6
            '''
            ## Pokémon blocked features

            blocking_statuses = ['slp', 'frz']

            # blocked pokémon (sleep or freeze)
            features["p1_blocked_count"] = sum(dict_status_p1[s] for s in blocking_statuses)
            features["p2_blocked_count"] = sum(dict_status_p2[s] for s in blocking_statuses)

            # difference
            features["p2-p1_blocked_status_diff"] = (
                features["p2_blocked_count"] - features["p1_blocked_count"]
            )

            # normalized over total team size (6)
            features["p1_blocked_over_total_pkmn"] = features["p1_blocked_count"] / 6
            features["p2_blocked_over_total_pkmn"] = features["p2_blocked_count"] / 6
            '''


            ## effects features

            # Count turns with effects like "reflect", "light screen", etc.

            dict_effects_p1 = {effect : 0 for effect in total_effects}
            dict_effects_p2 = {effect : 0 for effect in total_effects}
            for turn in timeline:
                if turn["p1_pokemon_state"].get("effects"):
                    turn_effects_p1 = turn["p1_pokemon_state"].get("effects")
                    for effect in turn_effects_p1:
                        dict_effects_p1[effect] += 1
                if turn["p2_pokemon_state"].get("effects"):
                    turn_effects_p2 = turn["p2_pokemon_state"].get("effects")
                    for effect in turn_effects_p2:
                        dict_effects_p2[effect] += 1            
            for effect in total_effects:
                features[f"p1_{effect}_count"] = dict_effects_p1.get(effect, 0)
                features[f"p2_{effect}_count"] = dict_effects_p2.get(effect, 0)

            # Boosts (attack, defense, etc.)
            #boost_keys = ["atk", "def", "spa", "spd", "spe"]
            #for key in boost_keys:
            #    p1_boosts = [turn["p1_pokemon_state"]["boosts"].get(key, 0) for turn in timeline]
            #    p2_boosts = [turn["p2_pokemon_state"]["boosts"].get(key, 0) for turn in timeline]
            #    features[f"p1_mean_boost_{key}"] = np.mean(p1_boosts)
            #    features[f"p2_mean_boost_{key}"] = np.mean(p2_boosts)

            # Number of time the player switched pokemon
            features["p1_switch_number"] = sum([1 for turn in timeline if turn["p1_move_details"] == None])
            features["p2_switch_number"] = sum([1 for turn in timeline if turn["p2_move_details"] == None])
            features["switchnumber_difference"] = features["p1_switch_number"]-features["p2_switch_number"]
            # Number of SPECIAL or PHYSICAL moves and Number of STATUS moves of p1 and p2
            features["p1_attack_moves"] = sum([1 for turn in timeline if turn.get("p1_move_details") and turn["p1_move_details"]["category"] in ["SPECIAL", "PHYSICAL"]])
            features["p2_attack_moves"] = sum([1 for turn in timeline if turn.get("p2_move_details") and turn["p2_move_details"]["category"] in ["SPECIAL", "PHYSICAL"]])
            features["p1_status_moves"] = sum([1 for turn in timeline if turn.get("p1_move_details") and turn["p1_move_details"]["category"] not in ["SPECIAL", "PHYSICAL"]])
            features["p2_status_moves"] = sum([1 for turn in timeline if turn.get("p2_move_details") and turn["p2_move_details"]["category"] not in ["SPECIAL", "PHYSICAL"]])

            # Number of same pokemon type moves (stab)
            features["p1_same_type_moves_number"] = sum([1 for turn in timeline if turn.get("p1_move_details") and turn["p1_move_details"]["type"] in my_dm.pokemon_type(turn["p1_pokemon_state"]["name"]) and turn["p1_move_details"]["category"] in ["SPECIAL", "PHYSICAL"]])
            features["p2_same_type_moves_number"] = sum([1 for turn in timeline if turn.get("p2_move_details") and turn["p2_move_details"]["type"] in my_dm.pokemon_type(turn["p2_pokemon_state"]["name"]) and turn["p2_move_details"]["category"] in ["SPECIAL", "PHYSICAL"]])

            # Average of multiplier effectivness
            def safe_nanmean(lst):
                return 0 if len(lst) == 0 else np.nanmean(lst)
            features["p1_effectivness_avg"] = safe_nanmean([my_dm.move_effectiveness(turn["p1_move_details"]["type"], turn["p2_pokemon_state"]["name"]) for turn in timeline if turn.get("p1_move_details") and turn["p1_move_details"]["category"] in ["SPECIAL", "PHYSICAL"]])
            features["p2_effectivness_avg"] = safe_nanmean([my_dm.move_effectiveness(turn["p2_move_details"]["type"], turn["p1_pokemon_state"]["name"]) for turn in timeline if turn.get("p2_move_details") and turn["p2_move_details"]["category"] in ["SPECIAL", "PHYSICAL"]])

            # Number of supereffective moves
            features["p1_supereffective_moves_count"] = sum([my_dm.is_supereffective(my_dm.move_effectiveness(turn["p1_move_details"]["type"], turn["p2_pokemon_state"]["name"])) for turn in timeline if turn.get("p1_move_details") and turn["p1_move_details"]["category"] in ["SPECIAL", "PHYSICAL"]])
            features["p2_supereffective_moves_count"] = sum([my_dm.is_supereffective(my_dm.move_effectiveness(turn["p2_move_details"]["type"], turn["p1_pokemon_state"]["name"])) for turn in timeline if turn.get("p2_move_details") and turn["p2_move_details"]["category"] in ["SPECIAL", "PHYSICAL"]])
            
            # Sum of priority moves
            features["p1_priority_moves"] = sum([1 for turn in timeline if turn.get("p1_move_details") and turn["p1_move_details"].get("priority")])
            features["p2_priority_moves"] = sum([1 for turn in timeline if turn.get("p2_move_details") and turn["p2_move_details"].get("priority")])

            # Number of supereffective pokemon of p1 in respect to p2 

            features["p1_supereffective_density"] = sum(1 for pkmn1 in p1_pkmns for pkmn2 in p2_pkmns if my_dm.is_supereffective(my_dm.pkmn_effectiveness(pkmn1, pkmn2)))/(len(p1_pkmns)*len(p2_pkmns))
            features["p2_supereffective_density"] = sum(1 for pkmn1 in p1_pkmns for pkmn2 in p2_pkmns if my_dm.is_supereffective(my_dm.pkmn_effectiveness(pkmn2, pkmn1)))/(len(p1_pkmns)*len(p2_pkmns))
            features["p1-p2_se_densities"] = features["p2_supereffective_density"] - features["p1_supereffective_density"]

            # P1 pkmn having at least a supereffective target in p2 team
            features["p1_attackers_share"] = sum(any(my_dm.is_supereffective(my_dm.pkmn_effectiveness(pkmn1, pkmn2)) for pkmn2 in p2_pkmns) for pkmn1 in p1_pkmns) / len(p1_pkmns)

            # P1 pkmn that are target of at least one pkmn in team 2
            features["p1_defensive_share"] = sum(any(my_dm.is_supereffective(my_dm.pkmn_effectiveness(pkmn2, pkmn1)) for pkmn2 in p2_pkmns) for pkmn1 in p1_pkmns) / len(p2_pkmns)

            # mean of hp percentage for p1 team and p2 team on last informations
            p1_hp_pctg = {p1_pkmn : None for p1_pkmn in p1_pkmns}
            p2_hp_pctg = {p2_pkmn : None for p2_pkmn in p2_pkmns}
            counter1 = 0
            counter2 = 0
            for turn in timeline:
                p1_hp_pctg.update({turn["p1_pokemon_state"]["name"] : turn["p1_pokemon_state"]["hp_pct"]})
                p2_hp_pctg.update({turn["p2_pokemon_state"]["name"] : turn["p2_pokemon_state"]["hp_pct"]})

                counter1 += turn["p1_pokemon_state"]["hp_pct"] > turn["p2_pokemon_state"]["hp_pct"]
                counter2 += turn["p1_pokemon_state"]["hp_pct"] < turn["p2_pokemon_state"]["hp_pct"]
            features["p1_remain_health_avg"] = sum(p1_hp_pctg.values())/len(p1_pkmns)
            features["p2_remain_health_avg"] = sum(p2_hp_pctg.values())/len(p2_pkmns)
            features["health_difference"] = features["p2_remain_health_avg"] - features["p1_remain_health_avg"]
            
            features["health_advantage_p1"]= counter1
            features["health_advantage_p2"]= counter2
            features["health_advantage_difference"]= counter1 - counter2
            hp_advantage_streak = sum(p1 > p2 for p1, p2 in zip(p1_hp_pctg, p2_hp_pctg))
            features["p1_hp_advantage_final_ratio"] = hp_advantage_streak 

            features["remaining_advantage"] = (
                features["p1_remain_health_avg"] * (1-features["p1_fnt_count"])
                - features["p2_remain_health_avg"] * (1-features["p2_fnt_count"])
            )

        feature_list.append(features)
        
    return pd.DataFrame(feature_list).fillna(0)



This function create_simple_features() performs the extraction of numerical features from the raw Pokémon battle data.
For each battle in the dataset, it builds a dictionary of features that describe statistics of the player’s team (Player 1), the opponent’s lead Pokémon (Player 2 lead), and the battle progression (timeline).
The features include:

Aggregated base stats of the player’s team (p1): mean, minimum, maximum, and standard deviation for each stat (HP, Attack, Defense, etc.).

Derived features: indices such as average total power, attack/defense ratio, offensive/defensive style, and the team’s maximum speed.

Opponent lead stats (p2): base values and one-hot encoding of Pokémon types.

Temporal characteristics of the battle: variations in average HP, total damage taken, and counts/types of moves used.

Status and field effects: counts of conditions (e.g., “burned”, “poisoned”, “asleep”) and effects (e.g., “reflect”, “wrap”) for both players.

Tactical metrics: number of switches, use of priority moves, super-effective moves, and same-type moves (STAB).

Advantage indicators: density of favorable matchups, average remaining HP, health differences, and count of turns with HP advantage.

Finally, the function returns a pandas DataFrame containing all extracted features, replacing any missing values with zero (fillna(0)).

In [22]:
all_types = set()
for battle in train_data:
    for p in battle['p1_team_details']:
        all_types.update(p['types'])

all_types = sorted(list(all_types))
print(all_types)



def extract_type_features(team):
    type_counts = {t: 0 for t in all_types}
    for p in team:
        for t in p['types']:
            type_counts[t] += 1
    
    team_size = len(team)
    for t in type_counts:
        type_counts[t] /= team_size
    return type_counts

team = train_data[0]['p1_team_details']
type_features = extract_type_features(team)
print(type_features)

['dragon', 'electric', 'fire', 'flying', 'ghost', 'grass', 'ground', 'ice', 'normal', 'notype', 'poison', 'psychic', 'rock', 'water']
{'dragon': 0.0, 'electric': 0.0, 'fire': 0.0, 'flying': 0.0, 'ghost': 0.0, 'grass': 0.16666666666666666, 'ground': 0.0, 'ice': 0.0, 'normal': 0.5, 'notype': 0.6666666666666666, 'poison': 0.0, 'psychic': 0.5, 'rock': 0.0, 'water': 0.16666666666666666}


This code extracts and processes Pokemon type features from the training data. First, it iterates through all battles to collect every unique Pokemon type that appears in the dataset, storing them in a sorted list. The extract_type_features() function then calculates the normalized distribution of types within a team by counting how many Pokemon of each type are present and dividing by the team size. This normalization ensures that the features are comparable across teams of different sizes. The function returns a dictionary where keys are Pokemon types and values are their relative frequencies within the team (ranging from 0 to 1).

In [23]:
i = 0
for battle in train_data:
    a = [my_dm.move_effectiveness(turn["p1_move_details"]["type"], turn["p2_pokemon_state"]["name"]) for turn in battle["battle_timeline"] if turn.get("p1_move_details") and (turn["p1_move_details"]["category"] in ["SPECIAL", "PHYSICAL"])]
    a = [my_dm.move_effectiveness(turn["p2_move_details"]["type"], turn["p1_pokemon_state"]["name"]) for turn in battle["battle_timeline"] if turn.get("p2_move_details") and turn["p2_move_details"]["category"] in ["SPECIAL", "PHYSICAL"]]
    if len(a) == 0:
        print(i, ",")
    i += 1

198 ,
2770 ,
3447 ,
3453 ,
3769 ,
6375 ,
7642 ,
7775 ,


This code analyzes move effectiveness throughout the battle timeline for both players. For each battle, it extracts all offensive moves (SPECIAL or PHYSICAL category) used by both players and calculates their type effectiveness against the opponent's active Pokemon using the my_dm.move_effectiveness() function. The code checks for battles where player 2 has no offensive moves recorded in the timeline (len(a) == 0) and prints their indices. This helps identify potential data quality issues or battles that ended without player 2 making any attacking moves. Note that the variable 'a' is overwritten in the second list comprehension, so only player 2's move effectiveness is ultimately checked.

In [24]:
def low_variance_features(df, threshold=0.999):
    to_drop = []
    for col in df.columns:
        top_freq = df[col].value_counts(normalize=True, dropna=False).iloc[0]
        if top_freq >= threshold:
            to_drop.append(col)
    return to_drop

In [25]:
import dcor
def high_nonlinear_corr(df, threshold=0.99):

    df_num = df.select_dtypes(include=[np.number])
    cols = df_num.columns
    n = len(cols)
    to_drop = set()

    for i in range(n):
        for j in range(i + 1, n):
            x = df_num.iloc[:, i].dropna()
            y = df_num.iloc[:, j].dropna()
            common_idx = x.index.intersection(y.index)
            x = x.loc[common_idx]
            y = y.loc[common_idx]

            if len(x) > 2:
                dcor_value = dcor.distance_correlation(x, y)
                if dcor_value > threshold:
                    to_drop.add(cols[j])

    return to_drop

This function identifies low-variance features in a dataframe that have minimal predictive value. It iterates through each column and calculates the frequency of the most common value (including missing values). If the most frequent value appears in 99% or more of the rows (by default), the column is flagged for removal. Such features provide little information for model training since they are nearly constant across all samples. The function returns a list of column names that meet this low-variance criterion and should be considered for removal during feature selection.

In [26]:

# Create feature DataFrames for both training and test sets
print("Processing training data...")
train_df = create_simple_features(train_data)
train_df = train_df.sample(frac=1, random_state=seed).reset_index(drop=True)

### removing correlations
corr_matrix = train_df.corr().abs()
upper = corr_matrix.where(
    np.triu(np.ones(corr_matrix.shape), k=1).astype(bool)
)
threshold = 0.9
to_drop = [column for column in upper.columns if any(upper[column] > threshold) and column!='player_won']

train_df = train_df.drop(columns=to_drop)

# removing low variance


to_drop2 = low_variance_features(train_df, threshold=0.99)
train_df = train_df.drop(columns=to_drop2)

print("feature to remove:", to_drop, to_drop2)
## exporting in csv
train_df.to_csv(config["data"]["processed_train_path"], index=False)

print("\nProcessing test data...")
test_data = []
with open(test_file_path, 'r') as f:
    for line in f:
        test_data.append(json.loads(line))
test_df = create_simple_features(test_data)
test_df =test_df.sample(frac=1, random_state=seed).reset_index(drop=True)
## removing correlations in test
test_df = test_df.drop(columns=to_drop)
test_df = test_df.drop(columns=to_drop2)

test_df.to_csv(config["data"]["processed_test_path"], index=False)
print("Done")

Processing training data...
feature to remove: ['p1_mean_base_spd', 'p1_std_base_spd', 'p1_style_index', 'p1_hp_ratio', 'p1_nostatus_count', 'p2_nostatus_count', 'p2-p1_fnt_pokemon_number', 'p2_typechange_count', 'p1_noeffect_count', 'p2_noeffect_count', 'p1_switch_number', 'p2_switch_number', 'switchnumber_differebnce', 'health_advantage_difference'] ['p1_psn_count', 'p2_psn_count', 'p1_brn_count', 'p2_brn_count', 'p1_disable_count', 'p2_disable_count', 'p1_firespin_count', 'p2_firespin_count', 'p1_wrap_count', 'p2_wrap_count', 'p1_clamp_count', 'p2_clamp_count', 'p1_typechange_count']

Processing test data...
Done


This code performs feature engineering and preprocessing on both training and test datasets. It starts by creating features from the raw training data and shuffling the rows with a fixed random seed for reproducibility. Two feature selection steps are then applied: first, highly correlated features are identified by computing the correlation matrix and removing features with correlation above 0.9 (excluding the target variable 'player_won'); second, low-variance features are removed using a 99% threshold. The processed training data is saved to CSV. The same preprocessing pipeline is then applied to the test data, ensuring that the exact same features identified for removal in training are also dropped from the test set to maintain consistency. Both datasets are shuffled with the same random seed before being saved to their respective output paths specified in the configuration file.

In [28]:
corrs = train_df.corrwith(train_df["player_won"])

# print features with |correlation| > 0.5
for feature, corr_value in corrs.items():
    if abs(corr_value) >0.2:
        print(f"{feature}: {corr_value:.3f}")

battle_id: -0.866
player_won: 1.000
p1_mean_hp_pct: 0.378
p2_mean_hp_pct: -0.227
p1_final_hp: 0.201
p1_total_damage: -0.416
p2_total_damage: 0.321
p1_total_moves: 0.366
p2_total_moves: -0.338
total_moves_difference: -0.457
p1_status_changes: -0.365
p2_status_changes: 0.351
status_changes_difference: -0.468
p1_slp_count: -0.319
p2_slp_count: 0.346
p1_fnt_count: -0.463
p2_frz_count: 0.208
p1_attack_moves: 0.261
p2_attack_moves: -0.304
p2_same_type_moves_number: -0.223
p1_remain_health_avg: 0.446
p2_remain_health_avg: -0.283
health_difference: -0.559
health_advantage_p1: 0.333
health_advantage_p2: -0.333
remaining_advantage: 0.354


This code calculates and displays the correlation between each feature and the target variable 'player_won'. The corrwith() method computes the Pearson correlation coefficient between every column in the dataframe and the target column. Features with an absolute correlation value greater than 0.2 are printed along with their correlation coefficient rounded to three decimal places. This analysis helps identify which features have the strongest linear relationships with the outcome, providing insights into which variables might be most important for predicting battle victories.