# Tanpopo1 表面付着物 EfficientNet CrossValidation

In [None]:
# Google Colab マウント
from google.colab import drive
drive.mount('/content/drive')
%cd /content/drive/MyDrive
import os
os.chdir('/content/drive/MyDrive/Tanpopo')

Mounted at /content/drive
/content/drive/MyDrive


In [None]:
from __future__ import print_function, division

import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
import numpy as np
import torchvision
from torchvision import datasets, models, transforms
import torch.utils.data as data

import matplotlib.pyplot as plt
import glob
import time
import copy
from PIL import Image

plt.ion()

In [None]:
#画像サイズがが704x480 #88x60
img_size = 224

class_num = 2

# 標準化
mean = (0.5, 0.5, 0.5)
std = (0.5, 0.5, 0.5)

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [None]:
batch_size =  "16" #@param[8, 16, 32, 64, 128, 256]
batch_size = int(batch_size)

epochs = "25" #@param[5, 8, 10, 15, 20, 22, 25, 27, 29, 30, 31, 32, 33, 35, 45, 60, 120]
epochs = int(epochs)

### 関数、クラスの定義

In [None]:
import random
from sklearn.model_selection import train_test_split
def make_filepath_list(folderpath, phase='train'):
    """
    ファイルのパスを格納したリストを返す
    """
    # .DS_Storeが最初に読み込まれる
    file_list = []
    files_list = []
    class_names = []

    for index, top_dir in enumerate(sorted(os.listdir(folderpath))):
        file_dir = os.path.join(folderpath, top_dir)
        file_list = glob.glob(file_dir + '/*bmp')

        if top_dir != '.DS_Store':
            class_names.append(top_dir)
            files_list += [os.path.join(folderpath, top_dir, file).replace('\\', '/') for file in file_list]
                                                            
    return files_list, class_names

In [None]:
class ImageTransform(object):
    """
    画像の前処理
    """
    def __init__(self, resize, mean, std):
        self.data_transform = {
            'train': transforms.Compose([
                # データオグメンテーション, 前処理
                transforms.Resize(256), # リサイズ
                transforms.CenterCrop(resize), # 切り取り
                transforms.RandomRotation(45), # ランタムに回転
                transforms.ColorJitter(), # ランダムに明るさ、コントラスト、彩度、色相を変化
                transforms.RandomHorizontalFlip(), # ランダムに左右(水平)反転
                transforms.RandomVerticalFlip(), # ランダムに上下(垂直)反転
                transforms.ToTensor(),
                transforms.Normalize(mean, std), # zcaと交換？
                # ZCA whitening追加する
            ]),
            'valid': transforms.Compose([
                transforms.Resize(256),
                transforms.CenterCrop(resize),
                transforms.ToTensor(),
                transforms.Normalize(mean, std),
            ]),
            'test': transforms.Compose([
                transforms.Resize(256),
                transforms.CenterCrop(resize),
                transforms.ToTensor(),
                transforms.Normalize(mean, std),
            ])
        }

    def __call__(self, img, phase='train'):
        return self.data_transform[phase](img)

In [None]:
from torchvision.io import read_image

class SurfaceObjectDataset(data.Dataset):
    """
    表面付着物のDatasetクラス
    PyTorchのDatasetクラスを継承
    """
    def __init__(self, file_list, classes, transform=None, phase='train'):
        #super().__init__()
        self.file_list = file_list
        self.transform = transform
        self.classes = classes
        self.phase = phase

        self.img = None
        self.label = None

    def __len__(self):
        """
        画像の枚数を返す
        """
        return len(self.file_list)

    def __getitem__(self, index):
        """
        前処理した画像データのTensor形式のデータとラベルを取得
        """
        # 指定したindexの画像を読み込む
        img_path = self.file_list[index]
        img = Image.open(img_path)

        # 画像ラベルをファイル名から抜き出す
        label = self.file_list[index].split('/')[6][:11]

        # ラベル名を数値に変換
        label = self.classes.index(label)

        # 画像の前処理を実施
        if self.transform is not None:
            img_transformed = self.transform(img, self.phase)
            
        return img_transformed, label

In [None]:
import time
import copy

def train_model(model, criterion, optimizer, scheduler, num_epochs=25):
    since = time.time()

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0
    best_loss = 100000.0

    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch, num_epochs - 1))

        # 各エポックには訓練フェーズと検証フェーズがあります
        for phase in ['train', 'valid']:
            if phase == 'train':
                model.train()  # モデルを訓練モードに設定します
            else:
                model.eval()   # モードを評価するモデルを設定します

            running_loss = 0.0
            running_corrects = 0

            # データをイレテートします
            for inputs, labels in dataloaders_dict[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)

                # パラメータの勾配をゼロにします
                optimizer.zero_grad()

                # 順伝播
                # 訓練の時だけ、履歴を保持します
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    # 訓練の時だけ逆伝播＋オプティマイズを行います
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                # 損失を計算します
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            if phase == 'train':
                scheduler.step()

            epoch_loss = running_loss / len(dataloaders_dict[phase].dataset)
            epoch_acc = running_corrects.double() / len(dataloaders_dict[phase].dataset)

            print(' {} Loss: {:.4f} Acc: {:.4f} '.format(
                phase, epoch_loss, epoch_acc), end='\t')

            # モデルをディープ・コピー
            if phase == 'valid' and epoch_acc >= best_acc and epoch_loss < best_loss:
                best_acc = epoch_acc
                best_loss = epoch_loss
                best_model_wts = copy.deepcopy(model.state_dict())

        print()

    time_elapsed = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(
        time_elapsed // 60, time_elapsed % 60))
    print('Best val Acc: {:4f}'.format(best_acc))
    print('Best val Acc: {:4f}'.format(best_loss))

    # ベストモデルの重みをロード
    model.load_state_dict(best_model_wts)
    return model, best_acc

In [None]:
from sklearn.metrics import confusion_matrix
import numpy as np

#正解
def class_accuracy(label, conf_mat):
    return (conf_mat[label][label] + (np.sum(conf_mat) - (np.sum(conf_mat[:, label])+np.sum(conf_mat[label])-conf_mat[label][label]))) / np.sum(conf_mat)
    
#精度(適合率)
def class_precision(label, conf_mat):
    return conf_mat[label][label] / np.sum(conf_mat[label])

#再現率
def class_recall(label, conf_mat):
    return conf_mat[label][label] / np.sum(conf_mat[:, label])

# テスト結果を返す
def test_model(model, test_dataloaders):
    labels_sum = None
    predicted_sum = None

    with torch.no_grad():
        for data in test_dataloaders:
            images, labels = data
            images = images.to(device)
            labels = labels.to(device)

            outputs = model(images)

            #outputs = nn.Softmax(dim=1)(outputs)

            _, predicted = torch.max(outputs, 1)

            if labels_sum is None:
                labels_sum = labels
                predicted_sum = predicted
            else:
                labels_sum = torch.cat([labels_sum, labels], dim=0)
                predicted_sum = torch.cat([predicted_sum, predicted], dim=0)

    #混同行列
    labels_sum = labels_sum.cpu()
    predicted_sum = predicted_sum.cpu()
    conf_mat = None
    Accuracy = []
    Precision = []
    Recall = []

    conf_mat = confusion_matrix(labels_sum, predicted_sum)

    for i in range(class_num):
        Accuracy = np.append(Accuracy, class_accuracy(i, conf_mat)*100)
        Precision = np.append(Precision, class_precision(i, conf_mat)*100)
        Recall = np.append(Recall, class_recall(i, conf_mat)*100)

    return conf_mat, Accuracy, Precision, Recall

### モデルの作成

In [None]:
# モデルをロード
# EfficientNet学習済みの重みを使用
model_efficientnet = models.efficientnet_b0(pretrained=True)
#print(model_efficientnet)

# EfficientNet の最後の層
model_efficientnet.classifier[1] = nn.Linear(in_features=1280, out_features=5)

Downloading: "https://download.pytorch.org/models/efficientnet_b0_rwightman-3dd342df.pth" to /root/.cache/torch/hub/checkpoints/efficientnet_b0_rwightman-3dd342df.pth


  0%|          | 0.00/20.5M [00:00<?, ?B/s]

In [None]:
# 転移学習で学習させるパラメータを、変数params_to_updateに格納
params_to_update = []

# まず全パラメータを勾配計算Falseにする
for name, param in model_efficientnet.named_parameters():
    param.requires_grad = False

for name, param in model_efficientnet.classifier.named_parameters(): # 全結合層のみパラメータ更新
    param.requires_grad = True
    params_to_update.append(param)

optimizer = optim.AdamW(params_to_update, lr=0.01, weight_decay=0.5)

criterion = nn.CrossEntropyLoss()

# 7エポックごとに学習率を1/10ずつ減衰させます
scheduler = lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.5)

### 学習

In [None]:
from torch.utils.data.dataset import Subset
from sklearn.model_selection import KFold
from  torch.utils.data import Dataset

# 訓練、検証データへのファイルパスを格納したリストを取得
train_file_list, class_names = make_filepath_list('/content/drive/MyDrive/Tanpopo/TrainingData11', 'train')
#print('train_file_list: ', train_file_list)
#print('class_names: ', class_names)
class_num = len(class_names) # 5

dataset = SurfaceObjectDataset(
        file_list = train_file_list, classes = class_names,
        transform = ImageTransform(img_size, mean, std),
        phase = 'train')

# テストデータ
test_file_list, class_names_test = make_filepath_list('/content/drive/MyDrive/Tanpopo/TestData11', 'test')
#print('test_file_list : ', test_file_list)
#print('class_names_test : ', class_names_test)

# Datasetの作成
test_dataset = SurfaceObjectDataset(
    file_list = test_file_list, classes = class_names_test,
    transform = ImageTransform(img_size, mean, std),
    phase = 'test')
# Dataloaderの作成
test_dataloader = data.DataLoader(
    test_dataset, batch_size = int(batch_size/2), shuffle=False)
dataloaders_dict = {'test': test_dataloader}


# K-Fold 交差検証 
kf = KFold(n_splits=5, shuffle=True, random_state=1) # 5回検証

scores = []
test_accuracy = []
test_precision = []
test_recall = []
conf_mat = []

epochs = 25
PATH = '/content/drive/MyDrive/Tanpopo/model_efficientnet_weights.pth'
model_efficientnet.load_state_dict(torch.load(PATH)) # 学習前: 前回の重みを使う

for _fold, (train_index, valid_index) in enumerate(kf.split(train_file_list)):
    # Datasetの作成
    train_dataset = Subset(dataset, train_index)
    valid_dataset = Subset(dataset, valid_index)
    # Dataloaderの作成
    train_dataloader = data.DataLoader(
        train_dataset, batch_size = batch_size, shuffle=True)
    valid_dataloader = data.DataLoader(
        valid_dataset, batch_size = int(batch_size/2), shuffle=False)
    
    dataloaders_dict['train'] = train_dataloader
    dataloaders_dict['valid'] = valid_dataloader

    print('-'*5, end='')
    print(_fold+1, end='')
    print('-'*5)
    model_efficientnet = model_efficientnet.to(device)
    model_efficientnet, score = train_model(model_efficientnet, criterion, optimizer, scheduler, num_epochs=epochs)
    scores.append(score.to('cpu'))

    conf, test_acc, test_prec, test_rec = test_model(model_efficientnet, dataloaders_dict['test'])
    test_accuracy.append(test_acc)
    test_precision.append(test_prec)
    test_recall.append(test_rec)
    conf_mat.append(conf)

-----1-----
Epoch 0/24
 train Loss: 1.1456 Acc: 0.6071 	 valid Loss: 1.1155 Acc: 0.5918 	
Epoch 1/24
 train Loss: 1.0208 Acc: 0.5918 	 valid Loss: 0.8325 Acc: 0.6531 	
Epoch 2/24
 train Loss: 1.0437 Acc: 0.6531 	 valid Loss: 1.3490 Acc: 0.5918 	
Epoch 3/24
 train Loss: 0.8985 Acc: 0.6735 	 valid Loss: 1.2599 Acc: 0.5306 	
Epoch 4/24
 train Loss: 0.9183 Acc: 0.6582 	 valid Loss: 0.9171 Acc: 0.7143 	
Epoch 5/24
 train Loss: 0.9301 Acc: 0.6327 	 valid Loss: 1.1216 Acc: 0.5102 	
Epoch 6/24
 train Loss: 1.0478 Acc: 0.6531 	 valid Loss: 1.0855 Acc: 0.6122 	
Epoch 7/24
 train Loss: 1.0141 Acc: 0.6327 	 valid Loss: 1.0564 Acc: 0.6122 	
Epoch 8/24
 train Loss: 1.0175 Acc: 0.6582 	 valid Loss: 1.0215 Acc: 0.6122 	
Epoch 9/24
 train Loss: 0.9389 Acc: 0.6582 	 valid Loss: 0.9201 Acc: 0.6122 	
Epoch 10/24
 train Loss: 0.7456 Acc: 0.7194 	 valid Loss: 0.9157 Acc: 0.6735 	
Epoch 11/24
 train Loss: 0.8234 Acc: 0.6786 	 valid Loss: 0.8449 Acc: 0.6735 	
Epoch 12/24
 train Loss: 0.8118 Acc: 0.7347 	 vali

In [None]:
# Validation 
print('EfficientNet Result')
print(f'Validation Accuracy: 平均 {np.mean(scores)*100:.1f} 標準偏差 {np.std(scores)*100:.4f}')

# Test
print('Test results:')
print(class_names)

tmp_list = [] # Accuracy
print('Accuracy:', end='\t')
for i in range(class_num):
    tmp_list = [r[i] for r in test_accuracy]
    means = np.mean(tmp_list)
    stds = np.std(tmp_list)
    print(f'{np.mean(tmp_list):.1f}±{np.std(tmp_list):.2f}', end='\t')

tmp_list = [] # Precision
print('\nPrecision:', end='\t')
for i in range(class_num):
    tmp_list = [r[i] for r in test_precision]
    means = np.mean(tmp_list)
    stds = np.std(tmp_list)
    print(f'{np.mean(tmp_list):.1f}±{np.std(tmp_list):.2f}', end='\t')

tmp_list = [] # Recall
print('\nRecall:', end='\t')
for i in range(class_num):
    tmp_list = [r[i] for r in test_recall]
    means = np.mean(tmp_list)
    stds = np.std(tmp_list)
    print(f'{np.mean(tmp_list):.1f}±{np.std(tmp_list):.2f}', end='\t')

print('\nConf Matrix:') # 混同行列
for i in range(class_num):
    print([r[i] for r in conf_mat])

EfficientNet Result
Validation Accuracy: 平均 88.2 標準偏差 3.9573
Test results:
['1Sputter', '2Fiber', '3Block', '4Bar', '5AGFragment']
Accuracy:	94.1±1.11	90.9±1.29	93.7±0.97	95.0±1.11	92.0±0.97	
Precision:	85.7±4.26	86.7±1.90	88.6±2.33	82.9±3.81	70.5±1.90	
Recall:	85.0±3.13	73.0±3.82	81.6±2.86	91.6±2.58	87.2±4.09	
Conf Matrix:
[array([18,  1,  2,  0,  0]), array([17,  3,  1,  0,  0]), array([17,  3,  1,  0,  0]), array([19,  1,  1,  0,  0]), array([19,  1,  1,  0,  0])]
[array([ 0, 19,  0,  1,  1]), array([ 1, 18,  0,  0,  2]), array([ 0, 18,  1,  1,  1]), array([ 0, 18,  1,  1,  1]), array([ 0, 18,  1,  0,  2])]
[array([ 1,  1, 18,  1,  0]), array([ 0,  1, 19,  1,  0]), array([ 0,  1, 19,  1,  0]), array([ 0,  1, 19,  1,  0]), array([ 1,  1, 18,  1,  0])]
[array([ 0,  2,  0, 17,  2]), array([ 1,  0,  1, 19,  0]), array([ 1,  2,  1, 17,  0]), array([ 1,  1,  1, 17,  1]), array([ 1,  1,  1, 17,  1])]
[array([ 1,  3,  2,  0, 15]), array([ 2,  3,  1,  0, 15]), array([ 2,  3,  1,  0, 15]), ar

In [None]:
PATH = '/content/drive/MyDrive/Tanpopo/model_vgg16_weights.pth'
#torch.save(model_vgg16.state_dict(), PATH) # 重みを保存