# Inference part is at the bottom
# Click "run all" to inference
# This code will NOT train

In [11]:
import numpy as np 
import pandas as pd 
import os
from PIL import Image
import csv
import cv2
import numpy as np
import random
import os
from tqdm import tqdm
from torchvision import transforms
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torch.utils.data import  ConcatDataset
from torch.optim.lr_scheduler import StepLR

from torchvision.models import wide_resnet50_2, Wide_ResNet50_2_Weights
from torchvision.models import resnet152, ResNet152_Weights
from torchvision.models import resnet50, ResNet50_Weights
from torchvision.models import resnet18, ResNet18_Weights

In [12]:

for dirname, _, filenames in os.walk('./'):
    for filename in filenames[:3]:
        print(os.path.join(dirname, filename))
    if len(filenames) > 3:
        print("...")

./0816036_HW5.docx
./0816036_HW5_inference.ipynb
./0816036_HW5_train.ipynb
...


In [13]:
torch.cuda.is_available()

True

In [14]:
print(torch.version.cuda)

11.6


In [15]:
TRAIN_PATH = "./train"
TEST_PATH = "./test"
device = "cuda"
# try device = "cuda" 
# and change your settings/accelerator to GPU if you want it to run faster

In [16]:
train_transforms = transforms.Compose(
    [
        transforms.Resize((64, 64)),
        transforms.ToTensor(),
        transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)),
    ]
)

val_transforms = transforms.Compose(
    [
        transforms.Resize((64, 64)),
        transforms.ToTensor(),
        transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)),
    ]
)


test_transforms = transforms.Compose(
    [
        transforms.Resize((64, 64)),
        transforms.ToTensor(),
        transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)),
    ]
)

test_transforms_unnormalized = transforms.Compose(
    [
        transforms.Resize((64, 64)),
        transforms.ToTensor(),
    ]
)

AutoAugment_transforms = transforms.Compose(
    [
        transforms.AutoAugment(transforms.AutoAugmentPolicy.SVHN),
        transforms.Resize((64, 64)),
        transforms.ToTensor(),
        transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)),
    ]
)

ColorJitter_transforms = transforms.Compose(
    [
        transforms.ColorJitter(brightness=0.5, contrast=0.5, saturation=0.5, hue=0.5),
        transforms.Resize((64, 64)),
        transforms.ToTensor(),
        transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)),
    ]
)

RandomPerspective_transforms = transforms.Compose(
    [
        transforms.RandomPerspective(distortion_scale=0.4, p=1.0),
        transforms.Resize((64, 64)),
        transforms.ToTensor(),
        transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)),
    ]
)

In [17]:

# data: all train/test file path
# self.data: select specific task
# self.root: ./train or ./test

# Task 1
class Task1_Dataset(Dataset):
    def __init__(self, data, root, return_filename=False, transform=None):
        self.data = [sample for sample in data if sample[0].startswith("task1")]
        self.return_filename = return_filename
        self.root = root
        self.transform = transform
    
    def __getitem__(self, index):
        filename, label = self.data[index]
        img_path = f"{self.root}/{filename}"
        img = Image.open(img_path)
        img_transformed = self.transform(img)

        if self.return_filename: # Normalization
            return torch.FloatTensor(img_transformed ), filename
        else:
            return torch.FloatTensor(img_transformed ), int(label)
    def __len__(self):
        return len(self.data)

In [18]:
# Task 2
class Task2_Dataset(Dataset):
    def __init__(self, data, root, return_filename=False, transform=None):
        self.data = [sample for sample in data if sample[0].startswith("task2")]
        self.return_filename = return_filename
        self.root = root
        self.transform = transform
    
    def __getitem__(self, index):
        filename, label = self.data[index]
        img_path = f"{self.root}/{filename}"
        img = Image.open(img_path)
        img_transformed = self.transform(img)

        if self.return_filename: # Normalization
            return torch.FloatTensor(img_transformed ), filename
        else:
            label = np.array([ord(s)-87 if s.isalpha() else int(s) for s in list(label)])
            return torch.FloatTensor(img_transformed ), label
    def __len__(self):
        return len(self.data)

In [19]:
# Task 3
class Task3_Dataset(Dataset):
    def __init__(self, data, root, return_filename=False, transform=None):
        self.data = [sample for sample in data if sample[0].startswith("task3")]
        self.return_filename = return_filename
        self.root = root
        self.transform = transform
    
    def __getitem__(self, index):
        filename, label = self.data[index]
        img_path = f"{self.root}/{filename}"
        img = Image.open(img_path)
        img_transformed = self.transform(img)

        if self.return_filename: # Normalization
            return torch.FloatTensor(img_transformed ), filename
        else:
            label = np.array([ord(s)-87 if s.isalpha() else int(s) for s in list(label)])
            return torch.FloatTensor(img_transformed ), label
    def __len__(self):
        return len(self.data)

In [20]:
train_data = []
val_data = []

with open(f'{TRAIN_PATH}/annotations.csv', newline='') as csvfile:
    for row in csv.reader(csvfile, delimiter=','):
        if random.random() < 0.7:
            train_data.append(row)
        else:
            val_data.append(row)

# task1
original_ds_task1 = Task1_Dataset(train_data, root=TRAIN_PATH, transform=train_transforms)
AutoAugment_ds_task1 = Task1_Dataset(train_data, root=TRAIN_PATH, transform=AutoAugment_transforms)
jitter_ds_task1 = Task1_Dataset(train_data, root=TRAIN_PATH, transform=ColorJitter_transforms)
angle_ds_task1 = Task1_Dataset(train_data, root=TRAIN_PATH, transform=RandomPerspective_transforms)
train_ds_task1 = ConcatDataset([original_ds_task1, AutoAugment_ds_task1, jitter_ds_task1, angle_ds_task1])
train_dl_task1 = DataLoader(train_ds_task1, batch_size=500, num_workers=0, drop_last=True, shuffle=True)

val_ds_task1 = Task1_Dataset(val_data, root=TRAIN_PATH, transform=val_transforms)
val_dl_task1 = DataLoader(val_ds_task1, batch_size=500, num_workers=0, drop_last=False, shuffle=False)

# task2
original_ds_task2 = Task2_Dataset(train_data, root=TRAIN_PATH, transform=train_transforms)
AutoAugment_ds_task2 = Task2_Dataset(train_data, root=TRAIN_PATH, transform=AutoAugment_transforms)
jitter_ds_task2 = Task2_Dataset(train_data, root=TRAIN_PATH, transform=ColorJitter_transforms)
angle_ds_task2 = Task2_Dataset(train_data, root=TRAIN_PATH, transform=RandomPerspective_transforms)
train_ds_task2 = ConcatDataset([original_ds_task2, AutoAugment_ds_task2, jitter_ds_task2, angle_ds_task2])
train_dl_task2 = DataLoader(train_ds_task2, batch_size=500, num_workers=0, drop_last=True, shuffle=True)

val_ds_task2 = Task2_Dataset(val_data, root=TRAIN_PATH, transform=val_transforms)
val_dl_task2 = DataLoader(val_ds_task2, batch_size=500, num_workers=0, drop_last=False, shuffle=False)

# task3
original_ds_task3 = Task3_Dataset(train_data, root=TRAIN_PATH, transform=train_transforms)
AutoAugment_ds_task3 = Task3_Dataset(train_data, root=TRAIN_PATH, transform=AutoAugment_transforms)
jitter_ds_task3 = Task3_Dataset(train_data, root=TRAIN_PATH, transform=ColorJitter_transforms)
angle_ds_task3 = Task3_Dataset(train_data, root=TRAIN_PATH, transform=RandomPerspective_transforms)
train_ds_task3 = ConcatDataset([original_ds_task3, AutoAugment_ds_task3, jitter_ds_task3, angle_ds_task3])
train_dl_task3 = DataLoader(train_ds_task3, batch_size=32, num_workers=0, drop_last=True, shuffle=True)

val_ds_task3 = Task3_Dataset(val_data, root=TRAIN_PATH, transform=val_transforms)
val_dl_task3 = DataLoader(val_ds_task3, batch_size=500, num_workers=0, drop_last=False, shuffle=False)

# print(train_ds.__getitem__(0))
# cv2.imshow("h", train_ds.__getitem__(0)[0].numpy())
# cv2.waitKey(0)

FileNotFoundError: [Errno 2] No such file or directory: './train/annotations.csv'

In [None]:


def train(num_task=2, num_epoch=1000, learning_rate=0.01, continue_train=False, model_path=None):

    print(f"task {num_task}")

    if num_task == 1:
        num_output = 1
        num_class = 10
        train_dl = train_dl_task1
        val_dl = val_dl_task1

    elif num_task == 2:
        num_output = 2
        num_class = 36
        train_dl = train_dl_task2
        val_dl = val_dl_task2

    elif num_task == 3:
        num_output = 4
        num_class = 36
        train_dl = train_dl_task3
        val_dl = val_dl_task3
        
    else:
        print("task error")
        return
    print("Start loading pretrain ResNet model")
    
    # model.fc = nn.Linear(512, num_output*num_class)
    if continue_train:
        model = resnet50(ResNet50_Weights.DEFAULT)
        model.fc = nn.Linear(2048, 2*num_class)
        model.load_state_dict(torch.load(model_path))
        model.fc = nn.Sequential(
            nn.Linear(2048, 1024),
            nn.Linear(1024, num_output*num_class)
        )
    else:
        model = resnet50(ResNet50_Weights.DEFAULT)
        model.fc = nn.Linear(2048, num_output*num_class)

    model = model.to(device)
    print("Successfully load pretrain ResNet model")

    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    criterion = nn.CrossEntropyLoss() #.cuda(args.gpu)
    # scheduler = StepLR(optimizer, step_size=1, gamma=0.7)
    # optimizer = torch.optim.SGD(model.parameters(), lr=0.1,
    #                             momentum=0.9,
    #                             weight_decay=1e-4)

    # model = DenseNet(48, (12, 24, 48, 36), 96, num_classes=num_class * num_output).to(device)  # 161
    # model = torch.load("pretrained model\TPS-ResNet-BiLSTM-Attn.pth")
    # model = densenet161(num_classes=num_class * num_output).to(device)

    for epoch in range( num_epoch):
        if epoch % 2 == 0:
            print(f"Epoch [{epoch}]")
        train_loss = 0
        model.train()
        for image, label in train_dl:
            image = image.to(device)
            label = label.to(device).to(torch.int64)

            # label = label.unsqueeze(1)
            # label = label.expand(-1, 4)

            y_pred = model(image)
            if num_task == 1:
                loss = criterion(y_pred, label)#.type(torch.LongTensor))
            else:
                y_pred = torch.unflatten(y_pred, dim=1, sizes=(num_output, num_class))
                loss = sum(criterion(y_pred[:,i], label[:,i]) for i in range(num_output))
            train_loss += loss
            
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
        sample_count = 0
        correct_count = 0
        val_loss = 0
        model.eval()
        for image, label in val_dl:
            image = image.to(device)
            label = label.to(device).to(torch.int64)


            # label = label.unsqueeze(1)
            # label = label.expand(-1, 4)
            y_pred = model(image)
            
            if num_task == 1:
                loss = criterion(y_pred, label)
            else:
                y_pred = torch.unflatten(y_pred, dim=1, sizes=(num_output, num_class))
                loss = sum(criterion(y_pred[:,i], label[:,i]) for i in range(num_output))

            val_loss += loss
            sample_count += len(image)

            if num_task == 1:
                y_pred = torch.argmax(y_pred, dim=1)
                correct_count += (label == y_pred).sum()

            else:
                y_pred = torch.argmax(y_pred, dim=2)
                correct_count += (torch.all(label == y_pred, 1)).sum()
        
        train_loss = train_loss.detach().cpu().numpy()
        val_loss = val_loss.detach().cpu().numpy()
        val_acc = (correct_count / sample_count).cpu().numpy()
        
        if epoch % 2 == 0:
            torch.save(model.state_dict(), f"saved_model/task{num_task}/normalized_ResNet50_lr_{learning_rate}/epoch{epoch}_val_acc_{val_acc:.3f}_train_loss_{train_loss:.3f}_val_loss_{val_loss:.3f}.pth")
            print("val acc = ", val_acc, "train loss = ", train_loss, " val loss = ", val_loss)

    return model

In [None]:
# # scene text recognition dataset's pretrained model
# # lr_lst = [ 5e-3, 1e-3, 5e-4]
# lr_lst = [ 8e-3]
# for lr in lr_lst:
#     model = train(num_task=2, num_epoch=2000, learning_rate=lr, continue_train=False, model_path="saved_model/task2/best_model.pth")

In [None]:
# STOP

# Start inference 

In [None]:
# load test data
test_data = []
with open(f'{TEST_PATH}/../sample_submission.csv', newline='') as csvfile:
    for row in csv.reader(csvfile, delimiter=','):
        test_data.append(row)

In [None]:
# test dataloader for task1,2,3
# task 1 : not normalized
# task 2 : normalized
# task 3 : normalized
test_ds_1 = Task1_Dataset(test_data, root=TEST_PATH, return_filename=True, transform=test_transforms_unnormalized)
test_dl_1 = DataLoader(test_ds_1, batch_size=500, num_workers=0, drop_last=False, shuffle=False)
test_ds_2 = Task2_Dataset(test_data, root=TEST_PATH, return_filename=True, transform=test_transforms)
test_dl_2 = DataLoader(test_ds_2, batch_size=500, num_workers=0, drop_last=False, shuffle=False)
test_ds_3 = Task3_Dataset(test_data, root=TEST_PATH, return_filename=True, transform=test_transforms)
test_dl_3 = DataLoader(test_ds_3, batch_size=500, num_workers=0, drop_last=False, shuffle=False)


In [None]:
# load task1 model
model1 = resnet152()
model1.load_state_dict(torch.load("best_model_task1.pth"))
# load task2 model
model2 = resnet50()
model2.fc = nn.Linear(2048, 2*36)
model2.load_state_dict(torch.load("best_model_normalized_task2.pth"))
# load task3 model
model3 = resnet50()
model3.fc = nn.Linear(2048, 4*36)
model3.load_state_dict(torch.load("best_model_normalized_task3.pth"))

<All keys matched successfully>

In [None]:
# open submission.csv
num_tasks = [1, 2, 3]
total_len = 0
        
write_lst = []
for num_task in num_tasks:
    print("num_task = ", num_task)

    # Load model
    if num_task == 1:
         model = model1.to(device)
         test_dl = test_dl_1
    elif num_task == 2:
        model = model2.to(device)
        test_dl = test_dl_2
    else:
        model = model3.to(device)
        test_dl = test_dl_3
    # Write prediction per batch
    model.eval()
    for image, filenames in test_dl:
        image = image.to(device)
    
        y_pred = model(image)
        if num_task == 1:
            y_pred = torch.argmax(y_pred, dim=1)
            output = y_pred
        elif num_task == 2:
            y_pred = torch.unflatten(y_pred, dim=1, sizes=(2, 36))
            output = torch.argmax(y_pred, dim=2)
        elif num_task == 3:
            y_pred = torch.unflatten(y_pred, dim=1, sizes=(4, 36))
            output = torch.argmax(y_pred, dim=2)

        if len(filenames) != 500:
            print("len(filenames) = ", len(filenames))
        total_len += len(filenames)
        
        # Start writing csv
        for i in range(len(filenames)):
            try:
                write_lst.append((filenames[i], str(output[i].item())))
                # tmp = type(output[i].item())
                # csv_writer.writerow([filenames[i], str(output[i].item())])
            except:
                batch_output = ""
                for class_num in output[i]:
                    if class_num < 10:
                        batch_output += str(class_num.item())
                    else:
                        batch_output += chr(class_num+87)
                
                write_lst.append((filenames[i], batch_output))
                # csv_writer.writerow([filenames[i], batch_output])

    # # Open csv file
    # if os.path.exists('submission.csv'):
    #     csv_writer = csv.writer(open('submission.csv', 'a', newline=''))
    # else:
    #     csv_writer = csv.writer(open('submission.csv', 'w', newline=''))
    #     csv_writer.writerow(["filename", "label"])

with open('submission.csv', 'w', newline='') as csvfile:
    csv_writer = csv.writer(csvfile)
    csv_writer.writerow(["filename", "label"])

    for filename, prediction in write_lst:
        csv_writer.writerow([filename, prediction])

print(total_len)
    # for filename, _ in test_data:
    #     if filename.startswith("task2") or filename.startswith("task3"):
    #         csv_writer.writerow([filename, 0])
    



num_task =  1
num_task =  2
num_task =  3
10000
