In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install evaluate jiwer

In [None]:
import zipfile
import cv2
import numpy as np
import json
from google.colab.patches import cv2_imshow
from PIL import Image
import matplotlib.pyplot as plt

from tqdm import tqdm
from torch.utils.data import DataLoader, Dataset
from transformers import TrOCRProcessor, VisionEncoderDecoderModel
import torch
import editdistance
import evaluate


In [None]:
train_file = '/content/drive/MyDrive/ghnk/train_data.zip'
test_file = '/content/drive/MyDrive/ghnk/test_data.zip'

audit_log = '/content/drive/MyDrive/ghnk/audit.log'
epoch_counter = '/content/drive/MyDrive/ghnk/epoch_counter.log'

model_dir = '/content/drive/MyDrive/ghnk/model'
processor_dir = '/content/drive/MyDrive/ghnk/processor'

In [None]:
def parse_images_json(file):
  images_and_json = []

  with zipfile.ZipFile(file, 'r') as zip_ref:
    for file_info in tqdm(zip_ref.infolist(), desc='Reading files'):
      if file_info.filename.lower().endswith('.jpg'):

        json_data = None
        cv2_img = None

        json_file_info = zip_ref.getinfo(file_info.filename.replace('.jpg', '.json'))

        with zip_ref.open(json_file_info) as json_file:
          json_data = json.load(json_file)

        with zip_ref.open(file_info) as image_file:
          img = image_file.read()
          img_array = np.frombuffer(img, np.uint8)
          cv2_img = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
          cv2_img = cv2.cvtColor(cv2_img, cv2.COLOR_BGR2RGB)
          #images_and_json.append({'images':cv2_img, 'json':json_data})

        for data_item in json_data:
          text = data_item["text"]
          polygon = data_item["polygon"]

          coords = np.array([[polygon["x0"], polygon["y0"]],
                    [polygon["x1"], polygon["y1"]],
                    [polygon["x2"], polygon["y2"]],
                    [polygon["x3"], polygon["y3"]]], dtype=np.int32)

          # Get the min and max coordinates of x and y
          min_x = np.min(coords[:, 0])
          min_y = np.min(coords[:, 1])
          max_x = np.max(coords[:, 0])
          max_y = np.max(coords[:, 1])

          cropped_img = cv2_img[min_y:max_y, min_x:max_x]
          cropped_pil_image = Image.fromarray(cropped_img)

          images_and_json.append({"image": cropped_pil_image, "text": text})

  return images_and_json

In [None]:
def get_shuffled_indexes(length):
  list_int = [i for i in range(length)]
  np.random.seed(42)
  np.random.shuffle(list_int)
  return list_int

In [None]:
def get_images_json(file, split=1):
  train_images_json_consolidated = parse_images_json(file)
  total_count = len(train_images_json_consolidated)
  shuffled_indx = get_shuffled_indexes(total_count)

  train_images = [train_images_json_consolidated[i] for i in shuffled_indx[0:int(total_count*split)]]
  test_images = [train_images_json_consolidated[j] for j in shuffled_indx[int(total_count*split):]]

  return train_images, test_images

In [None]:
def get_model_and_processor(model_location=None, processor_location=None):

  if model_location == None:
    model_location = 'microsoft/trocr-base-handwritten'
  if processor_location == None:
    processor_location = 'microsoft/trocr-base-handwritten'

  print(f"Reading model from {model_location} and processor from {processor_location}")

  processor = TrOCRProcessor.from_pretrained(processor_location)
  model = VisionEncoderDecoderModel.from_pretrained(model_location)

  model.config.pad_token_id = processor.tokenizer.pad_token_id
  model.config.decoder_start_token_id = processor.tokenizer.cls_token_id
  model.config.vocab_size = model.config.decoder.vocab_size

  # set beam search parameters
  #model.config.eos_token_id = processor.tokenizer.sep_token_id
  #model.config.max_length = 64
  #model.config.early_stopping = True
  #model.config.no_repeat_ngram_size = 3
  #model.config.length_penalty = 2.0
  #model.config.num_beams = 4

  return model, processor


In [None]:
class GNHKDataset(Dataset):
    def __init__(self, images, processor, max_length=128):
        self.images = images
        self.processor = processor
        self.max_length = max_length

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = self.images[idx]["image"]
        label = self.images[idx]["text"]

        pixel_values = self.processor(images=image, return_tensors="pt").pixel_values.squeeze()
        # For an image with handwritten text 'week', this is the torch.Size([3, 384, 384]) tensor
        #  tensor([[[0.5922, 0.5922, 0.5922,  ..., 0.5059, 0.4980, 0.4980],
        #          [0.5922, 0.5922, 0.5922,  ..., 0.5059, 0.4980, 0.4980],
        #          [0.5922, 0.5922, 0.5922,  ..., 0.5059, 0.4980, 0.4980],
        #          ...,
        #          [0.5765, 0.5765, 0.5765,  ..., 0.5059, 0.5137, 0.5137],
        #          [0.5765, 0.5765, 0.5765,  ..., 0.5059, 0.5137, 0.5137],
        #          [0.5765, 0.5765, 0.5765,  ..., 0.5059, 0.5137, 0.5137]],
        #
        #          [[0.5294, 0.5294, 0.5294,  ..., 0.4118, 0.4039, 0.4039],
        #          [0.5294, 0.5294, 0.5294,  ..., 0.4118, 0.4039, 0.4039],
        #          [0.5294, 0.5294, 0.5294,  ..., 0.4118, 0.4039, 0.4039],
        #          ...,
        #          [0.5137, 0.5137, 0.5137,  ..., 0.4118, 0.4196, 0.4196],
        #          [0.5137, 0.5137, 0.5137,  ..., 0.4118, 0.4196, 0.4196],
        #          [0.5137, 0.5137, 0.5137,  ..., 0.4118, 0.4196, 0.4196]],
        #
        #          [[0.3490, 0.3490, 0.3490,  ..., 0.3176, 0.3098, 0.3098],
        #          [0.3490, 0.3490, 0.3490,  ..., 0.3176, 0.3098, 0.3098],
        #          [0.3490, 0.3490, 0.3490,  ..., 0.3176, 0.3098, 0.3098],
        #          ...,
        #          [0.3647, 0.3647, 0.3647,  ..., 0.3176, 0.3255, 0.3255],
        #          [0.3647, 0.3647, 0.3647,  ..., 0.3176, 0.3255, 0.3255],
        #          [0.3647, 0.3647, 0.3647,  ..., 0.3176, 0.3255, 0.3255]]])

        val = self.processor.tokenizer(label, padding="max_length", max_length=self.max_length)
        # This is a sample 'val' for a label 'week'
        # {'input_ids': [0, 3583, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
        #                  1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
        #                  1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
        #                  1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
        #                  1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1],
        #  'attention_mask': [1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        #                  0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        #                  0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        #                  0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        #                  0, 0, 0, 0, 0, 0, 0, 0, 0, 0]}
        labels = val.input_ids
        labels = [label if label != self.processor.tokenizer.pad_token_id else -100 for label in labels]
        # The 'input_ids' for the label 'week', after updating the pad_token_ids to -100
        #  tensor([   0, 3583,    2, -100, -100, -100, -100, -100, -100, -100, -100, -100,
        #          -100, -100, -100, -100, -100, -100, -100, -100, -100, -100, -100, -100,
        #          -100, -100, -100, -100, -100, -100, -100, -100, -100, -100, -100, -100,
        #          -100, -100, -100, -100, -100, -100, -100, -100, -100, -100, -100, -100,
        #          -100, -100, -100, -100, -100, -100, -100, -100, -100, -100, -100, -100,
        #          -100, -100, -100, -100, -100, -100, -100, -100, -100, -100, -100, -100,
        #          -100, -100, -100, -100, -100, -100, -100, -100, -100, -100, -100, -100,
        #          -100, -100, -100, -100, -100, -100, -100, -100, -100, -100, -100, -100,
        #          -100, -100, -100, -100, -100, -100, -100, -100, -100, -100, -100, -100,
        #          -100, -100, -100, -100, -100, -100, -100, -100, -100, -100, -100, -100,
        #          -100, -100, -100, -100, -100, -100, -100, -100])

        return pixel_values, torch.tensor(labels)


In [None]:
def train_model(model, processor, train_images, batch_size=12, learning_rate=5e-7):

    #model, processor = model_and_processor(model, processor, current_epoch - 1)
    train_loader = DataLoader(GNHKDataset(train_images, processor), batch_size=batch_size, shuffle=True)

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu") #use GPU if available
    print(f'Training on device {device}')

    model.to(device)
    model.train()

    optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

    train_loss = 0.0
    for pixel_values, labels in tqdm(train_loader, desc=f"(Train)"):
        pixel_values = pixel_values.to(device)
        labels = labels.to(device)

        outputs = model(pixel_values=pixel_values, labels=labels)
        loss = outputs.loss

        loss.backward()

        optimizer.step()
        optimizer.zero_grad()
        train_loss += loss.item()

    total_train_loss = train_loss / len(train_loader)
    print(f"Train Loss: {train_loss / len(train_loader)}")

    return model, processor, total_train_loss

In [None]:
def evaluate_model(model, processor, images, batch_size=12):
    loader = DataLoader(GNHKDataset(images, processor), batch_size=batch_size, shuffle=True)

    cer_metric = evaluate.load("cer")

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    model.eval()

    true_texts = []
    predicted_texts = []
    outputs = []
    total_cer = 0.0

    with torch.no_grad():
        for pixel_values, labels in tqdm(loader, desc='Evaluation'):
            print(labels.shape)
            pixel_values = pixel_values.to(device)
            generated_ids = model.generate(pixel_values,
                                            eos_token_id=processor.tokenizer.sep_token_id,
                                            max_length=64,
                                            early_stopping=True,
                                            no_repeat_ngram_size=3,
                                            length_penalty=2.0,
                                            num_beams=4)

            generated_text = processor.batch_decode(generated_ids, skip_special_tokens=True)

            labels[labels == -100] = processor.tokenizer.pad_token_id
            true_labels = processor.batch_decode(labels, skip_special_tokens=True)
            print(generated_text, true_labels)
            cer = cer_metric.compute(predictions=generated_text, references=true_labels)
            total_cer += cer
            print(total_cer)

        calculated_cer = total_cer/len(loader)
        print('Calculated cer is ', calculated_cer)
        return calculated_cer


### Training Loop

In [None]:
model, processor = get_model_and_processor(f"{model_dir}/8",f"{processor_dir}/8")

In [None]:
train_images = get_images_json(train_file, 0.8)[0]

In [None]:
model, processor, total_train_loss = train_model(model, processor, train_images)
model.save_pretrained(f"{model_dir}/9")
processor.save_pretrained(f"{processor_dir}/9")

### Testing

In [None]:
test_images = get_images_json(train_file, 0.8)[1]

In [None]:
model, processor = get_model_and_processor(f"{model_dir}/7", f"{processor_dir}/7")

In [None]:
calculated_cer = evaluate_model(model, processor, test_images)

### Evaluation

In [None]:
eval_images = get_images_json(test_file, 1)[0]

In [None]:
model, processor = get_model_and_processor(f"{model_dir}/9", f"{processor_dir}/9")

In [None]:
calculated_cer = evaluate_model(model, processor, eval_images)