# Lab 01. 음성 데이터 -> waveshow / stft / MelSpectrogram 이미지 학습하기
---

# Custom Dataset

In [1]:
import os

from torch.utils.data import Dataset
from PIL import Image
import glob

class CustomDataset(Dataset) :
    def __init__(self, data_dir, transform=None):
        # data_dir = ./data/train/
        self.data_dir = glob.glob(os.path.join(data_dir, "*", "*.png"))
        self.transform = transform
        self.label_dict = {"MelSepctrogram" : 0 , "STFT" : 1, "waveshow" : 2}

    def __getitem__(self, item):
        image_path = self.data_dir[item]
        # image_path >>> .\data\train\waveshow\rock.00006_augmented_noise.png
        image = Image.open(image_path)
        image = image.convert("RGB")
        label_name = image_path.split("\\")[1]
        label = self.label_dict[label_name]

        if self.transform is not None :
            image = self.transform(image)

        return image ,label

    def __len__(self):
        return len(self.data_dir)

## Image Preprocessing

In [2]:
import os
import shutil
import glob
from sklearn.model_selection import train_test_split

class ImageMove :
    def __init__(self, org_folder):
        self.org_folder = org_folder
    def move_images(self):
        file_path_list = glob.glob(os.path.join(self.org_folder, "*", "*" , "*.png"))
        for file_path in file_path_list :
            folder_name = file_path.split("\\")[1]
            if folder_name == "MelSepctrogram" :
                shutil.move(file_path, "./ex_dataset/data/MelSepctrogram")
            elif folder_name == "STFT" :
                shutil.move(file_path, "./ex_dataset/data/STFT")
            elif folder_name == "waveshow" :
                shutil.move(file_path, "./ex_dataset/data/waveshow")

# test = ImageMove("./final_data/")
# test.move_images()

class ImageDataMove :
    def __init__(self, org_dir, train_dir, val_dir):
        self.org_dir = org_dir
        self.train_dir = train_dir
        self.val_dir = val_dir

    def move_images(self):
        # File Path List
        file_path_list01 = glob.glob(os.path.join(self.org_dir, "*", "waveshow", "*.png"))
        file_path_list02 = glob.glob(os.path.join(self.org_dir, "*", "STFT", "*.png"))
        file_path_list03 = glob.glob(os.path.join(self.org_dir, "*", "MelSepctrogram", "*.png"))

        # Data Split
        wa_train_data_list , wa_val_data_list = train_test_split(file_path_list01, test_size=0.2)
        st_train_data_list , st_val_data_list = train_test_split(file_path_list02, test_size=0.2)
        ms_train_data_list , ms_val_data_list = train_test_split(file_path_list03, test_size=0.2)

        # File Move
        self.move_file(wa_train_data_list, os.path.join(self.train_dir, "waveshow"))
        self.move_file(wa_val_data_list, os.path.join(self.val_dir, "waveshow"))
        self.move_file(st_train_data_list, os.path.join(self.train_dir, "STFT"))
        self.move_file(st_val_data_list, os.path.join(self.val_dir, "STFT"))
        self.move_file(ms_train_data_list, os.path.join(self.train_dir, "MelSepctrogram"))
        self.move_file(ms_val_data_list, os.path.join(self.val_dir, "MelSepctrogram"))

    def move_file(self, file_list, mov_dir):
        os.makedirs(mov_dir, exist_ok=True)
        for file_path in file_list:
            shutil.move(file_path, mov_dir)

# org_dir = "ex_dataset"
# train_dir = "./data/train"
# val_dir = "./data/val"
#
# move_temp = ImageDataMove(org_dir, train_dir, val_dir)
# move_temp.move_images

## Train

In [3]:
!pip install lion_pytorch



In [5]:
import torch
import torchvision
import torchvision.transforms as transforms
from torchvision.models import vgg11, VGG11_Weights, resnet50, resnet18
from torch.optim import AdamW
from torch.nn import CrossEntropyLoss
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt
import torch.nn as nn
from Lab01_CustomDataset import CustomDataset
from Lab02_CustomDataset import CustomDataset_ex02
from tqdm import tqdm

def train(model, train_loader, val_loader, epochs, DEVICE, optimizer, criterion) :
    best_val_acc = 0.0
    train_losses = []
    val_losses = []
    train_accs = []
    val_accs = []
    print("Train...")

    for epoch in range(epochs) :
        train_loss = 0.0
        val_loss = 0.0
        val_acc = 0.0
        train_acc = 0.0

        model.train()

        # tqdm
        train_loader_iter = tqdm(train_loader, desc=f"Epoch {epoch +1}/{epochs}", leave=False)

        for i, (data, target) in enumerate(train_loader_iter) :
            data = data.to(DEVICE)
            target = target.to(DEVICE)

            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()

            train_loss += loss.item()

            # Acc
            _, pred = torch.max(output, 1)
            train_acc += (pred == target).sum().item()

            # Print the loss
            if i % 10 == 9 :
                train_loader_iter.set_postfix({"Loss" : loss.item()})

        train_loss /= len(train_loader)
        train_acc = train_acc / len(train_loader.dataset)

        # Eval
        model.eval()
        with torch.no_grad() :
            for data, target in val_loader :
                data = data.to(DEVICE)
                target = target.to(DEVICE)

                outputs = model(data)
                pred = outputs.argmax(dim=1, keepdim=True)
                val_acc += pred.eq(target.view_as(pred)).sum().item()
                val_loss += criterion(outputs, target).item()

        val_loss /= len(val_loader)
        val_acc = val_acc / len(val_loader.dataset)

        train_losses.append(train_loss)
        train_accs.append(train_acc)
        val_losses.append(val_loss)
        val_accs.append(val_acc)

        # save the model with the best val acc
        if val_acc > best_val_acc :
            torch.save(model.state_dict(), '01_ex_best.pt')
            best_val_acc = val_acc

        print(f"Epoch [{epoch + 1}/{epochs}], "
              f"Train Loss : {train_loss :.4f}, "
              f"Train Acc : {train_acc:.4f}, "
              f"Val Loss : {val_loss :.4f},"
              f" Val Acc : {val_acc :.4f}  ")

    return model, train_losses, val_losses, train_accs, val_accs

# Train Loss : 0.4077, Train Acc : 0.8465,
def main() :
    DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    # DEVICE_MPS = torch.device("mps")    # Mac m1/m2

    # model = vgg11(weights=VGG11_Weights.DEFAULT)
    model = resnet18(pretrained=True)
    num_features = model.fc.in_features
    model.fc = nn.Linear(num_features, 10)

    # VGG
    # num_feature = model.classifier[6].in_features
    # model.classifier[6] = nn.Linear(num_feature, 3)
    
    model.to(DEVICE)

    """
    # transforms
    1. aug 
    2. ToTensor 
    3. Normalize
    """
    train_transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.RandomHorizontalFlip(),
        transforms.RandomVerticalFlip(),
        transforms.ToTensor()
    ])

    val_transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor()
    ])

    # Dataset
    train_dataset = CustomDataset_ex02("./data_art/train/", transform=train_transform)
    val_dataset = CustomDataset_ex02("./data_art/val/", transform=val_transform)

    # Dataloader
    train_loader = DataLoader(train_dataset, batch_size=100, num_workers=4,
                            pin_memory=True, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=100, num_workers=4,
                            pin_memory=True, shuffle=False)
    # import time
    # import math
    # test = time.time()
    # math.factorial(100000)
    # test01 = time.time()
    # print(f"{test01 - test :.5f} sec")

    from lion_pytorch import Lion    # 라이언 옵티마이저 사용

    epochs = 20
    criterion = CrossEntropyLoss().to(DEVICE)
    optimizer = AdamW(model.parameters(), lr=0.001, weight_decay=1e-2)

    train(model, train_loader, val_loader, epochs, DEVICE, optimizer, criterion)

if __name__ == "__main__" :
    main()

Train...


Epoch 1/20:   0%|          | 0/14 [00:00<?, ?it/s]