In [None]:
import os
import zipfile
from google.colab import files, drive

# # Keggle API
# uploaded = files.upload()

# # Move kaggle.json to .kaggle & Authorization
# !mkdir -p ~/.kaggle
# !mv kaggle.json ~/.kaggle/

# !chmod 600 ~/.kaggle/kaggle.json

In [None]:
# Download Outside of Google Drive
# !kaggle datasets download -d dougandrade/dog-emotions-5-classes

In [None]:
drive.mount('/content/drive')
os.chdir('/content/drive/MyDrive/YAI/24Summer_Toy_Project')

In [None]:
# # Unzip
# with zipfile.ZipFile('/content/drive/MyDrive/YAI/24Summer_Toy_Project/preprocessed_images.zip', 'r') as zip_ref:
#     zip_ref.extractall(path='/content/drive/MyDrive/YAI/24Summer_Toy_Project')

In [None]:
import torch
import torchvision
import torchvision.transforms as transforms
from torchvision.models import EfficientNet_V2_S_Weights, efficientnet_v2_s
from torch.utils.data import DataLoader, Dataset
import torch.nn as nn
import torch.optim as optim
import numpy as np
from torch.utils.data.dataset import random_split
from PIL import Image
from torchvision import datasets, models
from copy import deepcopy
import cv2
import glob
import argparse
import time
import json
import tensorflow as tf

In [None]:
try:
    tpu = tf.distribute.cluster_resolver.TPUClusterResolver()  # TPUs를 검색 및 해결
    tf.config.experimental_connect_to_cluster(tpu)
    tf.tpu.experimental.initialize_tpu_system(tpu)
    strategy = tf.distribute.TPUStrategy(tpu)
    print("Running on TPU")
except ValueError:
    print("No TPU found, using default strategy")
    strategy = tf.distribute.get_strategy()  # TPU가 없는 경우 기본 전략 사용 (예: CPU/GPU)

# Import Datasets

In [None]:
# 분할된 데이터셋을 저장할 경로
output_root_dir = '/content/drive/MyDrive/YAI/24Summer_Toy_Project/preprocessed_images'
train_dir = os.path.join(output_root_dir, 'train')
val_dir = os.path.join(output_root_dir, 'val')
test_dir = os.path.join(output_root_dir, 'test')

In [None]:
# 데이터셋 로드 함수
def load_npy_file(file_path):
    return np.load(file_path)
def create_dataset_from_directory(directory, batch_size=32, shuffle=True):
    file_paths = []
    labels = []
    class_names = sorted(os.listdir(directory))

    for label, class_name in enumerate(class_names):
        class_dir = os.path.join(directory, class_name)
        for file_name in os.listdir(class_dir):
            if file_name.endswith('.npy'):
                file_paths.append(os.path.join(class_dir, file_name))
                labels.append(label)

    file_paths = np.array(file_paths)
    labels = np.array(labels)

    dataset = tf.data.Dataset.from_tensor_slices((file_paths, labels))

    def load_and_preprocess_image(file_path, label):
        image = tf.numpy_function(load_npy_file, [file_path], tf.float32)
        image = tf.reshape(image, [300, 300, 3])  # 원본 이미지의 예상 shape을 지정합니다.
        image = tf.image.resize(image, (224, 224))  # 이미지 크기 조정 (224x224)
        image = image / 255.0  # 정규화
        return image, label

    dataset = dataset.map(load_and_preprocess_image, num_parallel_calls=tf.data.experimental.AUTOTUNE)

    if shuffle:
        dataset = dataset.shuffle(buffer_size=len(file_paths))

    dataset = dataset.batch(batch_size)
    dataset = dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)

    return dataset, class_names  # 이 부분에서 dataset과 class_names를 반환합니다.


In [None]:
class NumpyDataset(Dataset):
    def __init__(self, directory, transform=None):
        self.data = []
        self.labels = []
        self.class_names = sorted(os.listdir(directory))  # 클래스 이름
        self.classes = {i: class_name for i, class_name in enumerate(self.class_names)}  # 클래스 인덱스를 이름으로 매핑

        for class_name in self.class_names:
            class_dir = os.path.join(directory, class_name)
            if os.path.isdir(class_dir):
                for file_name in os.listdir(class_dir):
                    file_path = os.path.join(class_dir, file_name)
                    if file_path.endswith('.npy'):
                        img_array = np.load(file_path)
                        label = self.class_names.index(class_name)
                        self.data.append(img_array)
                        self.labels.append(label)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img = self.data[idx]
        label = self.labels[idx]

        # 만약 transform이 설정되어 있으면, transform을 적용
        if self.transform:
            img = self.transform(img)


        return img, label





In [None]:
trainset = NumpyDataset(train_dir)
valset = NumpyDataset(val_dir)
testset = NumpyDataset(test_dir)

# Model Define

In [None]:
net = models.efficientnet_v2_s(weights=EfficientNet_V2_S_Weights.DEFAULT)

# self.classifier = nn.Sequential(
#     nn.Dropout(p=dropout, inplace=True),
#     nn.Linear(lastconv_output_channels, num_classes),
# )
net.classifier[1] = nn.Linear(net.classifier[1].in_features, args.out_dim)
if hasattr(args, 'dropout_rate'):
    net.classifier.add_module("dropout", nn.Dropout(args.dropout_rate)) # dropout 사용할 경우 모델 아키텍처에 추가

# GPU
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
net = net.to(device)

# Training & Validation

In [None]:
def train(net, optimizer, criterion, args):
    trainloader = torch.utils.data.DataLoader(trainset,
                                              batch_size=args.train_batch_size,
                                              shuffle=True, num_workers=2)
    net.train()

    correct = 0
    total = 0
    train_loss = 0.0

    for i, data in enumerate(trainloader, 0):
        optimizer.zero_grad()

        # get the inputs
        inputs, labels = data

        inputs, labels = inputs.to(device), labels.to(device)
        outputs = net(inputs)

        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        train_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    train_loss = train_loss / len(trainloader)
    train_acc = 100 * correct / total
    return net, train_loss, train_acc

In [None]:
def validate(net, criterion, args):
    valloader = torch.utils.data.DataLoader(valset,
                                            batch_size=args.test_batch_size,
                                            shuffle=False, num_workers=2)
    net.eval()

    correct = 0
    total = 0
    val_loss = 0

    with torch.no_grad():
        for data in valloader:
            images, labels = data

            images, labels = images.to(device), labels.to(device)
            outputs = net(images)

            loss = criterion(outputs, labels)

            val_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        val_loss = val_loss / len(valloader)
        val_acc = 100 * correct / total
    return val_loss, val_acc

In [None]:
def test(net, args):
    testloader = torch.utils.data.DataLoader(testset,
                                             batch_size=args.test_batch_size,
                                             shuffle=False, num_workers=2)
    net.eval()

    correct = 0
    total = 0
    with torch.no_grad():
        for data in testloader:
            images, labels = data

            images, labels = images.to(device), labels.to(device)

            outputs = net(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        test_acc = 100 * correct / total
    return test_acc

### Experiment

In [None]:
def experiment(args):

    net = models.efficientnet_v2_s(weights=EfficientNet_V2_S_Weights.DEFAULT)

    # self.classifier = nn.Sequential(
    #     nn.Dropout(p=dropout, inplace=True),
    #     nn.Linear(lastconv_output_channels, num_classes),
    # )
    net.classifier[1] = nn.Linear(net.classifier[1].in_features, args.out_dim)
    if hasattr(args, 'dropout_rate'):
        net.classifier.add_module("dropout", nn.Dropout(args.dropout_rate)) # dropout 사용할 경우 모델 아키텍처에 추가

    # GPU
    device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
    net = net.to(device)

    criterion = nn.CrossEntropyLoss()
    if args.optim == 'SGD':
        optimizer = optim.SGD(net.parameters(), lr=args.lr, weight_decay=args.l2)
    elif args.optim == 'RMSprop':
        optimizer = optim.RMSprop(net.parameters(), lr=args.lr, weight_decay=args.l2)
    elif args.optim == 'Adam':
        optimizer = optim.Adam(net.parameters(), lr=args.lr, weight_decay=args.l2)
    else:
        raise ValueError('In-valid optimizer choice')

    # 스케줄러 정의
    if args.scheduler == 'ReduceLROnPlateau':
        scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, factor=0.5, patience=5)
    elif args.scheduler == 'StepLR':
        scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=30, gamma=0.1)
    else:
        scheduler = None

    train_losses = []
    val_losses = []
    train_accs = []
    val_accs = []

    best_acc = 0.0

    for epoch in range(args.epoch):  # loop over the dataset multiple times
        ts = time.time()
        net, train_loss, train_acc = train(net, optimizer, criterion, args)
        val_loss, val_acc = validate(net, criterion, args)
        te = time.time()

        train_losses.append(train_loss)
        val_losses.append(val_loss)
        train_accs.append(train_acc)
        val_accs.append(val_acc)

        print('Epoch {}, Acc(train/val): {:2.2f}/{:2.2f}, Loss(train/val) {:2.2f}/{:2.2f}. Took {:2.2f} sec'.format(epoch, train_acc, val_acc, train_loss, val_loss, te-ts))

        # 스케줄러 업데이트
        if scheduler is not None:
            if args.scheduler == 'ReduceLROnPlateau':
                scheduler.step(val_loss)
            else:
                scheduler.step()

        if val_acc > best_acc:
            best_acc = val_acc
            torch.save(net.state_dict(), 'your_best_model.pth')
            print(f'Best model saved with accuracy: {best_acc:2.2f}')

    result = {}
    result['train_losses'] = train_losses
    result['val_losses'] = val_losses
    result['train_accs'] = train_accs
    result['val_accs'] = val_accs
    result['train_acc'] = train_acc
    result['val_acc'] = val_acc
    result['best_acc'] = best_acc
    return vars(args), result

In [None]:
import hashlib
import json
from os import listdir
from os.path import isfile, join
import pandas as pd

def save_exp_result(setting, result):
    exp_name = setting['exp_name']
    del setting['epoch']
    del setting['test_batch_size']

    hash_key = hashlib.sha1(str(setting).encode()).hexdigest()[:6]
    filename = './results/{}-{}.json'.format(exp_name, hash_key)
    result.update(setting)
    with open(filename, 'w') as f:
        json.dump(result, f)


def load_exp_result(exp_name):
    dir_path = './results'
    filenames = [f for f in listdir(dir_path) if isfile(join(dir_path, f)) if '.json' in f]
    list_result = []
    for filename in filenames:
        if exp_name in filename:
            with open(join(dir_path, filename), 'r') as infile:
                results = json.load(infile)
                list_result.append(results)
    df = pd.DataFrame(list_result) # .drop(columns=[])
    return df

### Experiemt Parameters

In [None]:
# ====== Random Seed Initialization ====== #
seed = 123
np.random.seed(seed)
torch.manual_seed(seed)

parser = argparse.ArgumentParser()
args = parser.parse_args("")
args.exp_name = "exp1_lr_model_code"

# ====== Model ====== #
args.out_dim = 5
args.act = 'relu'

# ====== Regularization ======= #
args.l2 = 0.00001

# ====== Optimizer & Training ====== #
args.optim = 'RMSprop' #'RMSprop' #SGD, RMSprop, ADAM...
#args.lr = 0.0015
args.lr_decay = 0.95
#args.scheduler = 'ReduceLROnPlateau'
args.epoch = 10

args.dropout_rate = 0.2

args.train_batch_size = 64
args.test_batch_size = 128

In [None]:
# ====== Experiment Variable ====== #
name_var1 = 'lr'
name_var2 = 'scheduler'
list_var1 = [0.1,0.01,0.001]
list_var2 = ['ReduceLROnPlateau', 'StepLR']


setattr(trainset, 'transform', None)
setattr(valset, 'transform', None)
setattr(testset, 'transform', None)

for var1 in list_var1:
    for var2 in list_var2:
        setattr(args, name_var1, var1)
        setattr(args, name_var2, var2)
        print(args)

        setting, result = experiment(deepcopy(args))
        # save_exp_result(setting, result)

In [None]:
# 학습 완료 후 your_best_model.pth 로드
net.load_state_dict(torch.load('your_best_model.pth'))

# 테스트 실행
test_acc = test(net, args)
print(f'Test accuracy: {test_acc:.2f}%')