In [1]:
import os, json, re
import torch
from data import RecDataModule
import pytorch_lightning as pl
import torchvision

PATH_DATA = "/home/ubuntu/datasets/segment_car_plate/data/"


# def decode_sequence(pred, abc):
#     pred = pred.permute(1, 0, 2).cpu().data.numpy()
#     outputs = []
#     for i in range(len(pred)):
#         outputs.append(pred_to_string(pred[i], abc))
#     return outputs


In [2]:
# with open(os.path.join(PATH_DATA, "train_recognition.json")) as fp:
#     obj = json.load(fp)
# fname2text = {os.path.basename(rec["file"]):rec["text"] for rec in obj}

# assert len(obj) == len(fname2text)

# config = []

# for idx, fname in enumerate(
#     os.listdir(os.path.join(PATH_DATA, "train"))
# ):
#     if re.fullmatch(
#         pattern=r"\d+\.box\.\d+\.jpg",
#         string=fname
#     ):
#         config.append(
#             (fname, fname2text[fname])
#         )

In [3]:
def make_config(path_data):
    with open(os.path.join(path_data, "train_recognition.json")) as fp:
        obj = json.load(fp)
    fname2text = {os.path.basename(rec["file"]):rec["text"] for rec in obj}

    assert len(obj) == len(fname2text)

    config = []

    for idx, fname in enumerate(
        os.listdir(os.path.join(path_data, "train"))
    ):
        if re.fullmatch(
            pattern=r"\d+\.box\.\d+\.jpg",
            string=fname
        ):
            config.append(
                (fname, fname2text[fname])
            )
    return config

In [4]:
config = make_config(path_data=PATH_DATA)

In [5]:
dmodule = RecDataModule(
    data_path=os.path.join(PATH_DATA, "train"),
    config=config
)
dmodule.setup()

dtrain = dmodule.train_dataloader()
dval = dmodule.val_dataloader()
dtest = dmodule.test_dataloader()

In [6]:
class FeatureExtractor(torch.nn.Module):

    def __init__(self, input_size=(64, 320), output_len=20):
        super(FeatureExtractor, self).__init__()
        h, w = input_size
        resnet = torchvision.models.resnet18(weights=True)
        self.cnn = torch.nn.Sequential(*list(resnet.children())[:-2])
        self.pool = torch.nn.AvgPool2d(kernel_size=(h // 32, 1))
        self.proj = torch.nn.Conv2d(w // 32, output_len, kernel_size=1)
        self.num_output_features = self.cnn[-1][-1].bn2.num_features

    def apply_projection(self, x):
        """Use convolution to increase width of a features.

        Args:
            - x: Tensor of features (shaped B x C x H x W).

        Returns:
            New tensor of features (shaped B x C x H x W').
        """
        x = x.permute(0, 3, 2, 1).contiguous()
        x = self.proj(x)
        x = x.permute(0, 2, 3, 1).contiguous()

        return x

    def forward(self, x):
        features = self.cnn(x) # conv layers
        features = self.pool(features) # to make height == 1
        features = self.apply_projection(features) # to increase width

        return features


class SequencePredictor(torch.nn.Module):

    def __init__(self, input_size, hidden_size, num_layers, num_classes, dropout=0.3, bidirectional=False):
        super(SequencePredictor, self).__init__()

        self.num_classes = num_classes
        self.rnn = torch.nn.GRU(input_size=input_size,
                          hidden_size=hidden_size,
                          num_layers=num_layers,
                          dropout=dropout,
                          bidirectional=bidirectional)

        fc_in = hidden_size if not bidirectional else 2 * hidden_size
        self.fc = torch.nn.Linear(in_features=fc_in,
                            out_features=num_classes)

    def _init_hidden(self, batch_size):
        """Initialize new tensor of zeroes for RNN hidden state.

        Args:
            - batch_size: Int size of batch

        Returns:
            Tensor of zeros shaped (num_layers * num_directions, batch, hidden_size).
        """
        num_directions = 2 if self.rnn.bidirectional else 1
        h = torch.zeros(self.rnn.num_layers * num_directions, batch_size, self.rnn.hidden_size)
        return h

    def _reshape_features(self, x):
        """Change dimensions of x to fit RNN expected input.

        Args:
            - x: Tensor x shaped (B x (C=1) x H x W).

        Returns:
            New tensor shaped (W x B x H).
        """
        x = x.squeeze(1)
        x = x.permute(2, 0, 1)
        return x

    def forward(self, x):
        x = self._reshape_features(x)
        batch_size = x.size(1)
        h_0 = self._init_hidden(batch_size)
        h_0 = h_0.to(x.device)
        x, h = self.rnn(x, h_0)
        x = self.fc(x)
        return x


In [7]:
class CRNN(pl.LightningModule):
    ABC = "0123456789ABCEHKMOPTXY"
    
    def __init__(
        self, 
        alphabet=ABC,
        cnn_input_size=(64, 320),
        cnn_output_len=20,
        rnn_hidden_size=128,
        rnn_num_layers=1,
        rnn_dropout=0.0,
        rnn_bidirectional=False,
        lr=3e-4,
        device='cuda' if torch.cuda.is_available() else 'cpu',
    ):
        super(CRNN, self).__init__()
        self.alphabet = alphabet
        self.features_extractor = FeatureExtractor(
            input_size=cnn_input_size,
            output_len=cnn_output_len
        )
        self.sequence_predictor = SequencePredictor(
            input_size=self.features_extractor.num_output_features,
            hidden_size=rnn_hidden_size, num_layers=rnn_num_layers,
            num_classes=len(alphabet) + 1, dropout=rnn_dropout,
            bidirectional=rnn_bidirectional
        )
        self.lr = lr
    
    @staticmethod
    def decode_sequence(pred, abc):
        pred = pred.permute(1, 0, 2).cpu().data.numpy()
        outputs = []
        for i in range(len(pred)):
            outputs.append(pred_to_string(pred[i], abc))
        return outputs

    def forward(self, x, decode=False):
        features = self.features_extractor(x)
        sequence = self.sequence_predictor(features)
        if decode:
            sequence = self.decode_sequence(sequence, self.alphabet)
        return sequence
    
    def training_step(self, batch, batch_idx):
        images = batch["images"].to(self.device)
        seqs = batch["seqs"]
        seq_lens = batch["seq_lens"]

        # TODO TIP: What happens here is explained in seminar 06.
        seqs_pred = model(images).cpu()
        log_probs = torch.nn.functional.log_softmax(seqs_pred, dim=2)
        seq_lens_pred = torch.Tensor([seqs_pred.size(0)] * seqs_pred.size(1)).int()

        loss = torch.nn.functional.ctc_loss(
            log_probs, seqs, seq_lens_pred, seq_lens
        )
        return loss

    def configure_optimizers(self):
        return torch.optim.AdamW(model.parameters(), lr=self.lr)


In [8]:
model = CRNN()
trainer = pl.Trainer(
    accelerator="cpu", 
    max_epochs=5,
    fast_dev_run=3,
    limit_train_batches=2,
    limit_val_batches=2,
    limit_test_batches=2,
)

trainer.fit(
    model, 
    train_dataloaders=dtrain, 
    val_dataloaders=dval,
)

GPU available: False, used: False
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
Running in `fast_dev_run` mode: will run the requested loop using 3 batch(es). Logging and checkpointing is suppressed.
  rank_zero_warn("You passed in a `val_dataloader` but have no `validation_step`. Skipping val loop.")

  | Name               | Type              | Params
---------------------------------------------------------
0 | features_extractor | FeatureExtractor  | 11.2 M
1 | sequence_predictor | SequencePredictor | 249 K 
---------------------------------------------------------
11.4 M    Trainable params
0         Non-trainable params
11.4 M    Total params
45.705    Total estimated model params size (MB)
  rank_zero_warn(
  rank_zero_warn(


Training: 0it [00:00, ?it/s]

`Trainer.fit` stopped: `max_steps=3` reached.


In [9]:
for batch in dtrain:
    break
batch["images"].size()

torch.Size([32, 3, 64, 320])

In [None]:
x = model.features_extractor.cnn(batch["images"]).permute(0, 3, 2, 1).contiguous()
x.size()

In [None]:
model.features_extractor.cnn

In [None]:
model.features_extractor.proj(x)

In [None]:
m = torch.nn.Conv2d(16, 33, (3, 5))
inp = torch.randn(20, 16, 50, 100)
output = m(inp)
output.size()

In [None]:
images = batch["images"]
images.size()

In [None]:
feats1 = model.features_extractor.cnn(images)
feats1.size()

In [None]:
feats2 = model.features_extractor.pool(feats1)
feats2.size()

In [None]:
feats3 = feats2.permute(0, 3, 2, 1).contiguous()
feats3.size()

In [None]:
feats4 = model.features_extractor.proj(feats3)
feats4.size()