編集部注：2023年5月29日最終更新．コードの一部がお手元の書籍と異なる可能性がございます．正誤・更新情報は弊社ウェブサイトの[本書詳細ページ](https://www.yodosha.co.jp/jikkenigaku/book/9784758122634/index.html)をご参照ください．

In [None]:
from google.colab import drive
drive.mount('/content/drive')
%cd /content/drive/MyDrive/sec6/
%ls -a 

In [None]:
!pip install pytorch-gradcam

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from PIL import Image

import os
import scipy
import random
from tqdm import tqdm 
import glob

from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import confusion_matrix
from sklearn.metrics import roc_curve

import torch
from torch import nn
from torch.utils.data import DataLoader
import torch.optim as optim

import torchvision
from torchvision import models
from torchvision import datasets
from torchvision import transforms as transforms

from gradcam.utils import visualize_cam
from gradcam import GradCAM, GradCAMpp


In [None]:
path = "/content/drive/MyDrive/sec6" #フォルダパス
im_fd = "/cell_images/" #上のパスからImageのあるフォルダの参照

In [None]:
folder = ['Uninfected','Parasitized']
file0 = glob.glob(path + im_fd + folder[0] +"/*.png")[0]
print(file0)
image = Image.open(file0) # 画像ファイルの読み込み
plt.imshow(image) #表示

In [None]:
np.array(image) #画像データを配列として表示

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu" # Get cpu or gpu device for training.
print(f"Using {device} device")

In [None]:
image_size = 128; #Imageサイズの指定

In [None]:
SEED = 42

def seed_fix(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.benchmark = False
    torch.use_deterministic_algorithms = True
    torch.backends.cudnn.deterministic = True
    
    worker_seed = torch.initial_seed() % 2**32
    np.random.seed(worker_seed)
    random.seed(worker_seed)

seed_fix(SEED)
generator = torch.Generator()
generator.manual_seed(SEED)

In [None]:
folder = ['Uninfected','Parasitized']
X = [];
y = [];
usenumber = 1000
for i,folname in enumerate(folder):
    files1 = glob.glob(path + im_fd + folname +"/*.png"); #フォルダ内のファイル名を取得
    f_number = len(files1) #randomに一部のデータのみ抽出するためのコード
    files1, files2=torch.utils.data.random_split(files1,[usenumber,f_number - usenumber])
    for j, file in enumerate(tqdm(files1)):
        outcome = i
        image = Image.open(file) # 画像ファイルの読み込み
        image = image.convert("RGB") # RGBモードに変換
        image = image.resize((image_size, image_size)) # リサイズ
        data = np.asarray(image) # 画像を配列に変換
        X.append(data); # 複数枚を１つの配列に保存
        y.append(outcome)  #正解ラベルをyとして保存

In [None]:
#配列データへの変換 ４次元のデータとして(画像枚数，RGBの層，画像の高さ，画像の横幅)になるように次元を入れ替え
X = np.array(X).astype(np.float32).transpose(0,3,1,2)/255
y = np.array(y)

In [None]:
Nall = X.shape[0]

In [None]:
tensor_X = torch.tensor(X, dtype=torch.float32) #Tensor型として変換
tensor_y = torch.tensor(y, dtype=torch.int64) #Tensor型として変換

In [None]:
dataset = torch.utils.data.TensorDataset(tensor_X,tensor_y)# 目的変数と入力変数をまとめてdatasetに変換
n_train = int(Nall * 0.7) # Training データ数
n_val = int(Nall * 0.2)   # Validation データ数
n_test = Nall - n_train - n_val # Test データ数
train_x, val_x, test_x = torch.utils.data.random_split(dataset, [n_train, n_val,n_test]) # データセットの分割
print("train =",n_train,",validation =",n_val,",Test =",n_test)

In [None]:
batch_size = 64 #バッチサイズの指定
train_dataloader = DataLoader(train_x, batch_size=batch_size, shuffle=True)  # data loaderとして指定
val_dataloader   = DataLoader(val_x,   batch_size=batch_size, shuffle=False)
test_dataloader  = DataLoader(test_x,  batch_size=1, shuffle=False)

In [None]:
class CNN(nn.Module):
    def __init__(self, input_shape=(3,128,128),output_size=2):
        super(CNN, self).__init__()
        self.conv1 = nn.Sequential(nn.Conv2d(in_channels=input_shape[0], out_channels=16, kernel_size=3, padding='same'),nn.ReLU(),nn.MaxPool2d(2,2))
        self.conv2 = nn.Sequential(nn.Conv2d(in_channels=16, out_channels=16, kernel_size=3, padding='same'),nn.ReLU(),nn.MaxPool2d(2,2))
        self.conv3 = nn.Sequential(nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, padding='same'),nn.ReLU(),nn.MaxPool2d(2,2))
        self.conv4 = nn.Sequential(nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding='same'),nn.ReLU(),nn.MaxPool2d(2,2))
        self.flatten = nn.Flatten()
        self.dropout = nn.Dropout(0.2)
        self.CNN_outshape = self._get_conv_output(input_shape)
        self.linear = nn.Linear(self.CNN_outshape, output_size)
    def _get_conv_output(self, shape):
        bs = 1
        dummy_x = torch.empty(bs, *shape)
        x = self._forward_features(dummy_x)
        CNN_outshape = x.flatten(1).size(1)
        return CNN_outshape
    def _forward_features(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.conv4(x)
        x = self.dropout(x)
        x = self.flatten(x)
        return x     
    def forward(self, x):
        x = self._forward_features(x)
        x = self.linear(x.flatten(1))
        return x
model = CNN().to(device)
print(model)

In [None]:
transform = transforms.Compose([
    transforms.RandomResizedCrop((image_size,image_size)),
    transforms.RandomHorizontalFlip(p=0.4),
    transforms.RandomVerticalFlip(p=0.4),
    transforms.RandomRotation(degrees=[-7.5, 7.5])
    ]
) #Data augumentation

In [None]:
#最適化
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.001, momentum=0.9, weight_decay= 0.005)

def train(train_loader): #Training
    model.train()
    running_loss = 0
    correct = 0
    total = len(train_loader.dataset)
    for images, labels in train_loader:
        images = images.to(device)
        labels = labels.to(device)
        optimizer.zero_grad()
        #data augumentation
        #images = transform(images)
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
        predicted = outputs.max(1, keepdim=True)[1]
        labels = labels.view_as(predicted)
        correct += predicted.eq(labels).sum().item()
    train_loss = running_loss / len(train_loader) 
    train_acc = correct / total
    return train_loss, train_acc

def valid(test_loader): #Validation
    model.eval()
    running_loss = 0
    correct = 0
    total = len(test_loader.dataset)
    with torch.no_grad():
        for images, labels in test_loader:
            images = images.to(device)
            labels = labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            running_loss += loss.item()
            predicted = outputs.max(1, keepdim=True)[1]
            labels = labels.view_as(predicted)
            correct += predicted.eq(labels).sum().item()
    val_loss = running_loss / len(test_loader)
    val_acc = correct / total
    return val_loss, val_acc

#空の配列
acc_list = []
loss_list = []
val_loss_list = []
val_acc_list = []

In [None]:
nepoch = 300

#Fitting
for epoch in range(nepoch):
    loss, acc = train(train_dataloader)
    val_loss, val_acc = valid(val_dataloader)
    print('epoch %d, loss: %.4f acc: %.4f val_loss: %.4f val_acc: %.4f' % (epoch, loss,acc, val_loss, val_acc))
    loss_list.append(loss)
    acc_list.append(acc)
    val_loss_list.append(val_loss)
    val_acc_list.append(val_acc)

In [None]:
#modelの保存
torch.save(model, 'model.pt')

In [None]:
#読み込み
model = torch.load('model.pt')

In [None]:
print('正解率：',val_acc_list[-1]*100, '%')

#学習過程の表示
plt.plot(range(nepoch), loss_list, 'r-', label='train_loss')
plt.plot(range(nepoch), val_loss_list, 'b-', label='val_loss')
plt.legend()
plt.xlabel('epoch')
plt.ylabel('loss')

plt.figure()
plt.plot(range(nepoch), acc_list, 'b-', label='acc')
plt.plot(range(nepoch), val_acc_list, 'g-', label='val_acc')
plt.legend()
plt.xlabel('epoch')
plt.ylabel('acc')

モデル評価：テストデータへの当てはめ

In [None]:
#For Test
def test(test_loader):
    prob = []
    pred = []
    true = []
    model.eval()
    running_loss = 0
    correct = 0
    total = len(test_loader.dataset)
    with torch.no_grad():
        for images, labels in test_loader:
            images = images.to(device)
            labels = labels.to(device)
            outputs = model(images)
            lprob, predicted = outputs.max(1, keepdim=True)
            labels = labels.view_as(predicted)
            correct += predicted.eq(labels).sum().item()
            prob = np.append(prob, scipy.special.expit(torch.Tensor.numpy(outputs[:,1].to('cpu'))))
            pred = np.append(pred, torch.Tensor.numpy(predicted.to('cpu')))
            true = np.append(true, torch.Tensor.numpy(labels.to('cpu')))
    return prob, pred, true

prob, pred, true,  = test(test_dataloader)


In [None]:
# testデータの画像と予測ラベル・正答ラベル・予測確率を出力
plt.figure(figsize = (25, 25))
for i in range(20):
    plt.subplot(5, 4, i + 1)
    plt.axis("off")
    if pred[i] == true[i]:
        plt.title("pred:"+str(pred[i].astype(np.uint8))+' - '+"true:"+str(true[i].astype(np.uint8))+'\n'+'Prob.(Y=1) = %.4f' % (prob[i]))
    else:
        plt.title("pred:"+str(pred[i].astype(np.uint8))+' - '+"true:"+str(true[i].astype(np.uint8))+'\n'+'Prob.(Y=1) = %.4f' % (prob[i]), color = "red") # 分類が間違っていた場合，赤字で書き込む
        
    tmp = test_x[i][0].to('cpu').detach().numpy().copy() 
    tmp = tmp.transpose(1, 2, 0)
    img_pil = Image.fromarray((tmp*255).astype(np.uint8))
    plt.imshow(img_pil)




In [None]:
#2×2行列に正解不正解まとめる
cmat = confusion_matrix(true, pred)
print(cmat)

#感度特異度の算出
tn, fp, fn, tp = cmat.flatten()

acc = round((tp+tn)/(tp+tn+fp+fn),4)
sen = round(tp/(tp+fn),4)
spe = round(tn/(tn+fp),4)
ppv = round(tp/(tp+fp),4)
npv = round(tn/(tn+fn),4)
print("acc=",acc,"sen=",sen," ,spe=",spe," ,ppv=",ppv," ,npv=",npv)
  


In [None]:
#ROC curve

fpr, tpr, thresholds = roc_curve(true, prob)     
plt.plot(fpr, tpr, marker='o')
plt.xlabel('1-Specificity')
plt.ylabel('Sensitivity')
plt.grid()
#plt.savefig(path+'/roc_curve.png')

Youden_index = tpr-fpr    
index = np.where(Youden_index==max(Youden_index))[0][0]

cutoff = thresholds[index]
sensitivity = tpr[index]
specificity = 1 - fpr[index]
print("Cutoff-value:",round(cutoff,4),"Sensitivity:",round(sensitivity,4),"Specificity: ",round(specificity,4))




In [None]:
#!pip install grad-cam -q
#!conda install grad-cam

Grad-CAM

In [None]:
# Grad-CAM

In [None]:
#読み込み
model = torch.load('model.pt')
model.eval()

In [None]:
print(model)

In [None]:
target_layer = model.conv4[1]

In [None]:
gradcam = GradCAM(model, target_layer)

In [None]:
image_number = 1
torch_img = torch.tensor(np.expand_dims(test_x[image_number][0], 0)).to(device) #画像をTorch
images = []
mask, _ = gradcam(torch_img)
heatmap, result = visualize_cam(mask, torch_img)
image0 = torch.squeeze(torch_img,dim=0)
images.extend([image0.cpu(), result])
grid_image = torchvision.utils.make_grid(images, nrow=2)
transforms.ToPILImage()(grid_image)

Finetuning

In [None]:
model = models.vgg16(pretrained=True)# 事前トレーニングされたVGG16モデルを取得
model.classifier[6] = nn.Linear(in_features=4096,out_features=2)   # 入力サイズはデフォルトの4096 出力はデフォルトの1000から2に変更
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') #GPUで使用できるように指定
model = model.to(device)
print(model)

In [None]:
# 転移学習で学習させるパラメータ
params_to_update = []
update_param_names = ['classifier.6.weight', 'classifier.6.bias']
for name, param in model.named_parameters():
    if name in update_param_names:
        param.requires_grad = True 
        params_to_update.append(param) 
        print(name) 
    else:
        param.requires_grad = False # 出力層以外は勾配計算なし

In [None]:
# 損失関数
criterion = nn.CrossEntropyLoss()
# オプティマイザー
optimizer = torch.optim.SGD(params=params_to_update, lr=0.001, momentum=0.9)



#Trainig用
def train(train_loader):
    model.train()
    running_loss = 0
    correct = 0
    total = len(train_loader.dataset)
    
    for images, labels in train_loader:
        images = images.to(device)
        labels = labels.to(device)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

        predicted = outputs.max(1, keepdim=True)[1]
        labels = labels.view_as(predicted)
        correct += predicted.eq(labels).sum().item()


    train_loss = running_loss / len(train_loader) 
    train_acc = correct / total
    return train_loss, train_acc



#Validation用
def valid( test_loader):
    model.eval()
    running_loss = 0
    correct = 0
    total = len(test_loader.dataset)
    with torch.no_grad():
        for images, labels in test_loader:
            images = images.to(device)
            labels = labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            running_loss += loss.item()
            predicted = outputs.max(1, keepdim=True)[1]
            labels = labels.view_as(predicted)
            correct += predicted.eq(labels).sum().item()
            #break
            
    val_loss = running_loss / len(test_loader)
    val_acc = correct / total
    return val_loss, val_acc

'''誤差(loss)を記録する空の配列を用意'''
acc_list = []
loss_list = []
val_loss_list = []
val_acc_list = []


In [None]:
nepoch = 50

#学習 1回目の学習は行わず評価のみ
for epoch in range(nepoch):
    
    if(epoch > 0):
        loss, acc = train(train_dataloader)
    else:
        loss, acc = valid(train_dataloader)

    val_loss, val_acc = valid(val_dataloader)
    print('epoch %d, loss: %.4f acc: %.4f val_loss: %.4f val_acc: %.4f' % (epoch, loss,acc, val_loss, val_acc))
    loss_list.append(loss)
    acc_list.append(acc)
    val_loss_list.append(val_loss)
    val_acc_list.append(val_acc)

In [None]:
#modelの保存
torch.save(model, 'model_finetuning.pt')


In [None]:
print('正解率：',val_acc_list[-1]*100, '%')

'''結果の表示'''
plt.plot(range(nepoch), loss_list, 'r-', label='train_loss')
plt.plot(range(nepoch), val_loss_list, 'b-', label='val_loss')
plt.legend()
plt.xlabel('epoch')
plt.ylabel('loss')

plt.figure()
plt.plot(range(nepoch), acc_list, 'b-', label='acc')
plt.plot(range(nepoch), val_acc_list, 'g-', label='val_acc')
plt.legend()
plt.xlabel('epoch')
plt.ylabel('acc')