In [None]:


import torch
from torch import nn, optim
import os
from torchvision import datasets, transforms
import random

# 폴더 경로 설정
folder_path_train = 'train_path'
folder_path_test = 'test_path'
folder_path_val = 'val_path'


# 이미지 데이터 증강을 위한 변환 설정

transform_aug = transforms.Compose([
    transforms.Resize((256, 256)),      # 이미지 크기 조정
    transforms.ColorJitter(brightness=0.3, saturation=0.3),
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),
    transforms.ToTensor()])

transform = transforms.Compose([
    transforms.Resize((256, 256)),      # 이미지 크기 조정
    transforms.ToTensor()])


BATCH_SIZE = 32

# ImageFolder를 사용하여 데이터셋 생성 및 변환 적용
dataset_train_DS = datasets.ImageFolder(folder_path_train, transform=transform_aug)
dataset_train_DL = torch.utils.data.DataLoader(dataset_train_DS, batch_size=BATCH_SIZE, shuffle = True)

dataset_test_DS = datasets.ImageFolder(folder_path_test, transform=transform)
dataset_test_DL = torch.utils.data.DataLoader(dataset_test_DS, batch_size=BATCH_SIZE, shuffle = True)

dataset_val_DS = datasets.ImageFolder(folder_path_val, transform=transform)
dataset_val_DL = torch.utils.data.DataLoader(dataset_val_DS, batch_size=BATCH_SIZE, shuffle = True)


img_tensor, label = dataset_train_DS[0]
#print(dataset_train_DS[0][0])
print(img_tensor.shape)

print(len(dataset_test_DS))
print(dataset_train_DS[0][0].shape)
print(dataset_train_DS[0])


DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print(DEVICE)



In [None]:


class DepSepconv(nn.Module):
  def __init__(self, input_chanel, output_chanel, stride = 1, padding = 0):
    super().__init__()

    self.deptwise = nn.Sequential(nn.Conv2d(input_chanel, input_chanel, kernel_size = 1, stride = stride , padding = padding),
                                  nn.BatchNorm2d(input_chanel), nn.ELU(inplace = True))


    self.pointwise = nn.Sequential(nn.Conv2d(input_chanel, output_chanel, kernel_size = 1, bias = False),
                                  nn.BatchNorm2d(output_chanel), nn.ELU(inplace = True))


  def forward(self, x):
    x = self.deptwise(x)
    x = self.pointwise(x)

    return x


class MobileNet_transform(nn.Module):
  def __init__(self, alpha, num_classes):
    super().__init__()

    self.conv1 = nn.Sequential(nn.Conv2d(3, int(32*alpha), kernel_size = 3, stride = 1, padding = 1, bias = False),
                               nn.BatchNorm2d(int(32*alpha)),
                               nn.ELU(inplace = True))

    self.conv2 = nn.Sequential(DepSepconv(int(32*alpha), int(64*alpha)),
                               DepSepconv(int(64*alpha), int(64*alpha)),
                               nn.Conv2d(int(64*alpha), int(64*alpha), kernel_size = 2, stride = 2, bias = False),
                               nn.BatchNorm2d(int(64*alpha)),
                               nn.ELU(inplace = True))

    self.conv3 = nn.Sequential(DepSepconv(int(64*alpha), int(128*alpha)),
                               DepSepconv(int(128*alpha), int(128*alpha)),
                               nn.Conv2d(int(128*alpha), int(128*alpha), kernel_size = 2, stride = 2, bias = False),
                               nn.BatchNorm2d(int(128*alpha)),
                               nn.ELU(inplace = True))

    self.conv4 = nn.Sequential(DepSepconv(int(128*alpha), int(256*alpha)),
                               DepSepconv(int(256*alpha), int(256*alpha)),
                               nn.Conv2d(int(256*alpha), int(256*alpha), kernel_size = 2, stride = 2, bias = False),
                               nn.BatchNorm2d(int(256*alpha)),
                               nn.ELU(inplace = True))

    self.conv5 = nn.Sequential(DepSepconv(int(256*alpha), int(512*alpha)),
                               DepSepconv(int(512*alpha), int(512*alpha)),
                               nn.Conv2d(int(512*alpha), int(512*alpha), kernel_size = 2, stride = 2, bias = False),
                               nn.BatchNorm2d(int(512*alpha)),
                               nn.ELU(inplace = True))

    self.conv6 = nn.Sequential(DepSepconv(int(512*alpha), int(1024*alpha)),
                               DepSepconv(int(1024*alpha), int(1024*alpha)))


    self.avg_pool = nn.AdaptiveAvgPool2d((1, 1))


    self.fc = nn.Sequential(nn.Linear(int(1024*alpha), int(256*alpha)) , nn.ELU(inplace = True), nn.Dropout(p = 0.25),
                            nn.Linear(int(256*alpha), int(256*alpha)), nn.ELU(inplace = True), nn.Dropout(p = 0.5),
                            nn.Linear(int(256*alpha), int(64*alpha)), nn.ELU(inplace = True), nn.Dropout(p = 0.25),
                            nn.Linear(int(64*alpha), num_classes))

    for m in self.modules():
      if isinstance(m, nn.Conv2d):
        nn.init.kaiming_normal_(m.weight, mode = "fan_in", nonlinearity = "relu")

  def forward(self, x):
    x = self.conv1(x)
    x = self.conv2(x)
    x = self.conv3(x)
    x = self.conv4(x)
    x = self.conv5(x)
    x = self.conv6(x)
    x = self.avg_pool(x)
    x = torch.flatten(x, 1)
    x = self.fc(x)

    return x





In [None]:
import torch, gc
from torch.optim.lr_scheduler import ReduceLROnPlateau
import time
from tqdm import tqdm

from matplotlib import pyplot as plt

gc.collect()
torch.cuda.empty_cache()


epoch = 50
LR = 0.0001
errorfunc = nn.CrossEntropyLoss()

def Train(model, train_DL, val_DL, criterion, optimizer, scheduler):

  train_loss_set = []
  train_acc_set = []

  val_loss_set = []
  val_acc_set = []

  #Not = len(train_DL.dataset)
  for e in range(epoch):

    epoch_start = time.time()

    model.train()
    train_loss, train_acc = loss_epoch(model, train_DL, criterion, optimizer, scheduler)
    train_loss_set = train_loss_set + [train_loss]
    train_acc_set = train_acc_set + [train_acc]

    model.eval()
    with torch.no_grad():
      val_loss, val_acc = loss_epoch(model, val_DL, criterion)
      val_loss_set = val_loss_set + [val_loss]
      val_acc_set = val_acc_set + [val_acc]


    scheduler.step(val_loss)


    print(f"epoch : {e+1}/{epoch}, train loss : {round(train_loss, 5)} , validation loss : {round(val_loss, 5)}, train accuracy : {round(train_acc, 2)}% , validation accuracy : {round(val_acc, 2)}%, time : {round(time.time() - epoch_start)}s")
    print("-"*130)


  loss_zip = [train_loss_set, val_loss_set]
  acc_zip = [train_acc_set, val_acc_set]

  return loss_zip, acc_zip


def loss_epoch(model, DL, criterion, optimizer = None, scheduler = None):

  Not = len(DL.dataset)
  rloss = 0
  rcorrect = 0
  #print(e, rloss)
  for x_batch, y_batch in tqdm(DL, leave = False):
    x_batch = x_batch.to(DEVICE)
    y_batch = y_batch.to(DEVICE)
    y_hat = model(x_batch)
    loss = criterion(y_hat, y_batch)
    if optimizer is not None and scheduler is not None:
      optimizer.zero_grad()
      loss.backward() #역전파
      optimizer.step() #업데이트
    loss_b = loss.item()*x_batch.shape[0]
    rloss = rloss + loss_b
    pred = y_hat.argmax(dim = 1)
    correct_b = torch.sum(pred == y_batch).item()
    rcorrect = rcorrect + correct_b
    #print(rcorrect)
  loss_e = rloss/Not
  acc_e = rcorrect/Not*100

  return loss_e, acc_e


model = MobileNet_transform(0.85, 14).to(DEVICE)
#load_model = MobileNet_transform(0.85, 14).to(DEVICE)
#load_model.load_state_dict(torch.load(save_model_path, map_location = DEVICE))
optimizer = optim.Adam(model.parameters(), lr = LR)
scheduler = ReduceLROnPlateau(optimizer, mode = 'min', factor=0.75, patience = 3)

loss_history, acc_history = Train(model, dataset_train_DL, dataset_val_DL, errorfunc, optimizer, scheduler)


fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 6))

ax1.plot(range(1, epoch+1), loss_history[0], label='train loss')
ax1.plot(range(1, epoch+1), loss_history[1], label='validation loss')
ax1.set_title('loss graph')
ax1.set_xlabel('epoch')
ax1.set_ylabel('loss')
ax1.legend()
ax1.grid(True)

ax2.plot(range(1, epoch+1), acc_history[0], label='train accuracy')
ax2.plot(range(1, epoch+1), acc_history[1], label='validation accuracy')
ax2.set_title('accuracy graph')
ax2.set_xlabel('epoch')
ax2.set_ylabel('accuracy')
ax2.legend()
ax2.grid(True)

plt.show()




In [None]:
from tqdm import tqdm

#print(DEVICE)
def Test(model, test_DL):
  model.eval()
  with torch.no_grad():
    rcorrect = 0
    for x_batch, y_batch in tqdm(test_DL, leave = False):
      x_batch = x_batch.to(DEVICE)
      y_batch = y_batch.to(DEVICE)
      y_hat = model(x_batch)
      #print(y_hat.shape)
      pred = y_hat.argmax(dim=1)
      #print(pred)
      corrects_b = torch.sum(pred == y_batch).item()
      rcorrect += corrects_b
    accuracy = rcorrect/len(dataset_test_DL.dataset)*100
  print(f"Test accuracy : {rcorrect}/{len(dataset_test_DL.dataset)} ({round(accuracy, 2)}%)")
#print(pred == y_batch)
#print(y_batch)


def Test_plot(model, test_DL):
    model.eval()
    with torch.no_grad():
        x_batch, y_batch = next(iter(test_DL))
        x_batch = x_batch.to(DEVICE)
        y_hat = model(x_batch)
        pred = y_hat.argmax(dim=1)

    x_batch = x_batch.to("cpu")

    plt.figure(figsize=(12,4))
    for idx in range(6):
        plt.subplot(2,3, idx+1, xticks=[], yticks=[])
        # 채행열 -> 행열채
        plt.imshow(x_batch[idx].permute(1,2,0).squeeze(), cmap="gray")
        pred_class = test_DL.dataset.classes[pred[idx]]
        true_class = test_DL.dataset.classes[y_batch[idx]]
        plt.title(f"{pred_class} ({true_class})", color="g" if pred_class==true_class else "r")

def count_params(model):
    num = sum([p.numel() for p in model.parameters() if p.requires_grad])
    return num



In [None]:

Test(model, dataset_test_DL)
#print(count_params(load_model))
#Test(model, dataset_test_DL)

Test_plot(model, dataset_test_DL)
#Test_plot(model, dataset_test_DL)

print("parameter : ", count_params(model))

In [None]:
import numpy as np
import seaborn as sns
from sklearn.metrics import confusion_matrix

model.eval()
with torch.no_grad():
  y_true = []
  y_pred = []
  for x_batch, y_batch in tqdm(dataset_test_DL, leave = True):
    x_batch = x_batch.to(DEVICE)
    y_batch = y_batch.to(DEVICE)
    y_hat = model(x_batch)
    pred = y_hat.argmax(dim=1)

    y_hat = y_batch.to('cpu')
    pred = pred.to('cpu')

    y_hat = y_hat.tolist()
    pred = pred.tolist()

    y_true = y_true + y_hat
    y_pred = y_pred + pred

cm = confusion_matrix(y_true, y_pred)

print("\n")
sns.heatmap(cm, annot = True, cmap='Blues')
plt.xlabel('Predicted')
plt.ylabel('True')
plt.show()
