In [None]:
import torch
from torch import nn, optim
from torchvision import datasets, models, transforms

import matplotlib.pyplot as plt
import numpy as np
import time
import os
import sys
import torch.optim.lr_scheduler as lr_scheduler
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
#DEVICE = torch.device("cpu") #cpu 전환
DEVICE

In [None]:
# gpu 캐시 지우기
import gc
gc.collect()
torch.cuda.empty_cache()

## 데이터 전처리 및 로딩

In [None]:
train_transform = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.RandomHorizontalFlip(), #이미지 반전
    transforms.RandomAffine(degrees=90, shear=10),#이미지 0~90도 회전, 0~10% 늘이기
    transforms.ColorJitter(brightness=0.4, contrast=0.4, saturation=0.4, hue=0.2), #밝기, 명도, 채도, 색조 랜덤 변경
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) #이미지데이터 Nomalization
])

val_transform = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

test_transform = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])


In [None]:
train_DS = datasets.ImageFolder("./fish/20230504/20230504_data_set-1/train", train_transform)
val_DS = datasets.ImageFolder("./fish/20230504/20230504_data_set-1/val", val_transform)
test_DS = datasets.ImageFolder("./fish/20230504/20230504_data_set-1/test", test_transform)

train_DL = torch.utils.data.DataLoader(train_DS, batch_size=64, shuffle=True, num_workers=4)
val_DL = torch.utils.data.DataLoader(val_DS, batch_size=1, shuffle=True, num_workers=4)
test_DL = torch.utils.data.DataLoader(test_DS, batch_size=1, shuffle=False)


print(train_DS.class_to_idx)
print('학습 데이터 개수/Batch:', len(train_DL), ', ','학습 데이터 전체 개수:', len(train_DS) )
print('검증 데이터 개수/Batch:', len(val_DL), ', ','검증 데이터 전체 개수:', len(val_DS))
print('테스트 데이터 개수/Batch:', len(test_DL), ', ','테스트 데이터 전체 개수:', len(test_DS))

## 학습 모델-1 (class화해서 customaizng가능)

In [None]:
#MLP
class MLP(nn.Module):
    def __init__(self):
        super().__init__()
        self.fcs = nn.Sequential(nn.Linear(3*224*224, 1024),
                                 nn.Sigmoid(),
#                                  nn.ReLU(),
                                 nn.Linear(1024, 1024),
                                 nn.Sigmoid(),
#                                  nn.ReLU(),
                                 nn.Linear(1024, 1024),
                                 nn.Sigmoid(),
#                                  nn.ReLU(),
                                 nn.Dropout(p=0.5),
                                 nn.Linear(1024, 11))
#                                  *[i for _ in range(10) for i in [nn.Linear(1024, 1024),nn.ReLU()]],
    def forward(self,x):
        x = torch.flatten(x, start_dim=1)
        x = self.fcs(x)
        return x
    
    
   #CNN 
    class CNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Sequential(nn.Conv2d(3,32,3, padding=1),
                                   nn.ReLU())
        self.Maxpool1 = nn.MaxPool2d(2)
        self.conv2 = nn.Sequential(nn.Conv2d(32,64,3, padding=1),
                                   nn.ReLU())
        self.Maxpool2 = nn.MaxPool2d(2)
        self.conv3 = nn.Sequential(nn.Conv2d(64,128,3, padding=1),
                                  nn.ReLU())
        self.Maxpool3 = nn.MaxPool2d(2)
        self.fc = nn.Sequential(nn.Linear(128*28*28, 256),
                                 nn.ReLU(),
#                                  nn.Dropout(p=0.5),
                                 nn.Linear(256, 11))

    def forward(self, x):
        x = self.conv1(x)
        x = self.Maxpool1(x)
        x = self.conv2(x)
        x = self.Maxpool2(x)
        x = self.conv3(x)
        x = self.Maxpool3(x)
        x = torch.flatten(x, start_dim=1)
        x = self.fc(x)
        return x

In [None]:
# model = MLP().to(DEVICE)
model = CNN().to(DEVICE)

print(model)

### 학습 모델 -2 (class 설정안하고 기존에 있는 형태 바로 불러옴)

In [None]:
#ResNet

model = models.resnet101(pretrained=True)
num_features = model.fc.in_features
# 전이 학습(transfer learning): 모델의 출력 뉴런 수를 11개로 교체하여 마지막 레이어 다시 학습
# model.fc = nn.Sequential(
#     nn.Linear(num_features, 512),
#     nn.ReLU(inplace=True),
#     nn.Dropout(0.5),
#     nn.Linear(512, 11)
# )
model.fc = nn.Linear(num_features, 11)
model = model.to(DEVICE)

#VGGNet
model = models.vgg16(pretrained=True)
num_features = model.classifier[-1].in_features
# 전이 학습(transfer learning): 모델의 출력 뉴런 수를 11개로 교체하여 마지막 레이어 다시 학습
model.classifier[-1] = nn.Linear(num_features, 11)
model = model.to(DEVICE)

#MobileNet
model = models.mobilenet_v2(pretrained=True)

# 마지막 레이어 변경
num_features = model.classifier[1].in_features
model.classifier[1] = nn.Linear(num_features, 11)

model = model.to(DEVICE)

## 학습하는 함수 및 시작 코드

In [None]:
def train_with_val(model, train_DL, val_DL, creiterion, optimizer, EPOCH):
    
    train_loss_history, val_loss_history = [], []
    train_acc_history, val_acc_history= [], []

    for ep in range(EPOCH):
        scheduler.step()
        start_time = time.time()
        model.train()
        rcorr = 0 #running correct
        rloss = 0 #running loss
        for x_batch, y_batch in train_DL:
            x_batch = x_batch.to(DEVICE)
            y_batch = y_batch.to(DEVICE)
            #inference
            y_hat = model(x_batch)
            #loss
            loss = criterion(y_hat, y_batch)
            #update
            optimizer.zero_grad() # gradient 누적을 막기 위한 초기화
            loss.backward() # backpropagation
            optimizer.step() # weight update
            
            #loss,acc 계산
            loss_mini_batch = loss.item() * x_batch.shape[0] #batch loss # BATCH_SIZE를 곱하면 마지막 18개도 32개를 곱하니까..
            rloss += loss_mini_batch #running loss
            
            _, predicted = torch.max(y_hat,1)
            rcorr += torch.sum(predicted == y_batch).item() #얘는 그냥 각 데이터값 맞은 개수 다 더한거임
        #print loss
        train_rloss_e = rloss/len(train_DL.dataset) #epoch loss
        train_racc_e = (rcorr/len(train_DL.dataset)) *100 #epoch acc
        train_loss_history += [train_rloss_e] #append 개념
        train_acc_history += [train_racc_e]
        
        
        model.eval() #여기서 학습할때만 필요한 dropout이나 batchnorm등의 기능을 비활성화시킴
        rcorr = 0 #running correct
        rloss = 0 #running loss 
        for x_batch, y_batch in val_DL:
            x_batch = x_batch.to(DEVICE)
            y_batch = y_batch.to(DEVICE)
            
            with torch.no_grad(): #gradient 계산 비활성화(no backpropagation??)
                #inference
                y_hat = model(x_batch)
                #loss
                loss= criterion(y_hat, y_batch)
                
                loss_mini_batch = loss.item() * x_batch.shape[0]
                rloss += loss_mini_batch
                _, predicted = torch.max(y_hat, 1)
                rcorr += torch.sum(predicted == y_batch).item()
                
                val_rloss_e = rloss/len(val_DL.dataset)
                val_racc_e = (rcorr/len(val_DL.dataset))*100
                
        val_loss_history += [val_rloss_e]
        val_acc_history += [val_racc_e]
        
        
        print('# ep {} Train  /// Loss: {:.4f} /// Acc: {:.4f}%'.format(ep+1, train_rloss_e, train_racc_e))
        print('# ep {} Validation /// Loss: {:.4f} /// Acc: {:.4f}%'.format(ep+1, val_rloss_e, val_racc_e))
        print("time :", time.time() - start_time)
        print("-"*20)
    
    return train_loss_history, train_acc_history, val_loss_history, val_acc_history

In [None]:
#시작 코드

LR = 1e-2 
EPOCH = 20 
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=LR, momentum=0.9) 
# optimizer = optim.Adam(model.parameters(), lr=LR) 

train_loss_history, train_acc_history, val_loss_history, val_acc_history= train_with_val(model, train_DL, val_DL, criterion, optimizer, EPOCH)

## 모델 저장 및 불러오기

In [1]:
#모델 저장하기
save_model_path = "./fish/model/123.pt"
torch.save(model,save_model_path)

#모델 acurracy, Loss dataframe으로 저장하기, (학습 직후에만 가능함)
import pandas as pd
import numpy as np
df = pd.DataFrame({'train_loss_history': train_loss_history,
                   'train_acc_history': train_acc_history,
                   'val_loss_history': val_loss_history,
                   'val_acc_history': val_acc_history})
df.to_csv('./fish/model/history_123.csv', index=False)


#모델 불러오기
model=torch.load("./fish/model/1004.pt", map_location=DEVICE)

NameError: name 'torch' is not defined

## 모델 평가하기

In [None]:
#epoch별 acurracy, loss 구하기
import matplotlib.pyplot as plt

fig, ax2 = plt.subplots()
ax1 = ax2.twinx()

ax1.plot(range(1, len(train_acc_history)+1), train_acc_history, label='Train Acc', color='skyblue')
ax1.plot(range(1, len(train_acc_history)+1),val_acc_history, label='Val Acc', color='orange')
ax1.set_ylabel('Accuracy (%)')
ax1.set_ylim([0, 100])

ax2.plot(range(1, len(train_acc_history)+1), train_loss_history, linestyle='-',label='Train Loss', color='skyblue')
ax2.plot(range(1, len(train_acc_history)+1), val_loss_history, linestyle='-', label='Val Loss', color='orange')
ax2.set_ylabel('Loss')
ax2.set_xlabel('Epoch')
ax2.set_ylim([0, 10])
ax2.set_xlim([1, 10])
lines1, labels1 = ax1.get_legend_handles_labels()
lines2, labels2 = ax2.get_legend_handles_labels()
lines = lines1 + lines2
labels =   labels2
fig.legend(lines, labels, loc='upper right')
plt.xticks(np.arange(0, 21, 1))


plt.show()

In [None]:
#confusion matrix 및 사이킷런 classification_report 관련 함수
#테스트 데이터 넣어서 평가
def get_conf(model, test_DL):
    N = len(test_DL.dataset.classes)
    model.eval()
    with torch.no_grad():
        y_pred = []
        y_true = []
        confusion = torch.zeros(N,N)
        for x_batch, y_batch in test_DL:
            x_batch = x_batch.to(DEVICE)
            y_batch = y_batch.to(DEVICE)
            # inference
            y_hat = model(x_batch)
            # accuracy
            pred = y_hat.argmax(dim=1)
            y_pred.extend(pred.cpu().numpy())
            y_true.extend(y_batch.cpu().numpy())
            for i in range(y_batch.shape[0]):
                confusion[pred[i], y_batch[i]] += 1
            # confusion[pred, y_batch] += 1
        
    confusion = confusion.numpy()

    return confusion, y_pred, y_true #confusion은 confusion matrix에 사용, y_pred, y_true는 classes_report에 사용

#confusion matrix 그리기 함수
def plot_confusion_matrix(confusion, classes=None):
    N = confusion.shape[0]
    accuracy=np.trace(confusion)/np.sum(confusion) * 100
    
    confusion = confusion/np.sum(confusion, axis=0)
    plt.figure(figsize=(15,10))
    plt.imshow(confusion, cmap="Blues")
    plt.title("")
    plt.colorbar()

    for i in range(N):
        for j in range(N):
            plt.text(j,i, round(confusion[i,j],2), 
                     horizontalalignment="center", 
                     color="white" if confusion[i,j] > np.max(confusion) / 1.5 else "black")

    if classes is not None:
        plt.xticks(range(N), classes)
        plt.yticks(range(N), classes)
    else:
        plt.xticks(range(N))
        plt.yticks(range(N))

    plt.ylabel("Predicted label", fontsize=20)
    plt.xlabel(f"True label", fontsize=20)

In [None]:
#confusion matrix 실행
confusion, y_pred, y_true = get_conf(load_model, test_DL)
plot_confusion_matrix(confusion)

In [None]:
#사이킷런 classification_report 실행
from sklearn.metrics import classification_report
print(classification_report(y_true, y_pred, target_names=test_DS.classes))