In [1]:
import torch
import torchvision
import torchvision.transforms as transforms
from torchvision.transforms.functional import crop
import gc
import numpy as np
import random
random.seed(42) 
import os
gc.collect()

#from torchvision.transforms import v2
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(torch.__version__)
print(torch.version.cuda)
print(device)

transform_1 = transforms.Compose(
    [
#     transforms.Lambda(crop_image),
    transforms.Resize((64, 64)),
    transforms.AugMix(),
    transforms.RandomHorizontalFlip(1),
    transforms.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.1, hue=0.1),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)) 
    ])

transform_2 = transforms.Compose(
    [
    transforms.Resize((64, 64)),
        transforms.AugMix(),
    transforms.RandomHorizontalFlip(1),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)) 
    ])

transform_3 = transforms.Compose(
    [
    transforms.Resize((64, 64)),
        transforms.AugMix(),
    transforms.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.1, hue=0.1),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)) 
    ])




transform = transforms.Compose(
    [
    transforms.Resize((32, 32)),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)) 
    ])

2.2.2+cu121
12.1
cuda:0


In [2]:
import os
from torchvision.io import read_image
from torch.utils.data import Dataset
from PIL import Image
from tqdm import tqdm
from torchvision.io.image import read_file
from torchvision.transforms.functional import to_pil_image

class CustomImageDataset(Dataset):
    def __init__(self, annotations_file, img_dir, idex_list, transform=None, mask = None, balance = False):
        with open(annotations_file, 'r') as file:
            content = file.read()
            lines = content.strip("\n").split('\n')        
        self.img_labels_init = []
        for index in idex_list:
            self.img_labels_init.append(lines[index])
        self.img_dir = img_dir
        self.transform = transform
        self.idex_list = idex_list
        self.image = []
        self.mask = mask
        self.img_labels = []
        for idx in tqdm(range(len(self.idex_list))):
            label = self.img_labels_init[idx]
            img_path = os.path.join(self.img_dir, f"{self.idex_list[idx]}.png")
            image = Image.open(img_path).convert("RGB")
            if self.transform:
                image = self.transform(image)
            if self.mask:
                image = image[:, :, self.mask]
            if balance: 
                if label == '0':
                    for i in range(3):
                        self.image.append(image)
                        self.img_labels.append(label)
            self.img_labels.append(label)
            self.image.append(image)
    
    def __len__(self):
        return len(self.image)
    
    def __getitem__(self, idx):
        label = int(self.img_labels[idx])
        image = self.image[idx]
        return image, label

In [3]:
from torch.utils.data import DataLoader, ConcatDataset
from sklearn.model_selection import train_test_split

# with open("/scratch/hh3043/ML_contest/dataset/train_label.txt", 'r') as file:
#     content = file.read()
#     num = len(content.strip("\n").split('\n'))
    
    
# train_data_idx, test_data_idx = train_test_split(list(range(num)), test_size=0.2, random_state=42)

total_data = CustomImageDataset("/scratch/hh3043/ML_contest/dataset/train_label.txt", "/scratch/hh3043/ML_contest/dataset/train_img", train_data_idx, transform=transform, mask = None, balance = True)
test_data = CustomImageDataset("/scratch/hh3043/ML_contest/dataset/train_label.txt", "/scratch/hh3043/ML_contest/dataset/train_img", test_data_idx, transform=transform, mask = None, balance = False)


In [4]:
def collate_fn(batch):
    image = torch.stack([x[0] for x in batch])
    label = torch.tensor([x[1] for x in batch])
    return image, label

total_data = torch.load('/scratch/hh3043/ML_contest/dataset/total_train_32.pt') 
t_data = torch.load('/scratch/hh3043/ML_contest/dataset/total_train_32_1.pt') 
total_data = ConcatDataset([total_data, t_data])
test_data = torch.load('/scratch/hh3043/ML_contest/dataset/total_test_32.pt')
print("Finished loading")

Finished loading


In [5]:
import torch
import torchvision
import torchvision.transforms as transforms
from sklearn.model_selection import train_test_split
from multiprocess import Pool
from functools import partial
import os
from torch.utils.data import Dataset, DataLoader, ConcatDataset
from PIL import Image
from tqdm import tqdm

# trainloader = DataLoader(total_data, batch_size=16, shuffle=True, num_workers=3)
# testloader = DataLoader(test_data, batch_size=16, shuffle=False, num_workers=3)

trainloader = DataLoader(total_data, batch_size=16, collate_fn=collate_fn, shuffle=True, num_workers=3)
testloader = DataLoader(test_data, batch_size=16, collate_fn=collate_fn, shuffle=False, num_workers=3)

Define the residual block

In [6]:
import torch.nn as nn
import torch.nn.functional as F
def res(input_channel):
    block = nn.Sequential(
          nn.Conv2d(input_channel,input_channel,3, padding = 1),
          nn.BatchNorm2d(input_channel),
          nn.ReLU(),
          nn.Conv2d(input_channel,input_channel,3, padding = 1),
          nn.BatchNorm2d(input_channel),
        )
    
    return nn.Sequential(*block)

def conv_block(input_channel, output_channel, filter_size = 3,padding = 1):
    block = nn.Sequential(
          nn.Conv2d(input_channel,output_channel,filter_size, padding = padding),
          nn.BatchNorm2d(output_channel),
          nn.ReLU(),
        )
    
    return nn.Sequential(*block)

Define the whole network

In [7]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        

        self.conv1 = conv_block(3, 64, 7, padding = 3)
        self.res1_1 = res(64)
        self.res1_2 = res(64)
        self.res1_3 = res(64)
        
        self.conv2 = conv_block(64, 128, 3, padding = 1)
        self.res2_1 = res(128)
        self.res2_2 = res(128)
        self.res2_3 = res(128)
        
        self.conv3 = nn.Conv2d(128, 256, 3, padding = 1)
        self.res3_1 = res(256)
        self.res3_2 = res(256)
        self.res3_3 = res(256)
       
        self.conv4 = nn.Conv2d(256, 512, 3, padding = 1)
        self.res4_1 = res(512)
        self.res4_2 = res(512)
        self.res4_3 = res(512)
        
        self.droup = nn.Dropout(0.2)
        self.pool = nn.MaxPool2d(2, 2)
        self.fc1 = nn.Linear(512 * 4 * 4, 4)
        self.fc2 = nn.Linear(400, 4)
        
        
    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(self.res1_1(x) + x)
        x = F.relu(self.res1_2(x) + x)
        #x = F.relu(self.res1_3(x) + x)
        
#         x = self.droup(x)
        
        x = self.conv2(x)
        x = self.pool(x)
        x = F.relu(self.res2_1(x) + x)
        x = F.relu(self.res2_2(x) + x)
        #x = F.relu(self.res2_3(x) + x)
        
        # x = self.droup(x)
        
        x = self.conv3(x)
        x = self.pool(x)
        x = F.relu(self.res3_1(x) + x)
        x = F.relu(self.res3_2(x) + x)
        #x = F.relu(self.res3_3(x) + x)
        
#         x = self.droup(x)

        x = self.conv4(x)
        x = self.pool(x)
        x = F.relu(self.res4_1(x) + x)
        x = F.relu(self.res4_2(x) + x)
        #x = F.relu(self.res4_3(x) + x)
        
#         x = self.droup(x)

        #print(x.size())
        x = x.view(-1, 512 * 4 * 4)
        x = self.fc1(x)
#         x = self.fc2(x)
        x = nn.Softmax(dim=1)(x)
        return x



net = Net()
# PATH = "/kaggle/input/audio_resnet/pytorch/httpswww.kaggle.commodelshongjiahuangaudio_resnetpytorch/7/checkpoint_64_27.pth"
# net.load_state_dict(torch.load(PATH)['state_dict'])
net.to(device)  # gpu/ cpu

import torch.optim as optim

criterion = nn.CrossEntropyLoss().to(device)
max_lr = 0.01
epochs = 22
optimizer = optim.SGD(net.parameters(), lr = max_lr, weight_decay = 1.0e-4, momentum = 0.9) 
grad_clip = 0.1
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max = epochs*len(trainloader))
criterion = criterion.cuda()
sched = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr, epochs=epochs, 
                                                steps_per_epoch=len(trainloader))

In [8]:
import numpy as np 
def mixup_data(x, y, alpha=1.0, use_cuda=True):
    '''Returns mixed inputs, pairs of targets, and lambda'''
    if alpha > 0:
        lam = np.random.beta(alpha, alpha)
    else:
        lam = 1

    batch_size = x.size()[0]
    if use_cuda:
        index = torch.randperm(batch_size).cuda()
    else:
        index = torch.randperm(batch_size)

    mixed_x = lam * x + (1 - lam) * x[index, :]
    y_a, y_b = y, y[index]
    return mixed_x, y_a, y_b, lam


def mixup_criterion(criterion, pred, y_a, y_b, lam):
    return lam * criterion(pred, y_a) + (1 - lam) * criterion(pred, y_b)

In [9]:
from pathlib import Path
import pandas as pd
class test_CustomImageDataset(Dataset):
    def __init__(self, img_dir, transform=None):
        self.img_dir = img_dir
        self.transform = transform

    def __len__(self):
        return sum(1 for file in Path(self.img_dir).iterdir() if file.suffix == '.png')

    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, f"{idx}.png")
        image = Image.open(img_path).convert('RGB')
        if self.transform:
            image = self.transform(image)
        return image

In [10]:
def testing(net, testloader, criterion):
    net.eval()
    running_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():

        for data in testloader:
            images, labels = data
            images, labels = images.to(device), labels.to(device)

            outputs = net(images)
            loss = criterion(outputs, labels)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            running_loss += loss.item()

    accuracy = (correct / total) * 100
    val_loss =running_loss/len(testloader)
    print(f'validation Loss:{val_loss:.2f}, accuracy: {accuracy:.2f}%')
    return val_loss, accuracy

In [11]:

best_val = 100
cnt = 0
patience = 6
for epoch in range(epochs):  # loop over the dataset multiple times
    net.train()
    running_loss = 0.0
    correct=0
    total=0

    for i, data in enumerate(trainloader, 0):
        # get the inputs
        inputs, labels = data
        inputs, labels = inputs.to(device), labels.to(device) # inputs, labels = data
        #inputs, labels_a, labels_b, lam = mixup_data(inputs, labels, 0.2)

        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = net(inputs)
        loss = criterion(outputs, labels)
        # loss = mixup_criterion(criterion, outputs, labels_a, labels_b, lam)
        loss.backward()
        nn.utils.clip_grad_value_(net.parameters(), grad_clip)
        
        optimizer.step()
        #sched.step()
        scheduler.step()
        # print statistics
        running_loss += loss.item()

        _, predicted = outputs.max(1)
        total += labels.size(0)
        correct += predicted.eq(labels).sum().item()


    train_loss =running_loss/len(trainloader)
    accu=100.*correct/total

    #train_accu.append(accu)
    #train_losses.append(train_loss)
    my_lr = scheduler.get_last_lr()[0]
    print('Train Loss: %.3f | Accuracy: %.3f | lr: %f'%(train_loss,accu, my_lr), end = " | ")
    val_loss, _ =testing(net, testloader, criterion)
    if val_loss < best_val:
            best_val = val_loss
    else:
        cnt += 1
        if cnt >= patience:
            break
    
my_lr = scheduler.get_last_lr()[0]
print('Finished Training', "last_learning_rate", my_lr)

Train Loss: 1.003 | Accuracy: 73.686 | lr: 0.000398 | validation Loss:1.02, accuracy: 71.87%
Train Loss: 0.905 | Accuracy: 83.635 | lr: 0.000392 | validation Loss:0.96, accuracy: 78.01%
Train Loss: 0.849 | Accuracy: 89.353 | lr: 0.000382 | validation Loss:0.96, accuracy: 78.09%
Train Loss: 0.817 | Accuracy: 92.605 | lr: 0.000368 | validation Loss:0.93, accuracy: 81.12%
Train Loss: 0.796 | Accuracy: 94.697 | lr: 0.000351 | validation Loss:0.92, accuracy: 82.34%
Train Loss: 0.784 | Accuracy: 95.980 | lr: 0.000331 | validation Loss:0.93, accuracy: 80.95%
Train Loss: 0.777 | Accuracy: 96.667 | lr: 0.000308 | validation Loss:0.91, accuracy: 83.56%
Train Loss: 0.772 | Accuracy: 97.179 | lr: 0.000283 | validation Loss:0.91, accuracy: 82.59%
Train Loss: 0.770 | Accuracy: 97.380 | lr: 0.000256 | validation Loss:0.91, accuracy: 83.26%
Train Loss: 0.769 | Accuracy: 97.500 | lr: 0.000228 | validation Loss:0.91, accuracy: 83.60%
Train Loss: 0.768 | Accuracy: 97.565 | lr: 0.000200 | validation Loss:

In [12]:
testing(net, testloader, criterion)

validation Loss:0.91, accuracy: 83.39%


(0.9077153581900884, 83.38940285954584)

In [13]:
# total_loader = DataLoader(ConcatDataset([total_data , test_data]), batch_size=16, shuffle=True, num_workers=3)  #ConcatDataset([train_data_1,train_data_4]) train_data_2, train_data_3, 
# epochs = 3
# for epoch in range(epochs):  # loop over the dataset multiple times
#     net.train()
#     running_loss = 0.0
#     correct=0
#     total=0

#     for i, data in enumerate(total_loader, 0):
#         # get the inputs
#         inputs, labels = data
#         inputs, labels = inputs.to(device), labels.to(device) # inputs, labels = data
#         #inputs, labels_a, labels_b, lam = mixup_data(inputs, labels, 0.2)

#         # zero the parameter gradients
#         optimizer.zero_grad()

#         # forward + backward + optimize
#         outputs = net(inputs)
#         loss = criterion(outputs, labels)
#         # loss = mixup_criterion(criterion, outputs, labels_a, labels_b, lam)
#         loss.backward()
#         nn.utils.clip_grad_value_(net.parameters(), grad_clip)
        
#         optimizer.step()
#         #sched.step()
#         scheduler.step()
#         # print statistics
#         running_loss += loss.item()

#         _, predicted = outputs.max(1)
#         total += labels.size(0)
#         correct += predicted.eq(labels).sum().item()


#     train_loss =running_loss/len(total_loader)
#     accu=100.*correct/total

#     #train_accu.append(accu)
#     #train_losses.append(train_loss)
#     my_lr = scheduler.get_last_lr()[0]
#     print('Train Loss: %.3f | Accuracy: %.3f | lr: %f'%(train_loss,accu, my_lr))
# my_lr = scheduler.get_last_lr()[0]
# print('Finished Training', "last_learning_rate", my_lr)

In [16]:
checkpoint = {'model': Net(),
              'state_dict': net.state_dict(),
              'optimizer' : optimizer.state_dict()}

torch.save(checkpoint, '/scratch/hh3043/ML_contest/checkpoint.pth')

In [17]:
net.eval()
test_data = test_CustomImageDataset("/scratch/hh3043/ML_contest/dataset/test_img", transform=transform)
test_loader = DataLoader(test_data, batch_size=16, shuffle=False, num_workers=3)

predicted_labels = []
with torch.no_grad():
    for data in test_loader:
        images = data
        images = images.to(device)
        
        outputs = net(images)
        _, predicted = torch.max(outputs.data, 1)
        
        predicted_labels.extend(predicted.cpu().numpy())
        
output = pd.DataFrame({
"id": [i for i in range(len(test_data))],
"category": predicted_labels
})

output.to_csv('/scratch/hh3043/ML_contest/my_submission.csv', index=False)