# TASK - 2 for ICDAR 2019

In [9]:
'''
******** UTILITY BLOCK ********
'''

import torch
import torch.nn as nn
from torch.autograd import Variable
import collections


class strLabelConverter(object):
    """Convert between str and label.
    NOTE:
        Insert `blank` to the alphabet for CTC.
    Args:
        alphabet (str): set of the possible characters.
        ignore_case (bool, default=True): whether or not to ignore all of the case.
    """

    def __init__(self, alphabet, ignore_case=True):
        self._ignore_case = ignore_case
        if self._ignore_case:
            alphabet = alphabet.lower()
        self.alphabet = alphabet + '-'  # for `-1` index

        self.dict = {}
        for i, char in enumerate(alphabet):
            # NOTE: 0 is reserved for 'blank' required by wrap_ctc
            self.dict[char] = i + 1

    def encode(self, text):
        """Support batch or single str.
        Args:
            text (str or list of str): texts to convert.
        Returns:
            torch.IntTensor [length_0 + length_1 + ... length_{n - 1}]: encoded texts.
            torch.IntTensor [n]: length of each text.
        """
        if isinstance(text, str):
            text = [
                self.dict[char.lower() if self._ignore_case else char]
                for char in text
            ]
            length = [len(text)]
        elif isinstance(text, collections.Iterable):
            length = [len(s) for s in text]
            text = ''.join(text)
            text, _ = self.encode(text)
        return (torch.IntTensor(text), torch.IntTensor(length))

    def decode(self, t, length, raw=False):
        """Decode encoded texts back into strs.
        Args:
            torch.IntTensor [length_0 + length_1 + ... length_{n - 1}]: encoded texts.
            torch.IntTensor [n]: length of each text.
        Raises:
            AssertionError: when the texts and its length does not match.
        Returns:
            text (str or list of str): texts to convert.
        """
        if length.numel() == 1:
            length = length[0]
            assert t.numel() == length, "text with length: {} does not match declared length: {}".format(t.numel(), length)
            if raw:
                return ''.join([self.alphabet[i - 1] for i in t])
            else:
                char_list = []
                for i in range(length):
                    if t[i] != 0 and (not (i > 0 and t[i - 1] == t[i])):
                        char_list.append(self.alphabet[t[i] - 1])
                return ''.join(char_list)
        else:
            # batch mode
            assert t.numel() == length.sum(), "texts with length: {} does not match declared length: {}".format(t.numel(), length.sum())
            texts = []
            index = 0
            for i in range(length.numel()):
                l = length[i]
                texts.append(
                    self.decode(
                        t[index:index + l], torch.IntTensor([l]), raw=raw))
                index += l
            return texts


class averager(object):
    """Compute average for `torch.Variable` and `torch.Tensor`. """

    def __init__(self):
        self.reset()

    def add(self, v):
        if isinstance(v, Variable):
            count = v.data.numel()
            v = v.data.sum()
        elif isinstance(v, torch.Tensor):
            count = v.numel()
            v = v.sum()

        self.n_count += count
        self.sum += v

    def reset(self):
        self.n_count = 0
        self.sum = 0

    def val(self):
        res = 0
        if self.n_count != 0:
            res = self.sum / float(self.n_count)
        return res


def oneHot(v, v_length, nc):
    batchSize = v_length.size(0)
    maxLength = v_length.max()
    v_onehot = torch.FloatTensor(batchSize, maxLength, nc).fill_(0)
    acc = 0
    for i in range(batchSize):
        length = v_length[i]
        label = v[acc:acc + length].view(-1, 1).long()
        v_onehot[i, :length].scatter_(1, label, 1.0)
        acc += length
    return v_onehot


def loadData(v, data):
    v.data.resize_(data.size()).copy_(data)


def prettyPrint(v):
    print('Size {0}, Type: {1}'.format(str(v.size()), v.data.type()))
    print('| Max: %f | Min: %f | Mean: %f' % (v.max().data[0], v.min().data[0],
                                              v.mean().data[0]))


def assureRatio(img):
    """Ensure imgH <= imgW."""
    b, c, h, w = img.size()
    if h > w:
        main = nn.UpsamplingBilinear2d(size=(h, h), scale_factor=None)
        img = main(img)
    return img


In [10]:
'''
********** MODEL **********
'''


import torch.nn as nn


class BidirectionalLSTM(nn.Module):

    def __init__(self, nIn, nHidden, nOut):
        super(BidirectionalLSTM, self).__init__()

        self.rnn = nn.LSTM(nIn, nHidden, bidirectional=True)
        self.embedding = nn.Linear(nHidden * 2, nOut)

    def forward(self, input):
        recurrent, _ = self.rnn(input)
        T, b, h = recurrent.size()
        t_rec = recurrent.view(T * b, h)

        output = self.embedding(t_rec)  # [T * b, nOut]
        output = output.view(T, b, -1)

        return output


class CRNN(nn.Module):

    def __init__(self, imgH, nc, nclass, nh, n_rnn=2, leakyRelu=False):
        super(CRNN, self).__init__()
        assert imgH % 16 == 0, 'imgH has to be a multiple of 16'

        ks = [3, 3, 3, 3, 3, 3, 2]
        ps = [1, 1, 1, 1, 1, 1, 0]
        ss = [1, 1, 1, 1, 1, 1, 1]
        nm = [64, 128, 256, 256, 512, 512, 512]

        cnn = nn.Sequential()

        def convRelu(i, batchNormalization=False):
            nIn = nc if i == 0 else nm[i - 1]
            nOut = nm[i]
            cnn.add_module('conv{0}'.format(i),
                           nn.Conv2d(nIn, nOut, ks[i], ss[i], ps[i]))
            if batchNormalization:
                cnn.add_module('batchnorm{0}'.format(i), nn.BatchNorm2d(nOut))
            if leakyRelu:
                cnn.add_module('relu{0}'.format(i),
                               nn.LeakyReLU(0.2, inplace=True))
            else:
                cnn.add_module('relu{0}'.format(i), nn.ReLU(True))

        convRelu(0)
        cnn.add_module('pooling{0}'.format(0), nn.MaxPool2d(2, 2))  # 64x16x64
        convRelu(1)
        cnn.add_module('pooling{0}'.format(1), nn.MaxPool2d(2, 2))  # 128x8x32
        convRelu(2, True)
        convRelu(3)
        cnn.add_module('pooling{0}'.format(2),
                       nn.MaxPool2d((2, 2), (2, 1), (0, 1)))  # 256x4x16
        convRelu(4, True)
        convRelu(5)
        cnn.add_module('pooling{0}'.format(3),
                       nn.MaxPool2d((2, 2), (2, 1), (0, 1)))  # 512x2x16
        convRelu(6, True)  # 512x1x16

        self.cnn = cnn
        self.rnn = nn.Sequential(
            BidirectionalLSTM(512, nh, nh),
            BidirectionalLSTM(nh, nh, nclass))

    def forward(self, input):
        # conv features
        conv = self.cnn(input)
        b, c, h, w = conv.size()
        assert h == 1, "the height of conv must be 1"
        conv = conv.squeeze(2)
        conv = conv.permute(2, 0, 1)  # [w, b, c]

        # rnn features
        output = self.rnn(conv)

        return output


In [11]:
'''
******* MAKING DATASET READY TO USE FOR MODEL *******
'''


import random
import torch
from torch.utils.data import Dataset
from torch.utils.data import sampler
import torchvision.transforms as transforms
import lmdb
import six
import sys
from PIL import Image
import numpy as np


class lmdbDataset(Dataset):

    def __init__(self, root=None, transform=None, target_transform=None):
        self.env = lmdb.open(
            root,
            max_readers=1,
            readonly=True,
            lock=False,
            readahead=False,
            meminit=False)

        if not self.env:
            print('cannot creat lmdb from %s' % (root))
            sys.exit(0)

        with self.env.begin(write=False) as txn:
            nSamples = int(txn.get(b'num-samples'))
            self.nSamples = nSamples

        self.transform = transform
        self.target_transform = target_transform

    def __len__(self):
        return self.nSamples

    def __getitem__(self, index):
        assert index <= len(self), 'index range error'
        index += 1
        with self.env.begin(write=False) as txn:
            img_key = b'image-%09d' % index
            imgbuf = txn.get(img_key)

            buf = six.BytesIO()
            buf.write(imgbuf)
            buf.seek(0)
            try:
                img = Image.open(buf).convert('L')
            except IOError:
                print('Corrupted image for %d' % index)
                return self[index + 1]

            if self.transform is not None:
                img = self.transform(img)

            label_key = b'label-%09d' % index
            label = str(txn.get(label_key))

            if self.target_transform is not None:
                label = self.target_transform(label)

        return (img, label.split('\'')[1])


class resizeNormalize(object):

    def __init__(self, size, interpolation=Image.BILINEAR):
        self.size = size
        self.interpolation = interpolation
        self.toTensor = transforms.ToTensor()

    def __call__(self, img):
        img = img.resize(self.size, self.interpolation)
        img = self.toTensor(img)
        img.sub_(0.5).div_(0.5)
        return img


class randomSequentialSampler(sampler.Sampler):

    def __init__(self, data_source, batch_size):
        self.num_samples = len(data_source)
        self.batch_size = batch_size

    def __iter__(self):
        n_batch = len(self) // self.batch_size
        tail = len(self) % self.batch_size
        index = torch.LongTensor(len(self)).fill_(0)
        for i in range(n_batch):
            random_start = random.randint(0, len(self) - self.batch_size)
            batch_index = random_start + torch.range(0, self.batch_size - 1)
            index[i * self.batch_size:(i + 1) * self.batch_size] = batch_index
        # deal with tail
        if tail:
            random_start = random.randint(0, len(self) - self.batch_size)
            tail_index = random_start + torch.range(0, tail - 1)
            index[(i + 1) * self.batch_size:] = tail_index

        return iter(index)

    def __len__(self):
        return self.num_samples


class alignCollate(object):

    def __init__(self, imgH=32, imgW=100, keep_ratio=False, min_ratio=1):
        self.imgH = imgH
        self.imgW = imgW
        self.keep_ratio = keep_ratio
        self.min_ratio = min_ratio

    def __call__(self, batch):
        images, labels = zip(*batch)

        imgH = self.imgH
        imgW = self.imgW
        if self.keep_ratio:
            ratios = []
            for image in images:
                w, h = image.size
                ratios.append(w / float(h))
            ratios.sort()
            max_ratio = ratios[-1]
            imgW = int(np.floor(max_ratio * imgH))
            imgW = max(imgH * self.min_ratio, imgW)  # assure imgH >= imgW

        transform = resizeNormalize((imgW, imgH))
        images = [transform(image) for image in images]
        images = torch.cat([t.unsqueeze(0) for t in images], 0)

        return images, labels


In [12]:
'''
********* PRDICTION BLOCK **********
'''



import torch
from torch.autograd import Variable
import utils
from PIL import Image
import glob
import os
import csv
import cv2


def predict_this_box(image, model, alphabet):
    converter = strLabelConverter(alphabet)
    transformer = resizeNormalize((200, 32))
    image = transformer(image)
    if torch.cuda.is_available():
        image = image.cuda()
    image = image.view(1, *image.size())
    image = Variable(image)

    model.eval()
    preds = model(image)

    _, preds = preds.max(2)
    preds = preds.transpose(1, 0).contiguous().view(-1)

    preds_size = Variable(torch.IntTensor([preds.size(0)]))
    raw_pred = converter.decode(preds.data, preds_size.data, raw=True)
    sim_pred = converter.decode(preds.data, preds_size.data, raw=False)
    print('%-30s => %-30s' % (raw_pred, sim_pred))
    return sim_pred

'''
*****FUNCTION FOR PREDICTING TEXTS IN THE INVOICE*****
'''

def load_images_to_predict():
    # load model
    model_path = '/content/drive/My Drive/ICDAR_Dataset/expr/netCRNN_199_423.pth'   #path to weights for pre trained model
    alphabet = '0123456789,.:(%$!^&-/);<~|`>?+=_[]{}"\'@#*ABCDEFGHIJKLMNOPQRSTUVWXYZ\ '
    imgH = 32 # should be 32
    nclass = len(alphabet) + 1
    nhiddenstate = 256

    model = CRNN(imgH, 1, nclass, nhiddenstate)
    if torch.cuda.is_available():
        model = model.cuda()
    print('loading pretrained model from %s' % model_path)
    model.load_state_dict({k.replace('module.',''):v for k,v in torch.load(model_path).items()})

    # load image
    filenames = [os.path.splitext(f)[0] for f in glob.glob("/content/drive/My Drive/ICDAR_Dataset/data_test/*.jpg")] #path to test dataset
    print(len(filenames))
    jpg_files = [s + ".jpg" for s in filenames]
    for jpg in jpg_files:
        image = Image.open(jpg).convert('L')
        words_list = []
        
        with open('/content/drive/My Drive/ICDAR_Dataset/boundingbox/'+jpg.split('/')[-1].split('.')[0]+'.txt', 'r') as boxes: #Path for bounding box folder
            for line in csv.reader(boxes):
                box = [int(string, 10) for string in line[0:8]]
                boxImg = image.crop((box[0], box[1], box[4], box[5]))
                words = predict_this_box(boxImg, model, alphabet)
                words_list.append(words)
        
        with open('/content/drive/My Drive/ICDAR_Dataset/test_result/'+jpg.split('/')[-1].split('.')[0]+'.txt', 'w+') as resultfile: #where you want to save your results for test data
            for line in words_list:
                resultfile.writelines(line+'\n')


def process_txt():
    filenames = [os.path.splitext(f)[0] for f in glob.glob("/content/drive/My Drive/ICDAR_Dataset/test_result/*.txt")]#path for test result which is saved earlier
    old_files = [s + ".txt" for s in filenames]
    for old_file in old_files:
        new = []
        with open(old_file, "r") as old:
            for line in csv.reader(old):
                if not line:
                    continue
                if not line[0]:
                    continue
                if line[0][0] == ' ' or line[0][-1] == ' ':
                    line[0] = line[0].strip()
                if ' ' in line[0]:
                    line = line[0].split(' ')
                new.append(line)
        with open('/content/drive/My Drive/ICDAR_Dataset/task2_result/' + old_file.split('/')[1], "w+") as newfile: #answer for task2
            wr = csv.writer(newfile, delimiter = '\n')
            new = [[s[0].upper()] for s in new]
            wr.writerows(new)


def for_task3():
    filenames = [os.path.splitext(f)[0] for f in glob.glob("/content/drive/My Drive/ICDAR_Dataset/boundingbox/*.txt")]
    box_files = [s + ".txt" for s in filenames]
    for boxfile in box_files:
        box = []
        with open(boxfile,'r') as boxes:
            for line in csv.reader(boxes):
                print(line)
                box.append([int(string, 10) for string in line[0:8]])
        words = []
        
        with open('/content/drive/My Drive/ICDAR_Dataset/test_result/'+ boxfile.split('/')[-1], 'r') as prediction:
            for line in csv.reader(prediction):
                words.append(line)
        words = [s if len(s)!=0 else [' '] for s in words]
        new = []
        for line in zip(box,words):
            a,b = line
            new.append(a+b)
       
        with open('/content/drive/My Drive/ICDAR_Dataset/for_task3/'+ boxfile.split('/')[-1], 'w+') as newfile:#saving information for making visualization
            csv_out = csv.writer(newfile)
            for line in new:
                csv_out.writerow(line)


def draw():
    
    filenames = [os.path.splitext(f)[0] for f in glob.glob("/content/drive/My Drive/ICDAR_Dataset/for_task3/*.txt")]
    txt_files = [s + ".txt" for s in filenames]
    for txt in txt_files:
        print(txt.split('/')[-1].split('.')[0])
        image = cv2.imread('/content/drive/My Drive/ICDAR_Dataset/data_test/'+ txt.split('/')[-1].split('.')[0]+'.jpg', cv2.IMREAD_COLOR)
        with open(txt, 'r') as txt_file:
            for line in csv.reader(txt_file):
                for string in line[1:]:
                  print(string)
                box = [int(string, 10) for string in line[0:8]]
                print(len(box))
                if len(line) < 9:
                    print(txt)
                cv2.rectangle(image, (box[0], box[1]), (box[4], box[5]), (0,255,0), 2)
                font = cv2.FONT_HERSHEY_SIMPLEX
                cv2.putText(image, line[8].upper(), (box[0],box[1]), font, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
        cv2.imwrite('/content/drive/My Drive/ICDAR_Dataset/darw_result_task2/'+ txt.split('/')[-1].split('.')[0]+'.jpg', image)


if __name__ == "__main__":
    load_images_to_predict()
    process_txt()
    for_task3()
    draw()


[1;30;43mStreaming output truncated to the last 5000 lines.[0m
621
464
648
368
648
-1.10
8
968
416
968
416
990
64
990
and enjoy special discounts
8
596
192
596
192
618
48
618
2pc @ 5.50
8
199
448
199
448
220
16
220
no8 jalan 7/118b desa tam razak
8
436
400
436
400
458
32
458
meaber card no: 10010168849
8
1079
384
1079
384
1096
80
1096
www.popularonline.com.my
8
888
464
888
464
911
368
911
-1.10
8
X51009453729
619
512
619
512
639
432
639
100.00
8
271
448
271
448
292
176
292
gst id : 002139201536
8
586
368
586
368
607
192
607
total amt payable:
8
157
368
157
368
175
256
175
(162761-m)
8
754
480
754
480
775
416
775
(rm) :
8
734
368
734
368
754
112
754
gst summary amount
8
617
368
617
368
638
240
638
paid amount
8
788
368
788
368
805
304
805
75.00
8
736
480
736
480
753
432
753
tax :
8
398
528
398
528
418
96
418
jahe h389/10.:95m) 100 sr 830
8
248
384
248
384
264
240
264
darul ehsan
8
586
512
586
512
605
432
605
19.50/
8
304
384
304
384
325
240
325
tax invoice
8
788
208
788
208
806
112
80