## Init

In [None]:
!pip install albumentations==0.4.6
!pip install mrz
!pip install imantics

# MRZ gen.

In [24]:
!git clone https://github.com/luciantin/MRZ_Generator

Cloning into 'MRZ_Generator'...
remote: Enumerating objects: 465, done.[K
remote: Counting objects: 100% (465/465), done.[K
remote: Compressing objects: 100% (441/441), done.[K
remote: Total 465 (delta 27), reused 447 (delta 16), pack-reused 0[K
Receiving objects: 100% (465/465), 25.92 MiB | 38.86 MiB/s, done.
Resolving deltas: 100% (27/27), done.


U settings.json postaviti sample size za train set, generirati slike, spremiti dir pod nazivom train_images i train_masks.

Ponoviti to za val set, val_images i val_masks.

Pokrenuti iz njegovog DIR-a zbog path-a

In [None]:
!cd MRZ_Generator/ && mkdir result 
!cd MRZ_Generator/result && mkdir images && mkdir masks && touch MRZ_values.json
!cd MRZ_Generator/ && python ./main.py

In [None]:
## Test slika

import cv2
import matplotlib.pyplot as plt

img = cv2.imread('MRZ_Generator/result/masks/100.bmp', cv2.IMREAD_GRAYSCALE)
print(img)

plt.imshow(img, cmap='gray')
plt.show()

In [None]:
!git clone https://github.com/luciantin/MRZ-Detector

In [None]:
!cd MRZ-Detector/ && mkdir data && mkdir saved_images

In [None]:
!mv MRZ_Generator/result/images MRZ-Detector/data/train_images
!mv MRZ_Generator/result/images MRZ-Detector/data/train_masks

In [None]:
!mv MRZ_Generator/result/images MRZ-Detector/data/val_images
!mv MRZ_Generator/result/images MRZ-Detector/data/val_masks

# MRZ detector

In [2]:
import torch
import torchvision
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from PIL import Image
import numpy as np
import os

## Dataset

In [None]:



class MRZ_Dataset(Dataset):
    def __init__(self, image_dir, mask_dir, transform=None):
        self.image_dir = image_dir
        self.mask_dir = mask_dir
        self.transform = transform
        self.images = os.listdir(image_dir)

    def __len__(self):
        return len(self.images)

    def __getitem__(self, index):
        img_path = os.path.join(self.image_dir, self.images[index])
        mask_path = os.path.join(self.mask_dir, self.images[index].replace(".png", ".bmp"))  ######
        # print(mask_path)
        image = np.array(Image.open(img_path).convert("RGB"))
        mask = np.array(Image.open(mask_path).convert("L"), dtype=np.float32)
        mask[mask == 255.0] = 1

        if self.transform is not None:
            augmentations = self.transform(image=image, mask=mask)
            image = augmentations["image"]
            mask = augmentations["mask"]

        return image, mask

## DataLoader

In [None]:

def get_loaders(
    train_dir,
    train_maskdir,
    val_dir,
    val_maskdir,
    batch_size,
    train_transform,
    val_transform,
    num_workers=4,
    pin_memory=True,
):
    train_ds = MRZ_Dataset(
        image_dir=train_dir,
        mask_dir=train_maskdir,
        transform=train_transform,
    )

    train_loader = DataLoader(
        train_ds,
        batch_size=batch_size,
        num_workers=num_workers,
        pin_memory=pin_memory,
        shuffle=True,
    )

    val_ds = MRZ_Dataset(
        image_dir=val_dir,
        mask_dir=val_maskdir,
        transform=val_transform,
    )

    val_loader = DataLoader(
        val_ds,
        batch_size=batch_size,
        num_workers=num_workers,
        pin_memory=pin_memory,
        shuffle=False,
    )

    return train_loader, val_loader


## Dice Score

[Sørensen-Dice similarity coefficient for image segmentation](https://www.mathworks.com/help/images/ref/dice.html)

- tek nakon sto se izvrti epoha, ne sudjeluje u f.c.

In [None]:
def check_accuracy(loader, model, device="cuda"):
    num_correct = 0
    num_pixels = 0
    dice_score = 0
    model.eval()

    with torch.no_grad():
        for x, y in loader:
            x = x.to(device)
            y = y.to(device).unsqueeze(1)
            preds = torch.sigmoid(model(x))
            preds = (preds > 0.5).float()
            num_correct += (preds == y).sum()
            num_pixels += torch.numel(preds)
            dice_score += (2 * (preds * y).sum()) / (
                (preds + y).sum() + 1e-8
            )

    print(
        f"Got {num_correct}/{num_pixels} with acc {num_correct/num_pixels*100:.2f}"
    )
    print(f"Dice score: {dice_score/len(loader)}")
    model.train()

## Checkpoint

In [None]:
def save_checkpoint(state, filename="my_checkpoint.pth.tar"):
    print("=> Saving checkpoint")
    torch.save(state, filename)

def load_checkpoint(checkpoint, model):
    print("=> Loading checkpoint")
    model.load_state_dict(checkpoint["state_dict"])

## Model

In [None]:
import torch
import torch.nn as nn
import torchvision.transforms.functional as TF

import matplotlib.pyplot as plt

TEST = False

# dupla konvolucija za svaki korak u UNET-u
class DoubleConv(nn.Module):
    def __init__(self, in_channels, out_channels):  
        super(DoubleConv, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, 3, 1, 1, bias=False), # 1. konv
            nn.BatchNorm2d(out_channels), # normalizacija
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, 3, 1, 1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
        )

    def forward(self, x):
        return self.conv(x)


# features je borj kanala (featura) u UNET-u za jednu stranu, druga strana ide u suprotnom smjeru po polju
# npr. za torch.Size([3, 256, 40, 40])
# 2d polje od 40x40 gdje svaki element ima polje od 256 elemenata, to su ti featuri? , svaki od tih elementata ima 3 elem. za RGB
# ulaznih kanala imamo 3 jer je rgb a izlaznih 1 jer imamo samo jednu kategoriju (2, ili je ili nije) 
class UNET(nn.Module):
    def __init__(self, in_channels=3, out_channels=1, features=[64, 128, 256, 512]):
        super(UNET, self).__init__()
        self.ups = nn.ModuleList()
        self.downs = nn.ModuleList()
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)

        #Down part
        for feature in features:
            self.downs.append(DoubleConv(in_channels, feature))
            in_channels = feature

        #Up part
        for feature in reversed(features):
            self.ups.append(
                nn.ConvTranspose2d(
                    feature*2, feature, kernel_size=2, stride=2
                )
            )
            self.ups.append(DoubleConv(feature*2, feature))

        self.bottleneck = DoubleConv(features[-1], features[-1]*2)
        self.final_conv = nn.Conv2d(features[0], out_channels, kernel_size=1)


    def forward(self, x):
        skip_connections = []

        for down in self.downs:
            x = down(x)
            skip_connections.append(x)
            x = self.pool(x)

        x = self.bottleneck(x)

        skip_connections = skip_connections[::-1]

        for idx in range(0, len(self.ups), 2):
            x = self.ups[idx](x)
            skip_connection = skip_connections[idx//2]

            if TEST == True:
              print(x[0][0].detach().numpy().shape)
              plt.imshow(x[0][1].detach().numpy(), cmap='gray')
              plt.show()
            
            if x.shape != skip_connection.shape:
                x = TF.resize(x, size=skip_connection.shape[2:])

            concat_skip = torch.cat((skip_connection, x), dim=1)
            x = self.ups[idx+1](concat_skip)

        return self.final_conv(x)

def test():
    x = torch.randn((3, 1, 160, 160))
    model = UNET(in_channels=1, out_channels=1)
    preds = model(x)
    print(preds.shape)
    print(x.shape)
    assert x.shape == preds.shape

# TEST = True
# test()


## Parametri

In [None]:
LEARNING_RATE = 1e-4
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
BATCH_SIZE = 10
NUM_EPOCHS = 3
NUM_WORKERS = 4
IMAGE_HEIGHT = 160*3 
IMAGE_WIDTH = 240*3  
PIN_MEMORY = True ## samo ako CUDA 
LOAD_MODEL = False  
TRAIN_IMG_DIR = "MRZ-Detector/data/train_images/"
TRAIN_MASK_DIR = "MRZ-Detector/data/train_masks/"
VAL_IMG_DIR = "MRZ-Detector/data/val_images/"
VAL_MASK_DIR = "MRZ-Detector/data/val_masks/"

## Libs za train

In [None]:
import torch
import albumentations as A
from albumentations.pytorch import ToTensorV2
from tqdm import tqdm
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
import numpy as np
import cv2

## Transformacije

In [None]:
train_transform = A.Compose(
    [
        A.Resize(height=IMAGE_HEIGHT, width=IMAGE_WIDTH),
        # A.Rotate(limit=35, p=1.0),
        # A.HorizontalFlip(p=0.5),
        # A.VerticalFlip(p=0.1),
        A.Normalize(
            mean=[0.0, 0.0, 0.0],
            std=[1.0, 1.0, 1.0],
            max_pixel_value=255.0,
        ),
        ToTensorV2(),
    ],
)

val_transforms = A.Compose(
    [
        A.Resize(height=IMAGE_HEIGHT, width=IMAGE_WIDTH),
        A.Normalize(
            mean=[0.0, 0.0, 0.0],
            std=[1.0, 1.0, 1.0],
            max_pixel_value=255.0,
        ),
        ToTensorV2(),
    ],
)

## Train fn

In [None]:
def train_fn(loader, model, optimizer, loss_fn, scaler):
    loop = tqdm(loader)

    for batch_idx, (data, targets) in enumerate(loop):
        data = data.to(device=DEVICE)
        targets = targets.float().unsqueeze(1).to(device=DEVICE)
       
        # plt.imshow(np.transpose(targets[0].numpy(), (1, 2, 0)))
        # plt.show()

        # forward
        with torch.cuda.amp.autocast():
            predictions = model(data)
            loss = loss_fn(predictions, targets)
        
        # plt.imshow(np.transpose(predictions[0].detach().numpy(), (1, 2, 0)))
        # plt.show()

        # backward
        optimizer.zero_grad()
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        # update tqdm loop
        loop.set_postfix(loss=loss.item())

## Predict on Val dataset

In [None]:
def save_predictions_as_imgs(
    loader, model, folder="saved_images/", device="cuda"
):
    model.eval()
    for idx, (x, y) in enumerate(loader):
        x = x.to(device=device)
        with torch.no_grad():
            preds = torch.sigmoid(model(x))
            preds = (preds > 0.5).float()
        torchvision.utils.save_image(
            preds, f"{folder}/pred_{idx}.png"
        )
        torchvision.utils.save_image(y.unsqueeze(1), f"{folder}{idx}.png")

    model.train()

In [None]:
def predict():
    print('predictING')
    model = UNET(in_channels=3, out_channels=1).to(DEVICE)
    load_checkpoint(torch.load("my_checkpoint.pth.tar"), model)
   
    train_loader, val_loader = get_loaders(
        TRAIN_IMG_DIR,
        TRAIN_MASK_DIR,
        VAL_IMG_DIR,
        VAL_MASK_DIR,
        BATCH_SIZE,
        train_transform,
        val_transforms,
        NUM_WORKERS,
        PIN_MEMORY,
    )

    save_predictions_as_imgs(
        val_loader, model, folder="saved_images/", device=DEVICE
    )


## Main Train Loop

In [None]:
def main():
    torch.cuda.empty_cache()
    
    model = UNET(in_channels=3, out_channels=1).to(DEVICE)
    loss_fn = nn.BCEWithLogitsLoss()
    optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

    train_loader, val_loader = get_loaders(
        TRAIN_IMG_DIR,
        TRAIN_MASK_DIR,
        VAL_IMG_DIR,
        VAL_MASK_DIR,
        BATCH_SIZE,
        train_transform,
        val_transforms,
        NUM_WORKERS,
        PIN_MEMORY,
    )

    if LOAD_MODEL:
        load_checkpoint(torch.load("my_checkpoint.pth.tar"), model)

    check_accuracy(val_loader, model, device=DEVICE)
    scaler = torch.cuda.amp.GradScaler()

    for epoch in range(NUM_EPOCHS):
        train_fn(train_loader, model, optimizer, loss_fn, scaler)

        # save model
        checkpoint = {
            "state_dict": model.state_dict(),
            "optimizer": optimizer.state_dict(),
        }
        save_checkpoint(checkpoint)

        # check accuracy
        check_accuracy(val_loader, model, device=DEVICE)

        # print some examples to a folder
        save_predictions_as_imgs(
            val_loader, model, folder="MRZ-Detector/saved_images/", device=DEVICE
        )

## Ispis DataLoadera

In [None]:
def data_test():
    
    train_loader, val_loader = get_loaders(
        TRAIN_IMG_DIR,
        TRAIN_MASK_DIR,
        VAL_IMG_DIR,
        VAL_MASK_DIR,
        BATCH_SIZE,
        train_transform,
        val_transforms,
        NUM_WORKERS,
        PIN_MEMORY,
    )
    print(len(val_loader))

    for idx, (x, y) in enumerate(val_loader):
        print(idx)
        # plt.imshow(np.transpose(y[0].numpy(), (1, 2, 0)))
        plt.imshow(y[0].numpy())
        plt.show()

## Single image prediction

In [None]:
def predict_single():
  
  device="cuda"
  
  image = np.array(Image.open('MRZ-Detector/data/val_images/0.png').convert("RGB"))
  mask = np.array(Image.open('MRZ-Detector/data/val_masks/0.bmp').convert("L"), dtype=np.float32)
  mask[mask == 255.0] = 1
  augmentations = val_transforms(image=image, mask=mask)
  image = augmentations["image"]
  mask = augmentations["mask"]

  plt.imshow(image.squeeze().permute(1,2,0))
  plt.show()
  plt.imshow(mask, cmap='gray')
  plt.show()
  
  image = torch.tensor(image, requires_grad=True).to(DEVICE)
  image = image.unsqueeze(0)

  model = UNET(in_channels=3, out_channels=1).to(DEVICE)
  load_checkpoint(torch.load("my_checkpoint.pth.tar"), model)
  # image = image.to(device=device)
  model.eval()
  with torch.no_grad():
    preds = torch.sigmoid(model(image))
    preds = (preds > 0.5).float()
  torchvision.utils.save_image(preds, "./pred_100.png")
  model.train()

## TJT

za predict i predict_single mora postojati checkpoint

In [None]:
if __name__ == "__main__":
    # main()
    # predict_single()
    # predict()
    # data_test()