#  import

In [None]:
import random
import pandas as pd
import numpy as np
import os
import cv2
import matplotlib.pyplot as plt

from sklearn import preprocessing
from sklearn.model_selection import train_test_split

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

import torchvision
import torchvision.models as models
from torch.utils.data import Dataset, DataLoader, sampler

from tqdm.auto import tqdm

import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2

import torchvision.models as models

from sklearn.metrics import f1_score

import warnings
warnings.filterwarnings(action='ignore') 

In [None]:
device = torch.device('mps')
#device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
device

# 하이퍼파라미터 세팅

In [None]:
CFG = {
    # EfficientNet_B3를 사용하기 위해
    'IMG_SIZE':300,
    # 총 110Epoch 수행
    'EPOCHS':30,
    'LEARNING_RATE':0.001,
    # 8이하를 사용하였을 때, Overfit
    'BATCH_SIZE':16,
    'SEED':42
}

In [None]:
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True
seed_everything(CFG['SEED']) # Seed 고정

# 데이터 전처리

In [None]:
df = pd.read_csv('train.csv')
df.loc[(df['id'] == 3896) & (df['artist'] == 'Titian'), ['img_path', 'id', 'artist']] = ['./train/3986.jpg', 3986, 'Alfred Sisley']
df.loc[(df['id'] == 3896) & (df['artist'] == 'Edgar Degas'), 'artist'] = 'Titian'
df.to_csv('new_train_data.csv', index=False)
df = pd.read_csv('new_train_data.csv')
df.head()

In [None]:
encoder = preprocessing.LabelEncoder()
df['artist'] = encoder.fit_transform(df['artist'].values)
df.head(3)

In [None]:
train_df, val_df, _, _ = train_test_split(df, df['artist'], test_size=0.2,
                                          random_state=CFG['SEED'])

In [None]:
train_df = train_df.sort_values(by=['id'])
val_df = val_df.sort_values(by=['id'])

display(train_df.head(3))
display(val_df.head(3))

In [None]:
# DataLoader

In [None]:
def get_data(df, infer=False):
    if infer:
        return df['img_path'].values
    return df['img_path'].values, df['artist'].values

In [None]:
train_img_paths, train_labels = get_data(train_df) # 4728개
val_img_paths, val_labels = get_data(val_df) # 1183개

# 나만의 데이터셋

In [None]:
train_img_paths

In [None]:
class CustomDataset(Dataset):
    def __init__(self, img_paths, labels, transforms=None):
        self.img_paths = img_paths
        self.labels = labels
        self.transforms = transforms
    
    def __len__(self):
        return len(self.img_paths)
    
    def __getitem__(self, index):
        img_path = self.img_paths[index]
        #img_path = img_path.replace('./t', '/t')
        #img_path = ('/kaggle/input/artist-data' + img_path)
        image = cv2.imread(img_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        
        if self.transforms is not None:
            image = self.transforms(image=image)['image']
        
        if self.labels is not None:
            label = self.labels[index]
            return image, label
        else:
            return image

In [None]:
# Train 데이터를 위한 증강 및 전처리
train_transform = A.Compose([
    # test로 주어지는 데이터가 원본사이즈만큼의 1/4 RandomCrop인 점을 감안해 최대한 재현
    A.Resize(CFG['IMG_SIZE']*2, CFG['IMG_SIZE']*2), # 300*300
    A.RandomCrop(p=1, height=CFG['IMG_SIZE'], width=CFG['IMG_SIZE']),
    A.CoarseDropout(max_holes=4, max_height=64, max_width=64, p=0.5),
    A.OneOf([
         A.MotionBlur(p=1),
         A.OpticalDistortion(p=1),
         A.GaussNoise(p=1),
     ], p=0.3),
    A.VerticalFlip(p=0.5),
    A.HorizontalFlip(p=0.5),
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225),
                max_pixel_value=255.0, always_apply=False, p=1.0),
    ToTensorV2()
])

In [None]:
# Valid 데이터를 위한 증강 및 전처리
valid_transform = A.Compose([
    A.Resize(CFG['IMG_SIZE']*2,CFG['IMG_SIZE']*2), # test로 주어지는 데이터 (원본에서 1/4확대)
    A.RandomCrop(p=1, height=CFG['IMG_SIZE'], width=CFG['IMG_SIZE']),
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225),
                max_pixel_value=255.0, always_apply=False, p=1.0),
    ToTensorV2()
])

# Test 데이터를 위한 증강 및 전처리
test_transform = A.Compose([
    A.Resize(CFG['IMG_SIZE'],CFG['IMG_SIZE']),
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225),
                max_pixel_value=255.0, always_apply=False, p=1.0),
    ToTensorV2()
])

In [None]:
def make_weights(labels, nclasses):
    labels = np.array(labels)
    weight_arr = np.zeros_like(labels)

    _, counts = np.unique(labels, return_counts=True)
    for cls in range(nclasses):
        weight_arr = np.where(labels == cls, 1/counts[cls], weight_arr)
        # 각 클래스의의 인덱스를 산출하여 해당 클래스 개수의 역수를 확률로 할당한다.
        # 이를 통해 각 클래스의 전체 가중치를 동일하게 한다.

    return weight_arr

weights = make_weights(train_labels, len(np.unique(train_labels)))
weights = torch.DoubleTensor(weights)

In [None]:
len(weights)

In [None]:
train_dataset = CustomDataset(train_img_paths, train_labels, train_transform)
train_loader = DataLoader(train_dataset, batch_size = CFG['BATCH_SIZE'], num_workers=0,
                           sampler=sampler.WeightedRandomSampler(weights, len(weights)))

val_dataset = CustomDataset(val_img_paths, val_labels, valid_transform)
val_loader = DataLoader(val_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False, num_workers=0)

In [None]:
test_df = pd.read_csv('./test.csv')
test_df.head(3)

test_img_paths = get_data(test_df, infer=True)

test_dataset = CustomDataset(test_img_paths, None, test_transform)

# 시각화

**원본데이터 시각화**

In [None]:
rand_list = np.random.randint(0, len(train_img_paths)-1, 4)

figure, axes = plt.subplots(1, 4, figsize=(20, 15))
for i, idx in enumerate(rand_list):
    img = cv2.imread(train_img_paths[idx])
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    axes[i].imshow(img)

**훈련용 데이터 시각화**

In [None]:
rand_list = np.random.randint(0, len(train_dataset)-1, 4)

figure, axes = plt.subplots(1, 4, figsize=(20, 15))
for i, idx in enumerate(rand_list):
    img = train_dataset[idx][0].permute(1, 2, 0)
    axes[i].imshow(img)

**검증용 데이터 시각화**

In [None]:
rand_list = np.random.randint(0, len(val_dataset)-1, 4)

figure, axes = plt.subplots(1, 4, figsize=(20, 15))
for i, idx in enumerate(rand_list):
    img = val_dataset[idx][0].permute(1, 2, 0)
    axes[i].imshow(img)

**테스트용 데이터 시각화**

In [None]:
rand_list = np.random.randint(0, len(test_img_paths)-1, 4)

figure, axes = plt.subplots(1, 4, figsize=(20, 15))
for i, idx in enumerate(rand_list):
    img = cv2.imread(test_img_paths[idx])
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    axes[i].imshow(img)

# 모델 정의

In [None]:
class BaseModel(nn.Module):
    def __init__(self, num_classes=50):
        super(BaseModel, self).__init__()
        self.backbone = models.efficientnet_b3(pretrained=True)
        self.classifier = nn.Sequential(
            nn.Dropout(p=0.3),
            nn.Linear(in_features=1000, out_features=num_classes),
        )

    def forward(self, x):
        x = self.backbone(x)
        x = self.classifier(x)
        return x

In [None]:
model_Eff = BaseModel(num_classes=50)

# 훈련을 시켜봅시다!

In [None]:
def competition_metric(true, pred):
    return f1_score(true, pred, average="macro")

In [None]:
def validation(model, criterion, test_loader, device):
    model.eval()
    
    model_preds = []
    true_labels = []
    
    val_loss = []
    
    with torch.no_grad():
        for img, label in tqdm(iter(test_loader)):
            img, label = img.float().to(device), label.to(device)
            
            model_pred = model(img)
            
            loss = criterion(model_pred, label)
            
            val_loss.append(loss.item())
            
            model_preds += model_pred.argmax(1).detach().cpu().numpy().tolist()
            true_labels += label.detach().cpu().numpy().tolist()
        
    val_f1 = competition_metric(true_labels, model_preds)
    return np.mean(val_loss), val_f1

In [None]:
def train(model, optimizer, train_loader, test_loader, scheduler, device):
    model.to(device)
    
    criterion = nn.CrossEntropyLoss().to(device)
    
    best_score = 0
    best_model = None
    
    for epoch in range(1, CFG['EPOCHS'] + 1):
        model.train()
        train_loss = []
        
        for img, label in tqdm(iter(train_loader)):
            img = img.float().to(device)
            label = label.to(device)
            
            optimizer.zero_grad()
            
            model_pred = model(img)
            
            loss = criterion(model_pred, label)
            
            loss.backward()
            optimizer.step()
            
            train_loss.append(loss.item())
        
        tr_loss = np.mean(train_loss)
        
        val_loss, val_score = validation(model, criterion, test_loader, device)
        
        print(f'Epoch [{epoch}], Train Loss : [{tr_loss:.5f}] Val Loss : [{val_loss:.5f}] Val F1 Score : [{val_score:.5f}]')
        
        if scheduler is not None:
            scheduler.step(metrics=val_score)
        
        if best_score < val_score:
            best_model = model
            best_score = val_score
    return best_model

In [None]:
model_Eff.eval()

In [None]:
optimizer_Eff = torch.optim.Adam(params=model_Eff.parameters(), lr=1.0000e-08)

In [None]:
scheduler_Eff = optim.lr_scheduler.ReduceLROnPlateau(optimizer=optimizer_Eff,
                                                 mode='max', factor=0.1,
                                                 patience=3, verbose=True)

In [None]:
infer_model = train(model_Eff, optimizer_Eff, train_loader, val_loader,
                    scheduler_Eff, device=device)

# 예측해서 제출해보기

In [None]:
test_df = pd.read_csv('/kaggle/input/artist-data/test.csv')
test_df.head(3)

test_img_paths = get_data(test_df, infer=True)

test_dataset = CustomDataset(test_img_paths, None, test_transform)
test_loader = DataLoader(test_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False, num_workers=0)

In [None]:
def inference(model, test_loader, device):
    model.to(device)
    model.eval()
    
    model_preds = []
    
    with torch.no_grad():
        for img in tqdm(iter(test_loader)):
            img = img.float().to(device)
            
            model_pred = model(img)
            model_preds += model_pred.argmax(1).detach().cpu().numpy().tolist()
    
    print('Done.')
    return model_preds

In [None]:
preds = inference(infer_model, test_loader, device)

In [None]:
preds = encoder.inverse_transform(preds) # LabelEncoder로 변환 된 Label을 다시 화가이름으로 변환

In [None]:
submit = pd.read_csv('/kaggle/input/artist-data/sample_submission.csv')
submit.head()

In [None]:
submit['artist'] = preds
submit.head()

In [None]:
submit.to_csv('/kaggle/working/EFF_B3_300_110Epoch.csv', index=False)

# 모델 저장

In [None]:
torch.save(infer_model.state_dict(), 'EFF_B3_300_110Epoch.pth')