In [35]:
import os
import glob
from PIL import Image
import torchvision.transforms as transforms
import torch

#### モデルの定義

In [43]:
import torch.nn as nn
import torch.nn.functional as F

class LeNet(nn.Module):
    def __init__(self, out=2):
        super(LeNet, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, 3, 1, padding=1)
        self.conv2 = nn.Conv2d(16, 32, 3, 1, padding=1)
        self.conv3 = nn.Conv2d(32, 64, 3, 1, padding=1)
        self.fc1 = nn.Linear(4*4*64, 500)
        self.dropout1 = nn.Dropout(0.5)
        self.fc2 = nn.Linear(500, out)

    def forward(self, x):
        x = F.relu(self.conv1(x)) #  32*32*3 -> 32*32*16
        x = F.max_pool2d(x, 2, 2) #  32*32*16 -> 16*16*16
        x = F.relu(self.conv2(x)) #  16*16*16 -> 16*16*32
        x = F.max_pool2d(x, 2, 2) # 16*16*32 -> 8*8*32
        x = F.relu(self.conv3(x)) #  8*8*32 -> 8*8*64
        x = F.max_pool2d(x, 2, 2) #  8*8*64 -> 4*4*64
        x = x.view(-1, 4*4*64)
        x = F.relu(self.fc1(x))
        x = self.dropout1(x)
        x = self.fc2(x)
        return x

#### 学習とテスト

In [141]:
from collections import defaultdict
import json
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from typing import List, Tuple, TypeVar

class TrainLogging:

    def __init__(self):
        self.log = []

    def stack(self, **kwargs):
        self.log.append(kwargs)

    def save(self, path: str):
        with open(path, "w") as f:
            json.dump(self.log, f, indent=4)


def process(trainloader, testloader, model, epochs: int, lr: float, lr_scheduling=None, log_savepath=None):

    log_dict = defaultdict(list)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9)
    if lr_scheduling is not None:
        scheduler = torch.optim.lr_scheduler.LambdaLR(optimizer, lr_scheduling)

    def train(trainloader) -> Tuple[float, float]:
        sum_loss, sum_correct, sum_dataN = 0.0, 0, 0
        for (inputs, labels) in trainloader:
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            sum_loss += loss.item()
            _, predicted = outputs.max(1)
            sum_dataN += labels.size(0)
            sum_correct += (predicted == labels).sum().item()
            loss.backward()
            optimizer.step()
        train_loss = sum_loss*trainloader.batch_size/len(trainloader.dataset)
        train_acc = float(sum_correct/sum_dataN)
        return train_loss, train_acc

    def test(testloader) -> Tuple[float, float]:
        sum_loss, sum_correct, sum_dataN = 0.0, 0, 0
        for (inputs, labels) in testloader:
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            sum_loss += loss.item()
            _, predicted = outputs.max(1)
            sum_dataN += labels.size(0)
            sum_correct += (predicted == labels).sum().item()
        test_loss = sum_loss*testloader.batch_size/len(testloader.dataset)
        test_acc = float(sum_correct/sum_dataN)
        return test_loss, test_acc

    print("\n{0:<13}{1:<13}{2:<13}{3:<13}{4:<13}{5:<6}".format("epoch","train/loss","train/acc","test/loss","test/acc","lr"))
    logging = TrainLogging()
    for epoch in range(1, epochs + 1):
        train_loss, train_acc = train(trainloader)
        test_loss, test_acc = test(testloader)
        lr = optimizer.param_groups[-1]["lr"]
        print("{0:<13}{1:<13.5f}{2:<13.5f}{3:<13.5f}{4:<13.5f}{5:<6.6f}".format(epoch, train_loss, train_acc, test_loss, test_acc, lr))
        logging.stack(epoch=epoch, train_loss=train_loss, train_acc=train_acc, test_loss=test_loss, test_acc=test_acc, lr=lr)
        if lr_scheduling is not None: scheduler.step()
    if log_savepath is not None:
        logging.save(log_savepath)

    return model

#### 画像データの読み込みと学習データの生成

In [142]:
def load_dataset(dataset_dir, imsize=(32,32), img_name=False):
    dataset = []
    num_label = 2
    loader = transforms.Compose([transforms.Resize(imsize), transforms.ToTensor()])
    for label in range(num_label):
        path = os.path.join(dataset_dir, str(label), "*.jpg")
        files = glob.glob(path)
        for file in files:
            img = Image.open(file)
            img = loader(img)
            if img_name:
                img_name = file.split("/")[-1]
                dataset.append((img, label, img_name))
            else:
                dataset.append((img, label))
    return dataset

#### ex. 学習

In [143]:
train = load_dataset(dataset_dir="../demo/data/train_data")
test = load_dataset(dataset_dir="../demo/data/test_data")

In [144]:
batch_size = 8
trainloader = torch.utils.data.DataLoader(train, batch_size=batch_size, shuffle=True, num_workers=2)
testloader = torch.utils.data.DataLoader(test, batch_size=batch_size, shuffle=True, num_workers=2)

In [145]:
model = LeNet(2)
epochs = 10
lr = 0.01
model = process(trainloader, testloader, model, epochs, lr, log_savepath="./assets/log.json")


epoch        train/loss   train/acc    test/loss    test/acc     lr    
1            0.69449      0.48000      0.77340      0.52000      0.010000
2            0.68945      0.56000      0.77054      0.52000      0.010000
3            0.69363      0.52500      0.78207      0.46000      0.010000
4            0.67496      0.62000      0.76714      0.50000      0.010000
5            0.66429      0.60000      0.78069      0.50000      0.010000
6            0.68582      0.57500      0.78147      0.56000      0.010000
7            0.69212      0.54500      0.83639      0.48000      0.010000
8            0.65874      0.60500      0.79007      0.44000      0.010000
9            0.65422      0.58000      0.78153      0.48000      0.010000
10           0.62399      0.66000      0.89487      0.42000      0.010000


#### ex. 推論

In [146]:
batch_size = 8
test = load_dataset(dataset_dir="../demo/data/test_data", img_name=True)
testloader = torch.utils.data.DataLoader(test, batch_size=batch_size, shuffle=True, num_workers=2)
unpredictable = []
for (inputs, labels, img_name) in testloader:
    outputs = model(inputs)
    _, predicted = outputs.max(1)
    unpredictable += list(np.array(list(img_name))[predicted != labels])

In [147]:
print(unpredictable)

['0_13.jpg', '1_7.jpg', '0_7.jpg', '1_5.jpg', '1_2.jpg', '1_1.jpg', '1_4.jpg', '1_23.jpg', '0_11.jpg', '1_0.jpg', '0_3.jpg', '1_9.jpg', '1_14.jpg', '1_22.jpg', '1_15.jpg', '0_20.jpg', '1_16.jpg', '1_19.jpg', '1_13.jpg', '0_5.jpg', '1_11.jpg', '0_17.jpg', '1_21.jpg', '0_23.jpg', '1_17.jpg', '1_12.jpg', '0_14.jpg']


#### ログの読み込み

In [55]:
def load_train_log(path: str):

    def load_losses(path: str) -> Tuple[List[float], List[float]]:
        with open(path) as f:
            logs = json.load(f)
        train_losses, test_losses = [], []
        for log in logs:
            train_losses.append(log["train_loss"])
            test_losses.append(log["test_loss"])
        return train_losses, test_losses

    def load_accs(path: str) -> Tuple[List[float], List[float]]:
        with open(path) as f:
            logs = json.load(f)
        train_accs, test_accs = [], []
        for log in logs:
            train_accs.append(log["train_acc"])
            test_accs.append(log["test_acc"])
        return train_accs, test_accs

    train_losses, test_losses = load_losses(path)
    train_accs, test_accs = load_accs(path)

    return train_losses, test_losses, train_accs, test_accs

#### 学習結果のグラフ可視化

In [56]:
import matplotlib.pyplot as plt

# lossのplot
def plot_loss(train_losses: List[float], test_losses: List[float], savepath: str):
    max_axisX = len(train_losses)
    plt.figure(figsize=(7,5))
    plt.plot(range(max_axisX), train_losses)
    plt.plot(range(max_axisX), test_losses, c='#ed7700')
    plt.ylim(bottom=0)
    plt.xlabel('epoch')
    plt.ylabel('loss')
    plt.legend(['train/loss', 'test/loss'])
    plt.grid()
    plt.savefig(savepath)
    plt.clf()

# accuracyのplot
def plot_acc(train_accs: List[float], test_accs: List[float], savepath: str):
    max_axisX = len(train_accs)
    plt.figure(figsize=(7,5))
    plt.plot(range(max_axisX), train_accs)
    plt.plot(range(max_axisX), test_accs, c='#ed7700')
    plt.ylim(bottom=0)
    plt.xlabel('epoch')
    plt.ylabel('accuracy')
    plt.legend(['train/acc', 'test/acc'])
    plt.grid()
    plt.savefig(savepath)
    plt.clf()

#### ex. 可視化

In [57]:
train_losses, test_losses, train_accs, test_accs = load_train_log(path="./assets/log.json")
plot_loss(train_losses, test_losses, savepath="./assets/loss.png")
plot_acc(train_accs, test_accs, savepath="./assets/accuracy.png")

<matplotlib.figure.Figure at 0x7fe62eebdac8>

<matplotlib.figure.Figure at 0x7fe62eddff28>