In [None]:
!pip install timm

In [None]:
import warnings
warnings.filterwarnings('ignore')

from glob import glob
import pandas as pd
import numpy as np 
from tqdm import tqdm
import cv2

import os
import timm
import random

import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torchvision.transforms as transforms
from sklearn.metrics import f1_score, accuracy_score
import time


device = torch.device('cuda')

In [None]:
train_y = pd.read_csv("../input/dacon-cv-data/open/train_df.csv")
test_df = pd.read_csv("../input/dacon-cv-data/open/test_df.csv")
train_labels = train_y["label"]

label_unique = sorted(np.unique(train_labels))
label_unique = {key:value for key,value in zip(label_unique, range(len(label_unique)))}

train_labels = [label_unique[k] for k in train_labels]

In [None]:
class config:
    IMAGE_SIZE = 380
    MEAN_NORMAL = [0.485, 0.456, 0.406]
    STD_NORMAL = [0.229, 0.224, 0.225]

    pre_trained_model = 'efficientnet_b3'
    DEVICE = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [None]:
import albumentations as A
from albumentations.pytorch import ToTensorV2

def train_transform():
    return A.Compose([
        A.Resize(config.IMAGE_SIZE, config.IMAGE_SIZE),
        A.HueSaturationValue(p = 0.8),
        A.Normalize(mean= config.MEAN_NORMAL,
                    std = config.STD_NORMAL),
        ToTensorV2(p = 1),
    ])


def test_transform():
    return A.Compose([
        A.Resize(config.IMAGE_SIZE, config.IMAGE_SIZE),
        A.Normalize(mean = config.MEAN_NORMAL,
                    std = config.STD_NORMAL),
        ToTensorV2(p = 1),
    ])

In [None]:
class CustomDataset(Dataset):
    def __init__(self, dataframe, root_dir, labels, transforms = None):
        self.dataframe = dataframe
        self.root_dir = root_dir
        self.transforms = transforms
        self.labels = labels
    
    def __len__(self):
        return len(self.dataframe)
    
    def get_img(self, path):
        img_bgr = cv2.imread(path)
        img_rgb = img_bgr[:, :, ::-1]
        return img_rgb

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()
        img_name = os.path.join(self.root_dir,
                                self.dataframe.iloc[idx, 1])
        image = self.get_img(img_name)
        if self.transforms:
            image = self.transforms(image = image)['image']
        
        labels = self.labels[idx]
            
        return image, labels

In [None]:
class Network(nn.Module):
    def __init__(self):
        super(Network, self).__init__()
        self.model = timm.create_model('efficientnet_b4', pretrained=True, num_classes=88)
        
    def forward(self, x):
        x = self.model(x)
        return x

In [None]:
batch_size = 32
epochs = 50

# Train
train_dataset = CustomDataset(dataframe = train_y, root_dir = '../input/dacon-cv-data/open/train/train', labels = np.array(train_labels), transforms = train_transform())
train_loader = DataLoader(train_dataset, shuffle=True, batch_size=batch_size)

# Test
test_dataset = CustomDataset(dataframe = test_df, root_dir = '../input/dacon-cv-data/open/test/test', labels = np.array(["tmp"]*len(test_df)), transforms = test_transform())
test_loader = DataLoader(test_dataset, shuffle=False, batch_size=batch_size)

In [None]:
def score_function(real, pred):
    score = f1_score(real, pred, average="macro")
    return score

count = 1
for _ in range(5):
    model = Network().to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
    criterion = nn.CrossEntropyLoss()
    scaler = torch.cuda.amp.GradScaler() 
    best=0
    for epoch in range(epochs):    
        start=time.time()
        train_loss = 0
        train_pred=[]
        train_y=[]
        model.train()
        for batch in (train_loader):
            optimizer.zero_grad()
            x = torch.tensor(batch[0], dtype=torch.float32, device=device)
            y = torch.tensor(batch[1], dtype=torch.long, device=device)
            with torch.cuda.amp.autocast():
                pred = model(x)
            loss = criterion(pred, y)


            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()

            train_loss += loss.item()/len(train_loader)
            train_pred += pred.argmax(1).detach().cpu().numpy().tolist()
            train_y += y.detach().cpu().numpy().tolist()


        train_f1 = score_function(train_y, train_pred)

        TIME = time.time() - start
        print(f'epoch : {epoch+1}/{epochs}    time : {TIME:.0f}s/{TIME*(epochs-epoch-1):.0f}s')
        print(f'TRAIN    loss : {train_loss:.5f}    f1 : {train_f1:.5f}')
    torch.save(model.state_dict(),f'torch_model_effic4_state_dict_{count}.pth' )
    count += 1

In [None]:
model = Network().to(device)
model.load_state_dict(torch.load('./torch_model_effic4_state_dict_1.pth'))
f_pred = []

with torch.no_grad():
    for batch in (test_loader):
        x = torch.tensor(batch[0], dtype = torch.float32, device = device)
        with torch.cuda.amp.autocast():
            pred = model(x)
        f_pred.extend(pred.argmax(1).detach().cpu().numpy().tolist())

In [None]:
label_decoder = {val:key for key, val in label_unique.items()}
f_result = [label_decoder[result] for result in f_pred]
submission = pd.read_csv("../input/dacon-cv-data/open/sample_submission.csv")
submission["label"] = f_result
submission.to_csv("submit.csv", index = False)