In [None]:
import os
import time
import random

import timm
import torch
import albumentations as A
import pandas as pd
import numpy as np
import torch.nn as nn
from albumentations.pytorch import ToTensorV2
from torch.optim import Adam
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
from PIL import Image
from tqdm import tqdm
from sklearn.metrics import accuracy_score, f1_score

In [None]:
import cv2
import torch
import torch.nn.functional as F
import numpy as np
import albumentations as A
from augraphy import *
import matplotlib.pyplot as plt
import torchvision.transforms.functional as TF
from PIL import Image
import math
from tqdm import tqdm


In [None]:
SEED = 123
os.environ['PYTHONHASHSEED'] = str(SEED)
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)
torch.backends.cudnn.benchmark = True

In [None]:
# device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# training config
# img_size = 256
LR = 5e-4
EPOCHS = 10
BATCH_SIZE = 32
num_workers = 0
pretrained_size = 380

In [None]:
class ImageDataset(Dataset):
    def __init__(self, csv, path, transform=None):
        self.df = pd.read_csv(csv).values
        self.path = path
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        name, target = self.df[idx]
        img = np.array(Image.open(os.path.join(self.path, name)))
        if self.transform:
            img = self.transform(image=img)['image']
        return img, target

In [None]:
# augmentation을 위한 transform 코드
trn_transform = A.Compose([
    A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ToTensorV2(),
])

# test image 변환을 위한 transform 코드
tst_transform = A.Compose([
    # A.Resize(height=pretrained_size, width=pretrained_size),
    A.LongestMaxSize(max_size=pretrained_size, always_apply=True),
    A.PadIfNeeded(min_height=pretrained_size, min_width=pretrained_size, border_mode=0, value=(255, 255, 255)),
    A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ToTensorV2(),
])

aug_test_transform = A.Compose([    
    A.RandomRotate90(),
    A.Flip(p=0.5),              
                        
    # A.Resize(height=pretrained_size, width=pretrained_size),
    A.LongestMaxSize(max_size=pretrained_size, always_apply=True),
    A.PadIfNeeded(min_height=pretrained_size, min_width=pretrained_size, border_mode=0, value=(255, 255, 255)),
    
    A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ToTensorV2()
])

In [None]:
# Dataset 정의
trn_dataset = ImageDataset(
    "/data/ephemeral/home/data/augmentation/[12_v5_40배][0807]train_aug.csv",
    "/data/ephemeral/home/data/augmentation/[12_v5_40배][0807]train_aug/",
    transform=trn_transform
)
tst_dataset = ImageDataset(
    "/data/ephemeral/home/data/sample_submission.csv",
    "/data/ephemeral/home/data/test/",
    transform=tst_transform
)
aug_test_dataset = ImageDataset(
    "/data/ephemeral/home/data/sample_submission.csv",
    "/data/ephemeral/home/data/test/",
    transform=aug_test_transform
)
print(len(trn_dataset), len(tst_dataset))

In [None]:
# DataLoader 정의
trn_loader = DataLoader(
    trn_dataset,
    batch_size=BATCH_SIZE,
    shuffle=True,
    num_workers=num_workers,
    pin_memory=True,
    drop_last=False
)
tst_loader = DataLoader(
    tst_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=0,
    pin_memory=True
)

aug_test_dataloader = DataLoader(
    aug_test_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=0,
    pin_memory=True
    )

In [None]:
def training(loader, model, optimizer, loss_fn, device):
    model.train()
    train_loss = 0
    preds_list = []
    targets_list = []

    pbar = tqdm(loader)
    for image, targets in pbar:
        image = image.to(device)
        targets = targets.to(device)

        model.zero_grad(set_to_none=True)

        preds = model(image)
        loss = loss_fn(preds, targets)
        loss.backward()
        optimizer.step()

        train_loss += loss.item()
        preds_list.extend(preds.argmax(dim=1).detach().cpu().numpy())
        targets_list.extend(targets.detach().cpu().numpy())

        pbar.set_description(f"Loss: {loss.item():.4f}")

    train_loss /= len(loader)
    train_acc = accuracy_score(targets_list, preds_list)
    train_f1 = f1_score(targets_list, preds_list, average='macro')

    ret = {
        "train_loss": train_loss,
        "train_acc": train_acc,
        "train_f1": train_f1,
    }

    return ret

In [None]:
model_names = ['efficientnet_b2', 'resnext50_32x4d', 'mobilenetv2_100', 'efficientnet_b3', 'efficientnet_b4']

base_path = '/data/ephemeral/home/code/model_softvoting/[v12]aug5증강_모델'
model_paths = [os.path.join(base_path, f"{model_name}.pth") for model_name in model_names]

In [None]:
for model_name, model_path in zip(model_names, model_paths):
    model = timm.create_model(model_name, pretrained=True, num_classes=17).to(device)
    loss_fn = nn.CrossEntropyLoss()
    optimizer = Adam(model.parameters(), lr=LR)

    for epoch in range(EPOCHS):
        ret = training(trn_loader, model, optimizer, loss_fn, device=device)
        ret['epoch'] = epoch + 1

        log = ""
        for k, v in ret.items():
            log += f"{k}: {v:.4f}\n"
        print(log)

    torch.save(model.state_dict(), model_path)

In [None]:
model_names = ['efficientnet_b2', 'resnext50_32x4d', 'mobilenetv2_100', 'mobilenetv3_large_100', 'efficientnet_b4']

base_path = '/data/ephemeral/home/code/model_softvoting/[v11]aug5적용_모델'
model_paths = [os.path.join(base_path, f"{model_name}.pth") for model_name in model_names]

In [None]:
torch.cuda.empty_cache()

for model_name, model_path in zip(model_names, model_paths):
    model = timm.create_model(model_name, pretrained=True, num_classes=17).to(device)
    loss_fn = nn.CrossEntropyLoss()
    optimizer = Adam(model.parameters(), lr=LR)

    for epoch in range(EPOCHS):
        ret = training(trn_loader, model, optimizer, loss_fn, device=device)
        ret['epoch'] = epoch + 1

        log = ""
        for k, v in ret.items():
            log += f"{k}: {v:.4f}\n"
        print(log)

    torch.save(model.state_dict(), model_path)

In [None]:
# 최종 제출 모델 inference
model_names = ['resnext50_32x4d', 'efficientnet_b3', 'resnext50_32x4d', 'efficientnet_b2', 'mobilenetv2_100', 'resnext50_32x4d']

model_paths = [
    '/data/ephemeral/home/code/model_softvoting/[v12]aug5증강_모델/resnext50_32x4d.pth',
    '/data/ephemeral/home/code/model_softvoting/[v12]aug5증강_모델/efficientnet_b3.pth',
    '/data/ephemeral/home/code/model_softvoting/[v11]aug5적용_모델/resnext50_32x4d.pth',
    '/data/ephemeral/home/code/model_softvoting/[v11]aug5적용_모델/efficientnet_b2.pth',
    '/data/ephemeral/home/code/model_softvoting/[v11]aug5적용_모델/mobilenetv2_100.pth',
    '/data/ephemeral/home/code/model_softvoting/resnext50_32x4d_sungmi.pth'
]

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
# TTA 설정
N_TTA = 20

# 각 모델의 예측 확률값을 저장할 리스트
all_model_preds = []

# 모델을 불러와서 예측 수행
for model_name, model_path in zip(model_names, model_paths):
    model = timm.create_model(model_name, pretrained=False, num_classes=17).to(device)
    model.load_state_dict(torch.load(model_path))
    model.eval()
    
    #preds_list = []
    model_preds = []
    with torch.no_grad():
        loaders = [tst_loader] + [aug_test_dataloader] * N_TTA

        for batches in tqdm(zip(*loaders), total=len(tst_loader)):
            images, *aug_images = [images.to(device) for images, _ in batches]

            outputs_original = model(images)
            outputs_augmented = [model(aug_image) for aug_image in aug_images]

            final_outputs = (outputs_original + sum(outputs_augmented)) / (N_TTA + 1)
            #preds_list.extend(final_outputs.argmax(dim=1).cpu().numpy())
            model_preds.append(final_outputs.cpu().numpy())
    
    all_model_preds.append(np.concatenate(model_preds))

    # 예측 결과 저장
    #En_pred_df = pd.DataFrame(tst_dataset.df, columns=['ID', 'target'])
    #En_pred_df['target'] = preds_list

    #sample_submission_df = pd.read_csv('/data/ephemeral/home/data/sample_submission.csv')
    #assert (sample_submission_df['ID'] == En_pred_df['ID']).all()

    #output_path = f'/data/ephemeral/home/code/pred/[0807]test_aug_v11/[0807]TTA10_augv5_v11_testaug_soft_pred_{model_name}.csv'
    #En_pred_df.to_csv(output_path, index=False)

# 소프트 보팅을 통한 최종 예측 생성
softvoting_preds = np.mean(all_model_preds, axis=0)
final_preds = np.argmax(softvoting_preds, axis=1)

# 예측 결과 확인
# print("Ensemble Predictions:", final_preds)

En_pred_df = pd.DataFrame(tst_dataset.df, columns=['ID', 'target'])
En_pred_df['target'] = final_preds

sample_submission_df = pd.read_csv('/data/ephemeral/home/data/sample_submission.csv')
assert (sample_submission_df['ID'] == En_pred_df['ID']).all()

En_pred_df.to_csv('/data/ephemeral/home/code/pred/[0808]test_aug_v12/[0810]vv4_tta20_2model_total_testaug_soft_pred.csv', index=False)

print("Predictions saved for all models and ensemble.")