In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import torch
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Dataset, random_split
import torch.nn as nn
import torch.optim as optim
from torchvision import models

import torch.nn.functional as F
import matplotlib.pyplot as plt

import cv2 as cv

import copy

import os
from PIL import Image,ImageFilter


print('packages loaded!')

In [None]:
class VowelConsonantDataset(Dataset):
    def __init__(self, file_path,train=True,transform=None):
        self.transform = transform
        self.file_path=file_path
        self.train=train
        self.file_names=[file for _,_,files in os.walk(self.file_path) for file in files]
        self.len = len(self.file_names)
        if self.train:
            self.classes_mapping=self.get_classes()
    def __len__(self):
        return len(self.file_names)
    
    def __getitem__(self, index):
        file_name=self.file_names[index]
        image_data=self.pil_loader(self.file_path+"/"+file_name)
        #image_data = image_data.filter(ImageFilter.GaussianBlur(radius=5))
        
        if self.transform:
            image_data = self.transform(image_data)
        if self.train:
            file_name_splitted=file_name.split("_")
            Y1 = self.classes_mapping[file_name_splitted[0]]
            Y2 = self.classes_mapping[file_name_splitted[1]]
            z1,z2=torch.zeros(10),torch.zeros(10)
            z1[Y1-10],z2[Y2]=1,1
            #label=torch.stack([z1,z2])
            #_, label = torch.max(label.data, 1)
           
            label = torch.Tensor([Y1-10, Y2])             

            return image_data, label

        else:
            return image_data, file_name
          
    def pil_loader(self,path):
        with open(path, 'rb') as f:
            img = Image.open(f)
            return img.convert('RGB')

      
    def get_classes(self):
        classes=[]
        for name in self.file_names:
            name_splitted=name.split("_")
            classes.extend([name_splitted[0],name_splitted[1]])
        classes=list(set(classes))
        classes_mapping={}
        for i,cl in enumerate(sorted(classes)):
            classes_mapping[cl]=i
                
        return classes_mapping

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

In [None]:
transform= transforms.Compose([ 
    transforms.RandomAffine(10, translate=(0.1,0.1), scale=None, shear=None, resample=False, fillcolor=0),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5],
                                 std=[0.5, 0.5, 0.5])
])

In [None]:
full_data = VowelConsonantDataset("../input/train/train",train=True,transform=transform)
train_size = int(0.9 * len(full_data))
test_size = len(full_data) - train_size

train_data, validation_data = random_split(full_data, [train_size, test_size])


In [None]:
num_classes = 10

In [None]:
class ResNet50Bottom(nn.Module):
    def __init__(self, original_model):
        super(ResNet50Bottom, self).__init__()
        self.features = nn.Sequential(*list(original_model.children())[:-1])
        self.vowel = nn.Sequential(
            nn.Linear(original_model.fc.in_features, num_classes),
            nn.LeakyReLU(0.2, True)
            #nn.LogSoftmax(dim = 1)
        )
        self.cons = nn.Sequential(
            nn.Linear(original_model.fc.in_features, 10),
            nn.LeakyReLU(0.2, True)
           # nn.LogSoftmax(dim = 1)            
        )
        
    def forward(self, x):
        x = self.features(x)
        x = x.view(x.size(0), -1)
        v = self.vowel(x)
        #_, v = torch.max(v.data, 1)
        c = self.cons(x)
        #_, c = torch.max(c.data, 1)
        return v,c   
      

res50_model = models.resnet152(pretrained=True)
final_model = ResNet50Bottom(res50_model)

In [None]:
for param in final_model.features.parameters():
    param.requires_grad = True

In [None]:
#for param in final_model.parameters():
    #if param.requires_grad:
        #print(param.shape)

In [None]:
num_classes = 10

In [None]:
batch_size = 64
trainloader = torch.utils.data.DataLoader(train_data, batch_size=batch_size, shuffle=True)
validationloader = torch.utils.data.DataLoader(validation_data, batch_size=batch_size, shuffle=True)
test_data = VowelConsonantDataset("../input/test/test",train=False,transform=transform)
testloader = torch.utils.data.DataLoader(test_data, batch_size=batch_size,shuffle=False)


In [None]:
final_model = final_model.to(device)
loss_fn = nn.CrossEntropyLoss()
opt = optim.SGD(final_model.parameters(), lr=0.1)
#opt = optim.RMSprop(final_model.parameters(), lr = 0.01, alpha = 0.9)
#opt = optim.Adam(final_model.parameters(), lr = 0.01, betas= (0.9, 0.99))

In [None]:
loss_epoch_arr = []
max_epochs = 0

min_loss = 1000

n_iters = np.ceil(train_size/batch_size)

for epoch in range(max_epochs):

    for i, data in enumerate(trainloader, 0):
        inputs, labels = data
        inputs, labels = inputs.to(device), labels.to(device, dtype=torch.int64)
        
        opt.zero_grad()

        out_v,out_c = final_model(inputs)
        loss_v = loss_fn(out_v,labels[:,:1].view(len(labels)))
        loss_c = loss_fn(out_c,labels[:,1:2].view(len(labels)))
        loss = loss_v + loss_c
        loss.backward()
        opt.step()
        
        if min_loss > loss.item():
            min_loss = loss.item()
            best_model = copy.deepcopy(final_model.state_dict())
            print('Epoch:',epoch,'; Min loss %0.2f' % min_loss)
        
        if i % 100 == 0:
            print('Iteration: %d/%d, Loss: %0.2f' % (i, n_iters, loss.item()))
            
        del inputs, labels, out_v,out_c
        torch.cuda.empty_cache()
        
    loss_epoch_arr.append(loss.item())
        
   
    
plt.plot(loss_epoch_arr)
plt.show()

In [None]:
print('Min loss %0.2f' % min_loss)

In [None]:
def evaluation(dataloader, model):
    total, correct = 0, 0
    for data in dataloader:
        inputs, labels = data
        inputs, labels = inputs.to(device), labels.to(device, dtype=torch.int64)
        out_v,out_c = model(inputs)
        _, pred_v = torch.max(out_v.data, 1)
        _, pred_c = torch.max(out_c.data, 1)
        total += labels.size(0)
        
        for i, (v, c) in enumerate(zip(pred_v, pred_c)):
            if((v.item()==labels[i][0].item()) and (c.item()==labels[i][1].item())):
                correct +=1
            #else:
                #print('Org Lbl: V',str(labels[i][0].item()) , '_C' , str(labels[i][1].item()), '; Pred lbl: V',str(v.item()) , '_C' + str(c.item()) )            
    return 100 * correct / total

In [None]:
final_model.load_state_dict(best_model)
#print(evaluation(trainloader, final_model), evaluation(validationloader, final_model))
#print(evaluation(validationloader, final_model))


In [None]:
fulldataloader = torch.utils.data.DataLoader(full_data, batch_size=batch_size, shuffle=True)

In [None]:
loss_epoch_arr = []
max_epochs = 40

min_loss = 1000

n_iters = np.ceil(len(full_data)/batch_size)

for epoch in range(max_epochs):

    for i, data in enumerate(fulldataloader, 0):
        inputs, labels = data
        inputs, labels = inputs.to(device), labels.to(device, dtype=torch.int64)
        
        opt.zero_grad()

        out_v,out_c = final_model(inputs)
        loss_v = loss_fn(out_v,labels[:,:1].view(len(labels)))
        loss_c = loss_fn(out_c,labels[:,1:2].view(len(labels)))
        loss = loss_v + loss_c
        loss.backward()
        opt.step()
        
        if min_loss > loss.item():
            min_loss = loss.item()
            best_model = copy.deepcopy(final_model.state_dict())
            print('Min loss %0.2f' % min_loss)
        
        if i % 100 == 0:
            print('Iteration: %d/%d, Loss: %0.2f' % (i, n_iters, loss.item()))
            
        del inputs, labels, out_v,out_c
        torch.cuda.empty_cache()
        
    loss_epoch_arr.append(loss.item())
        
   
    
plt.plot(loss_epoch_arr)
plt.show()

In [None]:
final_model.load_state_dict(best_model)

In [None]:
final_ImageId = []
final_Class= []

for data in testloader:
    inputs, labels = data
    inputs, labels = inputs.to(device), labels
    out_v,out_c = final_model(inputs)
    _, pred_v = torch.max(out_v.data, 1)
    _, pred_c = torch.max(out_c.data, 1)
    
    for i, (v, c) in enumerate(zip(pred_v, pred_c)):
        final_ImageId.append(labels[i])
        final_Class.append('V' + str(v.item()) + '_C' + str(c.item()))      

In [None]:
submission = {}
submission['ImageId'] = final_ImageId
submission['Class'] = final_Class
print(final_ImageId)
print(final_Class)

submission = pd.DataFrame(submission)
submission = submission[['ImageId', 'Class']]
submission = submission.sort_values(['ImageId'])
submission.to_csv("submisision.csv", index=False)