# Data

In [1]:
import pandas as pd

train_df = pd.read_csv("./data/train.csv")
test_df = pd.read_csv("./data/test.csv")
submission_df = pd.read_csv("./data/sample_submission.csv")

In [3]:
from sklearn.model_selection import train_test_split

train_df, valid_df = train_test_split(train_df, 
                                      test_size=0.1,
                                      stratify=train_df[["healthy", "multiple_diseases", "rust", "scab"]],
                                      random_state=42)

train_df.shape, valid_df.shape

((1474, 5), (164, 5))

## Dataset

In [4]:
import torch
import numpy as np
import torch.utils
import cv2

class ImageDataset(torch.utils.data.Dataset):
    def __init__(self, label_df, image_dir, transform, is_test):
        super(ImageDataset, self).__init__()
        self.label_df = label_df
        self.image_dir = image_dir
        self.transform = transform
        self.is_test = is_test
    
    def __len__(self):
        return len(self.label_df)
    
    def __getitem__(self, index):
        image_id = self.label_df.iloc[index, 0] # 이미지 ID column 값
        image_file_path = f"./data/images/{image_id}.jpg"
        image = cv2.imread(image_file_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        if self.transform:
            image = self.transform(image=image)["image"]

        if self.is_test:
            return image, None

        label = np.argmax(self.label_df.iloc[index, 1:5])
        return image, label

In [5]:
import albumentations as A
from albumentations.pytorch import ToTensorV2

# for training
train_transform = A.Compose([
    A.Resize(450, 650),       # 이미지 크기 조절 
    A.RandomBrightnessContrast(brightness_limit=0.2, # 밝기 대비 조절
                               contrast_limit=0.2, p=0.3),
    A.VerticalFlip(p=0.2),    # 상하 대칭 변환
    A.HorizontalFlip(p=0.5),  # 좌우 대칭 변환 
    A.ShiftScaleRotate(       # 이동, 스케일링, 회전 변환
        shift_limit=0.1,
        scale_limit=0.2,
        rotate_limit=30, p=0.3),
    A.OneOf([A.Emboss(p=1),   # 양각화, 날카로움, 블러 효과
             A.Sharpen(p=1),
             A.Blur(p=1)], p=0.3),
    A.PiecewiseAffine(p=0.3), # 어파인 변환 
    A.Normalize(),            # 정규화 변환 
    ToTensorV2()              # 텐서로 변환
])

# for validation, test
test_transform = A.Compose([
    A.Resize(450, 650),       # 이미지 크기 조절 
    A.Normalize(),            # 정규화 변환 
    ToTensorV2()              # 텐서로 변환
])

## DataLoader

In [6]:
from torch.utils.data import DataLoader

train_dataset = ImageDataset(train_df, image_dir="./data/images", transform=train_transform, is_test=False)
valid_dataset = ImageDataset(valid_df, image_dir="./data/images", transform=test_transform, is_test=False)

train_data_loader = DataLoader(train_dataset, batch_size=4, shuffle=True)
valid_data_loader = DataLoader(train_dataset, batch_size=4, shuffle=False)

# Model

In [29]:
# https://github.com/lukemelas/EfficientNet-PyTorch
# 이 competition에서는 EfficientNet이 우수한 성능을 보인다는 의견이 많다: https://www.kaggle.com/c/plant-pathology-2020-fgvc7/discussion/140014
%pip install efficientnet-pytorch

Note: you may need to restart the kernel to use updated packages.


In [7]:
import torch.nn as nn
from efficientnet_pytorch import EfficientNet

# model = EfficientNet.from_pretrained("efficientnet-b7", num_classes=4)

model = EfficientNet.from_pretrained("efficientnet-b7")
model._fc = nn.Sequential(
    nn.Linear(model._fc.in_features, model._fc.out_features), # 2560 -> 1000
    nn.ReLU(),
    nn.Dropout(p=0.5),
    nn.Linear(model._fc.out_features, 4) # 1000 -> 4
)

Loaded pretrained weights for efficientnet-b7


# Train & Evaluate

In [8]:
from tqdm.notebook import tqdm # 진행률 표시 막대 
from sklearn.metrics import roc_auc_score
from transformers import get_cosine_schedule_with_warmup

criterion = nn.CrossEntropyLoss()
# AdamW: lr decay 적용된 optimizer
optimizer = torch.optim.AdamW(model.parameters(), lr=0.00006, weight_decay=0.0001)

epochs = 39

# Scheulder
scheduler = get_cosine_schedule_with_warmup(optimizer,
                                num_warmup_steps=len(train_data_loader)*3, # 지정한 학습률에 도달하기 위한 반복 횟수
                                num_training_steps=len(train_data_loader)*epochs) # 모든 훈련을 마치는데 필요한 반복 횟수

for epoch in range(epochs):
    # train
    model.train() # train mode
    epoch_train_loss = 0
    
    for images, labels in tqdm(train_data_loader):
        optimizer.zero_grad()
        # forward
        outputs = model(images)
        loss = criterion(outputs, labels)
        epoch_train_loss += loss.item()
        # backward
        loss.backward()
        optimizer.step()
        scheduler.step()

    print(f'Epoch [{epoch+1}/{epochs}] - train loss : {epoch_train_loss/len(train_data_loader):.4f}')

    # evaluate
    model.eval()
    epoch_valid_loss = 0
    preds_list = []
    true_onehot_list = []
    
    with torch.no_grad():
        for images, labels in valid_data_loader:
            outputs = model(images)
            loss = criterion(outputs, labels)
            epoch_valid_loss += loss.item()
            
            preds = torch.softmax(outputs, dim=1).numpy() # 예측 확률값
            true_onehot = torch.eye(4)[labels] # 실제값
            preds_list.extend(preds)
            true_onehot_list.extend(true_onehot)

    print(f'Epoch [{epoch+1}/{epochs}] - validation loss : {epoch_valid_loss/len(valid_data_loader):.4f} / 검증 데이터 ROC AUC : {roc_auc_score(true_onehot_list, preds_list):.4f}')  

  0%|          | 0/369 [00:00<?, ?it/s]

KeyboardInterrupt: 

# Submission

In [None]:
from torch.utils.data import DataLoader

test_batch_size = 4
test_dataset = ImageDataset(test_df, image_dir="./data/images", transform=test_transform, is_test=True)
test_data_loader = DataLoader(test_dataset, batch_size=test_batch_size, shuffle=False)

In [None]:
preds = np.zeros((len(test_df), 4)) # 예측값 저장용

model.eval()
with torch.no_grad():
    for i, images in enumerate(test_data_loader):
        outputs = model(images)
        preds_part = torch.softmax(outputs, dim=1).squeeze().numpy()
        preds[i*test_batch_size:(i+1)*test_batch_size] += preds_part

In [None]:
submission_df[["healthy", "multiple_diseases", "rust", "scab"]] = preds
submission_df.to_csv("submission.csv", index=False)

# Label smoothing
- 일반화 성능을 위한 예측값 보정
$$(1 - \alpha) * preds + \frac{\alpha}{K} $$

- $\alpha$: 레이블 스무딩 강도
- $preds$: 예측 확률값
- $K$: 타겟값 개수

In [9]:
def apply_label_smoothing(label_df, target, alpha, threshold):
    target_df = label_df[target].copy()
    k = len(target)

    for i, row in target_df.iterrows():
        # label smoothing 적용
        if (row > threshold).any():
            row = (1-alpha)*row + alpha/k
            target_df.iloc[i] = row # label smoothing 적용 값으로 변경
    return target_df

In [None]:
alpha = 0.001
threshold = 0.999
target = ["healthy", "multiple_diseases", "rust", "scab"]

submission_df_ls = submission_df.copy()
submission_df_ls[target] = apply_label_smoothing(submission_df_ls, target, alpha, threshold)
submission_df_ls.to_csv("submission_ls.csv", index=False)