In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
cd drive/MyDrive/Colab\ Notebooks

/content/drive/MyDrive/Colab Notebooks


## Library

In [None]:
import os
import time
import copy
import random
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import tqdm as tqdm

import PIL
from PIL import Image, ImageOps

import cv2
import glob

import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
import torch.nn.functional as F

from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.model_selection import StratifiedKFold

import albumentations as A
from albumentations.pytorch import ToTensorV2

import missingno as msno # null data를 보여줌

# ignore warnings
import warnings
warnings.filterwarnings('ignore')

%matplotlib inline

## Hyper-parameters

In [None]:
seed = 0
num_epochs = 100
lr = 1e-4
early_stop_count = 10
batch_size = 32
num_workers = 4

## Seed fix

In [None]:
def seed_everything(seed):
    """
    동일한 조건으로 학습 시, 동일한 결과를 얻기 위해 seed 고정.

    Args:
        seed: seed 정수값.
    """
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)  # if use multi-GPU
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    np.random.seed(seed)
    random.seed(seed)

In [None]:
seed_everything(seed)

## EDA

In [None]:
img_dir = './data/input/train'
images = sorted(os.listdir(img_dir))

In [None]:
categories={}
for image in images:
    dir=os.path.join(img_dir, image, '*.jpg')
    categories[image]=len(glob.glob(dir))

print('Number of categories:', len(categories))
print('-'*30)
for cls in categories.keys():
    print(f'Number of {cls} class:', categories[cls])

In [None]:
file_cat={
    'dog' : 0,
    'elephant' : 1,
    'giraffe' : 2,
    'guitar' : 3,
    'horse' : 4,
    'house' : 5,
    'person' : 6
}

cat_hist=np.zeros(7, dtype=int)
for cat, label in file_cat.items():
    cat_hist[label]=categories[cat]

f, ax = plt.subplots(figsize=(5, 5))

df = pd.DataFrame({'Categories': images, 'Number of classes': cat_hist})
df = df.sort_values('Number of classes', 0, False)

plt.title('category distribution of train set')
plot_1 = sns.barplot(x='Number of classes', y='Categories', data=df, label='Total', color='b')


In [None]:
sorted_temp_df = df.sort_index()
sorted_temp_df

## Augmentations

In [None]:
def get_transforms(need=('train', 'val'), img_size=(227, 227), mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)):
    transformations = {}
    if 'train' in need:
        transformations['train'] = A.Compose([
            A.Resize(img_size[0], img_size[1], p=1.0),
            A.Rotate(limit=30),
            A.HorizontalFlip(),
            A.RandomBrightnessContrast(),
            A.Cutout(num_holes=20, max_h_size=10, max_w_size=10),
            A.Normalize(mean=mean, std=std, max_pixel_value=255.0, p=1.0),
            ToTensorV2(p=1.0),
        ], p=1.0)
    if 'val' in need:
        transformations['val'] = A.Compose([
            A.Resize(img_size[0], img_size[1], p=1.0),
            A.Normalize(mean=mean, std=std, max_pixel_value=255.0, p=1.0),
            ToTensorV2(p=1.0),
        ], p=1.0)
    return transformations

## Datasets

In [None]:
class CustomDataset(Dataset):
    num_classes = 7
    _file_cat={
        'dog' : 0,
        'elephant' : 1,
        'giraffe' : 2,
        'guitar' : 3,
        'horse' : 4,
        'house' : 5,
        'person' : 6
    }
    img_paths=[] # 이미지 경로
    cat_labels=[] # category별 label

    def __init__(self, img_dir, transform=None):
        self.img_dir=img_dir
        self.mean=mean
        self.std=std
        self.transform=transform
        
        self.setup()

    def set_transform(self, transform=None):
        self.transform=transform

    # 이미지마다 경로, label을 저장해준다.
    def setup(self):
        for cat, label in self._file_cat.items():
            img_path = glob.glob(os.path.join(self.img_dir, cat, '*.jpg'))
            for path in img_path:
                if os.path.exists(path):
                    self.img_paths.append(path)
                    self.cat_labels.append(label)

    def __getitem__(self, index):
        img_path = self.img_paths[index]
        img = cv2.imread(img_path)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB).astype(np.float32)

        cat_labels = self.cat_labels[index]

        if self.transform:
            img_transform = self.transform(image=np.array(img))['image']
        else:
            img_transform = img
        return img_transform, cat_labels

    def __len__(self):
        return len(self.img_paths)


In [None]:
from torchvision import datasets

mean, std = (0.485, 0.456, 0.406), (0.229, 0.224, 0.225)
img_dir = './data/input/train'

transform = get_transforms(mean=mean, std=std)

dataset = CustomDataset(img_dir=img_dir)

## DataLoader

In [None]:
# train, validation
n_val = int(len(dataset) * 0.2)
n_train = len(dataset) - n_val
train_dataset, val_dataset = torch.utils.data.random_split(dataset, [n_train, n_val], generator=torch.Generator().manual_seed(seed))
train_dataset.dataset.set_transform(transform['train'])
val_dataset.dataset.set_transform(transform['val'])

train_loader = DataLoader(
    train_dataset,
    batch_size=batch_size,
    num_workers=num_workers,
    shuffle=True,
)

val_loader = DataLoader(
    val_dataset,
    batch_size=batch_size,
    num_workers=num_workers,
    shuffle=False,    
)

## Train

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

In [None]:
def train_model(model, dataloaders, criterion, optimizer, scheduler, num_epochs):
    best_model_weights = copy.deepcopy(model.state_dict())
    best_acc = 0.0
    early_stop = 0
    
    for epoch in range(num_epochs):
        print(f'Epoch {epoch}/{num_epochs-1}')
        print('-' * 10)
        
        since = time.time()
        
        # phase check
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()
            else:
                model.eval() # train, validation split
                
            running_loss = 0.0
            running_corrects = 0
            
            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)
                
                optimizer.zero_grad()

                # train일 경우 loss, optimizer를 업데이트 해준다.
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    preds = torch.argmax(outputs, dim=-1)
                    loss = criterion(outputs, labels)
                    
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()
                
                # phase별 loss 계산
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            # epoch당 loss계산                
            epoch_loss = running_loss / len(dataloaders[phase].dataset)
            epoch_acc = running_corrects.double() / len(dataloaders[phase].dataset)
            
            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')
            
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_weights = copy.deepcopy(model.state_dict())
                early_stop = 0
                print(f'...best weights update!... {best_acc}')

            # early stop (validation accuracy가 오르지 않으면 count + 1)
            elif phase == 'val' and epoch_acc <= best_acc:
                early_stop += 1

            if phase == 'train':
                scheduler.step(best_acc)

        # train time check
        time_elapsed = time.time() - since
        print(f'Training complete in {time_elapsed // 60}m {time_elapsed % 60}s')
        print()

        # early stopping
        if early_stop >= early_stop_count:
            print('...Early Stopping...')
            print()
            break
        
    print(f'Best val Acc: {best_acc}')
    
    # best weights load
    model.load_state_dict(best_model_weights)
    return model

## Loss

In [None]:
class LabelSmoothingLoss(nn.Module):
    def __init__(self, classes=7, smoothing=0.5, dim=-1):
        super(LabelSmoothingLoss, self).__init__()
        self.confidence = 1.0 - smoothing
        self.smoothing = smoothing
        self.cls = classes
        self.dim = dim

    def forward(self, pred, target):
        pred = pred.log_softmax(dim=self.dim)
        with torch.no_grad():
            true_dist = torch.zeros_like(pred)
            true_dist.fill_(self.smoothing / (self.cls - 1))
            true_dist.scatter_(1, target.data.unsqueeze(1), self.confidence)
        return torch.mean(torch.sum(-true_dist * pred, dim=self.dim))

In [None]:
import torchvision.models as models
import timm

timm.list_models()

In [None]:
model = timm.create_model('swsl_resnext50_32x4d', pretrained=True, num_classes=7).to(device)

dataloaders = {'train': train_loader, 'val': val_loader}

criterion = LabelSmoothingLoss()
optimizer = optim.AdamW(model.parameters(), lr=lr)
scheduler = lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', patience=3, verbose=True)

In [None]:
model = train_model(model, dataloaders, criterion, optimizer, scheduler, num_epochs=num_epochs)

## Test

In [None]:
class TestDataset(Dataset):
    def __init__(self, img_paths, transform):
        self.img_paths = img_paths
        self.transform = transform

    def __getitem__(self, index):
        img_path = self.img_paths[index]
        image = cv2.imread(img_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB).astype(np.float32)

        if self.transform:
            image = self.transform(image=np.array(image))['image']
        return image

    def __len__(self):
        return len(self.img_paths)

In [None]:
test_dir = './data/input/'

In [None]:
submission = pd.read_csv(os.path.join(test_dir, 'test_answer_sample_.csv'))
image_paths = sorted(glob.glob(os.path.join(test_dir, 'test/0', '*.jpg')))

test_transform = A.Compose([
    A.Resize(227, 227, p=1.0),
    A.Normalize(mean=mean, std=std, max_pixel_value=255.0, p=1.0),                          
    ToTensorV2(),
])

test_dataset = TestDataset(image_paths, test_transform)

test_loader = DataLoader(
    test_dataset,
    shuffle=False,
)

In [None]:
all_predictions = []
for images in test_loader:
    with torch.no_grad():
        images = images.to(device)
        pred = model(images)
        pred = pred.argmax(dim=-1)
        all_predictions.extend(pred.cpu().numpy())

submission['answer value'] = all_predictions

# 제출 파일 저장.
submission.to_csv(os.path.join(test_dir, 'submission.csv'), index=False)
print('test inference is done!')