# Rule-Based Baseline

Simple rule: The squad with the **lowest `dist_from_bluezone`** (closest to safezone) is predicted as the winner.


In [0]:
import os
import sys
from collections import defaultdict

import numpy as np
import pandas as pd
from tqdm import tqdm
from scipy.special import softmax

# Add project root to path
sys.path.insert(0, '/Users/seunghan96/pgc_wwcd_prediction')

# Constants
NUM_PHASES = 10
SAMPLES_PER_PHASE = 5
TARGET_SCALE = 2000.0


## 1. Load Test Data


In [0]:
# Data paths (same as other experiments)
FOLDER_PATH = "/Volumes/main_dev/dld_ml_anticheat_test/anticheat_test_volume/pgc_wwcd/pgc_features/inference_v2"
SPLIT_CSV_PATH = "/Volumes/main_dev/dld_ml_anticheat_test/anticheat_test_volume/pgc_wwcd/pgc_features/inference_v2/split_files.csv"

# Load split info
split_df = pd.read_csv(SPLIT_CSV_PATH)
test_files = split_df[split_df['split'] == 'test']['filename'].tolist()
print(f"Test files: {len(test_files)}")

# Load test data
test_dfs = []
for f in tqdm(test_files, desc="Loading test data"):
    df = pd.read_csv(os.path.join(FOLDER_PATH, f))
    test_dfs.append(df)

test_df = pd.concat(test_dfs, ignore_index=True)
print(f"Test data: {len(test_df):,} rows")
test_df.head()


In [0]:
import ast

# ============================================
# Parsing Functions
# ============================================

def parse_positions(positions_str):
    """Parse positions string to list of [x, y, z] coordinates"""
    
    if isinstance(positions_str, str):
        positions = ast.literal_eval(positions_str)
    else:
        positions = positions_str
    
    if not isinstance(positions, list):
        return []
    
    # Ensure each position has [x, y, z]
    parsed = []
    for pos in positions:
        if isinstance(pos, (list, tuple)) and len(pos) >= 3:
            parsed.append([float(pos[0]), float(pos[1]), float(pos[2])])
    return parsed


import re

def parse_bluezone(bluezone_str):
    """Parse bluezone_info to extract center coordinates [x, y, radius]"""
    if 'np.float64' in str(bluezone_str):
        pattern = r'np\.float64\(([-\d.]+)\)'
        matches = re.findall(pattern, str(bluezone_str))
        if len(matches) >= 3:
            return {
                'x': float(matches[0]),
                'y': float(matches[1]),
                'radius': float(matches[2])
            }
    
    # Original parsing logic for other formats
    if isinstance(bluezone_str, str):
        bluezone = ast.literal_eval(bluezone_str)
    else:
        bluezone = bluezone_str
    
    # Handle various formats: [[x, y, radius]] or [x, y, radius]
    coords = None
    if isinstance(bluezone, (list, tuple)) and len(bluezone) > 0:
        first_elem = bluezone[0]
        if isinstance(first_elem, (list, tuple)) and len(first_elem) >= 3:
            coords = first_elem
        elif hasattr(first_elem, '__float__') or isinstance(first_elem, (int, float)):
            if len(bluezone) >= 3:
                coords = bluezone
    
    if coords is not None and len(coords) >= 3:
        return {
            'x': float(coords[0]),
            'y': float(coords[1]),
            'radius': float(coords[2])
        }


def calculate_distance(p1, p2):
    """Calculate Euclidean distance between two 2D points"""
    return np.sqrt((p1[0] - p2[0])**2 + (p1[1] - p2[1])**2)


def get_squad_center(positions):
    """Calculate center position (x, y) of a squad from positions"""
    x_coords = [p[0] for p in positions if len(p) >= 2]
    y_coords = [p[1] for p in positions if len(p) >= 2]
    if x_coords and y_coords:
        return [np.mean(x_coords), np.mean(y_coords)]


def compute_dist_from_zone_v2(row, zone_col='bluezone_info'):
    """
    Compute distance from squad center to zone center, normalized by zone radius.
    
    Args:
        row: DataFrame row
        zone_col: Column name for zone info ('bluezone_info' or 'whitezone_info')
    
    Returns:
        Distance from squad center to zone center / zone radius
        Returns NaN if parsing fails
    """
    # Parse positions and zone info
    positions = parse_positions(row['positions'])
    zone_info = parse_bluezone(row[zone_col])
    
    # Get squad center
    squad_center = get_squad_center(positions)
    
    # Calculate distance to zone center
    zone_center = [zone_info['x'], zone_info['y']]
    distance = calculate_distance(squad_center, zone_center)
    
    if zone_info['radius'] > 0:
        return distance / zone_info['radius']
    else:
        return np.nan


print("Parsing functions defined successfully!")

## 2. Rule-Based Prediction

**Rule**: Lower `dist_from_bluezone` = Higher chance of winning

We use negative `dist_from_bluezone` as the "prediction score" so that argmax gives us the squad closest to the safezone.


In [0]:
# ============================================
# Compute dist_from_bluezone_v2 and dist_from_whitezone_v2
# ============================================
print("Computing dist_from_bluezone_v2...")
test_df['dist_from_bluezone_v2'] = test_df.apply(
    lambda row: compute_dist_from_zone_v2(row, zone_col='bluezone_info'), 
    axis=1
)

print("Computing dist_from_whitezone_v2...")
test_df['dist_from_whitezone_v2'] = test_df.apply(
    lambda row: compute_dist_from_zone_v2(row, zone_col='whitezone_info'), 
    axis=1
)

# Check results
print("\n--- dist_from_bluezone_v2 ---")
print(f"  Valid values: {test_df['dist_from_bluezone_v2'].notna().sum():,} / {len(test_df):,}")
print(f"  Min: {test_df['dist_from_bluezone_v2'].min():.4f}")
print(f"  Max: {test_df['dist_from_bluezone_v2'].max():.4f}")
print(f"  Mean: {test_df['dist_from_bluezone_v2'].mean():.4f}")

print("\n--- dist_from_whitezone_v2 ---")
print(f"  Valid values: {test_df['dist_from_whitezone_v2'].notna().sum():,} / {len(test_df):,}")
print(f"  Min: {test_df['dist_from_whitezone_v2'].min():.4f}")
print(f"  Max: {test_df['dist_from_whitezone_v2'].max():.4f}")
print(f"  Mean: {test_df['dist_from_whitezone_v2'].mean():.4f}")

# Compare with original dist_from_bluezone
print("\n--- Correlation with original ---")
print(f"  Corr(dist_from_bluezone, dist_from_bluezone_v2): {test_df['dist_from_bluezone'].corr(test_df['dist_from_bluezone_v2']):.4f}")
print(f"  Corr(dist_from_whitezone, dist_from_whitezone_v2): {test_df['dist_from_whitezone'].corr(test_df['dist_from_whitezone_v2']):.4f}")

In [0]:
test_df['dist_from_bluezone_v2']

In [0]:
# Rule-based prediction: lower dist_from_bluezone = higher score
# Use negative so argmax gives the squad closest to safezone
test_df['pred'] = -test_df['dist_from_bluezone']

print("Prediction based on: -dist_from_bluezone")
print(f"  Min pred: {test_df['pred'].min():.4f}")
print(f"  Max pred: {test_df['pred'].max():.4f}")


## 3. Compute Phase-wise Metrics


In [0]:
def compute_ece(confidences, accuracies, n_bins=10):
    """Compute Expected Calibration Error."""
    bin_boundaries = np.linspace(0, 1, n_bins + 1)
    ece = 0.0
    total_samples = 0

    for i in range(n_bins):
        bin_lower = bin_boundaries[i]
        bin_upper = bin_boundaries[i + 1]

        in_bin = (confidences > bin_lower) & (confidences <= bin_upper)
        n_in_bin = np.sum(in_bin)

        if n_in_bin > 0:
            avg_confidence = np.mean(confidences[in_bin])
            avg_accuracy = np.mean(accuracies[in_bin])
            ece += n_in_bin * abs(avg_accuracy - avg_confidence)
            total_samples += n_in_bin

    if total_samples > 0:
        ece /= total_samples

    return ece


def compute_metrics_by_phase(df, pred_col='pred', target_col='squad_death_time'):
    """
    Compute phase-wise metrics (accuracy, log_loss, ece).
    Groups by (match_id, time_point) and computes winner prediction metrics.
    """
    # Group by match_id and sort by time_point
    match_groups = df.groupby('match_id')

    # Build phase mapping
    phase_data = defaultdict(list)

    for match_id, match_df in match_groups:
        # Sort by time_point
        time_points = sorted(match_df['time_point'].unique())

        for tp_idx, time_point in enumerate(time_points):
            phase = min(tp_idx // SAMPLES_PER_PHASE + 1, NUM_PHASES)

            # Get all squads at this time_point
            tp_df = match_df[match_df['time_point'] == time_point]

            preds = tp_df[pred_col].values
            targets = tp_df[target_col].values / TARGET_SCALE

            phase_data[phase].append({
                'preds': preds,
                'targets': targets,
            })

    # Compute metrics per phase
    results = {
        'accuracy': {},
        'log_loss': {},
        'ece': {},
    }

    all_correct = 0
    all_total = 0
    all_log_losses = []
    all_confidences = []
    all_accuracies = []

    for phase in range(1, NUM_PHASES + 1):
        if phase not in phase_data:
            for metric in results:
                results[metric][f'Phase_{phase}'] = 0.0
            continue

        phase_correct = 0
        phase_total = 0
        phase_log_losses = []
        phase_confidences = []
        phase_accuracies = []

        for sample in phase_data[phase]:
            preds = sample['preds']
            targets = sample['targets']

            if len(preds) < 2:
                continue

            # Winner prediction
            pred_winner = np.argmax(preds)
            target_winner = np.argmax(targets)

            is_correct = pred_winner == target_winner
            if is_correct:
                phase_correct += 1
            phase_total += 1

            # Softmax for probabilities
            probs = softmax(preds)
            winner_prob = probs[target_winner]

            # Log loss
            winner_prob_clipped = np.clip(winner_prob, 1e-7, 1 - 1e-7)
            phase_log_losses.append(-np.log(winner_prob_clipped))

            # For ECE
            confidence = np.max(probs)
            phase_confidences.append(confidence)
            phase_accuracies.append(float(is_correct))

        # Phase accuracy
        if phase_total > 0:
            results['accuracy'][f'Phase_{phase}'] = phase_correct / phase_total
        else:
            results['accuracy'][f'Phase_{phase}'] = 0.0

        # Phase log loss
        if phase_log_losses:
            results['log_loss'][f'Phase_{phase}'] = np.mean(phase_log_losses)
        else:
            results['log_loss'][f'Phase_{phase}'] = 0.0

        # Phase ECE
        if phase_confidences:
            results['ece'][f'Phase_{phase}'] = compute_ece(
                np.array(phase_confidences),
                np.array(phase_accuracies),
            )
        else:
            results['ece'][f'Phase_{phase}'] = 0.0

        # Accumulate for average
        all_correct += phase_correct
        all_total += phase_total
        all_log_losses.extend(phase_log_losses)
        all_confidences.extend(phase_confidences)
        all_accuracies.extend(phase_accuracies)

    # Average metrics
    if all_total > 0:
        results['accuracy']['Average'] = all_correct / all_total
    else:
        results['accuracy']['Average'] = 0.0

    if all_log_losses:
        results['log_loss']['Average'] = np.mean(all_log_losses)
    else:
        results['log_loss']['Average'] = 0.0

    if all_confidences:
        results['ece']['Average'] = compute_ece(
            np.array(all_confidences),
            np.array(all_accuracies),
        )
    else:
        results['ece']['Average'] = 0.0

    return results


## 4. Results


In [0]:
def print_results(results, title="Results"):
    """Print results in a formatted table."""
    print()
    print("=" * 90)
    print(title)
    print("=" * 90)

    metrics = ['accuracy', 'log_loss', 'ece']
    phases = [f'Phase_{i}' for i in range(1, NUM_PHASES + 1)]

    # Header
    header = f"{'Phase':<12}"
    for metric in metrics:
        header += f"{metric.upper():<15}"
    print(header)
    print("-" * 90)

    # Phase rows
    for phase in phases:
        row = f"{phase:<12}"
        for metric in metrics:
            val = results[metric].get(phase, 0.0)
            row += f"{val:<15.4f}"
        print(row)

    # Average row
    print("-" * 90)
    avg_row = f"{'Average':<12}"
    for metric in metrics:
        val = results[metric].get('Average', 0.0)
        avg_row += f"{val:<15.4f}"
    print(avg_row)
    print("=" * 90)

# Compute metrics
print("Computing phase-wise metrics...")
test_df['dist_from_bluezone'] = test_df['dist_from_bluezone'].replace(-1, 999)
test_df['dist_from_whitezone'] = test_df['dist_from_whitezone'].replace(-1, 999)

for c in ['dist_from_bluezone','dist_from_bluezone_v2']:
    test_df['pred'] = -test_df[c]
    results = compute_metrics_by_phase(test_df)
    print_results(results, title=f"Rule-Based Baseline: {c}")


In [0]:
for c in ['dist_from_whitezone','dist_from_whitezone_v2']:
    test_df['pred'] = -test_df[c]
    results = compute_metrics_by_phase(test_df)
    print_results(results, title=f"Rule-Based Baseline: {c}")


In [0]:
# Create DataFrame for easy viewing
results_df = pd.DataFrame(results)
results_df.index.name = 'Metric'
results_df = results_df.T
results_df


## 5. Save Results


In [0]:
# Save results
OUTPUT_DIR = "/Volumes/main_dev/dld_ml_anticheat_test/anticheat_test_volume/pgc_wwcd/pgc_results/rule_based"
os.makedirs(OUTPUT_DIR, exist_ok=True)

# Save as CSV (same format as other experiments)
summary_data = []
for metric in ['accuracy', 'log_loss', 'ece']:
    row = {'metric': metric}
    row.update(results[metric])
    summary_data.append(row)

save_df = pd.DataFrame(summary_data)
save_path = os.path.join(OUTPUT_DIR, "test_results.csv")
save_df.to_csv(save_path, index=False)
print(f"Results saved to: {save_path}")


## 6. Visualization


In [0]:
import matplotlib.pyplot as plt

# Plot phase-wise accuracy
phases = [f'Phase_{i}' for i in range(1, NUM_PHASES + 1)]
accuracies = [results['accuracy'][p] for p in phases]

fig, ax = plt.subplots(figsize=(10, 5))
ax.bar(range(1, NUM_PHASES + 1), accuracies, color='steelblue', alpha=0.8)
ax.axhline(y=results['accuracy']['Average'], color='red', linestyle='--', label=f"Average: {results['accuracy']['Average']:.4f}")
ax.set_xlabel('Phase', fontsize=12)
ax.set_ylabel('Accuracy', fontsize=12)
ax.set_title('Rule-Based Baseline: Phase-wise Accuracy\n(Lower dist_from_bluezone = Winner)', fontsize=14)
ax.set_xticks(range(1, NUM_PHASES + 1))
ax.legend()
ax.set_ylim(0, 1)
ax.grid(axis='y', alpha=0.3)

plt.tight_layout()
plt.show()
