In [None]:
!pip install opencv-python
!pip install numpy
!pip install torch

In [93]:
import csv
import cv2
import numpy as np
import random
import os

from tqdm import tqdm

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

In [95]:
TRAIN_PATH = "/kaggle/input/captcha-hacker/train"
TEST_PATH = "/kaggle/input/captcha-hacker/test"
# device = "cpu"
device = 'cuda' if torch.cuda.is_available() else 'cpu'
# try device = "cuda" 
# and change your settings/accelerator to GPU if you want it to run faster

In [96]:
train_data = []
val_data = []
test_data = []

# with open(f'{TRAIN_PATH}/annotations.csv', newline='') as csvfile:
#     for row in csv.reader(csvfile, delimiter=','):
#         if random.random() < 0.7:
#             train_data.append(row)
#         else:
#             val_data.append(row)

# with open(f'{TEST_PATH}/../sample_submission.csv', newline='') as csvfile:
#     for row in csv.reader(csvfile, delimiter=','):
#         test_data.append(row)

with open(f'{TRAIN_PATH}/annotations.csv', newline='') as csvfile:
    for row in csv.reader(csvfile, delimiter=','):
        train_data.append(row)

with open(f'{TEST_PATH}/../sample_submission.csv', newline='') as csvfile:
    for row in csv.reader(csvfile, delimiter=','):
        test_data.append(row)

In [97]:
def load_checkpoint(filepath):
    checkpoint = torch.load(filepath)
    model = checkpoint['model']
    model.load_state_dict(checkpoint['state_dict'])
    for parameter in model.parameters():
        parameter.requires_grad = False
    
    model.eval()
    
    return model

In [98]:
model1 = load_checkpoint('task1.pth')
model1 = model1.to(device)
print(model1)
model2 = load_checkpoint('task2.pth')
model2 = model2.to(device)
print(model2)
model3 = load_checkpoint('task3.pth')
model3 = model3.to(device)
print(model3)

Model1(
  (conv1): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
  )
  (conv2): Sequential(
    (0): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
  )
  (maxpool1): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (dropout1): Dropout(p=0.2, inplace=False)
  (conv3): Sequential(
    (0): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
  )
  (conv4): Sequential(
    (0): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
  )
  (maxpool2): MaxPool2d(ker

In [99]:
LETTERSTR = "0123456789abcdefghijklmnopqrstuvwxyz"

def encode(text):
    labellist = []
    for letter in text:
        num = LETTERSTR.find(letter)
        labellist.append(num)
    return labellist

LETTERSTR_char = "0,1,2,3,4,5,6,7,8,9,a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r,s,t,u,v,w,x,y,z"
LETTERSTR_char = LETTERSTR_char.split(",")
LETTERSTR_index = [i for i in range(36)]
decode = dict(zip(LETTERSTR_index, LETTERSTR_char))

In [100]:
def ExtractData(data, root, task_round):
    filenames = np.array([sample for sample in data if sample[0].startswith(task_round)])
    imgs_names = filenames[:, 0]
    label = filenames[:, 1]
    imgs = np.zeros(shape=(len(filenames), 32, 32, 1))
    labels = np.zeros(shape=(len(filenames), len(label[0])))
    for i in range(len(filenames)):
        tmp = cv2.imread(f"{root}/{imgs_names[i]}")
        tmp = cv2.resize(tmp, (32, 32))
        tmp = cv2.cvtColor(tmp, cv2.COLOR_BGR2GRAY)
        imgs[i] = np.expand_dims(tmp, axis=2)
        tmp_label = encode(label[i])
        labels[i] = np.array(tmp_label)
    return imgs_names, imgs, labels

class Task1Dataset(Dataset):
    def __init__(self, data, root, return_filename=False):
        self.data = [sample for sample in data if sample[0].startswith("task1")]
        self.return_filename = return_filename
        self.root = root
    
    def __getitem__(self, index):
        filename, label = self.data[index]
        img = cv2.imread(f"{self.root}/{filename}")
        img = cv2.resize(img, (32, 32))
        
        img = torch.tensor(img).permute(2,0,1)
        if self.return_filename:
            return torch.FloatTensor((img - 128) / 128), filename
        else:
            return torch.FloatTensor((img - 128) / 128), int(label)

    def __len__(self):
        return len(self.data)

class TaskDataset(Dataset):
    def __init__(self, x_data, y_data, filenames, return_filename=False):
        x_data = x_data.astype('float32')
        x_data = x_data / 255.0
        self.x_data = torch.from_numpy(x_data).permute(0, 3, 1, 2) # (N, 1, 32, 32)
        self.y_data = torch.from_numpy(y_data) # (N, 1)
        self.filenames = filenames
        self.return_filename = return_filename
        
    def __getitem__(self, index):
        if self.return_filename:
            return self.x_data[index], self.filenames[index] # aug_num = 5
        else:
            return self.x_data[index], self.y_data[index]
        
    def __len__(self):
        return len(self.x_data)

In [101]:
# task1 data processing
test_ds = Task1Dataset(test_data, root=TEST_PATH, return_filename=True)
test_dl = DataLoader(test_ds, batch_size=195, num_workers=2, drop_last=False, shuffle=False)

In [102]:
# predict test1 write to csv
csv_writer = csv.writer(open('submission.csv', 'w', newline=''))
csv_writer.writerow(["filename", "label"])

# model.eval()
for image, filenames in test_dl:
    image = image.to(device)
    
    pred = model1(image)
    pred = torch.argmax(pred, dim=1)
    
    for i in range(len(filenames)):
        csv_writer.writerow([filenames[i], str(pred[i].item())])

In [103]:
# task2 data processing
filename_test, x_test, y_test = ExtractData(test_data, root=TEST_PATH, task_round="task2")
test_ds = TaskDataset(x_test, y_test, filename_test, return_filename=True)
test_dl = DataLoader(test_ds, batch_size=75, num_workers=2, drop_last=False, shuffle=False)

In [104]:
# predict test2 write to csv
if os.path.exists('submission.csv'):
    csv_writer = csv.writer(open('submission.csv', 'a', newline=''))
else:
    csv_writer = csv.writer(open('submission.csv', 'w', newline=''))
    csv_writer.writerow(["filename", "label"])


# model.eval()
for image, filenames in test_dl:
    image = image.to(device)
    
    pred = model2(image)
    pred1 = torch.argmax(pred[0], dim=1)
    pred2 = torch.argmax(pred[1], dim=1)
    for i in range(len(filenames)):
        cur_string = decode[pred1[i].item()]+decode[pred2[i].item()]
        csv_writer.writerow([filenames[i], cur_string])

In [105]:
# task3 data processing
filename_test, x_test, y_test = ExtractData(test_data, root=TEST_PATH, task_round="task3")
test_ds = TaskDataset(x_test, y_test, filename_test, return_filename=True)
test_dl = DataLoader(test_ds, batch_size=30, num_workers=2, drop_last=False, shuffle=False)

In [106]:
# predict task3 write to csv
csv_writer = csv.writer(open('submission.csv', 'a', newline=''))

# model.eval()
for image, filenames in test_dl:
    image = image.to(device)
    
    pred = model3(image)
    pred1 = torch.argmax(pred[0], dim=1)
    pred2 = torch.argmax(pred[1], dim=1)
    pred3 = torch.argmax(pred[2], dim=1)
    pred4 = torch.argmax(pred[3], dim=1)
    
    for i in range(len(filenames)):
        cur_string = decode[pred1[i].item()]+decode[pred2[i].item()]+decode[pred3[i].item()]+decode[pred4[i].item()]
        csv_writer.writerow([filenames[i], cur_string])
        
    """
    for filename, value in filenames:
        cur_string = decode[pred1[i].item()]+decode[pred2[i].item()]+decode[pred3[i].item()]+decode[pred4[i].item()]
        csv_writer.writerow(filename, value)
    """