In [13]:
import csv
import numpy as np
import random
import os

from PIL import Image

import torch as t
import torch.nn as nn
import torch.nn.functional as F
from torchvision import transforms as  T
from torch.utils.data import Dataset, DataLoader

In [14]:
TRAIN_PATH = "/kaggle/input/captcha-hacker/train"
TEST_PATH = "/kaggle/input/captcha-hacker/test"
MODEL_PATH = "/kaggle/input/mlhw5model"
device = "cuda" if t.cuda.is_available() else "cpu"

In [15]:
keys = [ i for i in range(36)]
values = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z']
elements = dict(zip(keys, values))
print(elements)

{0: '0', 1: '1', 2: '2', 3: '3', 4: '4', 5: '5', 6: '6', 7: '7', 8: '8', 9: '9', 10: 'a', 11: 'b', 12: 'c', 13: 'd', 14: 'e', 15: 'f', 16: 'g', 17: 'h', 18: 'i', 19: 'j', 20: 'k', 21: 'l', 22: 'm', 23: 'n', 24: 'o', 25: 'p', 26: 'q', 27: 'r', 28: 's', 29: 't', 30: 'u', 31: 'v', 32: 'w', 33: 'x', 34: 'y', 35: 'z'}


In [16]:
def numType(task):
    if task == 'task1':
        return int(10)
    else:
        return int(36)

In [17]:
class LoadData(Dataset):
    def __init__(self, data, root, task="task1", H=72, W=72):
        self.data = [sample for sample in data if sample[0].startswith(task)]
        self.root = root
        self.transform = T.Compose([
            T.Resize((H, W)),
            T.ToTensor(),
            T.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
        ])
    
    def __getitem__(self, index):
        filename, _ = self.data[index]
        img = Image.open(f"{self.root}/{filename}")
        img = self.transform(img)
        return img, filename

    def __len__(self):
        return len(self.data)

In [18]:
class Residual(nn.Module):
    def __init__(self, inChannel, outChannel, stride=1):
        super(Residual, self).__init__()
        self.conv2_1 = nn.Conv2d(inChannel, outChannel, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn2_1 = nn.BatchNorm2d(outChannel, track_running_stats=True)
        self.relu = nn.ReLU(inplace=True)
        self.conv2_2 = nn.Conv2d(outChannel, outChannel, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2_2 = nn.BatchNorm2d(outChannel, track_running_stats=True)
        self.shortcut = nn.Sequential()
        if stride != 1 or inChannel != outChannel:
            self.shortcut = nn.Sequential(
                nn.Conv2d(inChannel, outChannel, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(outChannel, track_running_stats=True)
            )

    def forward(self, x):
        out = self.conv2_1(x)
        out = self.bn2_1(out)
        out = self.relu(out)
        out = self.conv2_2(out)
        out = self.bn2_2(out)
        out += self.shortcut(x)
        out = self.relu(out)
        return out

In [19]:
class Model(nn.Module):
    def __init__(self, Residual, task='task1'):
        super(Model, self).__init__()
        self.task = task
        self.inChannel = 36
        self.conv1 = nn.Sequential(
            nn.Conv2d(3, 36, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(36, track_running_stats=True),
            nn.ReLU()
        )
        self.layer1 = self.make_layer(Residual, outChannel=36, num=2, stride=1)
        self.layer2 = self.make_layer(Residual, outChannel=72, num=2, stride=2)
        self.layer3 = self.make_layer(Residual, outChannel=144, num=2, stride=2)
        self.layer4 = self.make_layer(Residual, outChannel=288, num=2, stride=2)
        self.fc1 = nn.Linear(288, numType(task))
        self.fc2 = nn.Linear(288, numType(task))
        self.fc3 = nn.Linear(288, numType(task))
        self.fc4 = nn.Linear(288, numType(task))

    def make_layer(self, Residual, outChannel, num, stride):
        layers = list()
        strides = [stride] + [1] * (num - 1)
        for stride in strides:
            layers.append(Residual(self.inChannel, outChannel, stride))
            self.inChannel = outChannel
        return nn.Sequential(*layers)

    def forward(self, x):       #print("IN")#print(x.shape)
        x = self.conv1(x)       #print("conv1")#print(x.shape)
        x = self.layer1(x)      #print("layer1")#print(x.shape)
        x = self.layer2(x)      #print("layer2")#print(x.shape)
        x = self.layer3(x)      #print("layer3")#print(x.shape)
        x = self.layer4(x)      #print("layer4")#print(x.shape)
        x = F.avg_pool2d(x, 9)  #print("avg_pool2d")#print(x.shape)
        x = x.view(-1, 288)
        y = list()
        y.append(self.fc1(x))
        if self.task != 'task1':
            y.append(self.fc2(x))
            if self.task != 'task2':
                y.append(self.fc3(x))
                y.append(self.fc4(x))
        return y

In [20]:
task1_model = Model(Residual, task='task1').to(device)
task2_model = Model(Residual, task='task2').to(device)
task3_model = Model(Residual, task='task3').to(device)

In [21]:
if t.cuda.is_available():
    task1_model.load_state_dict(t.load(f'{MODEL_PATH}/task1_model.pth'))
    task2_model.load_state_dict(t.load(f'{MODEL_PATH}/task2_model.pth'))
    task3_model.load_state_dict(t.load(f'{MODEL_PATH}/task3_model.pth'))
else:
    task1_model.load_state_dict(t.load(f'{MODEL_PATH}/task1_model.pth', map_location=t.device('cpu')))
    task2_model.load_state_dict(t.load(f'{MODEL_PATH}/task2_model.pth', map_location=t.device('cpu')))
    task3_model.load_state_dict(t.load(f'{MODEL_PATH}/task3_model.pth', map_location=t.device('cpu')))

In [22]:
test_data = []
with open(f'{TEST_PATH}/../sample_submission.csv', newline='') as csvfile:
    for row in csv.reader(csvfile, delimiter=','):
        test_data.append(row)

In [23]:
task1_test_ds = LoadData(test_data, root=TEST_PATH, task='task1')
task1_test_dl = DataLoader(task1_test_ds, batch_size=128, num_workers=2, drop_last=False, shuffle=False)

task2_test_ds = LoadData(test_data, root=TEST_PATH, task='task2')
task2_test_dl = DataLoader(task2_test_ds, batch_size=256, num_workers=2, drop_last=False, shuffle=False)

task3_test_ds = LoadData(test_data, root=TEST_PATH, task='task3')
task3_test_dl = DataLoader(task3_test_ds, batch_size=256, num_workers=2, drop_last=False, shuffle=False)

In [24]:
with open('submission.csv', 'w', newline='') as csvfile:
    csv_writer = csv.writer(csvfile)
    csv_writer.writerow(["filename", "label"])
    
    task1_model.eval()
    for image, filenames in task1_test_dl:
        image = image.to(device)
        
        pred = task1_model(image)
        
        s = ['' for i in range(128)]
        for p in pred:
            index = t.argmax(p, dim=1)
            for i in range(len(index)):
                s[i] += elements[int(index[i].item())]
        
        for i in range(len(filenames)):
            if s[i] == '':
                continue
            csv_writer.writerow([filenames[i], s[i]])
            
    print('task1 finish!')
    
    task2_model.eval()
    for image, filenames in task2_test_dl:
        image = image.to(device)
        
        pred = task2_model(image)
        
        s = ['' for i in range(256)]
        for p in pred:
            index = t.argmax(p, dim=1)
            for i in range(len(index)):
                s[i] += elements[int(index[i].item())]
        
        for i in range(len(filenames)):
            if s[i] == '':
                continue
            csv_writer.writerow([filenames[i], s[i]])
    
    print('task2 finish!')
    
    task3_model.eval()
    for image, filenames in task3_test_dl:
        image = image.to(device)
        
        pred = task3_model(image)
        
        s = ['' for i in range(256)]
        for p in pred:
            index = t.argmax(p, dim=1)
            for i in range(len(index)):
                s[i] += elements[int(index[i].item())]
        
        for i in range(len(filenames)):
            if s[i] == '':
                continue
            csv_writer.writerow([filenames[i], s[i]])
    
    print('task3 finish!')

task1 finish!
task2 finish!
task3 finish!
