In [1]:
import os
import pickle
import random
import sys
import warnings

# sys.path.append(os.path.join("../models"))
# import conv_lstm
# from conv_lstm import ConvLSTM

import torch
# from src.model_utils import custom_multiclass_report, CroplandDataModule_LSTM, Crop_LSTM, Crop_PL
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset

import pytorch_lightning as pl
import torchmetrics
from pytorch_lightning.callbacks.early_stopping import EarlyStopping
from pytorch_lightning.callbacks import LearningRateMonitor

In [2]:
# Read dictionary pkl file
with open(os.path.join('..', 'data', 'processed_files', 'pkls', 'X_lstm_D.pkl'), "rb") as fp:
    X = pickle.load(fp)

with open(os.path.join('..', 'data', 'processed_files', 'pkls', 'y_lstm_D.pkl'), "rb") as fp:
    y = pickle.load(fp)

In [11]:
# batch_size = 1000
# X_train = torch.Tensor(X['Train'])
# y_train = torch.Tensor(y['Train'])
# X_test = torch.Tensor(X['Test'])
# y_test = torch.Tensor(y['Test'])

# train_dataset = TensorDataset(X_train,y_train)
# test_dataset = TensorDataset(X_test,y_test)

# trainloader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size,
#                                           shuffle=True, num_workers=2)

# testloader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size,
#                                          shuffle=False, num_workers=2)

# X_train.size()

torch.Size([16870800, 12, 54])

In [14]:
class CroplandDataModule_LSTM(pl.LightningDataModule):
    """
    This module defines a LightningDataModule class for loading and preparing data for a Cropland classification model using LSTM architecture.

    Args:
    X (dict): A dictionary containing the input data for Train, Validation, and Test sets.
    y (dict): A dictionary containing the corresponding target values for Train, Validation, and Test sets.
    batch_size (int): The batch size to be used for training and evaluation. Default is 128.
    """

    def __init__(self, X: dict, y: dict, batch_size: int = 128):
        super().__init__()
        self.batch_size = batch_size
        self.X_train, self.X_val, self.X_test = (
            torch.FloatTensor(X["Train"]),
            torch.FloatTensor(X["Val"]),
            torch.FloatTensor(X["Test"]),
        )
        # self.X_static_train, self.X_static_val, self.X_static_test = (
        #     torch.FloatTensor(X["Train"][1]),
        #     torch.FloatTensor(X["Val"][1]),
        #     torch.FloatTensor(X["Test"][1]),
        # )
        self.y_train, self.y_val, self.y_test = (
            torch.LongTensor(y["Train"]),
            torch.LongTensor(y["Val"]),
            torch.LongTensor(y["Test"]),
        )

        self.dl_dict = {"batch_size": self.batch_size}

    def setup(self, stage=None):
        if stage == "fit" or stage is None:
            self.dataset_train = TensorDataset(self.X_train, self.y_train)
            self.dataset_val = TensorDataset(self.X_val, self.y_val)
            # self.dataset_train = self.X_train, self.y_train
            # self.dataset_val = self.X_val, self.y_val

        if stage == "test" or stage is None:
            self.dataset_test = TensorDataset(self.X_test, self.y_test)
            # self.dataset_test = self.X_test, self.y_test

    def train_dataloader(self):
        return DataLoader(self.dataset_train, shuffle=True, **self.dl_dict)

    def val_dataloader(self):
        return DataLoader(self.dataset_val, **self.dl_dict)

    def test_dataloader(self):
        return DataLoader(self.dataset_test, **self.dl_dict)

In [15]:
class ConvLSTMCell(nn.Module):
    """
    Initialize ConvLSTM cell.

    Parameters
    ----------
    input_dim (int): Number of channels of input tensor.
    hidden_dim (int): Number of channels of hidden state.
    kernel_size (int): Size of the convolutional kernel.
    bias (bool): Whether to add the bias.
    """
    
    def __init__(self,
        input_dim,
        hidden_dim,
        kernel_size,
        bias):

        super(ConvLSTMCell, self).__init__()
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.kernel_size = kernel_size
        self.padding = kernel_size[0] // 2#, kernel_size[1] // 2
        self.bias = bias

        self.conv = nn.Conv1d(in_channels=self.input_dim + self.hidden_dim,
                            out_channels=4 * self.hidden_dim,
                            kernel_size=self.kernel_size,
                            padding=self.padding,
                            bias=self.bias)

    def forward(self, input_tensor, cur_state):
        h_cur, c_cur = cur_state
        combined = torch.cat([input_tensor, h_cur], dim=1)  # concatenate along channel axis

        combined_conv = self.conv(combined)
        cc_i, cc_f, cc_o, cc_g = torch.split(combined_conv, self.hidden_dim, dim=1)
        i = torch.sigmoid(cc_i)
        f = torch.sigmoid(cc_f)
        o = torch.sigmoid(cc_o)
        g = torch.tanh(cc_g)

        c_next = f * c_cur + i * g
        h_next = o * torch.tanh(c_next)

        return h_next, c_next

    def init_hidden(self, batch_size, length):
        return (torch.zeros(batch_size, self.hidden_dim, length, device=self.conv.weight.device),
                torch.zeros(batch_size, self.hidden_dim, length, device=self.conv.weight.device))

In [16]:
class Crop_ConvLSTM(nn.Module):

    """
    A PyTorch module implementing a Crop Conv LSTM network.
    
    Parameters:
        input_dim: Number of channels in input
        hidden_dim: Number of hidden channels
        kernel_size: Size of kernel in convolutions
        n_layers: Number of LSTM layers stacked on each other
        bias: Bias or no bias in Convolution
        return_all_layers: Return the list of computations for all layers

    Input:
        A tensor of size B, C, T
    Output:
        A tuple of two lists of length n_layers (or length 1 if return_all_layers is False).
            0 - layer_output_list is the list of lists of length T of each output
            1 - last_state_list is the list of last states
                    each element of the list is a tuple (h, c) for hidden state and memory
    """

    def __init__(self,
        input_dim: int,
        hidden_dim: int,
        kernel_size: tuple,
        n_layers: int,
        n_classes: int,
        seq_len: int,
        time_len: int,
        bias: bool=True,
        return_all_layers: bool=False
        ) -> None:
        super(Crop_ConvLSTM, self).__init__()

        self._check_kernel_size_consistency(kernel_size)
        # Make sure that both `kernel_size` and `hidden_dim` are lists having len == n_layers
        kernel_size = self._extend_for_multilayer(kernel_size, n_layers)
        hidden_dim = self._extend_for_multilayer(hidden_dim, n_layers)
        if not len(kernel_size) == len(hidden_dim) == n_layers:
            raise ValueError('Inconsistent list length.')

        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.kernel_size = kernel_size
        self.n_layers = n_layers
        self.bias = bias
        self.return_all_layers = return_all_layers
        self.n_classes = n_classes
        self.seq_len = seq_len
        self.time_len = time_len
        
        cell_list = []
        for i in range(0, self.n_layers):
            cur_input_dim = self.input_dim if i == 0 else self.hidden_dim[i - 1]

            cell_list.append(ConvLSTMCell(input_dim=cur_input_dim,
                                        hidden_dim=self.hidden_dim[i],
                                        kernel_size=self.kernel_size[i],
                                        bias=self.bias))
        self.cell_list = nn.ModuleList(cell_list)
        # self.fc = nn.Linear(self.time_len*self.hidden_dim*self.seq_len, self.n_classes)
        self.fc1 = nn.Linear(10368, 512)
        self.fc2 = nn.Linear(512, self.n_classes)
        self.act = nn.ReLU()
        self.sigmoid = nn.Sigmoid()

    def forward(self, input_tensor):
        """
        Parameters
        ----------
        input_tensor: Tensor of shape (b, t, c)
        hidden_state: todo
            None. todo implement stateful

        Returns
        -------
        last_state_list, layer_output
        """
        input_tensor = input_tensor[:, None, :, :]
        b = input_tensor.size()[0]
        
        # Implement stateful ConvLSTM
        hidden_state = self._init_hidden(batch_size=b, length=self.seq_len)

        layer_output_list = []
        last_state_list = []
        cur_layer_input = input_tensor

        for layer_idx in range(self.n_layers):

            h, c = hidden_state[layer_idx]
            output_inner = []
            for t in range(self.time_len):
                h, c = self.cell_list[layer_idx](input_tensor = cur_layer_input[:, :, t, :],
                                                cur_state=[h, c])
                output_inner.append(h)

            layer_output = torch.stack(output_inner, dim=1)
            cur_layer_input = layer_output

            layer_output_list.append(layer_output)
            last_state_list.append([h, c])

        if not self.return_all_layers:
            layer_output_list = layer_output_list[-1:]
            last_state_list = last_state_list[-1:]

        # squeezed_dim = self.time_len*self.hidden_dim*self.seq_len
        output = torch.reshape(layer_output_list[0], (-1, 192, 54))
        output = torch.reshape(output, (-1, 10368))#self.time_len*self.hidden_dim*self.seq_len))#self.n_classes))
        output = self.fc1(output)
        output = self.act(output)
        output = self.fc2(output)
        output = self.act(output)
        # output = self.sigmoid(output)
        return nn.functional.log_softmax(output, dim=1)

    def _init_hidden(self, batch_size, length):
        init_states = []
        for i in range(self.n_layers):
            init_states.append(self.cell_list[i].init_hidden(batch_size, length))
        return init_states

    @staticmethod
    def _check_kernel_size_consistency(kernel_size):
        if not (isinstance(kernel_size, tuple) or
                (isinstance(kernel_size, list) and all([isinstance(elem, tuple) for elem in kernel_size]))):
            raise ValueError('`kernel_size` must be tuple or list of tuples')

    @staticmethod
    def _extend_for_multilayer(param, n_layers):
        if not isinstance(param, list):
            param = [param] * n_layers
        return param

In [10]:
# model = Crop_ConvLSTM(
#     input_dim=1, #fictional dimension. will be used as channnels
#     hidden_dim=16,
#     kernel_size=(3,),
#     n_layers=1,
#     n_classes = 4,
#     seq_len = X['Train'].shape[2],
#     time_len = X['Train'].shape[1],
#     bias=False,
#     return_all_layers=False
#     )

# import torch.optim as optim

# criterion = nn.CrossEntropyLoss()
# optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

In [17]:
# initilize data module
dm = CroplandDataModule_LSTM(X=X, y=y, batch_size=128)

# initilize model
warnings.filterwarnings("ignore")
torch.manual_seed(123)
random.seed(123)
            
network = Crop_ConvLSTM(
    input_dim=1, #fictional dimension. will be used as channnels
    hidden_dim=16,
    kernel_size=(3,),
    n_layers=1,
    n_classes = 4,
    seq_len = X['Train'].shape[2],
    time_len = X['Train'].shape[1],
    bias=False,
    return_all_layers=False
    )

model = Crop_PL(net=network)

# initilize trainer
early_stop_callback = EarlyStopping(
    monitor="val/loss",
    min_delta=1e-4, patience=30, verbose=True, mode="min"
)
lr_monitor = LearningRateMonitor(logging_interval="epoch")

trainer = pl.Trainer(
    max_epochs=2,
    accelerator="gpu",
    precision=16,
    devices=[3],
    benchmark=True,
    check_val_every_n_epoch=1,
    callbacks=[early_stop_callback, lr_monitor],
)
trainer.fit(model, dm)

Using 16bit Automatic Mixed Precision (AMP)
GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0,1,2,3]

   | Name                | Type                       | Params
--------------------------------------------------------------------
0  | net                 | Crop_ConvLSTM              | 5.3 M 
1  | train_loss          | MeanMetric                 | 0     
2  | val_loss            | MeanMetric                 | 0     
3  | test_loss           | MeanMetric                 | 0     
4  | val_F1Score_best    | MaxMetric                  | 0     
5  | train_accuracy      | MulticlassAccuracy         | 0     
6  | val_accuracy        | MulticlassAccuracy         | 0     
7  | test_accuracy       | MulticlassAccuracy         | 0     
8  | train_avg_precision | MulticlassAveragePrecision | 0     
9  | val_avg_precision   | MulticlassAveragePrecision | 

Sanity Checking: 0it [00:00, ?it/s]

Training: 0it [00:00, ?it/s]

In [23]:
# for epoch in range(2):  # loop over the dataset multiple times

#     running_loss = 0.0
#     for i, data in enumerate(trainloader, 0):
#         # get the inputs; data is a list of [inputs, labels]
#         inputs, labels = data

#         # zero the parameter gradients
#         optimizer.zero_grad()
        
#         # forward + backward + optimize
#         output = model(inputs)
#         loss = criterion(output, labels)
#         loss.backward()
#         optimizer.step()

#         # print statistics
#         running_loss += loss.item()
#         if i % 2000 == 1999:    # print every 2000 mini-batches
#             print(f'[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 2000:.3f}')
#             running_loss = 0.0

# print('Finished Training')

<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'tor

KeyboardInterrupt: 

In [7]:
class Crop_PL(pl.LightningModule):
    """
    PyTorch Lightning module for training a crop classification neural network.

    Args:
    net (torch.nn.Module): the neural network module to be trained.
    num_classes

    Attributes:
    softmax (nn.Softmax): softmax activation function.
    criterion (nn.CrossEntropyLoss): cross entropy loss function.
    optimizer (torch.optim.Adam)
    scheduler (torch.optim.lr_scheduler.ReduceLROnPlateau)
    """

    def __init__(
        self,
        net: torch.nn.Module,
        num_classes=4,
        lr=1e-3,
        weight_decay=0.03,
    ):
        super().__init__()
        self.save_hyperparameters(logger=False, ignore=["net"])
        self.net = net
        self.lr = lr
        self.weight_decay = weight_decay

        self.train_loss = torchmetrics.MeanMetric()
        self.val_loss = torchmetrics.MeanMetric()
        self.test_loss = torchmetrics.MeanMetric()
        self.val_F1Score_best = torchmetrics.MaxMetric()
        
        self.train_accuracy = torchmetrics.Accuracy(
            task="multiclass", num_classes=num_classes, top_k=1
        )
        self.val_accuracy = torchmetrics.Accuracy(
            task="multiclass", num_classes=num_classes, top_k=1
        )
        self.test_accuracy = torchmetrics.Accuracy(
            task="multiclass", num_classes=num_classes, top_k=1
        )

        self.train_avg_precision = torchmetrics.AveragePrecision(
            task="multiclass", num_classes=num_classes, average="macro")
        self.val_avg_precision = torchmetrics.AveragePrecision(
            task="multiclass", num_classes=num_classes,  average="macro")
        self.test_avg_precision = torchmetrics.AveragePrecision(
            task="multiclass", num_classes=num_classes, average="macro")

        self.train_precision = torchmetrics.Precision(
            task="multiclass", num_classes=num_classes, average="macro")
        self.val_precision = torchmetrics.Precision(
            task="multiclass", num_classes=num_classes, average="macro")
        self.test_precision = torchmetrics.Precision(
            task="multiclass", num_classes=num_classes, average="macro"
        )

        self.train_recall = torchmetrics.Recall(
            task="multiclass", num_classes=num_classes, average="macro")
        self.val_recall = torchmetrics.Recall(
            task="multiclass", num_classes=num_classes, average="macro")
        self.test_recall = torchmetrics.Recall(
            task="multiclass", num_classes=num_classes, average="macro")

        self.train_F1Score = torchmetrics.F1Score(
            task="multiclass", num_classes=num_classes, average="macro")
        self.val_F1Score = torchmetrics.F1Score(
            task="multiclass", num_classes=num_classes, average="macro")
        self.test_F1Score = torchmetrics.F1Score(
            task="multiclass", num_classes=num_classes, average="macro")

    def forward(self, x):
        return self.net(x)

    def loss(self, y_hat, y):
        return nn.functional.cross_entropy(y_hat, y)

    def on_train_start(self):
        self.logger.log_hyperparams(self.hparams)
        self.val_F1Score_best.reset()

    def model_step(self, batch):
        x, y = batch
        logits = self.forward(x)
        loss = nn.functional.cross_entropy(logits, y.float())
        preds = nn.functional.softmax(logits, dim=1)
        return loss, preds, torch.argmax(y, dim=1)

    def training_step(self, batch, batch_idx):
        loss, preds, target = self.model_step(batch)

        self.train_loss(loss)
        self.train_accuracy(preds, target)
        self.train_recall(preds, target)
        self.train_precision(preds, target)
        self.train_F1Score(preds, target)
        self.train_avg_precision(preds, target)

        self.log(
            "train/loss",
            self.train_loss,
            on_step=False,
            on_epoch=True,
            prog_bar=True,
            logger=True,
        )
        self.log("train/accuracy", self.train_accuracy, on_step=False, on_epoch=True)
        self.log("train/recall", self.train_recall, on_step=False, on_epoch=True)
        self.log("train/precision", self.train_precision, on_step=False, on_epoch=True)
        self.log(
            "train/F1Score",
            self.train_F1Score,
            on_step=False,
            on_epoch=True,
            prog_bar=True,
            logger=True,
        )
        self.log("train/AP", self.train_avg_precision, on_step=False, on_epoch=True)

        return {"loss": loss, "preds": preds, "target": target}

    def validation_step(self, batch, batch_idx):
        loss, preds, target = self.model_step(batch)

        self.val_loss(loss)
        self.val_accuracy(preds, target)
        self.val_recall(preds, target)    
        self.val_precision(preds, target)
        self.val_F1Score(preds, target)
        self.val_avg_precision(preds, target)

        self.log("val/loss", self.val_loss, on_step=False, on_epoch=True, prog_bar=True)
        self.log("val/accuracy", self.val_accuracy, on_step=False, on_epoch=True)
        self.log("val/recall", self.val_recall, on_step=False, on_epoch=True)
        self.log("val/precision", self.val_precision, on_step=False, on_epoch=True)
        self.log(
            "val/F1Score", self.val_F1Score, on_step=False, on_epoch=True, prog_bar=True
        )
        self.log("val/AP", self.val_avg_precision, on_step=False, on_epoch=True)

        return {"loss": loss, "preds": preds, "target": target}

    def on_validation_epoch_end(self): 
        f1sc = self.val_F1Score.compute() 
        self.val_F1Score_best(f1sc)
        self.log("val/F1Score_best", self.val_F1Score_best.compute(), prog_bar=False)

    def test_step(self, batch, batch_idx):
        loss, preds, target = self.model_step(batch)

        self.test_loss(loss)
        self.test_accuracy(preds, target)
        self.test_recall(preds, target)
        self.test_precision(preds, target)
        self.test_F1Score(preds, target)
        self.test_avg_precision(preds, target)

        self.log("test/loss", self.test_loss, prog_bar=True)
        self.log("test/accuracy", self.test_accuracy, on_step=False, on_epoch=True)
        self.log("test/recall", self.test_recall, on_step=False, on_epoch=True)
        self.log("test/precision", self.test_precision, on_step=False, on_epoch=True)
        self.log(
            "test/F1Score",
            self.test_F1Score,
            on_step=False,
            on_epoch=True,
            prog_bar=True,
        )
        self.log("test/AP", self.test_avg_precision, on_step=False, on_epoch=True)

        return {"loss": loss, "preds": preds, "target": target}

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(
            self.net.parameters(), lr=self.lr, weight_decay=self.weight_decay
        )
        scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau
        if scheduler is not None:
            scheduler = scheduler(
                optimizer=optimizer,
                patience=10,
                mode="min",
                factor=0.5,
                verbose=True,
                min_lr=1e-8,
                threshold=1e-3,
            )
            return {
                "optimizer": optimizer,
                "lr_scheduler": {
                    "scheduler": scheduler,
                    "monitor": "val/loss",
                    "frequency": 1,
                },
            }
        return {"optimizer": optimizer}