In [None]:
import sys
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import time

sys.path.append('../code')
from minirocket import fit, transform
from aeon.datasets import load_classification
from sklearn.linear_model import RidgeClassifierCV

In [None]:
dev_datasets = [
    "Adiac", "ArrowHead", "Beef", "BeetleFly", "BirdChicken",
    "Car", "CBF", "ChlorineConcentration", "CinCECGTorso",
    "Coffee", "Computers", "CricketX", "CricketY", "CricketZ",
    "DiatomSizeReduction", "DistalPhalanxOutlineCorrect",
    "DistalPhalanxOutlineAgeGroup", "DistalPhalanxTW",
    "Earthquakes", "ECG200", "ECG5000", "ECGFiveDays",
    "ElectricDevices", "FaceAll", "FaceFour", "FacesUCR",
    "FiftyWords", "Fish", "GunPoint", "Ham",
    "Herring", "InlineSkate", "ItalyPowerDemand", "Lightning2",
    "Lightning7", "Mallat", "Meat", "MedicalImages",
    "MiddlePhalanxOutlineCorrect", "MiddlePhalanxOutlineAgeGroup"
]
print(f"{len(dev_datasets)} development datasets")

In [None]:
# Helper function to run experiments
def run_experiment(dataset_names, transform_fn, n_runs=10):
    """Run MiniRocket with a custom transform function over multiple runs."""
    all_accs = {}
    
    for name in dataset_names:
        try:
            X_train, y_train = load_classification(name, split="train")
            X_test, y_test = load_classification(name, split="test")
            X_train_np = X_train.squeeze().astype(np.float32)
            X_test_np = X_test.squeeze().astype(np.float32)
            
            accs = []
            for run in range(n_runs):
                X_train_tf, X_test_tf = transform_fn(X_train_np, X_test_np)
                clf = RidgeClassifierCV(alphas=np.logspace(-3, 3, 10))
                clf.fit(X_train_tf, y_train)
                accs.append(clf.score(X_test_tf, y_test))
            
            all_accs[name] = np.mean(accs)
        except Exception as e:
            print(f"Wrong {name}: {e}")
    
    return all_accs

In [None]:
# Experiment 1: PPV only vs PPV + Max (Figure 9)
# Default MiniRocket (PPV only)
def minirocket_default(X_train, X_test):
    parameters = fit(X_train)
    return transform(X_train, parameters), transform(X_test, parameters)

# For PPV + Max, you need to use Rocket with both pooling types
from aeon.transformations.collection.convolution_based import MiniRocket as AeonMiniRocket

ppv_results = {}
for name in dev_datasets:
    try:
        X_train, y_train = load_classification(name, split="train")
        X_test, y_test = load_classification(name, split="test")
        X_train_np = X_train.squeeze().astype(np.float32)
        X_test_np = X_test.squeeze().astype(np.float32)
        
        parameters = fit(X_train_np)
        X_train_tf = transform(X_train_np, parameters)
        X_test_tf = transform(X_test_np, parameters)
        
        clf = RidgeClassifierCV(alphas=np.logspace(-3, 3, 10))
        clf.fit(X_train_tf, y_train)
        ppv_results[name] = clf.score(X_test_tf, y_test)
        print(f"Ok {name}: {ppv_results[name]:.4f}")
    except Exception as e:
        print(f"Wrong {name}: {e}")

In [None]:
# Experiment 2: Number of features (Figure 10)
# Test different feature counts: 84, 504, 1008, 5040, 9996
# This requires modifying the num_features parameter

from aeon.transformations.collection.convolution_based import MiniRocket as AeonMR

feature_counts = [84, 504, 1008, 5040, 9996]
feature_results = {n: {} for n in feature_counts}

for name in dev_datasets:
    try:
        X_train, y_train = load_classification(name, split="train")
        X_test, y_test = load_classification(name, split="test")
        
        for n_feat in feature_counts:
            mr = AeonMR(num_features=n_feat, random_state=42)
            mr.fit(X_train)
            X_train_tf = mr.transform(X_train)
            X_test_tf = mr.transform(X_test)
            
            clf = RidgeClassifierCV(alphas=np.logspace(-3, 3, 10))
            clf.fit(X_train_tf, y_train)
            feature_results[n_feat][name] = clf.score(X_test_tf, y_test)
        
        print(f"Ok {name}")
    except Exception as e:
        print(f"Wrong {name}: {e}")

In [None]:
# Visualize feature count results
mean_accs = {n: np.mean(list(accs.values())) for n, accs in feature_results.items()}

fig, ax = plt.subplots(figsize=(8, 5))
ax.plot(list(mean_accs.keys()), list(mean_accs.values()), 'bo-', markersize=8)
ax.set_xlabel('Number of Features', fontsize=12)
ax.set_ylabel('Mean Accuracy', fontsize=12)
ax.set_title('Effect of Number of Features (cf. Figure 10)', fontsize=14)
ax.set_xscale('log')
plt.tight_layout()
plt.savefig('../results/sensitivity_num_features.png', dpi=150)
plt.show()

In [None]:
# Experiment 3: MiniRocket vs Deterministic Variant (Figure 5)
sys.path.append('../code')
from minirocket_dv import fit_transform as fit_transform_dv

dv_results = {}
default_results = {}

for name in dev_datasets:
    try:
        X_train, y_train = load_classification(name, split="train")
        X_test, y_test = load_classification(name, split="test")
        X_train_np = X_train.squeeze().astype(np.float32)
        X_test_np = X_test.squeeze().astype(np.float32)
        
        # Default
        params = fit(X_train_np)
        X_tr_tf = transform(X_train_np, params)
        X_te_tf = transform(X_test_np, params)
        clf = RidgeClassifierCV(alphas=np.logspace(-3, 3, 10))
        clf.fit(X_tr_tf, y_train)
        default_results[name] = clf.score(X_te_tf, y_test)
        
        # Deterministic
        params_dv, X_tr_tf_dv = fit_transform_dv(X_train_np)
        X_te_tf_dv = transform(X_test_np, params_dv)
        clf_dv = RidgeClassifierCV(alphas=np.logspace(-3, 3, 10))
        clf_dv.fit(X_tr_tf_dv, y_train)
        dv_results[name] = clf_dv.score(X_te_tf_dv, y_test)
        
        print(f"Ok {name}: default={default_results[name]:.4f}, DV={dv_results[name]:.4f}")
    except Exception as e:
        print(f"Wrong {name}: {e}")

In [None]:
# Scatter plot: Default vs Deterministic (Figure 5)
common = set(default_results.keys()) & set(dv_results.keys())
def_accs = [default_results[n] for n in common]
dv_accs = [dv_results[n] for n in common]

fig, ax = plt.subplots(figsize=(8, 8))
ax.scatter(dv_accs, def_accs, alpha=0.6, edgecolors='black', linewidths=0.5, s=50)
ax.plot([0, 1], [0, 1], 'r--')
ax.set_xlabel('Deterministic Variant Accuracy', fontsize=12)
ax.set_ylabel('Default MiniRocket Accuracy', fontsize=12)
ax.set_title('Default vs Deterministic MiniRocket (cf. Figure 5)', fontsize=14)
ax.set_xlim(0.3, 1.02)
ax.set_ylim(0.3, 1.02)
ax.set_aspect('equal')
plt.tight_layout()
plt.savefig('../results/default_vs_deterministic.png', dpi=150)
plt.show()

better = sum(1 for n in common if default_results[n] > dv_results[n])
worse = sum(1 for n in common if default_results[n] < dv_results[n])
equal = sum(1 for n in common if default_results[n] == dv_results[n])
print(f"Default wins: {better}, Draws: {equal}, DV wins: {worse}")