In [None]:
import matplotlib.pyplot as plt
import numpy as np
import torch
from torch import nn, optim
import torchvision
from torchvision import datasets, transforms, models
from PIL import Image
from torch.utils.data.sampler import SubsetRandomSampler

%matplotlib inline
%config InlineBackend.figure_format = 'retina'

## data direction & split size setting

In [None]:
data_dir = './data'
model_dir = 'animal_model.pth'
test_dir = './test_data'    # ./data   ..train시킨 자료 내에서 test할 경우 변경

# hyper parameter
valid_size = 0.3
learning_rate = 0.0005
avgpool = 512   # FCL 설정 시 feature 수 조정
dropout = 0.2
optimizer_type = 'NAdam' # Adam, AdaGrad, NAdam
epochs = 15         # 에폭을 설정한다.
print_every = 5     # 출력 간격을 설정한다.
criterion = nn.CrossEntropyLoss() # 손실함수를 Cross entropy loss 함수로 지정한다.
t_transforms = transforms.Compose([ transforms.RandomResizedCrop(224),
                                    transforms.Resize(224),
                                    transforms.ToTensor()])

In [None]:
def load_split_train_test(data_dir, transform, valid_size):
    train_data = datasets.ImageFolder(data_dir, transform=t_transforms)
    test_data = datasets.ImageFolder(data_dir, transform=t_transforms)

    num_train = len(train_data)
    indicies = list(range(num_train))

    np.random.shuffle(indicies)

    split = int(np.floor(num_train * valid_size))

    train_idx, test_idx = indicies[split:], indicies[:split]

    train_sampler = SubsetRandomSampler(train_idx)
    test_sampler = SubsetRandomSampler(test_idx)

    train_loader = torch.utils.data.DataLoader(train_data, sampler=train_sampler, batch_size=16)
    test_loader = torch.utils.data.DataLoader(test_data, sampler=test_sampler, batch_size=16)

    return train_loader, test_loader

In [None]:
train_loader, test_loader = load_split_train_test(data_dir, t_transforms, valid_size)
classes = train_loader.dataset.classes
output_len = len(train_loader.dataset.classes)

# 학습 loader와 테스트 loader의 class들을 출력하여 확인한다. 
print(train_loader.dataset.classes) 
print(test_loader.dataset.classes) 

In [None]:
# compute device를 정하고 확인한다.
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

## resnet setting

In [None]:
model = models.resnet50(pretrained=True)

# 모든 신경망 구축 : 전이학습을 위해 모델의 가중치를 freeze 한다.
for param in model.parameters():
    param.requires_grad = False
# 뉴런들을 연결하여 신경망을 생성한다.
model.fc = nn.Sequential(nn.Linear(2048,avgpool), nn.ReLU(),nn.Dropout(valid_size),nn.Linear(avgpool,output_len),nn.LogSoftmax(dim=1))
# q: explain the above code
# a: 2048개의 입력을 받아 512개의 출력을 내고, ReLU 함수를 거쳐 0.2의 확률로 Dropout을 적용한다.
# 512개의 입력을 받아 2개의 출력을 내고, LogSoftmax 함수를 거쳐 1차원으로 변환한다.
# 1차원으로 변환된 출력을 갖는 신경망을 생성한다.

optimizer = optim.Adam(model.fc.parameters(), lr=learning_rate)
if optimizer_type=='Adam':
    optimizer = optim.Adam(model.fc.parameters(), lr=learning_rate)
elif optimizer_type=='AdaGrad':
    optimizer = optim.Adagrad(model.fc.parameters(), lr=learning_rate)
elif optimizer_type=='NAdam':
    optimizer = optim.NAdam(model.fc.parameters(), lr=learning_rate)    
# 신경망을 compute device로 보낸다.
model.to(device)

## epoch, sequence setting

In [None]:
print(optimizer)

In [None]:
# 손실 변수들을 초기화 한다.
running_loss = 0
train_losses, test_losses = [],[]
# 현재의 학습 단계를 표현하는 steps 변수를 0으로 초기화 한다.
steps = 0

In [None]:
# 설정한 회수만큼 학습 후 테스트 및 평가해 본다.
for epoch in range(epochs):
    epoch +=1
    for inputs, labels in train_loader:
        steps +=1
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        logps = model.forward(inputs)
        loss = criterion(logps, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
        if steps % print_every == 0:
            test_loss = 0
            accuracy = 0
            model.eval()
            with torch.no_grad():
                for inputs, labels in test_loader:
                    inputs, labels = inputs.to(device), labels.to(device)
                    logps = model.forward(inputs)
                    batch_loss = criterion(logps, labels)
                    test_loss += batch_loss.item()
                    ps = torch.exp(logps)
                    top_p, top_class = ps.topk(1, dim=1)
                    equals = top_class == labels.view(*top_class.shape)
                    accuracy += torch.mean(equals.type(torch.FloatTensor)).item()
                train_losses.append(running_loss/len(train_loader))
                test_losses.append(test_loss/len(test_loader))
                print("Epoch {}/{}: ".format(epoch, epochs), "Train loss: {:.3f}..".format(running_loss/print_every),
                          "Test loss: {:.3f}..".format(test_loss/len(test_loader)), "Test accuracy: {:.3f}".format(accuracy/len(test_loader)))
                running_loss = 0
                model.train()
                break

## plot chart

In [None]:
%matplotlib inline
%config InlineBackend.figure_format='retina'

plt.plot(train_losses, label='training loss')
plt.plot(test_losses, label='validation loss')
plt.legend(frameon = False)

In [None]:
torch.save(model, model_dir)

In [None]:
def predict_image(device, model, trans, image):
    image_tensor = trans(image).float()
    input = image_tensor.unsqueeze_(0)
    input = input.to(device)
    output = model(input)
    
    scores = torch.nn.functional.softmax(output.data, dim=1)
    if device == torch.device('cuda'):
        scores = scores.cpu().numpy()
    else: #elif device ==  torch.device('cpu')
        scores = scores.numpy()

    index = scores.argmax()
    score = scores[0][index]*100.0
    print(f'{classes[index]}: {score:.2f}%')
    
    return index, score

In [None]:
def get_random_images(dir, trans, num):
    data = datasets.ImageFolder(dir, trans)
    indicies = list(range(len(data)))
    
    np.random.shuffle(indicies)
    idx = indicies[:num]

    sampler = SubsetRandomSampler(idx)
    loader = torch.utils.data.DataLoader(data, sampler=sampler, batch_size=num)

    images, labels = next(iter(loader))

    return images, labels

In [None]:
# 저장한 모델을 불러온다.
device = torch.device('cuda'if torch.cuda.is_available() else 'cpu')
model = torch.load(model_dir)

model.eval()

to_pil = transforms.ToPILImage()
images, labels = get_random_images(test_dir, t_transforms, 5)
fig = plt.figure(figsize=(20,20))
classes = train_loader.dataset.classes

for ii in range(len(images)):
    image = to_pil(images[ii])
    index, probability = predict_image(device, model, t_transforms, image)
    sub = fig.add_subplot(1, len(images), ii+1)
    res = labels[ii].item() == index
    sub.set_title(classes[index]+':'+str(res))
    plt.axis('off')
    plt.imshow(image)
plt.show()

# gradio test

In [None]:
def gradio_predict(image):
    # 저장한 모델을 불러온다.
    device = torch.device('cuda'if torch.cuda.is_available() else 'cpu')
    model = torch.load(model_dir)
    model.eval()

    #image = transforms.ToPILImage(image)
    index, probability = predict_image(device, model, t_transforms, image)

    return f'{classes[index]}: {probability:.2f}%'

In [None]:
import gradio as gr 

# Create Gradio interface
iface = gr.Interface(
    fn=gradio_predict,
    inputs=gr.Image(type="pil"),
    outputs="text",
)

# Launch the interface
iface.launch(share=True)