In [2]:
import pandas as pd
import numpy as np
import warnings
import joblib
from typing import Tuple

from typing import Tuple, Dict, Any
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix, classification_report
import seaborn as sns
import matplotlib.pyplot as plt

from xgboost import XGBClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression

In [14]:
def aggregate_player_data(df, agg_funcs=None, prefix=""):
    """
    Aggregates player data with a prefix to distinguish home or away features.
    """
    if agg_funcs is None:
        # Default aggregation: mean for all columns except 'ID'
        agg_funcs = {col: "mean" for col in df.columns if col != "ID"}
    aggregated = df.groupby("ID").agg(agg_funcs).reset_index()
    if prefix:
        aggregated = aggregated.rename(
            columns={col: f"{prefix}_{col}" for col in aggregated.columns if col != "ID"}
        )
    return aggregated

# Process CSV in chunks
def process_csv_in_chunks(file_path: str, chunksize: int, drop_columns: list[str]) -> pd.DataFrame:
    """
    Processes a CSV file in chunks, dropping unwanted columns and handling missing columns.
    """
    chunks = []
    for chunk in pd.read_csv(file_path, chunksize=chunksize):
        missing_columns = [col for col in drop_columns if col not in chunk.columns]
        if missing_columns:
            print(f"Warning: Missing columns {missing_columns} in {file_path}")
        chunk = chunk.drop(drop_columns, axis=1, errors="ignore")
        chunks.append(chunk)
    return pd.concat(chunks, ignore_index=True)

# Pre-aggregate player data in chunks and then group again by ID t
def pre_aggregate_player_data(player_path: str, drop_columns: list[str], chunksize: int, prefix: str = "") -> pd.DataFrame:
    aggregated_chunks = []
    for chunk in pd.read_csv(player_path, chunksize=chunksize):
        missing_columns = [col for col in drop_columns if col not in chunk.columns]
        if missing_columns:
            print(f"Warning: Missing columns {missing_columns} in {player_path}")
        chunk = chunk.drop(drop_columns, axis=1, errors="ignore")
        aggregated_chunk = aggregate_player_data(chunk, prefix=prefix)
        aggregated_chunks.append(aggregated_chunk)
    # Concatenate all aggregated chunks
    agg_all = pd.concat(aggregated_chunks, ignore_index=True)
       agg_all = agg_all.groupby("ID").mean().reset_index()
    return agg_all

# Calculate Diff and Ratio features for home and away statistics
def calculate_diff_and_ratio(df: pd.DataFrame) -> pd.DataFrame:
    """
    Adds Diff and Ratio features for home vs away statistics.
    Diff: home - away
    Ratio: home / (away + 1) to avoid division by zero.
    """
    for stat in ["SHOTS_TOTAL", "SHOTS_ON_TARGET", "GOALS", "DANGEROUS_ATTACKS", "REDCARDS", "FOULS"]:
        home_stat = f"home_{stat}_season_sum"
        away_stat = f"away_{stat}_season_sum"
        if home_stat in df.columns and away_stat in df.columns:
            df[f"{stat}_DIFF"] = df[home_stat] - df[away_stat]
            df[f"{stat}_RATIO"] = df[home_stat] / (df[away_stat] + 1)
    return df

# Encode multiclass targets
def encode_multiclass_targets(df: pd.DataFrame, cols: list) -> pd.Series:
    """
    Encodes multiclass targets based on the provided columns.
    """
    arr = df[cols].values
    return np.argmax(arr, axis=1)

# Load and merge train data
def load_and_merge_train_data(
    home_team_path: str,
    away_team_path: str,
    home_players_path: str,
    away_players_path: str,
    target_path: str,
    chunksize: int = 500,
) -> Tuple[pd.DataFrame, pd.Series]:
    # Process team statistics with prefixes
    df_home_team = process_csv_in_chunks(home_team_path, chunksize, drop_columns=["LEAGUE", "TEAM_NAME"])
    df_home_team = df_home_team.rename(columns={col: f"home_{col}" for col in df_home_team.columns if col != "ID"})
    df_away_team = process_csv_in_chunks(away_team_path, chunksize, drop_columns=["LEAGUE", "TEAM_NAME"])
    df_away_team = df_away_team.rename(columns={col: f"away_{col}" for col in df_away_team.columns if col != "ID"})
    targets = pd.read_csv(target_path)

    # Pre-aggregate player data with prefixes
    df_home_players_agg = pre_aggregate_player_data(home_players_path, ["LEAGUE", "TEAM_NAME", "PLAYER_NAME", "POSITION"], chunksize, prefix="home")
    df_away_players_agg = pre_aggregate_player_data(away_players_path, ["LEAGUE", "TEAM_NAME", "PLAYER_NAME", "POSITION"], chunksize, prefix="away")

    # Merge team and aggregated player data
    merged_home = df_home_team.merge(df_home_players_agg, on="ID", how="left")
    merged_away = df_away_team.merge(df_away_players_agg, on="ID", how="left")

    # Use the home team merge as the base (the complete set of matches) and left-join the away data
    merged_data = merged_home.merge(merged_away, on="ID", how="left")
    merged_data = merged_data.merge(targets, on="ID", how="inner").fillna(0)

    # Calculate Diff and Ratio features
    merged_data = calculate_diff_and_ratio(merged_data)

    # Encode targets and drop target and ID columns
    y = encode_multiclass_targets(merged_data, ["HOME_WINS", "DRAW", "AWAY_WINS"])
    merged_data.drop(["HOME_WINS", "DRAW", "AWAY_WINS", "ID"], axis=1, inplace=True)
    return merged_data, y

# Load and merge test data using pre-aggregation
def load_and_merge_test_data_chunked(
    home_team_test_path: str,
    away_team_test_path: str,
    home_players_test_path: str,
    away_players_test_path: str,
    chunksize: int = 1000,
) -> pd.DataFrame:
    # Process team data with prefixes
    df_home_team = process_csv_in_chunks(home_team_test_path, chunksize, drop_columns=["LEAGUE", "TEAM_NAME"])
    df_home_team = df_home_team.rename(columns={col: f"home_{col}" for col in df_home_team.columns if col != "ID"})
    df_away_team = process_csv_in_chunks(away_team_test_path, chunksize, drop_columns=["LEAGUE", "TEAM_NAME"])
    df_away_team = df_away_team.rename(columns={col: f"away_{col}" for col in df_away_team.columns if col != "ID"})
    
    print(f"Processed team data: Home team -> {df_home_team.shape}, Away team -> {df_away_team.shape}")
    
    # Pre-aggregate player data with prefixes
    df_home_players_agg = pre_aggregate_player_data(
        home_players_test_path, 
        drop_columns=["LEAGUE", "TEAM_NAME", "PLAYER_NAME", "POSITION"], 
        chunksize=chunksize, 
        prefix="home"
    )
    df_away_players_agg = pre_aggregate_player_data(
        away_players_test_path, 
        drop_columns=["LEAGUE", "TEAM_NAME", "PLAYER_NAME", "POSITION"], 
        chunksize=chunksize, 
        prefix="away"
    )
    
    print(f"Pre-aggregated player data: Home players -> {df_home_players_agg.shape}, Away players -> {df_away_players_agg.shape}")
    
    # Merge team data with the aggregated player data (using home team as base)
    merged_home = df_home_team.merge(df_home_players_agg, on="ID", how="left")
    print(f"Merged home shape -> {merged_home.shape}")
    
    merged_away = df_away_team.merge(df_away_players_agg, on="ID", how="left")
    print(f"Merged away shape -> {merged_away.shape}")
    
   
    merged_test = merged_home.merge(merged_away, on="ID", how="left")
    print(f"Final merged test data shape -> {merged_test.shape}")
    
    return merged_test.fillna(0)

In [None]:
train_home_team_csv = "data/Train_Data/train_home_team_statistics_df.csv"
train_away_team_csv = "data/Train_Data/train_away_team_statistics_df.csv"
train_home_players_csv = "data/Train_Data/train_home_player_statistics_df.csv"
train_away_players_csv = "data/Train_Data/train_away_player_statistics_df.csv"
y_csv = "data/Y_train_1rknArQ.csv"

X_raw, y_raw = load_and_merge_train_data(
  train_home_team_csv,
  train_away_team_csv,
  train_home_players_csv,
  train_away_players_csv,
  y_csv
)


print("Shape of training data:", X_raw.shape)
print("Unique target classes:", np.unique(y_raw))

In [None]:
print("Summary stats:")
display(X_raw.describe().T.head(10))

print("Class distribution:")
class_counts = pd.Series(y_raw).value_counts()
print(class_counts)

sns.countplot(x=y_raw)
plt.title("Distribution of Classes")
plt.show()

In [None]:
# Use a random sample to reduce computation during grid search.
sample_frac = 0.05  
X_sample = X_raw.sample(frac=sample_frac, random_state=42)
y_sample = y_raw[X_sample.index]

print("Shape of sampled data for tuning:", X_sample.shape)

#Train/Test Split on Sample for Tuning
X_train_s, X_val_s, y_train_s, y_val_s = train_test_split(
  X_sample, 
  y_sample, 
  test_size=0.2, 
  random_state=42, 
  stratify=y_sample
)

In [None]:
# Define Models and Parameter Grids with n_jobs=-1
models = {
  "LogisticRegression": {
    "estimator": LogisticRegression(max_iter=2000, solver="lbfgs", n_jobs=-1),
    "param_grid": {"C": [0.01, 0.1, 1.0], "penalty": ["l2"]}
  },
  "RandomForest": {
    "estimator": RandomForestClassifier(random_state=42, n_jobs=-1),
    "param_grid": {"n_estimators": [50, 100], "max_depth": [3, 6]}
  },
  "XGBClassifier": {
    "estimator": XGBClassifier(random_state=42, use_label_encoder=False, eval_metric="mlogloss", n_jobs=-1),
    "param_grid": {"n_estimators": [50, 100], "max_depth": [3, 6], "learning_rate": [0.1, 0.01]}
  }
}

results = {}
for name, cfg in models.items():
  print(f"Tuning {name} ...")
  clf = cfg["estimator"]
  param_grid = cfg["param_grid"]
  grid = GridSearchCV(
    estimator=clf,
    param_grid=param_grid,
    scoring="accuracy",
    cv=3,
    n_jobs=-1,
    verbose=1
  )
  grid.fit(X_train_s, y_train_s)
  best_model = grid.best_estimator_
  val_preds = best_model.predict(X_val_s)
  acc = accuracy_score(y_val_s, val_preds)
  f1 = f1_score(y_val_s, val_preds, average="macro")
  
  results[name] = {
    "best_params": grid.best_params_,
    "accuracy": acc,
    "f1_score": f1,
    "best_estimator": best_model
  }
  
  print(f"{name} tuned. Accuracy: {acc:.4f}, F1 Score: {f1:.4f}\n")


In [None]:
# Display Tuning Results from Sample
comparison = pd.DataFrame(
  [
    [mn, md["accuracy"], md["f1_score"], md["best_params"]]
    for mn, md in results.items()
  ],
  columns=["Model", "Accuracy", "F1 Score", "Best Params"]
)
print("Tuning Results on Sample Data:")
display(comparison)


In [None]:
# Use the best model from the sample data to train on the full data.
best_model_name = max(results, key=lambda k: results[k]["accuracy"])
best_params = results[best_model_name]["best_params"]
print("Best model on sample:", best_model_name, best_params)

In [None]:
if best_model_name == "LogisticRegression":
  final_model = LogisticRegression(max_iter=2000, solver="lbfgs", n_jobs=-1, **best_params)
elif best_model_name == "RandomForest":
  final_model = RandomForestClassifier(random_state=42, n_jobs=-1, **best_params)
elif best_model_name == "XGBClassifier":
  final_model = XGBClassifier(random_state=42, use_label_encoder=False, eval_metric="mlogloss", n_jobs=-1, **best_params)
  
X_train_full, X_val_full, y_train_full, y_val_full = train_test_split(
  X_raw, y_raw, test_size=0.2, random_state=42, stratify=y_raw
)

print("Training final model on full data...")
final_model.fit(X_train_full, y_train_full)
final_preds = final_model.predict(X_val_full)
print("Full Data Accuracy:", accuracy_score(y_val_full, final_preds))
print("Full Data F1 Score:", f1_score(y_val_full, final_preds, average="macro"))

In [None]:
# save the model
import joblib
joblib.dump(final_model, "model.pkl")

In [None]:
# evaluate the final model
cm = confusion_matrix(y_val_full, final_preds)
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues")
plt.title(f"Confusion Matrix - {best_model_name}")
plt.show()

print("Classification Report for Final Model:")
print(classification_report(y_val_full, final_preds))

In [4]:
# load model
final_model = joblib.load("model.pkl")

In [None]:
test_home_team_csv = "data/Test_Data/test_home_team_statistics_df.csv"
test_away_team_csv = "data/Test_Data/test_away_team_statistics_df.csv"
test_home_players_csv = "data/Test_Data/test_home_player_statistics_df.csv"
test_away_players_csv = "data/Test_Data/test_away_player_statistics_df.csv"
import warnings
warnings.filterwarnings("ignore")
# Load test data using the chunked processing function
X_test = load_and_merge_test_data_chunked(
  test_home_team_csv,
  test_away_team_csv,
  test_home_players_csv,
  test_away_players_csv
)

print("Shape of test data:", X_test.shape)


In [None]:
# If the merged test data still contains the "ID" column, aggregate it so that each unique ID appears once.
if "ID" in X_test.columns:
    before = X_test.shape[0]
    X_test = X_test.drop_duplicates(subset="ID")
    after = X_test.shape[0]
    print(f"Reduced rows from {before} to {after} by grouping over ID.")
    test_ids = X_test["ID"].values
    X_test = X_test.drop("ID", axis=1)
else:
    test_ids = X_test.index.values

print("Extracted test IDs with shape:", test_ids.shape)

# Predict labels on the test data using final_model.
predicted_labels = final_model.predict(X_test)

# Convert predicted class labels to one-hot encoding.
mapping = {0: [1, 0, 0], 1: [0, 1, 0], 2: [0, 0, 1]}
submission_data = [[id_, *mapping[label]] for id_, label in zip(test_ids, predicted_labels)]

# Create submission DataFrame with the required column names.
submission = pd.DataFrame(submission_data, columns=["ID", "HOME_WINS", "DRAW", "AWAY_WINS"])

# Write to CSV file.
submission.to_csv("final_submission.csv", index=False)
print("Submission file created: final_submission.csv")