In [2]:
import pandas as pd
import numpy as np
from typing import Optional, List, Callable, Any, Union, Dict
from itertools import product
from statistics import mean
from pathlib import Path
import gzip
import os
import matplotlib.pyplot as plt

### Read datasets
Use the gzip function is files ar gzipped

In [3]:
def read_ds_gzip(path: Optional[Path]=None, ds: str = "TRAIN") -> pd.DataFrame:
    """Args:
        path (Optional[Path], optional): the path to read the dataset file. Defaults to /kaggle/input/the-insa-starcraft-2-player-prediction-challenge/{ds}.CSV.gz.
        ds (str, optional): the part to read (TRAIN or TEST), to use when path is None. Defaults to "TRAIN".

    Returns:
        pd.DataFrame:
    """
    with gzip.open(f'/kaggle/input/the-insa-starcraft-2-player-prediction-challenge/{ds}.CSV.gz' if path is None else path) as f:
        max_actions = max(( len( str(c).split(",")) for c in f.readlines() ))
        f.seek(0)
        _names = ["battleneturl", "played_race"] if "TRAIN" in ds else ["played_race"]
        _names.extend(range(max_actions - len(_names)))
        return pd.read_csv(f, names=_names, dtype= str)

def read_ds(path: Optional[Path]=None, ds: str = "TRAIN"):
    """Args:
        path (Optional[Path], optional): the path to read the dataset file. Defaults to /kaggle/input/the-insa-starcraft-2-player-prediction-challenge/{ds}.CSV.gz.
        ds (str, optional): the part to read (TRAIN or TEST), to use when path is None. Defaults to "TRAIN".

    Returns:
        pd.DataFrame:
    """
    with open(f'/kaggle/input/train-sc2-keystrokes/{ds}.CSV' if path is None else path) as f:
        max_actions = max(( len( str(c).split(",")) for c in f.readlines() ))
        f.seek(0)
        _names = ["battleneturl", "played_race"] if "TRAIN" in ds else ["played_race"]
        _names.extend(range(max_actions - len(_names)))
        return pd.read_csv(f, names=_names, dtype= str)

In [4]:
features_train = read_ds_gzip(Path("data/TRAIN.CSV.GZ")) # Replace with correct path 

# features_test = read_ds("TEST")
features_train.shape #, features_test.shape

In [5]:
features_train.head()

### Dependent Variable
Our dependent variable is a categorical string; we can convert it to categories codes (number) with pd.Categorical

pd.Categorical doesn't directly modify the battleneturl to a number, instead it adds a cat.codes attribute to it. We can create a little function to convert the dependent variable from string to its category ID:

In [6]:
def to_categories(df: pd.DataFrame, col: str="battleneturl") -> None:
    """Convert col of df to a categorical column"""
    df["battleneturl"] = pd.Categorical(df["battleneturl"])
    df[[col]] = df[[col]].apply(lambda x: x.cat.codes)

#### Removing outliers
YOUR IDEAS / APPROACHES HERE.



In [7]:
to_categories(features_train)

In [8]:
import matplotlib.pyplot as plt

nan_counts = features_train.isna().sum()
nan_counts.plot()
plt.xlabel('Columns')
plt.ylabel('Number of NaN values')
plt.title('Number of NaN values per column')
plt.show()

In [9]:
features_train['played_race'].value_counts().plot(kind='bar')
plt.xlabel('Played Race')
plt.ylabel('Frequency')
plt.title('Histogram of Played Race')
plt.show()

In [10]:

durations = []

# Iterate over each row in the dataframe
for _, row in features_train.iterrows():
    # Convert the row to a list (strings, possibly some NaN if missing)
    row_list = row.tolist()

    # Filter columns that start with "t" (e.g., "t10", "t120", etc.)
    time_tokens = [
        col for col in row_list if isinstance(col, str) and col.startswith("t")
    ]

    if time_tokens:
        # Get the last time token in the row
        last_time_token = time_tokens[-1]
        # Convert from e.g. "t135" -> integer 135
        duration = int(last_time_token[1:])
        durations.append(duration)
    else:
        # If no time tokens, we can append None or skip.
        durations.append(None)

# Store the durations in the dataframe as a new column for convenience
features_train["duration"] = durations

# Drop rows without a valid duration
features_train.dropna(subset=["duration"], inplace=True)

# Plot the distribution as a histogram
plt.figure(figsize=(8, 6))
features_train["duration"].plot(kind="hist", bins=20, edgecolor="black")
plt.title("Distribution of Game Durations")
plt.xlabel("Game Duration (seconds)")
plt.ylabel("Count")
plt.grid(True)
plt.show()


In [11]:
# Keep the central 96% (remove bottom and top 2%)
q02 = features_train['duration'].quantile(0.02)
q98 = features_train['duration'].quantile(0.98)

features_train_filtered = features_train[(features_train['duration'] >= q02) & (features_train['duration'] <= q98)]
    

# Plot the distribution as a histogram
plt.figure(figsize=(8, 6))
features_train_filtered['duration'].plot(kind='hist', bins=20, edgecolor='black')
plt.title('Distribution of Game Durations')
plt.xlabel('Game Duration (seconds)')
plt.ylabel('Count')
plt.grid(True)
plt.show()

In [12]:
import pandas as pd

# Display global statistics about the dataset
print("Total number of rows (games):", len(features_train_filtered))

# If you have a unique player identifier (e.g. 'battleneturl')
if "battleneturl" in features_train_filtered.columns:
    print("Number of unique players:", features_train_filtered["battleneturl"].nunique())

# If you have a race column (e.g. 'played_race')
if "played_race" in features_train_filtered.columns:
    print("Number of unique races:", features_train_filtered["played_race"].nunique(), "\n")
    print("=== Race Distribution ===\n", features_train_filtered["played_race"].value_counts(), "\n")

# Basic statistics for durations
if "duration" in features_train_filtered.columns:
    print("=== Duration Summary ===")
    print(features_train_filtered["duration"].agg(["min", "max", "mean", "std"]))

    # Example: Group by race to see duration stats per race
    if "played_race" in features_train_filtered.columns:
        print("\n=== Duration by Race ===")
        print(features_train_filtered.groupby("played_race")["duration"].agg(["min", "max", "mean", "std"]))


In [13]:
# drop duration column
features_train_filtered.drop(columns=['duration'], inplace=True)

In [14]:
# Make a deep copy to avoid the SettingWithCopyWarning
features_train_filtered = features_train_filtered.copy()

# Now safely do your assignments
action_cols = features_train_filtered.columns[2:]
features_train_filtered["actions"] = (
    features_train_filtered[action_cols]
    .apply(lambda row: [x for x in row if pd.notna(x)], axis=1)
)

features_train_filtered.drop(columns=action_cols, inplace=True)

### Custom features

In [15]:
def chunkify_actions(action_list):
    """
    Given a list of actions (e.g., ["Base", "s", "s", "t5", "hotkey30", "t10", ...]),
    produce a list of tuples (start_time, end_time, actions_in_chunk).
    If we see 't5', that means the chunk boundary is 5 seconds, etc.
    """
    chunks = []
    current_time = 0
    chunk_actions = []

    def parse_time_marker(a):
        # e.g. 't5' -> int("5")
        return int(a[1:])

    for a in action_list:
        if a.startswith("t"):
            # We hit a time boundary
            end_time = parse_time_marker(a)
            if chunk_actions:
                chunks.append((current_time, end_time, chunk_actions))
            # Move forward
            current_time = end_time
            chunk_actions = []
        else:
            # It's a normal action
            chunk_actions.append(a)

    # If leftover actions exist after the last tXX
    # We'll treat them as from current_time to current_time (0-length),
    # or you can define a “max game time” if known. For now, let's store them anyway.
    if chunk_actions:
        chunks.append((current_time, current_time, chunk_actions))

    return chunks


In [16]:
def compute_features_per_chunk(chunks):
    """
    Given a list of (start_t, end_t, actions_in_chunk),
    classify them into early/mid/late, count relevant actions,
    compute average in each phase.
    """
    # Phase boundaries
    EARLY_MAX = 360  # 6 minutes
    MID_MAX = 720  # 12 minutes

    # We'll accumulate total counts and total durations (in seconds) for each phase
    phases = ["early", "mid", "late"]

    base_keys = {
        "hk_created": 0,
        "hk_updated": 0,
        "hk_used":    0,
        "base":       0,
        "mineral":    0,
        "other":      0
    }

    phase_counts = {
        ph: {
            **{f"hk_{i}": 0 for i in range(10)},
            **base_keys
        }
        for ph in phases
    }

    phase_durations = {ph: 0 for ph in phases}

    def get_phase(t):
        if t < EARLY_MAX:
            return "early"
        elif t < MID_MAX:
            return "mid"
        else:
            return "late"

    for start_t, end_t, acts in chunks:
        # Simple approach: classify chunk by its start time (some people use midpoint)
        phase = get_phase(start_t)
        # Avoid zero durations => set it to at least 1 second
        duration = max(1, end_t - start_t)

        # Count actions in this chunk
        c_hk_pressed = {f"c_hk_{i}": 0 for i in range(10)}
        c_hk_created = 0
        c_hk_updated = 0
        c_hk_used = 0
        c_base = 0
        c_mineral = 0
        c_other = 0

        for a in acts:
            if a.startswith("hotkey"):
                key = int(a[-2])
                c_hk_pressed[f"c_hk_{key}"] += 1
                # check last char if it is "0","1","2"
                if a.endswith("0"):
                    c_hk_created += 1
                elif a.endswith("1"):
                    c_hk_updated += 1
                elif a.endswith("2"):
                    c_hk_used += 1
            elif a in ["sBase", "Base"]:
                c_base += 1
            elif a in ["sMineral", "SingleMineral"]:
                c_mineral += 1
            elif a == "s":
                c_other += 1
            else:
                # some other action not relevant here
                pass

        for i in range(10):
            phase_counts[phase][f"hk_{i}"] += c_hk_pressed[f"c_hk_{i}"]
        phase_counts[phase]["hk_created"] += c_hk_created
        phase_counts[phase]["hk_updated"] += c_hk_updated
        phase_counts[phase]["hk_used"] += c_hk_used
        phase_counts[phase]["base"] += c_base
        phase_counts[phase]["mineral"] += c_mineral
        phase_counts[phase]["other"] += c_other

        phase_durations[phase] += duration

    # Now compute the final average for each phase
    # e.g.  (count / total_duration_in_phase)
    feats = {}
    for phase in ["early", "mid", "late"]:
        dur = phase_durations[phase]
        pc = phase_counts[phase]
        if dur == 0:
            # no data for that phase
            for i in range(10):
                feats[f"{phase}_hotkey_{i}"] = 0
            feats[f"{phase}_hotkey_created"] = 0
            feats[f"{phase}_hotkey_updated"] = 0
            feats[f"{phase}_hotkey_used"] = 0
            feats[f"{phase}_base"] = 0
            feats[f"{phase}_mineral"] = 0
            feats[f"{phase}_other"] = 0
        else:
            for i in range(10):
                feats[f"{phase}_hotkey_{i}"] = (pc[f"hk_{i}"] / dur)
            feats[f"{phase}_hotkey_created"] = (pc["hk_created"] / dur)
            feats[f"{phase}_hotkey_updated"] = (pc["hk_updated"] / dur)
            feats[f"{phase}_hotkey_used"] = (pc["hk_used"] / dur)
            feats[f"{phase}_base"] = (pc["base"] / dur)
            feats[f"{phase}_mineral"] = (pc["mineral"] / dur)
            feats[f"{phase}_other"] = (pc["other"] / dur)

    return feats

In [17]:
def compute_all_features(row):
    """
    row is a single Pandas row with:
       row["actions"] = list of action strings
       row["battleneturl"], row["played_race"], etc. if needed
    Returns a dict of feature_name -> value
    """
    chunks = chunkify_actions(row)
    feats = compute_features_per_chunk(chunks)
    
    # You can also include the player's ID or race in the returned features if desired
    return feats

# Apply to entire DataFrame
features_df = features_train_filtered["actions"].apply(compute_all_features).apply(pd.Series)

print(features_df.head())

# Combine with the original df if you like
result_df = pd.concat([features_train_filtered, features_df], axis=1)
print(result_df.head())


In [18]:
features_df["battleneturl"] = features_train_filtered["battleneturl"]
features_df["race"] = features_train_filtered["played_race"]

In [20]:
print(features_df.head())
copy_df = features_df.copy()

### Getting features...

Building a mini framework to read our Dataframe and convert it to features.

Now we will create features out of the dataset.

FeaturesGetter iterates over an ActionsDataLoader (yield every actions between two 't[xx]') and apply a set of Feature contained in a FeaturePool. At the end, it gets metrics over the values registered by each features in the feature pool.

Defining lambdas to convert dataset to features
We create basic features, corresponding to the mean of each action played per timestamp plus the mean of all actions together

Now it's ready to be put into a function that'll get all the features from the initial dataframe and return a new dataframe containing only those features. FeaturesGetter gets one extra feature from that we created, which is max_time, corresponding to the "xx" of the last "txx" seen.

### Handling string
The race_played column can only take three values; instead of converting it to categorical as we did with our dependent variable, we will instead convert it to dummy variables: we one-hot encode each race. It will not add many columns to our dataframe (only three) but will allow the decision trees to split much faster on the race (on only one binary split).

In [24]:
def get_dummies(df: pd.DataFrame):
    """Converts textual columns to one-hot encoded vectors (one column per possible value)"""
    df = pd.get_dummies(df, columns=["race"])
    return df

Function preprocess creates a pipeline of all the function we just implemented: it create the features, converts the race to dummy variables and the dependent variable to category codes.

In [25]:
features_df = get_dummies(features_df)

In [27]:
features_df.head()

## ML !!!

In [28]:
import pandas as pd
import numpy as np

from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split, RandomizedSearchCV, GridSearchCV
from sklearn.metrics import accuracy_score

In [29]:

feature_cols = [c for c in features_df.columns if c not in ['battleneturl']]
X = features_df[feature_cols]
y = features_df['battleneturl'].values

# Split into train/test
X_train, X_test, y_train, y_test = train_test_split(
    X, y, 
    test_size=0.2, 
    random_state=42, 
    stratify=y
)

In [30]:
param_distributions = {
    'n_estimators': [100, 200, 300, 500, 700],
    'max_depth': [10, 15, 20, 25, None],
    'min_samples_split': [2, 4, 8, 10],
    'min_samples_leaf': [1, 2, 4],
    'max_features': ['sqrt', 'log2', 0.33, 0.5]
}


In [31]:
rf = RandomForestClassifier(random_state=42)

random_search = RandomizedSearchCV(
    estimator=rf,
    param_distributions=param_distributions,
    n_iter=30,            # how many random samples of params to try
    scoring='accuracy',   # or 'f1_macro', etc., depending on your metric
    cv=5,                 # 5-fold cross-validation
    verbose=1,
    n_jobs=-1,            # use all CPU cores
    random_state=42
)

random_search.fit(X_train, y_train)
print("Best Params:", random_search.best_params_)
print("Best CV Accuracy:", random_search.best_score_)


In [34]:
best_rf = random_search.best_estimator_
y_pred = best_rf.predict(X_test)
test_acc = accuracy_score(y_test, y_pred)
print(f"Random Forest Test Accuracy: {test_acc*100:.2f}%")

Random Forest Test Accuracy: 92.64%


In [39]:
# Save the model to disk
import joblib
import pickle
with open('random_forest_model.pkl', 'wb') as file:
    pickle.dump(best_rf, file)

# ML Amélioré

In [None]:
import pandas as pd
import numpy as np

# For model building
from sklearn.ensemble import RandomForestClassifier, VotingClassifier
from sklearn.model_selection import (
    train_test_split, 
    RandomizedSearchCV, 
    GridSearchCV
)
from sklearn.metrics import accuracy_score

# We'll show XGBoost; you can similarly do LightGBM
from xgboost import XGBClassifier

# Optional: confusion_matrix, classification_report, etc.
from sklearn.metrics import confusion_matrix, classification_report
import seaborn as sns
import matplotlib.pyplot as plt


In [None]:
feature_cols = [c for c in features_df.columns if c not in ['battleneturl']]
X = features_df[feature_cols]
y = features_df['battleneturl'].values

# Split into train/test
X_train, X_test, y_train, y_test = train_test_split(
    X, y, 
    test_size=0.2, 
    random_state=42, 
    stratify=y
)

In [None]:
# Parameter distributions for random search
param_distributions = {
    'n_estimators':      [100, 200, 300, 500, 700, 1000],
    'max_depth':         [10, 15, 20, 25, None],
    'min_samples_split': [2, 4, 8, 10],
    'min_samples_leaf':  [1, 2, 4],
    'max_features':      ['sqrt', 'log2', 0.33, 0.5]
}

rf = RandomForestClassifier(random_state=42)

random_search = RandomizedSearchCV(
    estimator=rf,
    param_distributions=param_distributions,
    n_iter=20,          # how many random parameter sets to try
    scoring='accuracy', # or f1_macro, etc. based on your goal
    cv=5,               # 5-fold cross-validation
    verbose=1,
    n_jobs=-1,          # use all CPU cores
    random_state=42
)

random_search.fit(X_train, y_train)

print("Best parameters (RandomizedSearch):", random_search.best_params_)
print("Best CV accuracy (RandomizedSearch):", random_search.best_score_)


In [None]:
best_rf_random = random_search.best_estimator_

In [None]:
# Suppose these were your best params
best_params = random_search.best_params_

param_grid = {
    'n_estimators':      [best_params['n_estimators'] - 100, best_params['n_estimators'], best_params['n_estimators'] + 100],
    'max_depth':         [best_params['max_depth'] - 5, best_params['max_depth'], None] if best_params['max_depth'] else [None, 20, 25],
    'min_samples_split': [best_params['min_samples_split'], best_params['min_samples_split'] * 2],
    'min_samples_leaf':  [best_params['min_samples_leaf'], best_params['min_samples_leaf'] + 1],
    'max_features':      [best_params['max_features']]
}

# Make sure no negative values or nonsensical combos slip in
cleaned_param_grid = {}
for k, v in param_grid.items():
    cleaned_param_grid[k] = sorted(set([x for x in v if (x is not None and x > 0) or x is None]))

grid_rf = RandomForestClassifier(random_state=42)

grid_search = GridSearchCV(
    estimator=grid_rf,
    param_grid=cleaned_param_grid,
    scoring='accuracy',
    cv=5,
    verbose=1,
    n_jobs=-1
)

grid_search.fit(X_train, y_train)

print("Best parameters (GridSearch):", grid_search.best_params_)
print("Best CV accuracy (GridSearch):", grid_search.best_score_)


In [None]:
best_rf = grid_search.best_estimator_

In [None]:
xgb = XGBClassifier(random_state=42, use_label_encoder=False, eval_metric="logloss")

xgb_param_grid = {
    'n_estimators': [100, 200, 300],
    'max_depth':    [6, 10, 12],
    'learning_rate':[0.01, 0.1, 0.2],
    'subsample':    [0.8, 1.0],
    'colsample_bytree': [0.8, 1.0]
}

xgb_search = GridSearchCV(
    xgb,
    param_grid=xgb_param_grid,
    scoring='accuracy',
    cv=5,
    verbose=1,
    n_jobs=-1
)

xgb_search.fit(X_train, y_train)
print("Best XGB params:", xgb_search.best_params_)
print("Best XGB CV accuracy:", xgb_search.best_score_)

best_xgb = xgb_search.best_estimator_

In [None]:
# Random Forest
y_pred_rf = best_rf.predict(X_test)
acc_rf = accuracy_score(y_test, y_pred_rf)
print(f"Random Forest Test Accuracy: {acc_rf*100:.2f}%")

# XGBoost
y_pred_xgb = best_xgb.predict(X_test)
acc_xgb = accuracy_score(y_test, y_pred_xgb)
print(f"XGBoost Test Accuracy: {acc_xgb*100:.2f}%")

In [None]:
from sklearn.ensemble import VotingClassifier

voting_ensemble = VotingClassifier(
    estimators=[
        ('rf', best_rf),
        ('xgb', best_xgb),
    ],
    voting='hard'  # or 'soft' if you want to average predicted probabilities
)

voting_ensemble.fit(X_train, y_train)
y_pred_voting = voting_ensemble.predict(X_test)
acc_voting = accuracy_score(y_test, y_pred_voting)
print(f"Voting Ensemble Test Accuracy: {acc_voting*100:.2f}%")

In [None]:
from sklearn.ensemble import StackingClassifier
from sklearn.linear_model import LogisticRegression

stack_ensemble = StackingClassifier(
    estimators=[
        ('rf', best_rf),
        ('xgb', best_xgb),
    ],
    final_estimator=LogisticRegression(max_iter=1000),
    passthrough=False,  # set to True if you want to include original features in meta-level
    n_jobs=-1
)

stack_ensemble.fit(X_train, y_train)

y_pred_stack = stack_ensemble.predict(X_test)
acc_stack = accuracy_score(y_test, y_pred_stack)
print(f"Stacking Ensemble Test Accuracy: {acc_stack*100:.2f}%")

In [None]:
print("Model Accuracies:")
print(f"  Random Forest: {acc_rf*100:.2f}%")
print(f"  XGBoost:       {acc_xgb*100:.2f}%")
print(f"  Voting Ens:    {acc_voting*100:.2f}%")
print(f"  Stacking Ens:  {acc_stack*100:.2f}%")