** Load data/partitions

In [9]:
import os
import pickle
import numpy as np

# Training Partitons
data_dir = "../data/raw/Cleaned SWANSF Dataset/Cleaned SWANSF Dataset/train/"
X_train = []
y_train = []
num_partitions = 5

for i in range(num_partitions):
    with open(f"{data_dir}Partition{i+1}_LSBZM-Norm_FPCKNN-impute.pkl", 'rb') as f:
        X_train.append(pickle.load(f))
    with open(f"{data_dir}Partition{i+1}_Labels_LSBZM-Norm_FPCKNN-impute.pkl ", 'rb') as f:
        y_train.append(pickle.load(f))

In [10]:
import pickle
import numpy as np

# Test Partitons
data_dir = "../data/raw/Cleaned SWANSF Dataset/Cleaned SWANSF Dataset/test/"
X_test = []
y_test = []
num_partitions = 5

for i in range(num_partitions):
    with open(f"{data_dir}Partition{i+1}_RUS-Tomek-TimeGAN_LSBZM-Norm_WithoutC_FPCKNN-impute.pkl", 'rb') as f:
        X_test.append(pickle.load(f))
    with open(f"{data_dir}Partition{i+1}_Labels_RUS-Tomek-TimeGAN_LSBZM-Norm_WithoutC_FPCKNN-impute.pkl", 'rb') as f:
        y_test.append(pickle.load(f))

In [11]:
# shape check
for i, part in enumerate(X_train):
    print(f"Train Partition {i+1}: {part.shape}, Labels: {y_train[i].shape}")
for i, part in enumerate(X_test):
    print(f"Test Partition {i+1}: {part.shape}, Labels: {y_test[i].shape}")
    
for i in range(5):
    print(f"Partition {i+1} training labels sample:", y_train[i][:10])


print(type(y_train[0][0]))



Train Partition 1: (73492, 60, 24), Labels: (73492,)
Train Partition 2: (88557, 60, 24), Labels: (88557,)
Train Partition 3: (42510, 60, 24), Labels: (42510,)
Train Partition 4: (51261, 60, 24), Labels: (51261,)
Train Partition 5: (75365, 60, 24), Labels: (75365,)
Test Partition 1: (18773, 60, 24), Labels: (18773,)
Test Partition 2: (19807, 60, 24), Labels: (19807,)
Test Partition 3: (19965, 60, 24), Labels: (19965,)
Test Partition 4: (19320, 60, 24), Labels: (19320,)
Test Partition 5: (19899, 60, 24), Labels: (19899,)
Partition 1 training labels sample: [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
Partition 2 training labels sample: [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
Partition 3 training labels sample: [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
Partition 4 training labels sample: [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
Partition 5 training labels sample: [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
<class 'numpy.float64'>


In [12]:
# merge all partitions into single arrays

X_train_full = np.concatenate(X_train, axis=0)
y_train_full = np.concatenate(y_train, axis=0)
X_test_full  = np.concatenate(X_test, axis=0)
y_test_full  = np.concatenate(y_test, axis=0)


**Merge or Select Partition Pairs Dynamically

In [13]:
from itertools import combinations

# Sequential and proposed pairings
pairs = [(0,1), (1,2), (2,3), (3,4), (0,2), (0,3), (0,4)]

def get_partition_pair(train_idx, test_idx):
    """
    Selects a partition pair and filters for M vs X classification.
    """
    X_train_pair = np.array(X_train[train_idx])
    y_train_pair = np.array(y_train[train_idx])
    X_test_pair  = np.array(X_test[test_idx])
    y_test_pair  = np.array(y_test[test_idx])

    # Apply filtering
    X_train_pair, y_train_pair = filter_mx(X_train_pair, y_train_pair)
    X_test_pair,  y_test_pair  = filter_mx(X_test_pair,  y_test_pair)

    print(f"Train P{train_idx+1}: {X_train_pair.shape}, Labels: {np.unique(y_train_pair, return_counts=True)}")
    print(f"Test  P{test_idx+1}: {X_test_pair.shape},  Labels: {np.unique(y_test_pair, return_counts=True)}\n")

    return X_train_pair, y_train_pair, X_test_pair, y_test_pair



**Preprocessing for M vs X Classification

In [14]:
import numpy as np

def get_partition_pair(train_idx, test_idx, shuffle=False, seed=42):
    """
    Selects a train/test partition pair and prepares it for modeling.

    Args:
        train_idx (int): index (0-4) of training partition
        test_idx  (int): index (0-4) of testing partition
        shuffle   (bool): whether to shuffle training samples
        seed      (int): random seed for reproducible shuffling

    Returns:
        X_train_pair (np.ndarray): (n_train, seq_len, n_features)
        y_train_pair (np.ndarray): (n_train,)
        X_test_pair  (np.ndarray): (n_test,  seq_len, n_features)
        y_test_pair  (np.ndarray): (n_test,)
    """

    # --- Load selected partitions ---
    X_train_pair = np.array(X_train[train_idx], dtype=np.float32)
    y_train_pair = np.array(y_train[train_idx]).astype(int)
    X_test_pair  = np.array(X_test[test_idx],  dtype=np.float32)
    y_test_pair  = np.array(y_test[test_idx]).astype(int)

    # --- Optional shuffle for “out-of-order” experiment ---
    if shuffle:
        rng = np.random.default_rng(seed)
        idx = rng.permutation(len(X_train_pair))
        X_train_pair, y_train_pair = X_train_pair[idx], y_train_pair[idx]

    # --- Diagnostics ---
    print(f"Train P{train_idx+1}: {X_train_pair.shape}, "
          f"label counts: {np.bincount(y_train_pair)}")
    print(f"Test  P{test_idx+1}: {X_test_pair.shape},  "
          f"label counts: {np.bincount(y_test_pair)}\n")

    return X_train_pair, y_train_pair, X_test_pair, y_test_pair


In [15]:
# Sequential pair example: P1 → P2
Xtr, ytr, Xte, yte = get_partition_pair(0, 1)

# Optional shuffled training
Xtr_shuf, ytr_shuf, _, _ = get_partition_pair(0, 1, shuffle=True)


Train P1: (73492, 60, 24), label counts: [72238  1254]
Test  P2: (19807, 60, 24),  label counts: [10000  9807]

Train P1: (73492, 60, 24), label counts: [72238  1254]
Test  P2: (19807, 60, 24),  label counts: [10000  9807]



**Convert NumPy arrays to a Hugging Face DatasetDict

In [16]:
from datasets import Dataset, DatasetDict
import numpy as np


def build_hf_dataset(Xtr, ytr, Xte, yte):
    """
    Converts NumPy arrays into Hugging Face DatasetDict.
    Each element:
        past_values -> (seq_len, n_features) array
        labels -> scalar 0 or 1
    """
    train_dict = {
        "past_values": [x for x in Xtr],   # list of 2-D arrays
        "labels":      [int(y) for y in ytr]
    }

    test_dict = {
        "past_values": [x for x in Xte],
        "labels":      [int(y) for y in yte]
    }

    train_dataset = Dataset.from_dict(train_dict)
    test_dataset  = Dataset.from_dict(test_dict)

    dataset_dict = DatasetDict({
        "train": train_dataset,
        "test":  test_dataset
    })

    print(dataset_dict)
    return dataset_dict


In [17]:
dataset_dict = build_hf_dataset(Xtr, ytr, Xte, yte)


DatasetDict({
    train: Dataset({
        features: ['past_values', 'labels'],
        num_rows: 73492
    })
    test: Dataset({
        features: ['past_values', 'labels'],
        num_rows: 19807
    })
})


** Model Configuration and Training

In [18]:
from transformers import TimeSeriesTransformerConfig

config = TimeSeriesTransformerConfig(
    prediction_length=1,
    lags_sequence=[0],
    context_length=60,     # your timesteps
    input_size=24,         # your features
    num_time_features=0,
    num_static_categorical_features=0,
    d_model=128,
    encoder_layers=4,
    decoder_layers=0,
    dropout=0.1,
    is_encoder_decoder=False
)


In [19]:
import sys, transformers
print(sys.executable)
print(transformers.__file__)
print(transformers.__version__)

import torch, torchvision, transformers
print("Torch:", torch.__version__)
print("TorchVision:", torchvision.__version__)
print("Transformers:", transformers.__version__)

from transformers import TimeSeriesTransformerModel
print("TimeSeriesTransformerModel imported successfully!")




c:\UCCS_Projects\solar-flare-project\.SoFlareEnv\Scripts\python.exe
c:\UCCS_Projects\solar-flare-project\.SoFlareEnv\Lib\site-packages\transformers\__init__.py
4.57.1
Torch: 2.9.0+cpu
TorchVision: 0.24.0+cpu
Transformers: 4.57.1
TimeSeriesTransformerModel imported successfully!


In [20]:
from transformers import TimeSeriesTransformerModel
import torch.nn as nn
import torch

class TimeSeriesTransformerForClassification(nn.Module):
    def __init__(self, config, num_classes=2):
        super().__init__()
        self.backbone = TimeSeriesTransformerModel(config)
        self.fc = nn.Linear(config.d_model, num_classes)
        self.config = config

    def forward(self, past_values, labels=None):
        batch_size, seq_len, n_features = past_values.shape
        

        # Dummy time features (unused)
        past_time_features = torch.zeros(
            (batch_size, seq_len, 0),
            dtype=past_values.dtype,
            device=past_values.device,
        )

        # Mask: every time step and feature observed
        past_observed_mask = torch.ones(
            (batch_size, seq_len, n_features),
            dtype=torch.bool,
            device=past_values.device,
        )

        outputs = self.backbone(
            past_values=past_values,
            past_time_features=past_time_features,
            past_observed_mask=past_observed_mask,
        )

        # Take final hidden state
        last_hidden = outputs.last_hidden_state[:, -1, :]
        logits = self.fc(last_hidden)

        loss = None
        if labels is not None:
            loss_fn = nn.CrossEntropyLoss()
            loss = loss_fn(logits, labels)

        return {"loss": loss, "logits": logits}


In [21]:
model = TimeSeriesTransformerForClassification(config)

In [22]:
sample_batch = [dataset_dict["train"][i] for i in range(4)]

def collate_fn(batch):
    X = torch.tensor([item["past_values"] for item in batch], dtype=torch.float32)
    y = torch.tensor([item["labels"] for item in batch], dtype=torch.long)
    return {"past_values": X, "labels": y}

batch = collate_fn(sample_batch)

with torch.no_grad():
    outputs = model(batch["past_values"], batch["labels"])
print("Forward OK — loss:", outputs["loss"].item(),
      " logits:", outputs["logits"].shape)



Forward OK — loss: 0.6303568482398987  logits: torch.Size([4, 2])


** Train the Model and Evaluate Metrics

In [23]:
from transformers import Trainer, TrainingArguments
import evaluate
import numpy as np
import torch

# Load accuracy & recall from Hugging Face evaluate
accuracy_metric = evaluate.load("accuracy")
recall_metric = evaluate.load("recall")

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    preds = np.argmax(logits, axis=1)

    # Basic metrics
    acc = accuracy_metric.compute(predictions=preds, references=labels)["accuracy"]
    rec = recall_metric.compute(predictions=preds, references=labels)["recall"]

    # Compute True Skill Statistic (TSS)
    TP = np.sum((preds == 1) & (labels == 1))
    TN = np.sum((preds == 0) & (labels == 0))
    FP = np.sum((preds == 1) & (labels == 0))
    FN = np.sum((preds == 0) & (labels == 1))
    TSS = (TP / (TP + FN + 1e-8)) - (FP / (FP + TN + 1e-8))

    return {"accuracy": acc, "recall": rec, "tss": TSS}


In [None]:
from transformers import TrainingArguments, Trainer

training_args = TrainingArguments(
    output_dir="./checkpoints",
    num_train_epochs=10,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    learning_rate=5e-4,
    eval_strategy="epoch",
    logging_dir="./logs",
    logging_steps=50,
    save_strategy="no",
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=dataset_dict["train"],
    eval_dataset=dataset_dict["test"],
    data_collator=collate_fn,
    compute_metrics=compute_metrics,
)

trainer.train()




Epoch,Training Loss,Validation Loss,Accuracy,Recall,Tss
1,0.147,2.357429,0.504872,0.0,0.0
2,0.1492,2.328117,0.504872,0.0,0.0




TrainOutput(global_step=18374, training_loss=0.09048609515197703, metrics={'train_runtime': 680.362, 'train_samples_per_second': 216.038, 'train_steps_per_second': 27.006, 'total_flos': 0.0, 'train_loss': 0.09048609515197703, 'epoch': 2.0})