In [1]:
import pandas as pd
import numpy as np
import torch
from torch import nn
from torch.nn import init
import torchvision
from torchvision import transforms
import albumentations as A
from efficientnet_pytorch import EfficientNet
import gc
import cv2
from tqdm import tqdm
import sklearn.metrics
import json
import functools
import itertools
import random

In [2]:
MEAN = [0.5, 0.5, 0.5]
STD = [0.5, 0.5, 0.5]
IMG_HEIGHT = 224
IMG_WIDTH = 224
BATCH_SIZE = 8
EPOCH = 1
TQDM_DISABLE = True

device = torch.device('cuda')

In [3]:
def load_images(paths):
    all_images = []
    for path in paths:
        image_df = pd.read_parquet(path)
        images = image_df.iloc[:, 1:].values.reshape(-1, 137, 236).astype(np.uint8)
        del image_df
        gc.collect()
        all_images.append(images)
    all_images = np.concatenate(all_images)
    return all_images

In [5]:
font_data = pd.read_csv('font.csv')
font_images = load_images(['font_image_data_0.parquet',
                          'font_image_data_1.parquet',
                          'font_image_data_2.parquet',
                          'font_image_data_3.parquet'])
np.save('font_images.npy', font_images)
del font_images
gc.collect()

0

In [6]:
train_data = pd.read_csv('train.csv')
multi_diacritics_train_data = pd.read_csv('train_multi_diacritics.csv')
train_data = train_data = train_data.set_index('image_id')
multi_diacritics_train_data = multi_diacritics_train_data.set_index('image_id')
train_data.update(multi_diacritics_train_data)

In [15]:
train_images = load_images(['train_image_data_0.parquet',
                           'train_image_data_1.parquet',
                           'train_image_data_2.parquet',
                           'train_image_data_3.parquet'])

In [16]:
font_images = np.load('font_images.npy')

In [17]:
class GraphemeDataset(torch.utils.data.Dataset):
    
    def __init__(self, data, images, transform=None, num_grapheme_root=168, num_vowel_diacritic=11, num_consonant_diacritic=8):
        self.data = data
        self.grapheme_root_list = np.array(data['grapheme_root'].tolist(), dtype=np.int64)
        self.vowel_diacritic_list = np.array(data['vowel_diacritic'].tolist(), dtype=np.int64)
        self.consonant_diacritic_list = np.array(data['consonant_diacritic'].tolist(), dtype=np.int64)
        self.num_grapheme_root = num_grapheme_root
        self.num_vowel_diacritic = num_vowel_diacritic
        self.num_consonant_diacritic = num_consonant_diacritic
        self.images = images
        self.transform = transform
            
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        grapheme_root = self.grapheme_root_list[idx]
        vowel_diacritic = self.vowel_diacritic_list[idx]
        consonant_diacritic = self.consonant_diacritic_list[idx]
        label = (grapheme_root*self.num_vowel_diacritic+vowel_diacritic)*self.num_consonant_diacritic+consonant_diacritic
        np_image = self.images[idx].copy()
        out_image = self.transform(np_image)
        return out_image, label
    
    
class Albumentations:
    def __init__(self, augmentations):
        self.augmentations = A.Compose(augmentations)
    
    def __call__(self, image):
        image = self.augmentations(image=image)['image']
        return image

In [18]:
preprocess = [
    A.CenterCrop(height=137, width=IMG_WIDTH),
    A.Resize(height=IMG_HEIGHT, width=IMG_WIDTH, always_apply=True),
]

augmentations = [
    A.PadIfNeeded(min_height=256, min_width=256, border_mode=cv2.BORDER_CONSTANT, value=[255, 255, 255], always_apply=True),
    A.imgaug.transforms.IAAAffine(shear=5, mode='constant', cval=255, always_apply=True),
    A.ShiftScaleRotate(rotate_limit=5, border_mode=cv2.BORDER_CONSTANT, value=[255, 255, 255], mask_value=[255, 255, 255], always_apply=True),
    A.RandomCrop(height=IMG_HEIGHT, width=IMG_WIDTH, always_apply=True),
]


train_transform = transforms.Compose([
    np.uint8,
    transforms.Lambda(lambda x: np.array([x, x, x]).transpose((1, 2, 0)) ),
    np.uint8,
    Albumentations(preprocess + augmentations),
    transforms.ToTensor(),
    transforms.Normalize(mean=MEAN, std=STD),
#     transforms.ToPILImage(),
])
valid_transform = transforms.Compose([
    np.uint8,
    transforms.Lambda(lambda x: np.array([x, x, x]).transpose((1, 2, 0)) ),
    np.uint8,
    Albumentations(preprocess),
    transforms.ToTensor(),
    transforms.Normalize(mean=MEAN, std=STD),
#     transforms.ToPILImage(),
])

In [19]:
hand_dataset = GraphemeDataset(train_data, train_images, valid_transform)
font_dataset = GraphemeDataset(font_data, font_images, train_transform)

In [20]:
class ResnetGenerator(nn.Module):
    def __init__(self, input_nc, output_nc, ngf=64, norm_layer=nn.BatchNorm2d, use_dropout=False, n_block=6, padding_type='reflect'):
        assert(n_blocks >=0)
        super(ResnetGenerator, self).__init__()
        if type(norm_layer) == functools.partial:
            use_bias = norm_layer.func == nn.InstanceNorm2d
        else:
            use_bias = norm_layer == nn.InstanceNorm2d
            
        model = [nn.ReflectionPad2d(3),
                 nn.Conv2d(input_nc, ngf, kernel_size=7, padding=0, bias=use_bias),
                 norm_layer(ngf),
                 nn.ReLU(True)]
        
        n_downsampling = 2
        for i in range(n_downsampling):
            mult = 2 ** i
            model += [nn.Conv2d(ngf * mult, ngf * mult * 2 , kernel_size= 3, stride=2, padding=1, bias=use_bias),
                     norm_layer(ngf * mult * 2),
                     nn.ReLU(True)]
            
        mult = 2 ** n_downsampling
        for i in range(n_blocks):
            model +=[ResnetBlock(ngf * mult, padding_type=padding_type, norm_layer = norm_layer, use_dropout=use_dropout, use_bias=use_bias)]
            
        for i in range(n_downsampling):
            mult = 2**(n_downsampling - i)
            model += [nn.ConvTranspose2d(ngf * mult, int(ngf * mult / 2), kernel_size=3, stride=2, padding=1, output_padding=1, bias=use_bias),
                     norm_layer(int(ngf * mult / 2)),
                     nn.ReLU(True)]
            model += [nn.ReflectionPad2d(3)]
            model += [nn.Conv2d(ngf, output_nc, kernel_size =7, padding=0)]
            model += [nn.Tanh()]
            
            self.model = nn.Sequential(*model)
            
    def forward(self, input):
        return self.model(input)
        
class ResnetBlock(nn.Module):
    def __init__(self, dim, padding_type, norm_layer, use_dropout, use_bias):
        super(ResnetBlock, self).__init__()
        self.conv_block = self.build_conv_block(dim, padding_type, norm_layer, use_dropout, use_bias)
    def build_conv_block(self, dim, padding_type, norm_layer, use_dropout, use_bias):
        conv_block = []
        
        p = 0
        if padding_type == 'reflect':
            conv_block += [nn.ReflectionPad2d(1)]
        elif padding_type =='replicate':
            conv_block += [nn.ReplicationPad2d(1)]
        elif padding_type =='zero':
            p = 1
        else:
            raise NotImplementedError('padding [%s] is not implemented' % padding_type)
            
        conv_block += [nn.Conv2d(dim, dim, kernel_size=3, padding=p, bias = use_bias), norm_layer(dim), nn.ReLU(True)]
        if use_dropout:
            conv_block +=[nn.Dropout(0.5)]
            
        p = 0
        if padding_type == 'reflect':
            conv_block += [nn.ReflectionPad2d(1)]
        elif padding_type == 'replicate':
            conv_block += [nn.ReplicationPad2d(1)]
        elif padding_type == 'zero':
            p = 1
        else:
            raise NotImplementedError('padding [%s] is not implemented' % padding_type)
        conv_block += [nn.Conv2d(dim, dim, kernel_size=3, padding=p, bias=use_bias), norm_layer(dim)]

        return nn.Sequential(*conv_block)
    
    def forward(self, x):
        out = x + self.conv_block(x)
        return out

In [21]:
class NLayerDiscriminator(nn.Module):
    def __init__(self, input_nc, ndf=64, n_layers=3, norm_layer = nn.BatchNorm2d):
        super(NLayerDiscriminator, self).__init__()
        if type(norm_layer) == functools.partial:
            use_bias = norm_layer.func == nn.InstanceNorm2d
        else:
            use_bias = norm_layer == nn.InstanceNorm2d
        
        kw = 4
        padw = 1
        sequence = [nn.Conv2d(input_nc, ndf, kernel_size = kw, stride=2, padding=padw), 
                    nn.LeakyReLU(0.2, True)]
        nf_mult = 1
        nf_mult_prev = 1
        for n in rnage(1, n_layers):
            nf_mult_prev = nf_mult
            nf_mult = min(2**n, 8)
            sequence += [nn.Conv2d(ndf * nf_mult_prev, ndf*nf_mult, kernel_size=kw, stride=2, padding=padw, bias=use_bias),
                         norm_layer(ndf * nf_mult), 
                         nn.LeakyReLU(0.2, True)]
            
        nf_mult_prev = nf_mult
        nf_mult = min(2 ** n_layers, 8)
        sequence += [
            nn.Conv2d(ndf * nf_mult_prev, ndf * nf_mult, kernel_size = kw, stride=1, padding=padw, bias=use_bias),
            norm_layer(ndf * nf_mult),
            nn.LeakyReLU(0.2, True)
        ]
        
        sequence += [nn.Conv2d(ndf * nf_mult, 1, kernel_size=kw, stride=1, padding=padw)]
        self.model = nn.Sequential(*sequence)
        
    def forward(self, input):
        return self.model(input)

In [22]:
def init_weight(net, init_gain):
    def init_func(m):
        classname = m.__class__.__name__
        if hasattr(m, 'weight') and (classname.find('Conv') != -1 or classname.find('Linear')!= -1):
            init.normal_(m.weight.data, 0.0, init_gain)
            if hasattr(m, 'bias'):
                init.constant_(m.bias.data, 0.0)
        elif classname.find('BatchNorm2d') != -1:
            init.normal_(m.weight.data, 1.0, init_gain)
            init.constant_(m.bias.data, 0.0)
    net.apply(init_func)

In [23]:
class ImagePool():
    def __init__(self, pool_size):
        self.pool_size = pool_size
        if self.pool_size > 0 :
            self.num_imgs = 0
            self.images = []
            
    def query(self, images):
        if self.pool_size ==0:
            return images
        return_images = []
        for image in images:
            image = torch.unsqueeze(image.data, 0)
            if self.num_imgs < self.pool_size :
                self.num_imgs = self.num_imgs +1
                self.images.append(image)
                return_images.append(image)
            else:
                p = random.uniform(0,1)
                if p > 0.5:
                    random_id = random.randint(0, self.pool_size - 1)
                    tmp = self.images[random_id].clone()
                    self.images[random_id] = image
                    return_images.append(tmp)
                else:
                    return_images.append(image)
                    
        return_images = torch.cat(return_images, 0) 
        return return_images
                

In [24]:
class GANLoss(nn.Module):
    def __init__(self, gan_mode, target_real_label = 1.0, target_fake_label = 0.0):
        super(GANLoss, self).__init__()
        self.register_buffer('real_label', torch.tensor(target_real_label))
        self.register_buffer('fake_label', torch.tensor(target_fake_label))
        self.gan_mode = gan_mode
        if gan_mode == 'lsgan':
            self.loss = nn.MSELoss()
        elif gan_mode == 'vanilla':
            self.loss = nn.BCEWithLogitsLoss()
        elif gan_mode in ['wganp']:
            self.loss = None
        else :
            raise NotImplementedError('gan mode %s not implemented' % gan_mode)
            
    def get_target_tensor(self, prediction, target_is_real):
        
        if target_is_real:
            target_tensor = self.real_label
        else:
            target_tensor = self.fake_label
        return target_tensor.expand_as(prediction)
    
    def __call__(self, prediction, target_is_real):
        
        if self.gan_mode in ['lsgan', 'vanilla']:
            target_tensor = self.get_target_tensor(prediction, target_is_real)
            loss = self.loss(prediction, target_tensor)
        elif self.gan_mode == 'wgangp':
            if target_is_real:
                loss = -prediction.mean()
            else:
                loss = prediction.mean()
        return loss
            

In [25]:
class BengalModel(nn.Module):
    def __init__(self, backbone, hidden_size = 2560, class_num = 168*11*7):
        super(BengalModel, self).__init__()
        self.backbone = backbone
        self._avg_pooling = nn.AdaptiveAvgPool2d(1) #square shape
        self.fc = nn.Linear(hidden_size, class_num)
        self.ln = nn.LayerNorm(hidden_size)
        
    def forward(self, inputs):
        bs = inputs.shape[0]
        feature = self.backbone.extract_features(inputs)
        feature_vector = self._avg_pooling(feature)
        feature_vector = feature_vector.view(bs, -1)
        feature_vector = self.ln(feature_vector)
        
        out = self.fc(feature_vector)
        return out