In [129]:
import torch
import torch.nn as nn
import numpy as np


def get_conv_output_dim(layer: nn.Module, input_dim: tuple) -> tuple:
    """Calculate output dimension of a CNN layer

    Parameters
    ----------
    layer : torch.nn.Module
        The CNN layer to calculate the output dimension of
    input_dim : tuple
        The input dimension of the CNN layer in the form of (n_channels, height, width)

    Returns
    -------
    tuple
        The output dimension of the CNN layer in the form of (n_channels, height, width)
    """
    kernel_size = layer.kernel_size
    stride = layer.stride
    padding = layer.padding
    dilation = layer.dilation

    input_channels, input_height, input_width = input_dim

    output_channels = layer.out_channels
    output_height = (
        input_height + 2 * padding[0] - dilation[0] * (kernel_size[0] - 1) - 1
    ) / stride[0] + 1
    output_width = (
        input_width + 2 * padding[1] - dilation[1] * (kernel_size[1] - 1) - 1
    ) / stride[1] + 1

    return (output_channels, int(output_height), int(output_width))


class BaseCNN(nn.Module):
    def __init__(
        self,
        input_shape,
        conv_layers,
        conv_filters,
        dropout_rate,
        conv_kernel,
        max_pooling_size,
        fc_units,
        fc_layers,
    ):
        """Base CNN model for the classification of the images

        Parameters
        ----------
        input_shape : tuple
            The input shape of the images in the form of (n_channels, height, width)
        conv_layers : int
            The number of convolutional layers
        conv_filters : int
            The number of filters in the convolutional layers
        dropout_rate : float
            The dropout rate of the dropout layers
        conv_kernel : int
            The kernel size of the convolutional layers
        max_pooling_size : int
            The kernel size of the max pooling layers
        fc_units : int
            The number of units in the fully connected layers
        fc_layers : int
            The number of fully connected layers
        """
        super(BaseCNN, self).__init__()
        self.input_shape = input_shape
        n_channels = input_shape[0]
        self.n_conv_layers = conv_layers
        self.conv_filters = conv_filters
        self.dropout_rate = dropout_rate
        self.conv_kernel = conv_kernel
        self.max_pooling_size = max_pooling_size
        self.fc_units = fc_units
        self.n_fc_layers = fc_layers

        # Convolutional layers
        self.conv_layers = nn.Sequential()
        self.conv_layers.add_module(
            "conv0",
            nn.Conv2d(n_channels, self.conv_filters, kernel_size=self.conv_kernel),
        )
        self.conv_layers.add_module("relu0", nn.ReLU())
        self.conv_layers.add_module("dropout0", nn.Dropout(self.dropout_rate))
        self.conv_layers.add_module("maxpool0", nn.MaxPool2d(self.max_pooling_size))

        for i in range(1, self.n_conv_layers):
            self.conv_layers.add_module(
                f"conv{i}",
                nn.Conv2d(
                    self.conv_filters, self.conv_filters, kernel_size=self.conv_kernel
                ),
            )
            self.conv_layers.add_module(f"relu{i}", nn.ReLU())
            self.conv_layers.add_module(f"dropout{i}", nn.Dropout(self.dropout_rate))
            self.conv_layers.add_module(
                f"maxpool{i}", nn.MaxPool2d(self.max_pooling_size)
            )

        # Fully connected layers
        self.fc_layers = nn.Sequential()
        # input_units = self.conv_filters * (128 // (self.max_pooling_size ** self.n_conv_layers)) * (76 // (self.max_pooling_size ** self.n_conv_layers))
        input_units = np.prod(self._calc_cnn_output_dim())
        for i in range(self.n_fc_layers):
            self.fc_layers.add_module(f"fc{i}", nn.Linear(input_units, self.fc_units))
            self.fc_layers.add_module(f"relu{i}", nn.ReLU())
            self.fc_layers.add_module(f"dropout{i}", nn.Dropout(self.dropout_rate))
            input_units = self.fc_units

        # Output layer
        self.output_layer = nn.Linear(self.fc_units, 2)
        self.softmax = nn.Softmax(dim=1)

    def _calc_cnn_output_dim(self) -> tuple:
        """Calculate output dimension of the CNN part of the network

        Parameters
        ----------
        None

        Returns
        -------
        tuple
        The output dimension of the CNN part of the network in the form of (n_channels, height, width)
        """
        output_dim = get_conv_output_dim(self.conv_layers[0], self.input_shape)
        for layer in self.conv_layers[1:]:
            # Check if layer is a convolutional layer
            if isinstance(layer, nn.Conv2d):
                output_dim = get_conv_output_dim(layer, output_dim)
            elif isinstance(layer, nn.MaxPool2d):
                output_dim = (
                    output_dim[0],
                    output_dim[1] // layer.kernel_size,
                    output_dim[2] // layer.kernel_size,
                )

        return output_dim

    def forward(self, x):
        """Forward pass of the network

        Parameters
        ----------
        x : torch.Tensor
            The input tensor. Shape should be (batch_size, n_channels, height, width)

        Returns
        -------
        torch.Tensor
            The output tensor. Shape should be (batch_size, n_classes). Outputs a probability for each class.
        """
        x = self.conv_layers(x)
        x = x.view(x.size(0), -1)  # Flatten the tensor
        # print("x size: ", x.size())
        x = self.fc_layers(x)
        x = self.output_layer(x)
        x = self.softmax(x)
        return x

In [130]:
# from cnn import BaseCNN

conv_layers = 1
fc_layers = 2
max_pooling_size = 4
dropout_rate = 0.5
conv_filters = 8
conv_kernel = 8
fc_units = 32
epochs = 10
batch_size = 32


class Model:
    """Model class."""

    def __init__(
        self,
        input_shape,
        batch_size=32,
        optimizer="adam",
        learning_rate=0.001,
        loss="cross_entropy",
        shuffle=True,
    ):
        self.cnn = BaseCNN(
            input_shape=input_shape,
            conv_layers=conv_layers,
            conv_filters=conv_filters,
            dropout_rate=dropout_rate,
            conv_kernel=conv_kernel,
            max_pooling_size=max_pooling_size,
            fc_units=fc_units,
            fc_layers=fc_layers,
        )

        # self.logger.info("Initializing Model...")
        # Get Device
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        self.optimizer_name = optimizer
        self.learning_rate = learning_rate
        self.loss_name = loss
        self.batch_size = batch_size
        self.shuffle = shuffle

        self._set_optimizer_and_loss()

    def _set_optimizer_and_loss(self):
        """Set the optimizer and loss function"""
        if self.optimizer_name == "adam":
            self.optimizer = torch.optim.Adam(
                self.cnn.parameters(), lr=self.learning_rate
            )
        else:
            raise NotImplementedError("Only Adam optimizer is supported at the moment")

        if self.loss_name == "cross_entropy":
            self.criterion = torch.nn.CrossEntropyLoss()
        else:
            raise NotImplementedError(
                "Only cross entropy loss is supported at the moment"
            )

    def get_number_of_parameters(self):
        return sum(p.numel() for p in self.cnn.parameters() if p.requires_grad)

    def _create_dataloader(
        self, X: np.array, Y: np.array
    ) -> torch.utils.data.DataLoader:
        """Create a dataloader from the given data

        Parameters
        ----------
        X : np.array
            Input data of shape (n_samples,height,width) or (n_samples,channels,height,width). Will add channel dimension if needed.
        Y : np.array
            Target data of shape (n_samples,).

        Returns
        -------
        loader : torch.utils.data.DataLoader
            Dataloader with the given data and batch size specified in the constructor.

        """
        X_tensor = torch.from_numpy(X).float()
        Y_tensor = torch.from_numpy(Y).float()

        # Reshape X_tensor
        if len(X_tensor.shape) == 3:
            X_tensor = X_tensor.unsqueeze(1)

        dataset = torch.utils.data.TensorDataset(X_tensor, Y_tensor)
        loader = torch.utils.data.DataLoader(
            dataset, batch_size=self.batch_size, shuffle=self.shuffle
        )
        return loader

    def train(self, n_epochs, X_train, Y_train, save_path=None, verbose=False):
        print("Training")

        # Create Dataloaders
        train_loader = self._create_dataloader(X_train, Y_train)
        self.cnn.to(self.device)
        train_losses = []
        for epoch in range(n_epochs):
            print("Epoch: ", epoch)
            for batch_inputs, batch_targets in train_loader:
                batch_inputs, batch_targets = (
                    batch_inputs.to(self.device),
                    batch_targets.to(self.device),
                )
                # Reset gradients
                self.optimizer.zero_grad()

                # Forward pass
                batch_preds = self.cnn.forward(batch_inputs)
                # Compute loss
                loss = self.criterion(batch_preds, batch_targets)
                # Backward and optimize
                loss.backward()
                self.optimizer.step()
                train_losses.append(loss.item())
                # Validation
                if epoch % 10 == 0:
                    pass
                    """
                    val_loss, val_acc = self.evaluate(val_loader, criterion)
                    val_losses.append(val_loss)
                    val_accs.append(val_acc)
                    if val_acc > best_val_acc:
                        best_val_acc = val_acc
                    """
                    if verbose:
                        print(f"Epoch {epoch} | Train Loss {loss.item()}")
        return train_losses  # , val_losses, val_accs

    def evaluate(self, X_test, Y_test, criterion):
        loader = self._create_dataloader(X=X_test, Y=Y_test)
        self.cnn.eval()
        with torch.no_grad():
            total_loss = 0
            correct = 0
            for batch_inputs, batch_targets in loader:
                batch_inputs, batch_targets = (
                    batch_inputs.to(self.device),
                    batch_targets.to(self.device),
                )
                batch_preds = self.cnn.forward(batch_inputs)

                total_loss += self.criterion(batch_preds, batch_targets).item()
                class_predictions = batch_preds.argmax(dim=1)
                # true classes are one hot encoded
                true_classes = batch_targets.argmax(dim=1)

                correct += (class_predictions == true_classes).sum().item()
            average_loss = total_loss / len(loader)
            accuracy = correct / len(loader.dataset)
        self.cnn.train()
        return average_loss, accuracy

    def __call__(self, x):
        return self.cnn(x)

    def __str__(self):
        return str(self.cnn)

    def __repr__(self):
        return str(self.cnn)

In [131]:
import pickle

datapath = (
    "/Users/ufuk/1. Research/AIMS/Project Repo/eso/data/SavedData/preprocessed/train"
)

with open(datapath + "/X.pkl", "rb") as f:
    X = pickle.load(f)
with open(datapath + "/Y.pkl", "rb") as f:
    Y = pickle.load(f)

In [132]:
X.shape

(1980, 128, 76)

In [133]:
test = torch.from_numpy(X[0]).float()
test = test.unsqueeze(2)
test = test.unsqueeze(0)
print(test.shape)

torch.Size([1, 128, 76, 1])


In [134]:
8 * 121 * 69

66792

In [135]:
(1, X.shape[1], X.shape[2])

(1, 128, 76)

In [136]:
8 * 510

4080

In [137]:
8 * 30 * 17

4080

In [138]:
m = Model(input_shape=(1, X.shape[1], X.shape[2]))
test = torch.from_numpy(X[0]).float()
test = test.unsqueeze(0)
test = test.unsqueeze(0)
print(test.shape)
m(test)

torch.Size([1, 1, 128, 76])


tensor([[0.4863, 0.5137]], grad_fn=<SoftmaxBackward0>)

In [139]:
m = Model(input_shape=(1, X.shape[1], X.shape[2]))

In [141]:
m = Model(input_shape=(1, X.shape[1], X.shape[2]))
test = torch.from_numpy(X[0]).float()
test = test.unsqueeze(0)
test = test.unsqueeze(0)
print(test.shape)
m(test)

from sklearn.model_selection import train_test_split
from torch.utils.data import TensorDataset, DataLoader


X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=0.2)


# Convert NumPy arrays to PyTorch tensors
X_train_tensor = torch.Tensor(X_train)
X_test_tensor = torch.Tensor(X_test)
y_train_tensor = torch.Tensor(y_train)
y_test_tensor = torch.Tensor(y_test)


# Reshape input data if needed
if len(X_train_tensor.shape) == 3:
    X_train_tensor = X_train_tensor.unsqueeze(1)
    X_test_tensor = X_test_tensor.unsqueeze(1)


# Create datasets and data loaders
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)

batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

torch.Size([1, 1, 128, 76])


In [142]:
X_train_tensor.shape

torch.Size([1584, 1, 128, 76])

In [145]:
m.train(100, X_train=X_train, Y_train=y_train, verbose=True)

Training
Epoch:  0
Epoch 0 | Train Loss 0.41014260053634644
Epoch 0 | Train Loss 0.581453263759613
Epoch 0 | Train Loss 0.5128495693206787
Epoch 0 | Train Loss 0.4777698218822479
Epoch 0 | Train Loss 0.5446839928627014
Epoch 0 | Train Loss 0.47392070293426514
Epoch 0 | Train Loss 0.5632570385932922
Epoch 0 | Train Loss 0.567483127117157
Epoch 0 | Train Loss 0.5461961030960083
Epoch 0 | Train Loss 0.5639830231666565
Epoch 0 | Train Loss 0.4779053330421448
Epoch 0 | Train Loss 0.6133934855461121
Epoch 0 | Train Loss 0.6607391834259033
Epoch 0 | Train Loss 0.4760320782661438
Epoch 0 | Train Loss 0.4772844612598419
Epoch 0 | Train Loss 0.5231408476829529
Epoch 0 | Train Loss 0.591839611530304
Epoch 0 | Train Loss 0.6541820168495178
Epoch 0 | Train Loss 0.5608392357826233
Epoch 0 | Train Loss 0.4975656270980835
Epoch 0 | Train Loss 0.6153240203857422
Epoch 0 | Train Loss 0.6527336239814758
Epoch 0 | Train Loss 0.4122413396835327
Epoch 0 | Train Loss 0.5761924982070923
Epoch 0 | Train Loss 0

KeyboardInterrupt: 

In [146]:
m.evaluate(X_test, y_test, m.criterion)

(0.3133186652110173, 1.0)

In [154]:
m.cnn.eval()
for x, y in zip(X_test, y_test):
    X = torch.from_numpy(x).float()
    X = X.unsqueeze(0)
    X = X.unsqueeze(0)
    print("true_label: ", y)
    predicted = m(X)
    print(predicted)

true_label:  [0. 1.]
tensor([[1.5201e-17, 1.0000e+00]], grad_fn=<SoftmaxBackward0>)
true_label:  [0. 1.]
tensor([[1.2157e-16, 1.0000e+00]], grad_fn=<SoftmaxBackward0>)
true_label:  [0. 1.]
tensor([[7.2161e-16, 1.0000e+00]], grad_fn=<SoftmaxBackward0>)
true_label:  [0. 1.]
tensor([[1.8747e-15, 1.0000e+00]], grad_fn=<SoftmaxBackward0>)
true_label:  [0. 1.]
tensor([[4.3711e-14, 1.0000e+00]], grad_fn=<SoftmaxBackward0>)
true_label:  [1. 0.]
tensor([[9.9999e-01, 8.6611e-06]], grad_fn=<SoftmaxBackward0>)
true_label:  [0. 1.]
tensor([[1.1480e-10, 1.0000e+00]], grad_fn=<SoftmaxBackward0>)
true_label:  [0. 1.]
tensor([[7.5265e-17, 1.0000e+00]], grad_fn=<SoftmaxBackward0>)
true_label:  [1. 0.]
tensor([[1.0000e+00, 2.9732e-10]], grad_fn=<SoftmaxBackward0>)
true_label:  [1. 0.]
tensor([[1.0000e+00, 4.1014e-06]], grad_fn=<SoftmaxBackward0>)
true_label:  [0. 1.]
tensor([[2.7585e-17, 1.0000e+00]], grad_fn=<SoftmaxBackward0>)
true_label:  [1. 0.]
tensor([[9.9997e-01, 3.3947e-05]], grad_fn=<SoftmaxBack