# Classification with FNO
Train an FNO timeseries classifier on the FordA dataset from the UCR/UEA archive.

Much of this comes from the following Keras tutorial: https://keras.io/examples/timeseries/timeseries_classification_from_scratch/

The dataset we are using here is called FordA. The data comes from the UCR archive. The dataset contains 3601 training instances and another 1320 testing instances. Each timeseries corresponds to a measurement of engine noise captured by a motor sensor. For this task, the goal is to automatically detect the presence of a specific issue with the engine. The problem is a balanced binary classification task. The full description of this dataset can be found here: http://www.j-wichard.de/publications/FordPaper.pdf

Later, can include the features mentioned in the paper; namely the autocorrelation values and spectral density features as separate channels, akin to the work we will do later

In [87]:
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "6"

import matplotlib.pyplot as plt
import numpy as np
import torch
from torch.utils.data import DataLoader
from torch.optim.lr_scheduler import ReduceLROnPlateau, CosineAnnealingLR, ExponentialLR
import torch.nn as nn
import torch.nn.functional as F
import torch.utils.data as data
import torchaudio
from pytorch_lightning import LightningModule, Trainer
from pytorch_lightning.loggers import TensorBoardLogger
from pytorch_lightning.callbacks import EarlyStopping, LearningRateMonitor, RichProgressBar
import datetime
from sklearn.metrics import roc_curve, auc

In [2]:
import pytorch_lightning
pytorch_lightning.__version__

'2.1.4'

In [3]:
# Check if CUDA is available
if torch.cuda.is_available():
    device = torch.device("cuda")
    print("GPU is available")
else:
    device = torch.device("cpu")
    print("GPU is not available")

GPU is available


## Load the data

In [59]:
# Read the data
def readucr(filename):
    data = np.loadtxt(filename, delimiter="\t")
    y = data[:, 0]
    x = data[:, 1:]
    return x, y.astype(int)

root_url = "https://raw.githubusercontent.com/hfawaz/cd-diagram/master/FordA/"

x_train, y_train = readucr(root_url + "FordA_TRAIN.tsv")
x_test, y_test = readucr(root_url + "FordA_TEST.tsv")

# Reshape the data to be ready for multivariate time-series data (multiple channels)
# Shape is (samples, channels, sequence length)
x_train = x_train.reshape((x_train.shape[0], 1, x_train.shape[1]))
x_test = x_test.reshape((x_test.shape[0], 1, x_test.shape[1]))
print("x_train shape: ", x_train.shape)
print("x_test shape: ", x_test.shape)

# Scale the data to be between 0 and 1
min_val = min(np.min(x_train), np.min(x_test))
max_val = max(np.max(x_train), np.max(x_test))
x_train = (x_train - min_val) / (max_val - min_val)
x_test = (x_test - min_val) / (max_val - min_val)

# Count the number of classes
num_classes = len(np.unique(y_train))
print("Number of classes: " + str(num_classes))

# Standardize the labels to positive integers. The expected labels will then be 0 and 1.
y_train[y_train == -1] = 0
y_test[y_test == -1] = 0

# Use 20% of training data for validation
train_set_size = int(len(x_train) * 0.8)
valid_set_size = len(x_train) - train_set_size
print("Training set size: " + str(train_set_size))
print("Validation set size: " + str(valid_set_size))

# split the x_train and y_train set into two
seed = torch.Generator().manual_seed(42)
x_train, x_valid = data.random_split(x_train, [train_set_size, valid_set_size], generator=seed)
y_train, y_valid = data.random_split(y_train, [train_set_size, valid_set_size], generator=seed)

x_train shape:  (3601, 1, 500)
x_test shape:  (1320, 1, 500)
Number of classes: 2
Training set size: 2880
Validation set size: 721


In [104]:
# Define a custom Dataset class for your data
class CustomDataset(torch.utils.data.Dataset):
    def __init__(self, x_data, y_data, transform=None):
        self.x_data = x_data
        self.y_data = y_data
        self.transform = transform

    def __len__(self):
        return len(self.x_data)

    def __getitem__(self, index):
        x = self.x_data[index]
        y = self.y_data[index]

        if self.transform:
            x = self.transform(x)

        return x, y

In [105]:
# Create train, valid, and test data loaders with float64 datatype
batch_size = 128
workers = 0
train_loader = DataLoader(
    CustomDataset(x_train, y_train),
    batch_size=batch_size,
    shuffle=True,
    drop_last=True,
    num_workers=workers,
)

valid_loader = DataLoader(
    CustomDataset(x_valid, y_valid),
    batch_size=batch_size,
    shuffle=False,
    drop_last=True,
    num_workers=workers,
)

test_loader = DataLoader(
    CustomDataset(x_test, y_test),
    batch_size=batch_size,
    shuffle=False,
    drop_last=True,
    num_workers=workers,
)

# Print the size of a batch and type of data
for x, y in train_loader:
    print("Sample batch of data (batch size, # channels, sequence length): " + str(x.shape))
    break

Sample batch of data (batch size, # channels, sequence length): torch.Size([128, 1, 500])


## FNO Model

In [100]:
# Option 1: use neuralop package
# from neuralop.models.spectral_convolution import FactorizedSpectralConv
# fourier_layer = FactorizedSpectralConv(in_channels=in_channels, out_channels=out_channels, n_modes=(modes1))

# Option 2: create my own spectral convolution layer
class SpectralConv1d(nn.Module):
    def __init__(self,
                 in_channels,   # Number of input channels
                 out_channels,  # Number of output channels
                 modes,         # Number of Fourier modes to multiply, at most floor(N/2) + 1
                 debug=False):  # If true, print the shape of every parameter      
        super(SpectralConv1d, self).__init__()

        self.in_channels = in_channels
        self.out_channels = out_channels
        self.modes = modes
        self.debug = debug

        self.scale = (1 / (in_channels * out_channels))
        self.weights = torch.empty(in_channels, out_channels, self.modes, dtype=torch.cfloat)
        nn.init.xavier_normal_(self.weights, gain=1.0)
        self.weights = nn.Parameter(self.weights)

    def forward(self, x):
        batchsize = x.shape[0]

        #Compute Fourier coefficients
        x_ft = torch.fft.rfft(x)

        # Multiply relevant Fourier modes
        out_ft = torch.zeros(batchsize, self.out_channels,  x.size(-1)//2 + 1, dtype=torch.cfloat, device=x.device)
        
        if self.debug:
            print("x shape: " + str(x.shape))
            print("x_ft.shape: " + str(x_ft.shape))
            print("out_ft.shape: " + str(out_ft.shape))
            print("self.weights.shape: " + str(self.weights.shape))

        out_ft[:, :, :self.modes] = \
            self.compl_mul1d(x_ft[:, :, :self.modes], self.weights)

        #Return to physical space
        x = torch.fft.irfft(out_ft)

        if self.debug:
            print("irfft out_ft shape: " + str(x.shape) + "\n")
        
        return x

    def compl_mul1d(self, input, weights):
        # (batch, in_channel, x), (in_channel, out_channel, x) -> (batch, out_channel, x)
        return torch.einsum("bix,iox->box", input, weights)

In [101]:
class FNOClassifier(LightningModule):
    def __init__(self, 
                 modes, 
                 lr=1e-3, 
                 channels=[64, 64, 64], 
                 pooling=500, 
                 optimizer="adam", 
                 scheduler="reducelronplateau", 
                 momentum=0.9,
                 pool_type="max"
        ):
        super().__init__()

        self.lr = lr
        self.optimizer = optimizer
        self.scheduler = scheduler
        self.momentum = momentum
        self.num_channels = len(channels)
        self.loss = nn.CrossEntropyLoss()

        self.example_input_array = torch.rand(1, 1, 500)

        for i in range(self.num_channels):
            if i == 0:
                in_channels = 1
            else:
                in_channels = channels[i-1]

            out_channels = channels[i]

            setattr(self, f"fno_layer_{i}", nn.Sequential(
                SpectralConv1d(in_channels, out_channels, modes),
                nn.BatchNorm1d(out_channels)
            ))
        
        if pool_type == "max":
            self.pool = nn.MaxPool1d(pooling)
        else:
            self.pool = nn.AvgPool1d(pooling)

        self.dropout = nn.Dropout(p=0.5)

        self.fc = nn.Linear(channels[-1] * int((500/pooling)), 2) # output number of channels of final fno_block * (3rd input dimension 500 / maxpool size)

    def forward(self, x):
        for i in range(self.num_channels):
            x = getattr(self, f"fno_layer_{i}")(x)
            x = F.tanh(x)

        x = self.pool(x)

        x = x.view(x.size(0), -1)
        # noise = torch.randn_like(x)  # adding noise doesn't seem to help
        # x = x + noise
        x = self.dropout(x) # dropping out seems to help a little, but not to the extent we need it to 
        x = self.fc(x)

        return x    

    def training_step(self, batch, batch_idx):
        x, y = batch
        x = self.forward(x)

        # Log the loss
        loss = self.loss(x, y) # No need for softmax, as it is included in nn.CrossEntropyLoss
        self.log("train_loss", loss, on_step=True, on_epoch=True, prog_bar=True)

        # Log the accuracy
        logits = F.softmax(x, dim=1)
        preds = torch.argmax(logits, dim=1)
        acc = (preds == y).float().mean()
        self.log("train_acc", acc, on_step=True, on_epoch=True, prog_bar=True)

        # collapse flag 
        collapse_flg = torch.unique(logits).size(dim=0)
        self.log("collapse_flg_train", collapse_flg, sync_dist=True, on_step=False, on_epoch=True, prog_bar=True)

        return loss
    
    def validation_step(self, batch, batch_idx):
        x, y = batch
        x = self.forward(x)

        # Log the loss
        loss = self.loss(x, y)
        self.log("val_loss", loss, on_step=False, on_epoch=True, prog_bar=True)
        
        # Log the accuracy
        logits = F.softmax(x, dim=1)
        preds = torch.argmax(logits, dim=1)
        acc = (preds == y).float().mean()
        self.log("val_acc", acc, on_step=False, on_epoch=True, prog_bar=True)

        # collapse flag 
        collapse_flg = torch.unique(logits).size(dim=0)
        self.log("collapse_flg", collapse_flg, sync_dist=True, on_epoch=True, prog_bar=True)

        return loss
    
    def test_step(self, batch, batch_idx):
        x, y = batch
        x = self.forward(x)

        # Log the loss
        loss = self.loss(x, y)
        self.log("test_loss", loss)

        # Log the accuracy
        logits = F.softmax(x, dim=1)
        preds = torch.argmax(logits, dim=1)
        acc = (preds == y).float().mean()
        self.log("test_acc", acc)

        return loss
    
    def configure_optimizers(self):
        if self.optimizer == "sgd":
            optimizer = torch.optim.SGD(self.parameters(), lr=self.lr, momentum=self.momentum)
        else:
            optimizer = torch.optim.Adam(self.parameters(), lr=self.lr)

        if self.scheduler == "reducelronplateau":
            scheduler = ReduceLROnPlateau(optimizer, mode="min", factor=0.5, patience=5, min_lr=1e-6)
        elif self.scheduler == "cosineannealinglr":
            scheduler = CosineAnnealingLR(optimizer, T_max=10, eta_min=1e-6)
        else:
            scheduler = ExponentialLR(optimizer, gamma=0.95)
        
        return [optimizer], [{"scheduler": scheduler, "monitor": "val_loss"}]

## Train and test a model

In [102]:
# Hyperparameters
modes = 250
channels = [8092] 
pool_type = "avg" 
pooling = 500

# Optimizers and learning rate schedulers
optimizer = "adam"
momentum = 0
scheduler = "cosineannealinglr"
lr = 1e-3

# Initialize classifier
classifier = FNOClassifier(modes=modes, lr=lr, channels=channels, pooling=pooling, optimizer=optimizer, scheduler=scheduler, momentum=momentum, pool_type=pool_type)

# Print the model
# print(classifier.eval())

In [103]:
# Create a learning rate scheduler and early stopping callback
callbacks = [
    # EarlyStopping(monitor="val_acc", patience=30, mode="max"),
    LearningRateMonitor(logging_interval="epoch")
]

# Train the model
trainer = Trainer(max_epochs=500,
                  callbacks=callbacks,
                  accelerator="auto"
)

trainer.fit(model=classifier, 
            train_dataloaders=train_loader,
            val_dataloaders=valid_loader
)

GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [6]

  | Name        | Type             | Params | In sizes       | Out sizes     
-----------------------------------------------------------------------------------
0 | loss        | CrossEntropyLoss | 0      | ?              | ?             
1 | fno_layer_0 | Sequential       | 2.0 M  | [1, 1, 500]    | [1, 8092, 500]
2 | pool        | AvgPool1d        | 0      | [1, 8092, 500] | [1, 8092, 1]  
3 | dropout     | Dropout          | 0      | [1, 8092]      | [1, 8092]     
4 | fc          | Linear           | 16.2 K | [1, 8092]      | [1, 2]        
-----------------------------------------------------------------------------------
2.1 M     Trainable params
0         Non-trainable params
2.1 M     Total params
8.221     Total estimated model params size (MB)


Sanity Checking DataLoader 0:  50%|█████     | 1/2 [00:00<00:00, 23.26it/s]

                                                                           

/mnt/srl-oahu-1/srl-hawaii-1/ariannab/anaconda3/envs/torch_fno/lib/python3.10/site-packages/pytorch_lightning/trainer/connectors/data_connector.py:441: The 'val_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=255` in the `DataLoader` to improve performance.
/mnt/srl-oahu-1/srl-hawaii-1/ariannab/anaconda3/envs/torch_fno/lib/python3.10/site-packages/pytorch_lightning/trainer/connectors/data_connector.py:441: The 'train_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=255` in the `DataLoader` to improve performance.
/mnt/srl-oahu-1/srl-hawaii-1/ariannab/anaconda3/envs/torch_fno/lib/python3.10/site-packages/pytorch_lightning/loops/fit_loop.py:293: The number of training batches (22) is smaller than the logging interval Trainer(log_every_n_steps=50). Set a lower value for log_every_n_steps if you want to 

Epoch 22:  73%|███████▎  | 16/22 [00:01<00:00,  9.70it/s, v_num=41, train_loss_step=0.322, train_acc_step=0.898, val_loss=2.680, val_acc=0.539, collapse_flg=1.600, train_loss_epoch=0.369, train_acc_epoch=0.862, collapse_flg_train=2.000]

/mnt/srl-oahu-1/srl-hawaii-1/ariannab/anaconda3/envs/torch_fno/lib/python3.10/site-packages/pytorch_lightning/trainer/call.py:54: Detected KeyboardInterrupt, attempting graceful shutdown...


In [None]:
# Test the model

trainer.test(dataloaders=test_loader)

In [None]:
# # Get the predictions
# y_pred = []
# y_true = []

# for x, y in test_loader:
#     x = x.to(classifier.device)
#     y = y.to(classifier.device)
#     logits = F.softmax(classifier(x), dim=1)
#     preds = torch.argmax(logits, dim=1)
#     y_pred.extend(logits.cpu().detach().numpy())
#     y_true.extend(y.cpu().detach().numpy())

# # Compute the ROC curve and AUC
# fpr, tpr, _ = roc_curve(y_true, y_pred)
# roc_auc = auc(fpr, tpr)

# # Plot the ROC curve
# plt.figure()
# plt.plot(fpr, tpr, color="darkorange", lw=2, label="ROC curve (area = %0.2f)" % roc_auc)
# plt.plot([0, 1], [0, 1], color="navy", lw=2, linestyle="--")
# plt.xlim([0.0, 1.0])
# plt.ylim([0.0, 1.05])
# plt.xlabel("False Positive Rate")
# plt.ylabel("True Positive Rate")
# plt.title("Receiver Operating Characteristic")
# plt.legend(loc="lower right")


# # Save the figure to the logs directory
# plt.savefig(logger.log_dir + "/roc_curve.png")

## Optuna

In [None]:
# # Create an optuna study to optimize the number of modes, channels, and pooling size
# def objective(trial):
#     # Optimize the number of modes
#     modes = trial.suggest_int("modes", 5, 15)

#     # Optimize the number of fno_layers
#     fno_layers = trial.suggest_int("fno_layers", 2, 5)

#     # Optimize the number of channels
#     channels = [trial.suggest_int(f"n_channels_{i}", 16, 256) for i in range(fno_layers)]

#     # Optimize the pooling size
#     pooling = trial.suggest_int("pooling", 2, 500)

#     # Optimize the learning rate
#     lr = trial.suggest_float("lr", 1e-4, 1e-1, log=True)

#     # Create the model
#     model = FNOClassifier(modes=modes, channels=channels, pooling=pooling)

#     # Create a learning rate scheduler and early stopping callback
#     callbacks = [
#         EarlyStopping(monitor="val_acc", patience=20, mode="max"),
#         LearningRateMonitor(logging_interval="step")
#     ]

#     # Create a tensorboard logger
#     experiment_name = "optuna"
#     logger = TensorBoardLogger(save_dir="logs/", name=experiment_name, version=datetime.datetime.now().strftime("%Y_%m_%d-%H_%M_%S"))

#     logger.log_hyperparams({"modes": modes, "lr": lr, "channels": channels, "pooling": pooling, "lr_scheduler": "ReduceLROnPlateau", "patience": 10, "min_lr": 1e-6, "factor": 0.5, "batchsize": batch_size})

#     # Create a trainer
#     trainer = Trainer(
#         max_epochs=100,
#         logger=logger,
#         callbacks=callbacks,
#         accelerator='auto'
#     )

#     # Train the model
#     trainer.fit(model, train_loader, valid_loader)

#     # Test the model
#     result = trainer.test(dataloaders=test_loader)

#     # Return the validation accuracy
#     return result[0]["test_acc"]

In [None]:
# study = optuna.create_study(direction="maximize")
# study.optimize(objective, n_trials=500)

# # Print the best hyperparameters
# print(study.best_params)