# Temporal sequence classification: CROHME dataset

Dataset:
https://www.cs.rit.edu/~rlaz/files/CROHME+TFD%E2%80%932019.pdf

Objective: 

- Recognize handwriting input sequences (mathematical expression)
- Output sequence of symbols + relations based on Symbol Relation Tree and Writing Order

![image.png](attachment:63358e0a-363d-43a8-9cac-a690f218add1.png)![image.png](attachment:fd744812-6788-4939-9690-0967d44ee52c.png)

Method:
- Use recurrent neural networks (Bi-directional LSTM).
- Training: supervised learning using pairs of input sequence and target sequence.
- Loss function: CTC loss function to learn temporal classification.
https://pytorch.org/docs/stable/generated/torch.nn.CTCLoss.html


## Task 1: Dataset & Dataloader (3 pts)

### Overview

We need to build the Dataset class which read the input samples, annotation files and return the pair of <input, target> for each sample.

The target should follow the description of CTC loss function. "Each element in the target sequence is a class index".

![image.png](attachment:5aed9256-89aa-4b21-9416-8dd26b556a38.png)


Example:

We have an inkml file: "crohme2019/test/UN19_1041_em_597.inkml"	
Its label is "- Right \sqrt Inside 2"

Target would be: [[5, 50, 35, 51, 12]], where the index here is the index of tokens (from a vocabulary)


### Task 1.1 Build vocab

The first task is build the vocabulary from the dataset. From the target of the annotation files, extract and unique the token list to get the vocabulary. (Find the set of symbols and relations from labeled dataset).

Input: 

```
/kaggle/input/crohme2019/crohme2019_train.txt
/kaggle/input/crohme2019/crohme2019_valid.txt
/kaggle/input/crohme2019/crohme2019_test.txt
```
Output:
```
crohme_vocab.txt
```

In [None]:
input_files = ['crohme2019_train.txt',
'crohme2019_valid.txt',
'crohme2019_test.txt']

vocab = set()

for input_file in input_files:
    for line in open(input_file).readlines():
        if len(line.strip().split('\t')) == 2:
            vocab.update(line.strip().split('\t')[1].split())


In [None]:
vocab_syms = [v for v in vocab if v not in ['Above', 'Below', 'Inside', 'NoRel', 'Right', 'Sub', 'Sup']]

In [None]:
with open('crohme_vocab.txt', 'w') as f:
    f.writelines([c + '\n' for c in sorted(vocab_syms)])
    f.writelines([c + '\n' for c in ['Above', 'Below', 'Inside', 'NoRel', 'Right', 'Sub', 'Sup']])

#### Vocab class

In [None]:
class Vocab(object):
    def __init__(self, vocab_file=None):
        self.word2index = {}
        self.index2word = {}

        if vocab_file:
            self.load_vocab(vocab_file)
    
    def load_vocab(self, vocab_file):
        # load vocab from file
        with open(vocab_file, 'r') as f:
            for i, line in enumerate(f):
                word = line.strip()
                self.word2index[word] = i
                self.index2word[i] = word
        # add blank word
        self.word2index['<blank>'] = len(self.word2index)
        self.index2word[self.word2index['<blank>']] = '<blank>'


In [None]:
vocab = Vocab(vocab_file = 'crohme_vocab.txt')

In [None]:
vocab.index2word

In [None]:
## test vocab
vocab = Vocab('crohme_vocab.txt')
input = '- Right \\sqrt Inside 2'.split()
output = [vocab.word2index[word] for word in input]
output
assert output == [4, 105, 66, 103, 9]

### Task 1.2: Write dataset class

In this task, we will build a dataset class that will be used to load the dataset files _(train, valid, and test)_.

The dataset class will also be used to preprocess the data by converting the characters in the data to integers using the vocabulary.

> _Before going to this task, it is **highly recommended** to read this [Tutorial on creating Custom Dataset](https://pytorch.org/tutorials/beginner/basics/data_tutorial.html#creating-a-custom-dataset-for-your-files) with PyTorch._
>
> _You should have basic understanding of required methods in the Dataset class such as `__init__`, `__len__`, and `__getitem__`._


***Input***: Dataset class should take the following arguments:.

- `annotation`: the path to the dataset annotation *(`.txt`)*.
- `root_dir`: the root directory of the dataset file.
- `vocab`: the vocabulary.

***Output***: the dataset class.

> *Note: You can adjust the Dataset class base on your references.*


#### How to handle an InkML files 

Visualize InkML file
https://www.cs.rit.edu/~rlaz/inkml_viewer/index.xhtml


Below is a class structure for handling `.inkml` files.

In [None]:
import numpy as np

import xml.etree.ElementTree as ET

class Segment(object):
    """Class to reprsent a Segment compound of strokes (id) with an id and label."""
    __slots__ = ('id', 'label' ,'strId')
    def __init__(self, *args):
        if len(args) == 3:
            self.id = args[0]
            self.label = args[1]
            self.strId = args[2]
        else:
            self.id = "none"
            self.label = ""
            self.strId = set([])

class Inkml(object):
    """Class to represent an INKML file with strokes, segmentation and labels"""
    __slots__ = ('fileName', 'strokes', 'strkOrder','segments','truth','UI')

    NS = {'ns': 'http://www.w3.org/2003/InkML', 'xml': 'http://www.w3.org/XML/1998/namespace'}

    def __init__(self,*args):
        self.fileName = None
        self.strokes = {}
        self.strkOrder = []
        self.segments = {}
        self.truth = ""
        self.UI = ""
        if len(args) == 1:
            self.fileName = args[0]
            self.loadFromFile()
    
    def fixNS(self,ns,att):
        """Build the right tag or element name with namespace"""
        return '{'+Inkml.NS[ns]+'}'+att

    def loadFromFile(self):
        """load the ink from an inkml file (strokes, segments, labels)"""
        tree = ET.parse(self.fileName)
        # # ET.register_namespace();
        root = tree.getroot()
        for info in root.findall('ns:annotation',namespaces=Inkml.NS):
            if 'type' in info.attrib:
                if info.attrib['type'] == 'truth':
                    self.truth = info.text.strip()
                if info.attrib['type'] == 'UI':
                    self.UI = info.text.strip()
        for strk in root.findall('ns:trace',namespaces=Inkml.NS):
            self.strokes[strk.attrib['id']] = strk.text.strip()
            self.strkOrder.append(strk.attrib['id'])
        segments = root.find('ns:traceGroup',namespaces=Inkml.NS)
        if segments is None or len(segments) == 0:
            return
        for seg in (segments.iterfind('ns:traceGroup',namespaces=Inkml.NS)):
            id = seg.attrib[self.fixNS('xml','id')]
            label = seg.find('ns:annotation',namespaces=Inkml.NS).text
            strkList = set([])
            for t in seg.findall('ns:traceView',namespaces=Inkml.NS):
                strkList.add(t.attrib['traceDataRef'])
            self.segments[id] = Segment(id,label, strkList)
        
    def getTraces(self, height = 256):
        traces_array = [np.array([p.strip().split()[:2] 
            for p in self.strokes[id].split(',')], dtype='float') 
              for id in self.strkOrder ]
        
        ratio = height / ((np.concatenate(traces_array, 0).max(0) - np.concatenate(traces_array, 0).min(0))[1] + 1e-6)
        return [(trace * ratio).astype(int).tolist() for trace in traces_array]

`Inkml` class is used to read an inkml file and extract the strokes

***Input***: the path to the inkml file.

***Output***: Inkml object that can be used to extract the strokes.

#### Visualize InkML traces

> The coordinations of pen traces (x, y) are obtain from collection device, origin at top left position. To visualize, the y coordination need to be inverted.

In [None]:
import matplotlib.pyplot as plt

def visualize_inkml(ink_obj: Inkml):
    ink = ink_obj

    plt.figure(figsize=(16, 4))
    plt.axis("off")
    for trace in ink.getTraces():
        trace_arr = np.array(trace)
        plt.plot(trace_arr[:, 0], -trace_arr[:, 1])  # invert y coordinate


path = 'crohme2019/crohme2019/valid/18_em_0.inkml'
ink = Inkml(path)
visualize_inkml(ink)

> **TASK: UNDERSTAND AND HANDLE OUTPUT OF `getTraces()` METHOD IN `Inkml` CLASS.**



In [None]:
# ink object from an InkML file
ink = Inkml('crohme2019/crohme2019/valid/18_em_0.inkml')

# get traces from the ink object
[np.array(trace).shape for trace in ink.getTraces()]

**WARNING**: _Some Inkml may return (x, y, t) coordinates, while others may return (x, y) coordinates. Make sure to handle this in your implementation._

Now you have the idea of how to read an inkml file, let's build the Dataset class.

#### Build Dataset

Base on the observation, how can we build a dataset class from the dataset files?

***Hints:***
   - *Understand the big picture. (Data --> Dataset --> DataLoader --> Model --> Output --> Calculate Loss --> Backpropagation --> Update weight)*
   - *What do our model expect to get? In other words, what information is being fed into our model to calculate the loss?*
   - *What informations needed to calculate our Loss?*
   - *What should be returned in the `__getitem__` method?*
   - *The outputs should be of Tensor type*
   
***References from pytorch tutorials***

https://pytorch.org/tutorials/beginner/data_loading_tutorial.html

torch.utils.data.Dataset is an abstract class representing a dataset. Your custom dataset should inherit Dataset and override the following methods:

- \_\_len\_\_ so that len(dataset) returns the size of the dataset.

- \_\_getitem\_\_ to support the indexing such that dataset[i] can be used to get ith sample.


> **TASK: IMPLEMENT `InkmlDataset` CLASS.**

You need to complete the implementation of InkMmlDataset class. Check out the sub-tasks for more information.

In [None]:
from torch.utils.data import Dataset
import os
import numpy as np
import torch
from rdp import rdp


class InkmlDataset(Dataset):
    def __init__(self, annotation, root_dir, vocab, rdp=False):
        """
        Arguments:
            annotation (string): annotation file
            root_dir (string): Directory with all the images.
        """
        self.annotation = annotation
        self.root_dir = root_dir
        self.vocab = vocab
        self.rdp = rdp
        
        # load annotations
        self.ink_paths = []
        self.labels = []

        # your code for create self.inks and self.labels from the annotation file
        with open(self.annotation, 'r') as f:
            for line in f:
                parts = line.strip().split('\t')
                if len(parts) == 2:
                    ink_file, label = parts
                    full_path = os.path.join(self.root_dir, ink_file)
                    self.ink_paths.append(full_path)
                    self.labels.append(label)

    def __len__(self):
        # This code should return the number of samples in the dataset
        return len(self.labels)

    def __getitem__(self, idx):
        # This code should return the idx-th sample in the dataset
        
        inkfile = self.ink_paths[idx]
        label = self.labels[idx]

        ### feature
        # read inkml sample --> traces
        traces = Inkml(inkfile).getTraces()
        # remove consecutive duplicated points
        traces = [remove_duplicate(trace) for trace in traces]
        if self.rdp:
            traces = [rdp(trace, epsilon=1) for trace in traces]
        # extract features (delta_x / distance, delta_y / distance, distance, pen_up)
        feature = feature_extraction(traces)
        ### label
        # convert tokens of label into vocab indexes
        feature = torch.tensor(feature, dtype=torch.float32)
        label = [self.vocab.word2index[token] for token in label.split()]
        label = torch.tensor(label, dtype=torch.long)
        input_len = torch.tensor([feature.shape[0]], dtype=torch.long)
        label_len = torch.tensor(len(label), dtype=torch.long)

        return feature, label, input_len, label_len

#### Feature representation

In this subtask, we will convert the strokes into feature representations that can be used by the model.

Initially, the `.getTraces()` method returns list of strokes, where each stroke is a list of (x, y) coordinates. Intuitively, we can use those coordinates as features. But if we directly use the coordinates, the model will have a hard time learning the patterns.

Instead, we can calculate the difference *(∆d)* between consecutive coordinates as features. This way, the model can learn the patterns more easily.
The feature could also be normalize as ($\frac{\Delta_x}{d}$ , $\frac{\Delta_y} {d}$, $d$), where $d = \sqrt{\Delta_x^2 + \Delta_y^2}$

**Pen-up and Pen-down**

In the dataset, each stroke is separated by a pen-up event. We can use this information to separate the strokes.

1. The pen is lifted from the paper (connecting the end of a stroke to the start of a stroke): $pen\_up = 1$
2. The pen is on the paper: $pen\_up = 0$ 

Then, our feature representation would be:
($\frac{\Delta_x}{d}$ , $\frac{\Delta_y} {d}$, $d$, $pen\_up$)


```
point 1: (x1, y1)
point 2: (x2, y2)
point 3: (x3, y3) <--- end of stroke #1
point 4: (x4, y4) <--- start of stroke #2
...
point n: (xn, yn)
```

The feature representation will be:

```
f1 = ((x2 - x1) / d, (y2 - y1)/d, d, 0)
f2 = ((x3 - x2) / d, (y3 - y2)/d, d, 0)
f3 = ((x4 - x3) / d, (y4 - y3)/d, d, 1) <-- pen up
...
fn-1 = ((xn - xn-1) / d, (yn - yn-1)/d, d, 0)
```

> **Subtask: REMOVE DUPLICATED POINTS.**

In [None]:
def remove_duplicate(trace):
    # your implementation to remove consecutive duplicated points
    new_trace = [trace[0]] if trace else []
    for point in trace[1:]:
        if point != new_trace[-1]:
            new_trace.append(point)
    return new_trace

> **Test your implementation**

In [None]:
ink = Inkml('crohme2019/crohme2019/valid/18_em_0.inkml')
traces = ink.getTraces()
traces = [remove_duplicate(trace) for trace in traces]

assert list(map(len,traces)) == [82, 16, 21, 78, 82, 15, 18, 18, 28, 58, 15, 19, 56, 70, 18, 21]

> **Subtask: CALCULATE `(∆X / d, ∆Y /d, d)` FOR INPUT FEATURE.**

In [None]:
import math
def feature_extraction (traces):
    # your implementation
    features = []
    last_point = None
    for stroke in traces:
        if len(stroke) < 2:  
            continue
        
        if last_point is not None:
            x1, y1 = last_point
            x2, y2 = stroke[0]
            dx = x2 - x1
            dy = y2 - y1
            d = math.sqrt(dx**2 + dy**2)
            
            if d > 0:
                normalized_feature = (dx / d, dy / d, d)
            else:
                normalized_feature = (0, 0, 0)
            features.append(normalized_feature)
        
        for i in range(1, len(stroke)):
            x_prev, y_prev = stroke[i-1]
            x_curr, y_curr = stroke[i]
            delta_x = x_curr - x_prev
            delta_y = y_curr - y_prev
            d = math.sqrt(delta_x**2 + delta_y**2)
            if d > 0:
                normalized_feature = (delta_x / d, delta_y / d, d)  
            else:
                normalized_feature = (0, 0, 0)
            features.append(normalized_feature)
        last_point = stroke[-1]
        
    feature = np.array(features)
    return feature

> **Test your implementation**

In [None]:
ink = Inkml('crohme2019/crohme2019/valid/18_em_0.inkml')
traces = ink.getTraces()
traces = [remove_duplicate(trace) for trace in traces]
feature = feature_extraction(traces)

import numpy.testing as npt
assert feature.shape == (614, 3)
npt.assert_allclose(feature.mean(axis=0), np.array([ 0.17677799,  0.29519369, 11.35540311]))
npt.assert_allclose(feature.var(axis=0), np.array([3.75390418e-01, 5.06219812e-01, 4.19017361e+02])
)

> **Subtask: CALCULATE `(∆X / d, ∆Y /d, d, pen_up)` FOR INPUT FEATURE.**

In [None]:
def feature_extraction(traces):
    # your implementation
    features = []
    last_point = None 
    
    for stroke in traces:
        if len(stroke) == 0:
            continue  
        
        if last_point is not None:
            if len(last_point) != 2 or len(stroke[0]) != 2:
                continue  
            
            x1, y1 = last_point
            x2, y2 = stroke[0]
            dx = x2 - x1
            dy = y2 - y1
            distance = math.sqrt(dx**2 + dy**2)
            pen_up = 1
            
            if distance != 0:
                features.append([dx / distance, dy / distance, distance, pen_up])
            else:
                features.append([0, 0, 0, pen_up])
        
        for i in range(1, len(stroke)):
            if len(stroke[i - 1]) != 2 or len(stroke[i]) != 2:
                continue
            
            x1, y1 = stroke[i - 1]
            x2, y2 = stroke[i]
            dx = x2 - x1
            dy = y2 - y1
            distance = math.sqrt(dx**2 + dy**2)
            pen_up = 0
            
            if distance != 0:
                features.append([dx / distance, dy / distance, distance, pen_up])
            else:
                features.append([0, 0, 0, pen_up])
        
        
        last_point = stroke[-1]
    
    return np.array(features)

> **Test your implementation**

In [None]:
ink = Inkml('crohme2019/crohme2019/valid/18_em_0.inkml')
traces = ink.getTraces()
traces = [remove_duplicate(trace) for trace in traces]
feature = feature_extraction(traces)

import numpy.testing as npt
assert feature.shape == (614, 4)
npt.assert_array_equal(np.where(feature[:,3] == 1)[0], np.array([ 81,  97, 118, 196, 278, 293, 311, 329, 357, 415, 430, 449, 505,
       575, 593]))


> **Subtask: COMPLETE InkmlDataset class**

From the sub-tasks, complete the implementation of InkmlDataset class.

> **Test your implementation**

In [None]:
dataset = InkmlDataset(annotation='crohme2019_valid.txt', root_dir='crohme2019', vocab=Vocab('crohme_vocab.txt'))
feature, label, input_len, label_len = dataset.__getitem__(0)
import numpy.testing as npt

assert type(feature) == torch.Tensor
assert type(label) == torch.Tensor
assert feature.shape == (231, 4)
npt.assert_array_equal(label, np.array([59, 105, 1, 105, 59, 105, 1, 105, 87, 105, 2, 105, 2]))
assert input_len == 231
assert label_len == 13

### Dataloader

Write the dataloader with custom collate function to pad the input and target sequence

Reference:
https://colab.research.google.com/drive/1TS7kIuNwYIN2dYGIzm-60hPnmYxKi-VN?usp=sharing
https://plainenglish.io/blog/understanding-collate-fn-in-pytorch-f9d1742647d3

In [None]:
import torch
from torch.nn.utils.rnn import pad_sequence

def collate_fn(batch):
    """Create batch"""
    # your code here
    # features, labels, input_lens, label_lens should be torch.tensor
    features = [item[0] for item in batch]
    labels = [item[1] for item in batch]
    input_lens = [item[2] for item in batch]
    label_lens = [item[3] for item in batch]

    features = pad_sequence(features, batch_first=True)
    labels = pad_sequence(labels, batch_first=True)

    input_lens = torch.tensor(input_lens)
    label_lens = torch.tensor(label_lens)
    return features, labels, input_lens, label_lens

In [None]:
features, labels, input_lens, label_lens = collate_fn([dataset[0], dataset[1]])

import numpy.testing as npt

assert type(input_lens) == torch.Tensor
assert type(label_lens) == torch.Tensor

assert features.shape == (2, 231, 4)
assert labels.shape == (2, 13)
npt.assert_array_equal(input_lens.numpy(), np.array([231, 102])) 
npt.assert_array_equal(label_lens, np.array([13, 5]))

In [None]:
# test collate_fn with data loader
from torch.utils.data import DataLoader
data_loader = DataLoader(dataset, batch_size=2, shuffle=False, collate_fn=collate_fn)

features, labels, input_lens, label_lens = next(iter(data_loader))
assert features.shape == (2, 231, 4)
assert labels.shape == (2, 13)
npt.assert_array_equal(input_lens.numpy(), np.array([231, 102])) 
npt.assert_array_equal(label_lens, np.array([13, 5]))

### Pytorch lightning datamodule

https://lightning.ai/docs/pytorch/stable/data/datamodule.html

#### Data module

In [None]:
import pytorch_lightning as pl
from torch.utils.data import DataLoader

class CROHMEDatamodule(pl.LightningDataModule):
    def __init__(self, root_dir, train_annotation, validation_annotation, test_annotation, vocab_file, batch_size, num_workers):
        super().__init__()
        self.root_dir = root_dir
        self.batch_size = batch_size
        self.train_annotation = train_annotation
        self.validation_annotation = validation_annotation
        self.test_annotation = test_annotation
        self.vocab = Vocab(vocab_file)
        self.num_workers=num_workers

    def setup(self, stage: str):
        self.train_dataset = InkmlDataset(self.train_annotation, self.root_dir, self.vocab, rdp=False)
        self.val_dataset = InkmlDataset(self.validation_annotation, self.root_dir, self.vocab)
        self.test_dataset = InkmlDataset(self.test_annotation, self.root_dir, self.vocab)

    def train_dataloader(self):
        return DataLoader(self.train_dataset, batch_size=self.batch_size, num_workers=self.num_workers, shuffle=True, collate_fn=collate_fn)

    def val_dataloader(self):
        return DataLoader(self.val_dataset, batch_size=self.batch_size, num_workers=self.num_workers, shuffle=False, collate_fn=collate_fn)

    def test_dataloader(self):
        return DataLoader(self.test_dataset, batch_size=self.batch_size, num_workers=self.num_workers, shuffle=False, collate_fn=collate_fn)

## Task 2: Models, loss functions (2 pts)

### Model

Check out this tutorial
https://pytorch.org/tutorials/beginner/introyt/modelsyt_tutorial.html

In [None]:
# build a model of Bidirectional LSTM 3 x 128
import torch.nn as nn
class LSTM_TemporalClassification(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        # declare layers
        super(LSTM_TemporalClassification, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, bidirectional=True, batch_first=True)
        self.fc = nn.Linear(hidden_size * 2, num_classes)
        
    def forward(self, x):
        # write calculation process here
        output, _ = self.lstm(x)
        x = self.fc(output)
        x = x.log_softmax(2)
        
        return x
        

In [None]:
# test your implementation
model = LSTM_TemporalClassification(4, 128, 3, 109)
assert model.forward(torch.rand((10, 100, 4))).shape == (10, 100, 109)

### Loss: CTC

Read the documentation of CTC loss
https://pytorch.org/docs/stable/generated/torch.nn.CTCLoss.html

What is the role of `input_lengths`, `target_lengths`

```
loss = nn.CTCLoss()(input, target, input_lengths, target_lengths)
```

### Pytorch lightning Module

https://lightning.ai/docs/pytorch/stable/common/lightning_module.html

A LightningModule organizes your PyTorch code into 6 sections:

Initialization (\_\_init\_\_ and setup()).

Train Loop (training_step())

Validation Loop (validation_step())

Test Loop (test_step())

Optimizers and LR Schedulers (configure_optimizers())


> **Task: Complete the LightningModule**

- Calculate loss
- Log the loss

In [50]:
import pytorch_lightning as pl
import torch

class MathOnlineModel(pl.LightningModule):
    def __init__(self, lr=0.001, input_size=4, output_size=109, hidden_size=128, num_layers = 3, decoder=None):
        super().__init__()
        self.model = LSTM_TemporalClassification(input_size, hidden_size, num_layers, output_size)
        self.criterion = nn.CTCLoss(blank=output_size - 1)
        self.lr = lr
        self.decoder = decoder

    def forward(self, x):
        return self.model(x)
    def training_step(self, batch, batch_idx):
        x, y, x_lens, y_lens = batch

        # your code to calculate loss
        y_hat = self.model(x)
        y_hat = y_hat.permute(1, 0, 2)
        loss = self.criterion(y_hat, y, x_lens, y_lens)

        self.log('train_loss', loss, prog_bar=True, on_step=True, on_epoch=True)
        return loss

    def validation_step(self, batch, batch_idx):
        x, y, x_lens, y_lens = batch

        # your code to calculate loss
        y_hat = self.model(x)
        y_hat = y_hat.permute(1, 0, 2)
        loss = self.criterion(y_hat, y, x_lens, y_lens)

        self.log('val_loss', loss, prog_bar=True, on_step=False, on_epoch=True)
        return loss

    def test_step(self, batch, batch_idx):
        x, y, x_lens, y_lens = batch
        # your code to calculate loss
        y_hat = self.model(x)
        y_hat = y_hat.permute(1, 0, 2)
        loss = self.criterion(y_hat, y, x_lens, y_lens)

        self.log('test_loss', loss, prog_bar=True, on_step=False, on_epoch=True)
        return loss

    def configure_optimizers(self):
        return torch.optim.Adam(self.model.parameters(), lr=self.lr)

## Task 3: Training with Trainer (pytorch lightning) (2 pts)

Pytorch lightning trainer:
https://lightning.ai/docs/pytorch/stable/common/trainer.html

From the documentation:

> The Lightning Trainer does much more than just “training”. Under the hood, it handles all loop details for you, some examples include:
> - Automatically enabling/disabling grads
> - Running the training, validation and test dataloaders
> - Calling the Callbacks at the appropriate times
> - Putting batches and computations on the correct devices

In [None]:
from pytorch_lightning.callbacks import ModelCheckpoint, LearningRateMonitor, EarlyStopping
from pytorch_lightning.loggers import TensorBoardLogger
from pytorch_lightning import Trainer

trainer = Trainer(
    callbacks = [
        EarlyStopping(monitor='val_loss', patience=3, mode='min'),
        LearningRateMonitor(logging_interval='step'),
        ModelCheckpoint(filename='{epoch}-{val_loss:.4f}', save_top_k=3, monitor='val_loss', mode='min'),
    ],
    logger = TensorBoardLogger('lightning_logs'),
    check_val_every_n_epoch=1,
    fast_dev_run=False,
    default_root_dir='checkpoint',
    deterministic=False,
    max_epochs=30,
    log_every_n_steps=50,
    devices = 1,
)
model = MathOnlineModel()
# model = MathOnlineModel.load_from_checkpoint('lightning_logs/lightning_logs/version_1/checkpoints/epoch=7-val_loss=0.6932.ckpt')

dm = CROHMEDatamodule(root_dir='crohme2019',
                       train_annotation='crohme2019_train.txt',
                       validation_annotation='crohme2019_valid.txt',
                       test_annotation='crohme2019_test.txt',
                       vocab_file='crohme_vocab.txt',
                       batch_size=32,
                       num_workers=0
                     )

trainer.fit(model, dm)
#Best val loss model is save at lightning_logs/lightning_logs/version_2/checkpoints/epoch=2-val_loss=0.5243.ckpt

**Requirements:**
- Validation loss (val_loss < 0.7)

**Tips on training:**
- Debug with **fast_dev_run**:
Before run the training process, it is better to make a quick check of training and validation loop. Set fast_dev_run=True, then run the trainer and check if there is any bug exist.
- Training initially with **small batch size**:
In practice, training with CTC loss converges slowly for large batch size. To accelerate convergence, training with small batch size first, save model, then, train with large batch size. 
- Train more epochs by setting:

```
trainer = Trainer(
    ...
    max_epochs=20, 
    ...
)```

In [51]:
# run test on validation dataset
# load your best val_loss model

from pytorch_lightning import Trainer

trainer = Trainer(
    devices = 1,
)

# Load the model from a checkpoint
model = MathOnlineModel.load_from_checkpoint(
    'lightning_logs/lightning_logs/version_2/checkpoints/epoch=2-val_loss=0.5243.ckpt',
)

# Initialize the data module
dm = CROHMEDatamodule(
    root_dir='crohme2019', 
    train_annotation='crohme2019_train.txt', 
    validation_annotation='crohme2019_valid.txt', 
    test_annotation='crohme2019_valid.txt', 
    vocab_file='crohme_vocab.txt', 
    batch_size=32,
    num_workers=0
)

# Test the model
trainer.test(model, datamodule=dm)

GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]


Testing DataLoader 0: 100%|██████████| 31/31 [00:05<00:00,  5.42it/s]
────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
       Test metric             DataLoader 0
────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
        test_loss           0.5244007706642151
────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────


[{'test_loss': 0.5244007706642151}]

## Task 4: Decode output (1 pts)

#### Inference

The output of networks need to process by a decoding step.

- Greedy decode: Your task is to implement greedy decoding method. Which converts the output into a string of symbols and relations (same form with labels). Greedy decoder produce the best path by removing consecutive repeated symbols/relations and then remove \<blank\>

**Your task**
Based on GreedyCTCDecoder class from the following link, write decoding for an output.
https://pytorch.org/audio/main/tutorials/asr_inference_with_ctc_decoder_tutorial.html#greedy-decoder

In [34]:
from typing import List
import torch
class GreedyCTCDecoder(torch.nn.Module):
    def __init__(self, vocab):
        super().__init__()
        self.vocab = vocab
        self.blank = vocab.word2index['<blank>']

    def forward(self, emission: torch.Tensor) -> List[str]:
        """Given a sequence emission over labels, get the best path
        Args:
          emission (Tensor): Logit tensors. Shape `[seq_len, num_label]`.

        Returns:
          List[str]: The resulting transcript
        """
        # your implementation
        indices = torch.argmax(emission, dim=-1)

        decoded_sequence = []
        prev_index = None
        for index in indices:
            if index != prev_index and index != self.blank:  
                decoded_sequence.append(index.item())
            prev_index = index

        output_seq_list = [self.vocab.index2word[index] for index in decoded_sequence]
        return output_seq_list


In [35]:
# test code
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = MathOnlineModel.load_from_checkpoint('lightning_logs/lightning_logs/version_2/checkpoints/epoch=2-val_loss=0.5243.ckpt')
model = model.to(device)
model.eval()

dataset = InkmlDataset(annotation='crohme2019_valid.txt', root_dir='crohme2019', vocab=Vocab('crohme_vocab.txt'))
feature, label, input_len, label_len = dataset.__getitem__(0)
greedy_decoder = GreedyCTCDecoder(vocab)

# run model forward with feature --> output
feature = feature.unsqueeze(0).to(device)
output = model(feature)

# run greedy decoder
decoded = greedy_decoder(output[0])
print(decoded)

# possible output if your training work well
# decoded -> ['\\phi', 'Right', '(', 'Right', '0', 'Right', '(', 'Right', 'n', 'Right', ')', 'Right', ')']

['\\phi', 'Right', '(', 'Right', '\\phi', 'Right', '(', 'Right', 'n', 'Right', ')', 'Right', ')']


## Task 5: Add calulation of metrics (word error rate) for training (2 pts)


> **Sub-task: Calculate edit distance between two list of tokens**


In [36]:
from torchmetrics.functional.text.helper import _LevenshteinEditDistance 

def edit_distance(pred_seq, label_seq):
    # Your code here
    leven = _LevenshteinEditDistance(label_seq)
    distance, _ = leven(pred_seq)
    
    return distance

# Test your implementation
assert edit_distance(['\\phi',
  'Right',
  '(',
  'Right',
  '0',
  'Right',
  '(',
  'Right',
  'n',
  'Right',
  ')',
  'Right',
  ')'],
 ['\\phi',
  'Right',
  '(',
  'Right',
  '\\phi',
  'Right',
  '(',
  'Right',
  'n',
  'Right',
  ')',
  'Right',
  ')']) == 1

**Add word error rate (wer) metric to the model**

$wer = \frac{total\ edit\ distance (predict\ sequence,\ target\ sequence)}{total\ target\ sequence\ length}$

In [37]:
import pytorch_lightning as pl
import torch
from torchmetrics.functional.text.helper import _LevenshteinEditDistance as _LE_distance

class MathOnlineModel(pl.LightningModule):
    def __init__(self, lr=0.001, input_size=4, output_size=109, hidden_size=128, num_layers = 3, decoder=None):
        super().__init__()
        self.model = LSTM_TemporalClassification(input_size, hidden_size, num_layers, output_size)
        self.criterion = nn.CTCLoss(blank=output_size - 1)
        self.lr = lr
        self.decoder = decoder
        
    def forward(self, x):
        return self.model(x)
    def training_step(self, batch, batch_idx):
        x, y, x_lens, y_lens = batch

        # your code to calculate loss
        y_hat = self.model(x)
        y_hat = y_hat.permute(1, 0, 2)
        loss = self.criterion(y_hat, y, x_lens, y_lens)

        # your code to calculate total edit distance
        total_edits = 0
        total_lens = 0
        y_hat = y_hat.permute(1, 0, 2)
        for i in range(len(y)):
            with torch.no_grad():
                decoded = self.decoder(y_hat[i])
            pred_seq = decoded
            label_seq = [vocab.index2word[y[i][j].item()] for j in range(y_lens[i])]  
            edit_distances = edit_distance(pred_seq, label_seq)

        # loop: calculate edit distance
            total_edits += edit_distances
            total_lens += y_lens[i]
            
        self.log('train_loss', loss, prog_bar=True, on_step=True, on_epoch=True)
        self.log('train_wer', total_edits / total_lens, prog_bar=True, on_step=True, on_epoch=True)
        return loss
    
    def validation_step(self, batch, batch_idx):
        x, y, x_lens, y_lens = batch
        
        # your code to calculate loss
        y_hat = self.model(x)
        y_hat = y_hat.permute(1, 0, 2)
        loss = self.criterion(y_hat, y, x_lens, y_lens)

        # your code to calculate total edit distance
        total_edits = 0
        total_lens = 0
        y_hat = y_hat.permute(1, 0, 2)
        for i in range(len(y)):
            with torch.no_grad():
                decoded = self.decoder(y_hat[i])
            pred_seq = decoded
            label_seq = [vocab.index2word[y[i][j].item()] for j in range(y_lens[i])]  
            edit_distances = edit_distance(pred_seq, label_seq)

        # loop: calculate edit distance
            total_edits += edit_distances
            total_lens += y_lens[i]
        
        self.log('val_loss', loss, prog_bar=True, on_step=False, on_epoch=True)
        self.log('val_wer', total_edits / total_lens, prog_bar=True, on_step=False, on_epoch=True)
        return loss
    
    def test_step(self, batch, batch_idx):
        x, y, x_lens, y_lens = batch
        
        # your code to calculate loss
        y_hat = self.model(x)
        y_hat = y_hat.permute(1, 0, 2)
        loss = self.criterion(y_hat, y, x_lens, y_lens)

        # your code to calculate total edit distance
        total_edits = 0
        total_lens = 0
        y_hat = y_hat.permute(1, 0, 2)
        for i in range(len(y)):
            with torch.no_grad():
                decoded = self.decoder(y_hat[i])
            pred_seq = decoded
            label_seq = [vocab.index2word[y[i][j].item()] for j in range(y_lens[i])] 
            edit_distances = edit_distance(pred_seq, label_seq)

        # loop: calculate edit distance
            total_edits += edit_distances
            total_lens += y_lens[i]
        
        self.log('test_loss', loss, prog_bar=True, on_step=False, on_epoch=True)
        self.log('test_wer', total_edits / total_lens, prog_bar=True, on_step=False, on_epoch=True)
        return loss

    def configure_optimizers(self):
        return torch.optim.Adam(self.model.parameters(), lr=self.lr)

**Continue to train the model**

- Load the latest trained model:

```
model = MathOnlineModel.load_from_checkpoint('path/to/your/checkpoint.ckpt')
```
- Change config of ModelCheckPoint to monitor the new metric (val_wer) instead.

```
        ModelCheckpoint(filename='{epoch}-{val_loss:.4f}', save_top_k=5, monitor='val_loss', mode='min')
        
      --> ModelCheckpoint(filename='{epoch}-{val_wer:.4f}', save_top_k=5, monitor='val_wer', mode='min'),

```


In [None]:
from pytorch_lightning.callbacks import ModelCheckpoint, LearningRateMonitor, EarlyStopping
from pytorch_lightning.loggers import TensorBoardLogger
from pytorch_lightning import Trainer

trainer = Trainer(
    callbacks = [
        EarlyStopping(monitor='val_wer', patience=5, mode='min'),
        LearningRateMonitor(logging_interval='step'),
        ModelCheckpoint(filename='{epoch}-{val_wer:.4f}', save_top_k=3, monitor='val_wer', mode='min'),
    ], 
    logger = TensorBoardLogger('lightning_logs'),
    check_val_every_n_epoch=1,
    fast_dev_run=False,
    default_root_dir='checkpoint',
    deterministic=False, 
    max_epochs=10, 
    log_every_n_steps=50,
    devices = 1,
)

vocab = Vocab('crohme_vocab.txt')
greedy_decoder = GreedyCTCDecoder(vocab)

# model = MathOnlineModel(decoder=greedy_decoder)
model = MathOnlineModel.load_from_checkpoint('lightning_logs/lightning_logs/version_3/checkpoints/epoch=6-val_wer=0.1952.ckpt', decoder=greedy_decoder)

dm = CROHMEDatamodule(root_dir='crohme2019', 
                       train_annotation='crohme2019_train.txt', 
                       validation_annotation='crohme2019_valid.txt', 
                       test_annotation='crohme2019_test.txt', 
                       vocab_file='crohme_vocab.txt', 
                       batch_size=1,
                      num_workers=0
                     )

trainer.fit(model, dm)
#Best val wer modei is saved at lightning_logs/lightning_logs/version_4/checkpoints/epoch=3-val_wer=0.1327.ckpt

**Requirements:**
- Validation word error rate (val_wer < 0.15)

In [38]:
# run test on validation dataset
# load your best val_wer model

from pytorch_lightning import Trainer

trainer = Trainer(
    devices = 1,
)

# Load the model from a checkpoint
model = MathOnlineModel.load_from_checkpoint(
    'lightning_logs/lightning_logs/version_4/checkpoints/epoch=3-val_wer=0.1327.ckpt', 
    decoder=GreedyCTCDecoder(dataset.vocab)
)

# Initialize the data module
dm = CROHMEDatamodule(
    root_dir='crohme2019', 
    train_annotation='crohme2019_train.txt', 
    validation_annotation='crohme2019_valid.txt', 
    test_annotation='crohme2019_valid.txt', 
    vocab_file='crohme_vocab.txt', 
    batch_size=32,
    num_workers=0
)

# Test the model
trainer.test(model, datamodule=dm)

GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]


Testing DataLoader 0: 100%|██████████| 31/31 [01:57<00:00,  0.26it/s]
────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
       Test metric             DataLoader 0
────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
        test_loss           0.47184303402900696
        test_wer            0.13278627395629883
────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────


[{'test_loss': 0.47184303402900696, 'test_wer': 0.13278627395629883}]

# Bonus 1 (+1 pts for final exam)

Add metrics to evaluate accuracy of symbols and relations separately. 

More detail are in this FAQs:
https://horn-sky-ebe.notion.site/What-does-it-mean-by-edit-distance-by-symbol-and-relation-a36d721683a3456a847d396515698a01


Your task is complete the class here, then retrain one epoch to see the metrics are shown.

In [39]:
#'Above': 101, 'Below': 102, 'Inside': 103, 'NoRel': 104, 'Right': 105, 'Sub': 106, 'Sup': 107
vocab_dict = vocab.word2index
relation = ['Above', 'Below', 'Inside', 'NoRel', 'Right', 'Sub', 'Sup']
relation_idx = [vocab_dict[rel] for rel in relation]

In [48]:
import pytorch_lightning as pl
import torch
from torchmetrics.functional.text.helper import _LevenshteinEditDistance as _LE_distance

class MathOnlineModel2(pl.LightningModule):
    def __init__(self, lr=0.001, input_size=4, output_size=109, hidden_size=128, num_layers = 3, decoder=None):
        super().__init__()
        self.model = LSTM_TemporalClassification(input_size, hidden_size, num_layers, output_size)
        self.criterion = nn.CTCLoss(blank=output_size - 1)
        self.lr = lr
        self.decoder = decoder
        
    def forward(self, x):
        return self.model(x)
    def training_step(self, batch, batch_idx):
        x, y, x_lens, y_lens = batch

        # your code to calculate loss
        y_hat = self.model(x)
        y_hat = y_hat.permute(1, 0, 2)
        loss = self.criterion(y_hat, y, x_lens, y_lens)

        # your code to calculate total edit distance
        total_edits = 0
        total_lens = 0
        y_hat = y_hat.permute(1, 0, 2)
        for i in range(len(y)):
            with torch.no_grad():
                decoded = self.decoder(y_hat[i])
            pred_seq = decoded
            label_seq = [vocab.index2word[y[i][j].item()] for j in range(y_lens[i])]  
            edit_distances = edit_distance(pred_seq, label_seq)

        # loop: calculate edit distance
            total_edits += edit_distances
            total_lens += y_lens[i]
            
        self.log('train_loss', loss, prog_bar=True, on_step=True, on_epoch=True)
        self.log('train_wer', total_edits / total_lens, prog_bar=True, on_step=True, on_epoch=True)
        return loss
    
    def validation_step(self, batch, batch_idx):
        x, y, x_lens, y_lens = batch
        
        # your code to calculate loss
        y_hat = self.model(x)
        y_hat = y_hat.permute(1, 0, 2)
        loss = self.criterion(y_hat, y, x_lens, y_lens)

        # your code to calculate total edit distance
        total_edits = 0
        total_lens = 0
        relation_edits = 0
        relation_lens = 0
        symbol_edits = 0
        symbol_lens = 0
        y_hat = y_hat.permute(1, 0, 2)
        
        for i in range(len(y)):
            with torch.no_grad():
                decoded = self.decoder(y_hat[i])
            pred_seq = decoded
            label_seq = [vocab.index2word[y[i][j].item()] for j in range(y_lens[i])]  
            edit_distances = edit_distance(pred_seq, label_seq)
            
            relation_pred = [rel for rel in pred_seq if rel in relation]
            symbol_pred = [rel for rel in pred_seq if rel not in relation]
            
            relation_label = [rel for rel in label_seq if rel in relation]
            symbol_label = [rel for rel in label_seq if rel not in relation]
            
        # loop: calculate edit distance
            total_edits += edit_distances
            total_lens += y_lens[i]
            
            relation_edits += edit_distance(relation_pred, relation_label)
            relation_lens += len(relation_label)
            
            symbol_edits += edit_distance(symbol_pred, symbol_label)
            symbol_lens += len(symbol_label)
        
        self.log('val_loss', loss, prog_bar=True, on_step=False, on_epoch=True)
        self.log('val_wer', total_edits / total_lens, prog_bar=True, on_step=False, on_epoch=True)
        self.log('val_wer_relation', relation_edits / relation_lens, prog_bar=True, on_step=False, on_epoch=True)
        self.log('val_wer_symbol', symbol_edits / symbol_lens, prog_bar=True, on_step=False, on_epoch=True)
        
        return loss
    
    def test_step(self, batch, batch_idx):
        x, y, x_lens, y_lens = batch
        
        # your code to calculate loss
        y_hat = self.model(x)
        y_hat = y_hat.permute(1, 0, 2)
        loss = self.criterion(y_hat, y, x_lens, y_lens)

        # your code to calculate total edit distance
        total_edits = 0
        total_lens = 0
        relation_edits = 0
        relation_lens = 0
        symbol_edits = 0
        symbol_lens = 0
        y_hat = y_hat.permute(1, 0, 2)
        
        for i in range(len(y)):
            with torch.no_grad():
                decoded = self.decoder(y_hat[i])
            pred_seq = decoded
            label_seq = [vocab.index2word[y[i][j].item()] for j in range(y_lens[i])]  
            edit_distances = edit_distance(pred_seq, label_seq)
            
            relation_pred = [rel for rel in pred_seq if rel in relation]
            symbol_pred = [rel for rel in pred_seq if rel not in relation]
            
            relation_label = [rel for rel in label_seq if rel in relation]
            symbol_label = [rel for rel in label_seq if rel not in relation]
            
        # loop: calculate edit distance
            total_edits += edit_distances
            total_lens += y_lens[i]
            
            relation_edits += edit_distance(relation_pred, relation_label)
            relation_lens += len(relation_label)
            
            symbol_edits += edit_distance(symbol_pred, symbol_label)
            symbol_lens += len(symbol_label)
        
        self.log('test_loss', loss, prog_bar=True, on_step=False, on_epoch=True)
        self.log('test_wer', total_edits / total_lens, prog_bar=True, on_step=False, on_epoch=True)
        self.log('test_wer_relation', relation_edits / relation_lens, prog_bar=True, on_step=False, on_epoch=True)
        self.log('test_wer_symbol', symbol_edits / symbol_lens, prog_bar=True, on_step=False, on_epoch=True)
        
        return loss

    def configure_optimizers(self):
        return torch.optim.Adam(self.model.parameters(), lr=self.lr)

In [49]:
from pytorch_lightning import Trainer

trainer = Trainer(
    devices = 1,
)

# Load the model from a checkpoint
model = MathOnlineModel2.load_from_checkpoint(
    'lightning_logs/lightning_logs/version_4/checkpoints/epoch=3-val_wer=0.1327.ckpt', 
    decoder=GreedyCTCDecoder(dataset.vocab)
)

# Initialize the data module
dm = CROHMEDatamodule(
    root_dir='crohme2019', 
    train_annotation='crohme2019_train.txt', 
    validation_annotation='crohme2019_valid.txt', 
    test_annotation='crohme2019_valid.txt', 
    vocab_file='crohme_vocab.txt', 
    batch_size=32,
    num_workers=0
)

# Test the model
trainer.test(model, datamodule=dm)

GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
c:\Users\tinvi\AppData\Local\Programs\Python\Python311\Lib\site-packages\pytorch_lightning\trainer\connectors\data_connector.py:424: The 'test_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=11` in the `DataLoader` to improve performance.


Testing DataLoader 0: 100%|██████████| 31/31 [01:48<00:00,  0.29it/s]
────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
       Test metric             DataLoader 0
────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
        test_loss           0.47184303402900696
        test_wer            0.13278627395629883
    test_wer_relation       0.10667701810598373
     test_wer_symbol         0.152822345495224
────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────


[{'test_loss': 0.47184303402900696,
  'test_wer': 0.13278627395629883,
  'test_wer_relation': 0.10667701810598373,
  'test_wer_symbol': 0.152822345495224}]

# Bonus 2 (+1 pts for final exam)

Modify loss function to constrain the output of relations at the timestep of pen-up. 

The idea: Provide a masked sequence such that the position of pen-up is masked, the additional loss would penaltize all the relations output to the timestep that has been masked. 


## Loss function to constraint relation output

```
pen_down: a masked sequence, where len(pen_down) = len(input_sequence) 
        and pen_down[t] == 1 (pen down), pen_down[t] == 0 (pen up)
p_rel: total probability of relation outputs for every time step t, len(p_rel) = len(input_sequence), p_rel[t] = sum(p[t][rel] for rel in ['Sub', 'Sup', 'Above', ...])

The loss is defined as:

    L_{constraint} = -log(1 - sum(p_rel * pen_down))
    
Explanation:

    Minimize L_{constraint} (value range of (-inf, 0)) would make maximize of (1 - sum(p_rel * pen_down)), or making sum(p_rel * mask) -> 0, then it will penalize the relation output on pen_down timesteps.
```

## Apply constraint loss function with ctc loss

```
    L = loss_{ctc} + \lambda * L_{constraint}
    
where \lambda is a weighted parameter to control balance between the two losses.
   
```


You can also find it in a published paper here, section 3.4:
https://arxiv.org/pdf/2105.10156



In [None]:
#'Above': 101, 'Below': 102, 'Inside': 103, 'NoRel': 104, 'Right': 105, 'Sub': 106, 'Sup': 107
vocab_dict = vocab.word2index
relation = ['Above', 'Below', 'Inside', 'NoRel', 'Right', 'Sub', 'Sup']
relation_idx = [vocab_dict[rel] for rel in relation]

In [None]:
class RelationLoss(nn.Module):
    def __init__(self, reduction='mean'):
        super(RelationLoss, self).__init__()
        self.reduction = reduction
    def forward(self, y_hat, pen_up):
        _, _, z = y_hat.shape
        
        rel_mask = torch.zeros(z, device=y_hat.device)
        rel_mask[relation_idx] = 1
        
        probs_rel = y_hat * rel_mask
        pen_up = pen_up.unsqueeze(-1)
        pen_down = 1 - pen_up
        
        probs_rel_pen_down = probs_rel * pen_down
        sum_probs_rel_pen_down = probs_rel_pen_down.sum(dim=2)
        
        loss = -torch.log(1 - sum_probs_rel_pen_down)
        
        if self.reduction == "mean":
            return loss.mean()
        elif self.reduction == "sum":
            return loss.sum()
        
        return loss

In [None]:
import pytorch_lightning as pl
import torch
from torchmetrics.functional.text.helper import _LevenshteinEditDistance as _LE_distance

class MathOnlineModel3(pl.LightningModule):
    def __init__(self, lr=0.001, input_size=4, output_size=109, hidden_size=128, num_layers = 3, decoder=greedy_decoder, alpha=0.1):
        super().__init__()
        self.model = LSTM_TemporalClassification(input_size, hidden_size, num_layers, output_size)
        self.criterion = nn.CTCLoss(blank=output_size - 1)
        self.lr = lr
        self.decoder = decoder
        self.relationCriterion = RelationLoss()
        self.alpha = alpha

    def forward(self, x):
        return self.model(x)
    
    def training_step(self, batch, batch_idx):
        x, y, x_lens, y_lens = batch

        # your code to calculate loss
        y_hat = self.model(x)
        y_hat = y_hat.permute(1, 0, 2)
        y_hat = y_hat.log_softmax(2)
        loss = self.criterion(y_hat, y, x_lens, y_lens)
        relation_loss = self.relationCriterion(y_hat, x[:, :, 3])
        total_loss = loss + self.alpha * relation_loss
        
        # your code to calculate total edit distance
        total_edits = 0
        total_lens = 0
        y_hat = y_hat.permute(1, 0, 2)

        # loop: calculate edit distance
        for i in range(x.shape[0]):
            pred_seq = self.decoder(y_hat[i])
            label_seq = [vocab.index2word[index.item()] for index in y[i]]
            total_edits += edit_distance(pred_seq, label_seq)
            total_lens += len(label_seq)

        self.log('train_loss', total_loss, prog_bar=True, on_step=True, on_epoch=True)
        self.log('train_wer', total_edits / total_lens, prog_bar=True, on_step=True, on_epoch=True)
        return loss

    def validation_step(self, batch, batch_idx):
        x, y, x_lens, y_lens = batch

        # your code to calculate loss
        y_hat = self.model(x)
        y_hat = y_hat.permute(1, 0, 2)
        y_hat = y_hat.log_softmax(2)
        loss = self.criterion(y_hat, y, x_lens, y_lens)
        relation_loss = self.relationCriterion(y_hat, x[:, :, 3])
        print(loss, relation_loss)
        total_loss = loss + self.alpha * relation_loss

        # your code to calculate total edit distance
        total_edits = 0
        total_lens = 0
        relation_edits = 0
        relation_lens = 0
        symbol_edits = 0
        symbol_lens = 0
        y_hat = y_hat.permute(1, 0, 2)
        
        for i in range(len(y)):
            with torch.no_grad():
                decoded = self.decoder(y_hat[i])
            pred_seq = decoded
            label_seq = [vocab.index2word[y[i][j].item()] for j in range(y_lens[i])]  
            edit_distances = edit_distance(pred_seq, label_seq)
            
            relation_pred = [rel for rel in pred_seq if rel in relation]
            symbol_pred = [rel for rel in pred_seq if rel not in relation]
            
            relation_label = [rel for rel in label_seq if rel in relation]
            symbol_label = [rel for rel in label_seq if rel not in relation]
            
        # loop: calculate edit distance
            total_edits += edit_distances
            total_lens += y_lens[i]
            
            relation_edits += edit_distance(relation_pred, relation_label)
            relation_lens += len(relation_label)
            
            symbol_edits += edit_distance(symbol_pred, symbol_label)
            symbol_lens += len(symbol_label)
        
        self.log('val_loss', loss, prog_bar=True, on_step=False, on_epoch=True)
        self.log('val_wer', total_edits / total_lens, prog_bar=True, on_step=False, on_epoch=True)
        self.log('val_wer_relation', relation_edits / relation_lens, prog_bar=True, on_step=False, on_epoch=True)
        self.log('val_wer_symbol', symbol_edits / symbol_lens, prog_bar=True, on_step=False, on_epoch=True)
        
        return loss

    def test_step(self, batch, batch_idx):
        x, y, x_lens, y_lens = batch

        # your code to calculate loss
        y_hat = self.model(x)
        y_hat = y_hat.permute(1, 0, 2)
        y_hat = y_hat.log_softmax(2)
        loss = self.criterion(y_hat, y, x_lens, y_lens)
        relation_loss = self.relationCriterion(y_hat, x[:, :, 3])
        print(loss, relation_loss)
        total_loss = loss + self.alpha * relation_loss

        # your code to calculate total edit distance
        total_edits = 0
        total_lens = 0
        relation_edits = 0
        relation_lens = 0
        symbol_edits = 0
        symbol_lens = 0
        y_hat = y_hat.permute(1, 0, 2)
        
        for i in range(len(y)):
            with torch.no_grad():
                decoded = self.decoder(y_hat[i])
            pred_seq = decoded
            label_seq = [vocab.index2word[y[i][j].item()] for j in range(y_lens[i])]  
            edit_distances = edit_distance(pred_seq, label_seq)
            
            relation_pred = [rel for rel in pred_seq if rel in relation]
            symbol_pred = [rel for rel in pred_seq if rel not in relation]
            
            relation_label = [rel for rel in label_seq if rel in relation]
            symbol_label = [rel for rel in label_seq if rel not in relation]
            
        # loop: calculate edit distance
            total_edits += edit_distances
            total_lens += y_lens[i]
            
            relation_edits += edit_distance(relation_pred, relation_label)
            relation_lens += len(relation_label)
            
            symbol_edits += edit_distance(symbol_pred, symbol_label)
            symbol_lens += len(symbol_label)
        
        self.log('test_loss', loss, prog_bar=True, on_step=False, on_epoch=True)
        self.log('test_wer', total_edits / total_lens, prog_bar=True, on_step=False, on_epoch=True)
        self.log('test_wer_relation', relation_edits / relation_lens, prog_bar=True, on_step=False, on_epoch=True)
        self.log('test_wer_symbol', symbol_edits / symbol_lens, prog_bar=True, on_step=False, on_epoch=True)
        
        return loss

    def configure_optimizers(self):
        return torch.optim.Adam(self.model.parameters(), lr=self.lr)