In [1]:
import torch
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.utils.data as data
from torch.utils.data import Dataset, DataLoader,random_split
from torch.optim.lr_scheduler import StepLR,ReduceLROnPlateau
import torchmetrics.functional as metrics
import os
import shutil
from torchvision import transforms
from PIL import Image

  warn(


In [2]:
# 학습 검증용 데이터
folder_path = '../data/train/train/'
target_data = []
img_data = []
for encoding_label,label in enumerate(os.listdir(folder_path)):
    label_path = os.path.join(folder_path, label)
    if os.path.isdir(label_path):  # 디렉토리인 경우에만 진입
        for img in os.listdir(folder_path+'/'+label):
            image_path = os.path.join(folder_path,label,img)
            if os.path.isfile(image_path):  # 파일인 경우에만 진입
                with open(image_path, 'rb') as file:
                    image = Image.open(file)
                    width, height = image.size
                    if width == 48 and height == 48:
                        image_array = np.array(image)
                        target_data.append(encoding_label)
                        img_data.append(image_array)

In [3]:
# 테스트용 데이터
folder_path = '../data/test/test/'
target_test = []
img_test = []
for encoding_label,label in enumerate(os.listdir(folder_path)):
    label_path = os.path.join(folder_path, label)
    if os.path.isdir(label_path):  # 디렉토리인 경우에만 진입
        for img in os.listdir(folder_path+'/'+label):
            image_path = os.path.join(folder_path,label,img)
            if os.path.isfile(image_path):  # 파일인 경우에만 진입
                with open(image_path, 'rb') as file:
                    image = Image.open(file)
                    width, height = image.size
                    if width == 48 and height == 48:
                        image_array = np.array(image)
                        target_test.append(encoding_label)
                        img_test.append(image_array)

In [4]:
# 이미지 데이터 정규화
x_data = np.array(img_data)/255.
x_data = x_data.reshape((-1,48*48))
print(x_data.shape)

(28709, 2304)


In [5]:
x_data_test = np.array(img_test)/255.
x_data_test = x_data_test.reshape((-1,48*48))
print(x_data_test.shape)

(7178, 2304)


In [6]:
target_data = pd.Series(target_data).replace({0:3, 5:4, 2:5, 3:2, 6:0, 4:6, 7:1})
target_data.value_counts()

2    7215
6    4965
4    4830
5    4097
3    3995
0    3171
1     436
Name: count, dtype: int64

In [7]:
target_test = pd.Series(target_test).replace({0:3, 5:4, 2:5, 3:2, 6:0, 4:6, 7:1})
target_test.value_counts()

2    1774
4    1247
6    1233
5    1024
3     958
0     831
1     111
Name: count, dtype: int64

In [8]:
# 데이터 클래스 생성
class DLdataset(Dataset):
    
    def __init__(self,x_data,y_data):
        super().__init__()
        self.feature = torch.FloatTensor(x_data)
        self.target = torch.LongTensor(y_data)
        
    def __len__(self):
        return self.target.shape[0]
    
    def __getitem__(self,idx):
        return self.feature[idx], self.target[idx]

In [9]:
# 데이터셋 생성
dataset = DLdataset(x_data,target_data)
dataset_test = DLdataset(x_data_test, target_test)

In [10]:
# 학습용, 검증용 데이터 준비
seed = torch.Generator().manual_seed(42)
trainDS, validDS = random_split(dataset, [0.8,0.2], generator=seed)


In [11]:
# 모델 클래스 정의
class Model(nn.Module):
    
    def __init__(self, IN, OUT):
        super().__init__()
        self.input = nn.Linear(IN, 128) 
        self.af = nn.ReLU()
        self.hidden = nn.Linear(128, 32)
        self.output = nn.Linear(32, OUT)
        
    def forward(self, x):
        y = self.input(x)
        y = self.af(y)
        y = self.hidden(y)
        y = self.af(y)
        y = self.output(y)
        
        return y

In [12]:
# 학습 준비

DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'

EPOCHS = 2000

IN = dataset.feature.shape[1]
OUT = pd.Series(target_data).nunique()

# 모델 생성
model  = Model(IN,OUT)

# 손실함수
LF = nn.CrossEntropyLoss().to(DEVICE)

# 옵티마이저
OPTIMIZER = torch.optim.Adam(model.parameters())

# 스케줄러
SCHEDULER = ReduceLROnPlateau(OPTIMIZER, mode = 'min', patience = 3)

In [13]:
def training(dataLoader):
    
    model.train()
    train_report=[[], []]
    for (feature, target) in dataLoader:

        feature, target = feature.to(DEVICE), target.to(DEVICE)
        
        # 학습
        pre_target = model(feature)
        
        # 손실계산
        loss = LF(pre_target, target)
        train_report[0].append(loss)
  
        # 성능 평가
        acc = metrics.accuracy(pre_target.argmax(dim=1), target, task = 'multiclass',num_classes=OUT)
        train_report[1].append(acc)
        
        # W,b업데이트
        OPTIMIZER.zero_grad()
        loss.backward()
        OPTIMIZER.step()

    loss_score = sum(train_report[0])/len(train_report[0])
    acc_score = sum(train_report[1])/len(train_report[1])
    print(f'[Train loss] ==> {loss_score}    [Train Accuracy] ==> {acc_score}')
    return loss_score, acc_score

In [14]:
def testing(dataLoader):
    
    model.eval()
    
    with torch.no_grad():
        test_report=[[], []]
        for (feature, target)  in dataLoader:
            # 배치크기만큼의 학습 데이터 준비
            feature, target = feature.to(DEVICE), target.to(DEVICE)
            
            # 학습
            pre_target = model(feature)
            
            # 손실계산
            loss = LF(pre_target, target)
            test_report[0].append(loss)
      
            # 성능 평가
            acc = metrics.accuracy(pre_target.argmax(dim=1), target, task = 'multiclass',num_classes=OUT)
            test_report[1].append(acc)
    
    loss_score = sum(test_report[0])/len(test_report[0])
    acc_score = sum(test_report[1])/len(test_report[1])

    print(f'[Test loss] ==> {loss_score}    [Test Accuracy] ==> {acc_score}')
    return loss_score, acc_score

In [15]:
# 기본 배치사이즈 32

BATCHLIST = [16, 32, 64, 128, 256]

for BATCH in BATCHLIST:
    trainDL = DataLoader(trainDS, batch_size=BATCH)
    validDL = DataLoader(validDS, batch_size=BATCH)
    testDL = DataLoader(dataset_test, batch_size=BATCH)


    min_loss = 100.0  # 초기 최소 손실 설정
    cnt = 0
    for eps in range(EPOCHS):
        print(f'[{eps+1}/{EPOCHS}]')
        # 학습
        train_loss, train_acc = training(trainDL)

        # 검증
        val_loss, val_acc = testing(validDL)
        
        # 최소 손실 업데이트
        if val_loss < min_loss:
            min_loss = val_loss
            cnt = 0
            torch.save(model.state_dict(), "my_trained_model3 "+str(BATCH)+".pth")

        else:
            cnt+=1

        # 조기 종료 기능 => 조건 : val_loss가 지정된 횟수 이상 개선이 안되면 학습 종료
        if SCHEDULER.num_bad_epochs >= SCHEDULER.patience or cnt >= 5:
            print(f"Early stopping at epoch {eps}")
            break

[1/2000]
[Train loss] ==> 1.7844654321670532    [Train Accuracy] ==> 0.2711089849472046
[Test loss] ==> 1.7113263607025146    [Test Accuracy] ==> 0.33454304933547974
[2/2000]
[Train loss] ==> 1.7097376585006714    [Train Accuracy] ==> 0.3212047219276428
[Test loss] ==> 1.6650176048278809    [Test Accuracy] ==> 0.34442630410194397
[3/2000]
[Train loss] ==> 1.6780332326889038    [Train Accuracy] ==> 0.33918002247810364
[Test loss] ==> 1.6474828720092773    [Test Accuracy] ==> 0.34947505593299866
[4/2000]
[Train loss] ==> 1.660597562789917    [Train Accuracy] ==> 0.3451862931251526
[Test loss] ==> 1.634196400642395    [Test Accuracy] ==> 0.3573092818260193
[5/2000]
[Train loss] ==> 1.6464006900787354    [Train Accuracy] ==> 0.3516713082790375
[Test loss] ==> 1.6202938556671143    [Test Accuracy] ==> 0.3653176426887512
[6/2000]
[Train loss] ==> 1.6318405866622925    [Train Accuracy] ==> 0.357372909784317
[Test loss] ==> 1.6150834560394287    [Test Accuracy] ==> 0.36479535698890686
[7/2000]

[Train loss] ==> 1.4432908296585083    [Train Accuracy] ==> 0.4439103901386261
[Test loss] ==> 1.5790684223175049    [Test Accuracy] ==> 0.38800111413002014
[2/2000]
[Train loss] ==> 1.4414982795715332    [Train Accuracy] ==> 0.44437840580940247
[Test loss] ==> 1.5785044431686401    [Test Accuracy] ==> 0.38448596000671387
[3/2000]
[Train loss] ==> 1.4393502473831177    [Train Accuracy] ==> 0.44487279653549194
[Test loss] ==> 1.5792584419250488    [Test Accuracy] ==> 0.38896092772483826
[4/2000]
[Train loss] ==> 1.437695026397705    [Train Accuracy] ==> 0.44444820284843445
[Test loss] ==> 1.578576922416687    [Test Accuracy] ==> 0.3879418969154358
[5/2000]
[Train loss] ==> 1.4357264041900635    [Train Accuracy] ==> 0.4458710551261902
[Test loss] ==> 1.5785578489303589    [Test Accuracy] ==> 0.3876022398471832
[6/2000]
[Train loss] ==> 1.4341462850570679    [Train Accuracy] ==> 0.44697311520576477
[Test loss] ==> 1.5774106979370117    [Test Accuracy] ==> 0.3870927095413208
[7/2000]
[Trai

In [16]:
# min_loss = 100.0  # 초기 최소 손실 설정
# cnt = 0
# for eps in range(EPOCHS):
#     print(f'[{eps+1}/{EPOCHS}]')
#     # 학습
#     train_loss, train_acc = training(trainDL)

#     # 검증
#     val_loss, val_acc = testing(validDL)
    
#     # 최소 손실 업데이트
#     if val_loss < min_loss:
#         min_loss = val_loss
#         cnt = 0
#         torch.save(model.state_dict(), "my_trained_model.pth")

#     else:
#         cnt+=1

#     # 조기 종료 기능 => 조건 : val_loss가 지정된 횟수 이상 개선이 안되면 학습 종료
#     if SCHEDULER.num_bad_epochs >= SCHEDULER.patience or cnt >= 5:
#         print(f"Early stopping at epoch {eps}")
#         break

In [17]:
training(trainDL)

[Train loss] ==> 1.4253636598587036    [Train Accuracy] ==> 0.4508265554904938


(tensor(1.4254, grad_fn=<DivBackward0>), tensor(0.4508))

In [18]:
testing(validDL)

[Test loss] ==> 1.5791363716125488    [Test Accuracy] ==> 0.38788267970085144


(tensor(1.5791), tensor(0.3879))

In [19]:
# predicting(testDL, 5)