In [None]:
!pip install einops

In [None]:
from google.colab import drive
drive.mount('content/')

In [None]:
import torch
import torchvision
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot as plt
%matplotlib inline

from torch import optim
from torchvision import datasets
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
import os
import json
from torchvision import utils


from torch import nn
from torch import Tensor
from PIL import Image
from torchvision.transforms import Compose, Resize, ToTensor
from einops import rearrange, reduce, repeat
from einops.layers.torch import Rearrange, Reduce
from torchsummary import summary
import numpy as np
import pandas as pd
import time
import copy
import random
from tqdm.notebook import tqdm
import math

# Device configuration
if torch.cuda.is_available():
    device = torch.device("cuda")
    print("GPU가 사용 가능합니다.")
else:
    device = torch.device("cpu")
    print("GPU를 사용할 수 없습니다.")

In [None]:
trainpath = '/content/content/MyDrive/etc/aihub-meat-image/Training/'
valpath = '/content/content/MyDrive/etc/aihub-meat-image/Validation/'
train_imagepath = os.path.join(trainpath, '[image]cow_seg_')
val_imagepath = os.path.join(valpath,'[image]cow_seg_')


In [None]:
# JSON 파일이 있는 디렉토리 경로
train_labelpath = []
train_labelpath.append(os.path.join(trainpath, '[label]cow_seg_1'))
train_labelpath.append(os.path.join(trainpath, '[label]cow_seg_2'))
train_labelpath.append(os.path.join(trainpath, '[label]cow_seg_3'))

val_labelpath = []
val_labelpath.append(os.path.join(valpath, '[label]cow_seg_1'))
val_labelpath.append(os.path.join(valpath, '[label]cow_seg_2'))
val_labelpath.append(os.path.join(valpath, '[label]cow_seg_3'))

# 라벨 정보를 저장할 딕셔너리
jsons = []

# JSON 디렉토리 내의 모든 파일에 대해 라벨 정보 추출
for path in train_labelpath:
  for filename in os.listdir(path):
    if filename.endswith(".json"):
      json_path = os.path.join(path, filename)

      # JSON 파일 로드
      with open(json_path) as f:
        json_data = json.load(f)

      # 라벨 정보 추출
      jsons.append(json_data)

train_labels = []
for d in jsons:
  label = [d["label_info"]["image"]["file_name"],
           d["label_info"]["shapes"][0]["label"],
           d["label_info"]["shapes"][0]["grade"],
           d["label_info"]["shapes"][0]["gender"],
          ]
  train_labels.append(label)

# 라벨 정보를 저장할 딕셔너리
jsons = []

# JSON 디렉토리 내의 모든 파일에 대해 라벨 정보 추출
for path in val_labelpath:
  for filename in os.listdir(path):
    if filename.endswith(".json"):
      json_path = os.path.join(path, filename)

      # JSON 파일 로드
      with open(json_path) as f:
        json_data = json.load(f)

      # 라벨 정보 추출
      jsons.append(json_data)

val_labels = []
for d in jsons:
  label = [d["label_info"]["image"]["file_name"],
           d["label_info"]["shapes"][0]["label"],
           d["label_info"]["shapes"][0]["grade"],
           d["label_info"]["shapes"][0]["gender"],
          ]
  val_labels.append(label)

In [None]:
train_label_set = pd.DataFrame(data=train_labels, columns=['file_name','label','grade','gender'])
val_label_set = pd.DataFrame(data=val_labels, columns=['file_name','label','grade','gender'])
print(train_label_set)
print(val_label_set)

def grade_encoding(x):
  if x == '1':
    return 0
  elif x == '2':
    return 1
  elif x== '3':
    return 2
  return 0

one_hot_labels = torch.eye(3)[[0,1,2]]

print(one_hot_labels)

train_label_set['grade_encode'] = train_label_set['grade'].apply(grade_encoding)
val_label_set['grade_encode'] = val_label_set['grade'].apply(grade_encoding)

print(train_label_set)
print(val_label_set)

In [None]:

class CustomImageDataset(Dataset):
  def __init__(self, labels, img_dir, transform=None, target_transform=None):
    self.img_dir = img_dir
    self.transform = transform
    self.target_transform = target_transform
    self.img_labels = labels
    #self.train = train
    #self.train_len = int(len(labels) * 0.8)
    #if train == True:
    #  self.img_labels = labels[:self.train_len]
    #else:
    #  self.img_labels = labels[self.train_len:]

  def __len__(self):
    return len(self.img_labels)

  def __getitem__(self, idx):
    img_path = self.img_dir + str(self.img_labels.iloc[idx]['grade'])
    img_path = os.path.join(img_path, self.img_labels.iloc[idx, 0])
    image = Image.open(img_path)
    label = self.img_labels.iloc[idx]['grade_encode']
    if self.transform:
        image = self.transform(image)
    if self.target_transform:
        label = self.target_transform(label)
    return image, label

transformation = transforms.Compose([
    transforms.Resize([224,224]),
    transforms.ToTensor(),
    ])


In [None]:
train_set = CustomImageDataset(train_label_set, train_imagepath, transform=transformation)
val_set = CustomImageDataset(val_label_set, val_imagepath, transform=transformation)

train_dl = DataLoader(train_set, batch_size=16, shuffle=True,num_workers=4)
val_dl = DataLoader(val_set,batch_size=8, shuffle = True, num_workers=4)

In [None]:
weights = torchvision.models.ResNet101_Weights.DEFAULT
model = torchvision.models.resnet101(weights=weights)
model.fc = nn.Linear(in_features=2048, out_features=3)
transformation = weights.transforms()
model.to(device)
print(transformation)

In [None]:
summary(model,(3,224,224))

In [None]:
loss_func = nn.CrossEntropyLoss(reduction='sum')
opt = optim.Adam(model.parameters(), lr=0.01)

from torch.optim.lr_scheduler import ReduceLROnPlateau
lr_scheduler = ReduceLROnPlateau(opt, mode='min', factor=0.1, patience=10)

In [None]:
# get current lr
def get_lr(opt):
    for param_group in opt.param_groups:
        return param_group['lr']

In [None]:
# calculate the metric per mini-batch
def metric_batch(output, target):
    pred = output.argmax(1, keepdim=True)
    corrects = pred.eq(target.view_as(pred)).sum().item()
    return corrects

# calculate the loss per mini-batch
def loss_batch(loss_func, output, target, opt=None):
    loss_b = loss_func(output, target)
    metric_b = metric_batch(output, target)

    if opt is not None:
        opt.zero_grad()
        loss_b.backward()
        opt.step()

    return loss_b.item(), metric_b

# calculate the loss per epochs
def loss_epoch(model, loss_func, dataset_dl, sanity_check=False, opt=None):
    running_loss = 0.0
    running_metric = 0.0
    len_data = len(dataset_dl.dataset)

    for xb, yb in dataset_dl:
        xb = xb.to(device)
        yb = yb.to(device)
        output = model(xb)
        loss_b, metric_b = loss_batch(loss_func, output, yb, opt)

        running_loss += loss_b

        if metric_b is not None:
            running_metric += metric_b

        if sanity_check is True:
            break

    loss = running_loss / len_data
    metric = running_metric / len_data
    return loss, metric

In [None]:
# function to start training
def train_val(model, params):
    num_epochs=params['num_epochs']
    loss_func=params['loss_func']
    opt=params['optimizer']
    train_dl=params['train_dl']
    val_dl=params['val_dl']
    sanity_check=params['sanity_check']
    lr_scheduler=params['lr_scheduler']
    path2weights=params['path2weights']

    loss_history = {'train': [], 'val': []}
    metric_history = {'train': [], 'val': []}

    best_loss = float('inf')
    best_model_wts = copy.deepcopy(model.state_dict())
    start_time = time.time()

    for epoch in range(num_epochs):
        current_lr = get_lr(opt)
        print('Epoch {}/{}, current lr= {}'.format(epoch, num_epochs-1, current_lr))

        model.train()
        train_loss, train_metric = loss_epoch(model, loss_func, train_dl, sanity_check, opt)
        loss_history['train'].append(train_loss)
        metric_history['train'].append(train_metric)

        model.eval()
        with torch.no_grad():
            val_loss, val_metric = loss_epoch(model, loss_func, val_dl, sanity_check)
        loss_history['val'].append(val_loss)
        metric_history['val'].append(val_metric)

        if val_loss < best_loss:
            best_loss = val_loss
            best_model_wts = copy.deepcopy(model.state_dict())
            torch.save(model.state_dict(), path2weights)
            print('Copied best model weights!')

        lr_scheduler.step(val_loss)
        if current_lr != get_lr(opt):
            print('Loading best model weights!')
            model.load_state_dict(best_model_wts)

        print('train loss: %.6f, val loss: %.6f, accuracy: %.2f, time: %.4f min' %(train_loss, val_loss, 100*val_metric, (time.time()-start_time)/60))
        print('-'*10)

    model.load_state_dict(best_model_wts)
    return model, loss_history, metric_history

In [None]:
# define the training parameters
params_train = {
    'num_epochs':20,
    'optimizer':opt,
    'loss_func':loss_func,
    'train_dl':train_dl,
    'val_dl':val_dl,
    'sanity_check':False,
    'lr_scheduler':lr_scheduler,
    'path2weights':'./models/weights.pt',
}

# check the directory to save weights.pt
def createFolder(directory):
    try:
        if not os.path.exists(directory):
            os.makedirs(directory)
    except OSError:
        print('Error')
createFolder('./models')

In [None]:
model, loss_hist, metric_hist = train_val(model, params_train)

Epoch 0/19, current lr= 0.01


In [None]:
num_epochs = params_train['num_epochs']

# Plot train-val loss
plt.title('Train-Val Loss')
plt.plot(range(1, num_epochs+1), loss_hist['train'], label='train')
plt.plot(range(1, num_epochs+1), loss_hist['val'], label='val')
plt.ylabel('Loss')
plt.xlabel('Training Epochs')
plt.legend()
plt.show()

# plot train-val accuracy
plt.title('Train-Val Accuracy')
plt.plot(range(1, num_epochs+1), metric_hist['train'], label='train')
plt.plot(range(1, num_epochs+1), metric_hist['val'], label='val')
plt.ylabel('Accuracy')
plt.xlabel('Training Epochs')
plt.legend()
plt.show()