In [16]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings("ignore")
import torch
import torch.nn as nn
import torch.nn.init as init
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader, RandomSampler, SequentialSampler

import torchvision.models as models
import torchvision.datasets as dset
import torchvision.transforms as transforms

import os
import cv2
import copy
import time
from random import *
from collections import defaultdict

from sklearn.model_selection import train_test_split

In [17]:
train = pd.read_csv('train.csv')
test  = pd.read_csv('test.csv')
submission = pd.read_csv('submission.csv')

In [18]:
x_train = np.concatenate(
    [
        pd.get_dummies(train['letter']).values.reshape(-1, 1, 26),
        (train[[str(i) for i in range(784)]] / 255.).values.reshape(-1, 1, 784)
    ],
    axis=2
)
y_train = train['digit'].values

In [19]:
x_train, x_valid, y_train, y_valid = train_test_split(x_train, y_train, test_size=0.2, random_state=123)

In [5]:
# tensor로 형변환
x_train = torch.Tensor(x_train)
x_valid = torch.Tensor(x_valid)
y_train = torch.LongTensor(y_train)
y_valid = torch.LongTensor(y_valid)

In [6]:
train_data = TensorDataset(
    x_train[:, :, :26], # Letter
    x_train[:, :, 26:].reshape(-1, 1, 28, 28), # Image
    y_train # Digit
)
train_sampler = RandomSampler(train_data)
train_dataloader = DataLoader(train_data, sampler=train_sampler, batch_size=32)

valid_data = TensorDataset(
    x_valid[:, :, :26],
    x_valid[:, :, 26:].reshape(-1, 1, 28, 28),
    y_valid
)
valid_sampler = SequentialSampler(valid_data)
valid_dataloader = DataLoader(valid_data, sampler=valid_sampler, batch_size=32)

In [15]:
train_data.tensors[0].shape

torch.Size([1638, 1, 26])

In [13]:
train_data.tensors[1].shape

torch.Size([1638, 1, 28, 28])

In [14]:
train_data.tensors[2].shape

torch.Size([1638])

In [7]:
class customCNN(nn.Module):
    def __init__(self):
        super().__init__()
        
        # Letter의 Convolution Block
        self.conv1 = nn.Sequential(
            nn.Conv1d(1, 8, 3, padding=1), nn.ReLU(), # 8@26
            nn.Conv1d(8, 16, 3, padding=1), nn.ReLU(), # 16@26
            nn.Conv1d(16, 32, 3, padding=1), nn.ReLU(), # 32@26
            nn.Conv1d(32, 64, 3, padding=1), nn.ReLU(), # 64@26
            nn.Conv1d(64, 128, 3, padding=1), nn.ReLU(), # 128@26
            nn.Conv1d(128, 256, 3, padding=1), nn.ReLU(), # 256@26
            nn.Conv1d(256, 128, 3, padding=1), nn.ReLU(), # 128@26
            nn.Conv1d(128, 64, 3, padding=1), nn.ReLU(), # 64@26
            nn.Conv1d(64, 32, 3, padding=1), nn.ReLU(), # 32@26
            nn.Conv1d(32, 16, 3, padding=1), nn.ReLU(), # 16@26
            nn.Conv1d(16, 8, 3, padding=1), nn.ReLU(), # 8@26
        )
        
        # Image의 Convolution Block
        self.conv2 = nn.Sequential(
            nn.Conv2d(1, 32, 3, padding=1), nn.ReLU(), # 16@28x28
            nn.Conv2d(32, 64, 3, padding=1), nn.ReLU(), # 64@28x28
            nn.Conv2d(64, 128, 3, padding=1), nn.ReLU(), # 128@28x28
            nn.Conv2d(128, 256, 3, padding=1), nn.ReLU(), # 256@28x28
            nn.Conv2d(256, 512, 3, padding=1), nn.ReLU(), # 512@28x28
            nn.Conv2d(512, 1024, 3, padding=1), nn.ReLU(), # 1024@28x28
            nn.Conv2d(1024, 2048, 3, padding=1), nn.ReLU(), # 2048@28x28
            nn.Conv2d(2048, 1024, 3, padding=1), nn.ReLU(), # 1024@28x28
            nn.Conv2d(1024, 512, 3, padding=1), nn.ReLU(), # 512@28x28
            nn.Conv2d(512, 256, 3, padding=1), nn.ReLU(), # 256@28x28
            nn.Conv2d(256, 128, 3, padding=1), nn.ReLU(), # 128@28x28
            nn.Conv2d(128, 64, 3, padding=1), nn.ReLU(), # 64@28x28
            nn.Conv2d(64, 32, 3, padding=1), nn.ReLU(), # 32@28x28
            nn.Conv2d(32, 16, 3, padding=1), nn.ReLU(), # 16@28x28
        )
        
        self.out = nn.Sequential(
            nn.Linear(12752, 128), nn.ReLU(),
            nn.Linear(128, 32), nn.ReLU(),
            nn.Linear(32, 10)
        )
        
        self.loss = nn.CrossEntropyLoss()
    
    def forward(self, x1, x2, label=False):
        out = self._inference(x1, x2)
        if label is not False:
            loss = self.loss(out, label)
            return (out, loss)
        
        return out
    
    def _inference(self, x1, x2):
        bsz = x1.size(0)
#         print('bsz: ', bsz)
        x1 = self.conv1(x1)
#         print('x1 shape: ', x1.shape)
        x2 = self.conv2(x2)
#         print('x2 shape: ', x2.shape)
        
        x1 = x1.view(bsz, -1)
#         print('x1 shape: ', x1.shape)
        x2 = x2.view(bsz, -1)
#         print('x2 shape: ', x2.shape)
        
        x = torch.cat([x1, x2], dim=1)
        out = torch.nn.functional.softmax(self.out(x), dim=1)
        
        return out

In [None]:
model = customCNN()
model.cuda()

In [None]:
test_letter = x_train[:32, :, :26].cuda()
test_image = x_train[:32, :, 26:].reshape(-1, 1, 28, 28).cuda()

In [None]:
print(test_letter.shape)
print(test_image.shape)

In [None]:
model(test_letter, test_image)

In [None]:
from torch.optim import Adam

optimizer = Adam(
    model.parameters(),
    lr=2e-5,
    eps=1e-8,
)

epochs = 150
seed_val = 42
np.random.seed(seed_val)
torch.manual_seed(seed_val)
torch.cuda.manual_seed_all(seed_val)

In [None]:
device = torch.device('cuda')
print(device)

In [None]:
# 정확도 계산 함수
def flat_accuracy(preds, labels):
    pred_flat = np.argmax(preds, axis=1).flatten()
    labels_flat = labels.flatten()
    
    return np.sum(pred_flat == labels_flat) / len(labels_flat)

In [None]:
# gradient 초기화
model.zero_grad()

history = defaultdict(list)
for epoch_i in range(0, epochs):
    
    total_loss = 0
    
    # train 모드로 변경
    model.train()
    
    # dataloader에서 batch size만큼 반복해서 가져옴
    for step, batch in enumerate(train_dataloader):
        
        # batch를 GPU에 적용
        batch = tuple(t.to(device) for t in batch)
        
        # batch에서 데이터 추출
        letter, image, label = batch
        
        # Forward Propagation 수행
        outputs = model(letter, image, label)
        
        loss = outputs[1]
        
        total_loss += loss.item()
        
        # Backward Propagation 수행
        loss.backward()
        history["train_loss"].append(loss.item())
        
        # 정확도 계산
        logits = outputs[0].detach().cpu().numpy()
        label = label.to("cpu").numpy()
        tmp_train_accuracy = flat_accuracy(logits, label)
        history["train_acc"].append(tmp_train_accuracy)
        
        # Gradient Cleeping
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        
        # gradient를 통해 weight update
        optimizer.step()
        
        # gradient 초기화
        model.zero_grad()
        
    # average loss
    avg_train_loss = total_loss / len(train_dataloader)
    
    # ========================================
    #               Validation
    # ========================================
    
    t0 = time.time()
    
    # eval 모드로 변경
    model.eval()
    
    # 변수 초기화
    eval_loss, eval_accuracy, nb_eval_steps, nb_eval_examples = 0, 0, 0, 0
    
    # dataloader에서 batch만큼 반복해서 가져옴
    for batch in valid_dataloader:
        
        # batch를 GPU에 적용
        batch = tuple(t.to(device) for t in batch)
        
        # batch에서 데이터 추출
        letter, image, label = batch
        
        # gradient 계산 안함
        with torch.no_grad():
            # Forward Propagation 수행
            outputs = model(letter, image, label)
        
        logits = outputs[0]
        history["eval_loss"].append(outputs[1].item())
        
        # CPU로 데이터 이동
        logits = logits.detach().cpu().numpy()
        label = label.to("cpu").numpy()
        
        # 출력 logit과 label을 비교하여 정확도 계산
        tmp_eval_accuracy = flat_accuracy(logits, label)
        history["eval_acc"].append(tmp_eval_accuracy)
        eval_accuracy += tmp_eval_accuracy
        nb_eval_steps += 1
        
    s = f"\r[Epoch {epoch_i+1}/{epochs}]"
    s += f" Avg Training Loss: {avg_train_loss: .2f}"
    s += " Valid Acc: {0:.2f}".format(eval_accuracy / nb_eval_steps)
    print(s, end="")
    
print("")
print("Training complete")

In [None]:
torch.save(model.state_dict(), "./model/emnist_model4.pt")

In [None]:
model = customCNN()
model.load_state_dict(torch.load("./model/emnist_model4.pt"))
model.eval()
model.cuda()

In [None]:
x_test = np.concatenate(
    [
        pd.get_dummies(test["letter"]).values.reshape(-1, 1, 26),
        (test[[str(i) for i in range(784)]] / 255.).values.reshape(-1, 1, 784)
    ],
    axis=2
)
x_test = torch.Tensor(x_test)

x1 = x_test[:, :, :26].cuda()
x2 = x_test[:, :, 26:].reshape(-1, 1, 28, 28).cuda()

In [None]:
test_data = TensorDataset(x1, x2)
test_sampler = SequentialSampler(test_data)
test_dataloader = DataLoader(test_data, sampler=test_sampler, batch_size=32)

In [None]:
y_pred = []
for batch in test_dataloader:
    input1, input2 = batch
    with torch.no_grad():
        outputs = model(input1, input2)
    y_pred.append(torch.argmax(outputs, dim=1))

In [None]:
submission["digit"] = torch.cat(y_pred).detach().cpu().numpy()
submission.to_csv("./result/submission4.csv", index=False)