In [None]:
import torch
import torchvision
import torch.nn as nn
import torch.nn.functional as F
from torch import optim
import torch.nn.init as init
import torchvision.transforms as transforms
from torchvision import models
from torchvision.models.feature_extraction import create_feature_extractor
from torch.utils.data import Dataset, DataLoader
import torchvision.datasets as datasets
from torchinfo import summary

import numpy as np
import matplotlib.pyplot as plt
import time
import os
from pathlib import Path
import glob
import cv2
from PIL import Image

In [None]:
# データ読み込み
folder_path = ""
# img_size = ( 256,256 )

path = glob.glob( os.path.join( folder_path, "*.jpg" ) )
data = np.array([ cv2.imread(path) for path in path ]).astype( 'float32' )

# data = np.array([ cv2.resize( data, img_size ) for data in data ])        # 画像サイズを変更する場合
data = np.array([ img[ :, :, ::-1] for img in data ])
data = [ Image.fromarray( img.astype(np.uint8)).convert('RGB') for img in data ]
# data = [ img.transpose((1, 2, 0)) for img in data ]
data = np.array(data).transpose((0, 1, 2, 3))
print( "img num:{}, img size:{}".format( len(data), np.array(data).size ) )

In [None]:
label = [0 if ("Healthy" in p) else 1 if ("Damaged" in p) else 2 for p in path]

In [None]:
# GPUの確認

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

Dataloaderの作成

In [None]:
transform = transforms.ToTensor()

In [None]:
class Mydatasets(torch.utils.data.Dataset):
    def __init__(self, label, data, transform):
        self.transform = transform
        self.labelset = label
        self.dataset = data

        self.datanum = len(label)

    def __len__(self):
        return self.datanum

    def __getitem__(self, idx):
        out_label = self.labelset[idx]
        out_data = self.dataset[idx]

        out_label = torch.tensor(out_label,dtype=torch.long)
        out_data = self.transform(out_data)
        return out_data, out_label

In [None]:
from torch.utils.data import Dataset, DataLoader, random_split
batch_size = 100
dataset = Mydatasets(label, data, transform)

train_size = int(0.8 * len(dataset))
test_size = int(0.2 * len(dataset))

train_dataset, test_dataset = random_split(dataset, [train_size, test_size+1])
data_loader = DataLoader(train_dataset, batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size, shuffle=False)

In [None]:
# ResNet18のプリトレインモデルを読み込み
resnet18 = models.resnet18(pretrained = True)
backbone = nn.Sequential(*list(resnet18.children())[:-1])   # 最終層を削除

In [None]:
class Model(nn.Module):
    def __init__(self):
        super(Model, self).__init__()
        self.backbone = backbone

        # 全結合層
        self.fc = nn.Linear(512, 3)

    def forward(self, x):
        x = self.backbone(x).reshape(-1, 512)

        # 全結合層
        fc = self.fc(x)

        return fc

# モデルのインスタンス化
model = Model()

model = model.to(device)


In [None]:
from torchsummary import summary
summary(model.to(device), (3,100,100))

損失関数と最適化関数

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr = 0.001)

In [None]:
# 学習ループ

num_epoch = 10

train_loss_list = []
train_acc_list = []

test_loss_list = []
test_acc_list = []

for epoch in range(num_epoch):
    train_loss = 0
    train_acc  = 0

    # 学習時間
    start = time.time()

    model.train()
    for images, labels in data_loader:
        # 勾配の初期化(ループの頭でやる必要あり)
        optimizer.zero_grad()

        # 訓練データの準備
        images = images.to(device)
        labels = labels.to(device)

        # 順伝搬計算
        outputs = model(images)

        # 誤差計算
        loss = criterion( outputs, labels )
        train_loss += loss.item()

        # 学習
        loss.backward()
        optimizer.step()


        #予測値算出
        predicted = outputs.max(1)[1]

        #正解件数算出
        train_acc += (predicted == labels).sum()

    # # 訓練データに対する損失と精度の計算
    avg_train_loss = train_loss / len(data_loader.dataset)
    avg_train_acc = train_acc / len(data_loader.dataset)

    model.eval()
    test_loss = 0
    test_acc = 0

    with torch.set_grad_enabled(False):
        for images, labels in test_loader:
            images = images.to(device)
            labels = labels.to(device)

            outputs = model(images)

            loss = criterion(outputs, labels)
            test_loss += loss.item()

            predicted = outputs.max(1)[1]
            test_acc += (predicted == labels).sum()

    avg_test_loss = test_loss / len(test_loader.dataset)
    avg_test_acc = test_acc / len(test_loader.dataset)

    # 損失と精度の表示
    elapsed = time.time() - start
    print (f'Epoch [{(epoch+1):2}/{num_epoch}] - {elapsed:0.2f}s - loss: {avg_test_loss:.5f} - acc: {avg_test_acc:.5f}')
    train_loss_list.append(avg_train_loss)
    train_acc_list.append(avg_train_acc)
    test_loss_list.append(avg_test_loss)
    test_acc_list.append(avg_test_acc)

In [None]:
pred_list = []
label_list = []
model.eval()
for i,(img, labels) in enumerate(test_loader):
    img = img.to(device)
    pred = model(img)

    label_list.extend(labels.tolist())

    pred_list.extend(pred.tolist())

pred = np.argmax(pred_list , axis=1 )
diff_index = np.arange( len(label_list) )[ label_list != pred]


label:0 = 健康

label:1 = 損傷

label:2 = 死亡

In [None]:
print(f"{len(diff_index)}/{len(label_list)}")
plt.figure( figsize=(12,16) )

for i,diff in enumerate( diff_index[:30] ):
    plt.subplot( 6, 5, i+1 )
    plt.tight_layout()
    plt.title(  "No.{}, Ture:{}, Pred:{}".format( diff, label_list[diff], pred[diff] ) )
    plt.imshow( data[diff] )
    plt.axis("off")

plt.savefig("ResNet50-5.png")
plt.show()

In [None]:
import requests

def line_notify(text):

    line_notify_token = ''
    line_notify_api = ''

    message = ('\n{0}'.format(text))

    payload = {'message': message}
    headers = {'Authorization': 'Bearer ' + line_notify_token}
    requests.post(line_notify_api, data=payload, headers=headers)

line_notify('python')