In [None]:
import os 
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from PIL import Image
from sklearn.preprocessing import LabelEncoder
import torch
import torch.nn.functional as F
import torchvision
from torchvision import transforms
from torch.utils.data.dataset import Dataset
from torch.utils.data import DataLoader
from sklearn.model_selection import train_test_split

In [None]:
!mkdir train_set
!unzip -O cp949 /content/image.zip -d /content/train_set

In [None]:
dataframe = pd.read_csv('/content/data.csv')
print('Training set: {}'.format(dataframe.shape[0]))

In [None]:
train_dir = './train_set/new_image'
dataframe.image = dataframe.image.apply(lambda x: x+'.jpg')
dataframe.image = dataframe.image.apply(lambda x:train_dir+'/'+x)

In [None]:
from sklearn.preprocessing import LabelEncoder
encoder = LabelEncoder()
encoder.fit(dataframe['state'])
dataframe['state']=encoder.transform(dataframe['state'])

In [None]:
class setting(Dataset):
    def __init__(self,dataframe,answer,transform):
        self.dataframe = dataframe
        self.transform = transform
        self.answer = answer
        
    def __getitem__(self,index):
        x = Image.open(self.dataframe.iloc[index,0]).convert('RGB')
        if self.transform:
            x = self.transform(x)
        y = self.answer.iloc[index,0]
        return x,y
        
    def __len__(self):
        return self.dataframe.shape[0]

In [None]:
transformer = transforms.Compose([transforms.Resize(256),
                                  transforms.CenterCrop(224),
                                  transforms.ToTensor(),
                                  transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])])

In [None]:
def print_result(train_loss,val_loss,val_acc):
    print('train_loss: {:.4f}, val_loss: {:.4f}, val_acc: {:.4f}'.format(train_loss, val_loss, val_acc))

In [None]:
def training(model, loss_function, optimizer,scheduler, num_epochs):

    best_valid_loss = 1000
    best_valid_acc = 0
    patience = 0

    for epoch in range(num_epochs):
        print('-'*20)
        print('best_training epoch {}/{}'.format(epoch+1, num_epochs))
        print('-'*20)
        
        # Training
        train_losses = []
        model.train()
        for x,y in train_loader:
            optimizer.zero_grad()
            x,y = x.to(device),y.to(device)
            y_hat = model(x)
            loss = loss_function(y_hat,y)
            train_losses.append(loss.item())
            loss.backward()
            optimizer.step()
            

            
        # Validation
        val_losses = []
        model.eval()
        with torch.no_grad():
          correct = 0
          total = 0
          for x,y in val_loader:
            x,y = x.to(device),y.to(device)
            y_hat = model(x)
            loss = loss_function(y_hat,y)
            val_losses.append(loss.item())
            _, predicted = torch.max(y_hat.data, 1)
            total += len(y)
            correct += (predicted == y).sum().item()

        # Save best_model
        if best_valid_loss > np.mean(val_losses):
          torch.save(model.state_dict(), './best_model.pth')
          best_valid_loss = np.mean(val_losses)
          best_valid_acc = correct/total
          patience = 0

        scheduler.step(metrics=np.mean(val_losses)) # adjust learning rate (decay)

        patience += 1
        # print the result of epoch
        print_result(np.mean(train_losses),np.mean(val_losses),correct/total)
        if patience > 8:
          print('Early stop.')
          return best_valid_acc
        
    print('Finish Training.')
    return best_valid_acc

In [None]:
device = torch.device('cuda:0' if torch.cuda.is_available else 'cpu')

In [None]:
training_samples = dataframe.shape[0] 
test_size=0.05
batch_size = 64

sample_dataframe = dataframe.sample(training_samples)
x_data = pd.DataFrame(sample_dataframe.image)
y_data = pd.DataFrame(sample_dataframe.state)
x_train,x_val,y_train,y_val = train_test_split(x_data,y_data,test_size=test_size, stratify=y_data)

train_set = setting(x_train, y_train, transform=transformer)
val_set = setting(x_val, y_val, transform=transformer)
train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_set , batch_size=batch_size, shuffle=True)

In [None]:
class net(torch.nn.Module):
    def __init__(self, base_model, base_out_features, num_classes):
        super(net,self).__init__()
        self.base_model=base_model
        self.linear1 = torch.nn.Linear(base_out_features, 512)
        self.output = torch.nn.Linear(512,num_classes)
    def forward(self,x):
        x = F.relu(self.base_model(x))
        x = F.relu(self.linear1(x))
        x = self.output(x)
        return x

res = torchvision.models.resnet50(pretrained=True)
for param in res.parameters():
    param.requires_grad=False

model = net(base_model=res, base_out_features=res.fc.out_features, num_classes=3)
model= model.to(device)

In [None]:
CE_loss = torch.nn.CrossEntropyLoss()

optimizer = torch.optim.Adam([param for param in model.parameters() if param.requires_grad], lr=0.0003)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer=optimizer,patience=3, factor=0.95, verbose=True)
EPOCHS = 30

In [None]:
val_acc_best = training(model=model, loss_function=CE_loss, optimizer=optimizer, scheduler = scheduler, num_epochs=EPOCHS)
print(val_acc_best)