# 事前準備

In [1]:
!pip install pytorch_lightning torchmetrics tensorboard

[0m

In [2]:
# ライブラリのインポート
import os
import random
from glob import glob
from warnings import filterwarnings

import argparse

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.model_selection import train_test_split

import cv2
from PIL import Image

import torch
from torch.utils.data import Dataset, DataLoader
from torch import nn, optim
import torch.nn.functional as F
from torch.optim import optimizer
import torchvision
from torchvision import datasets, transforms, models

import pytorch_lightning as pl
import torchmetrics
from torchmetrics import F1, Accuracy, MetricCollection, Precision, Recall

import tensorboard

import warnings

filterwarnings('ignore')

In [3]:
# Model name
model_name = 'mobilenet_v3_large'

In [4]:
# ハイパーパラメータの設定
parser = argparse.ArgumentParser()
parser.add_argument('--image_size', type=int, default=224)
parser.add_argument('--num_classes', type=int, default=2)
parser.add_argument('--epochs', type=int, default=100)
parser.add_argument('--batch_size', type=int, default=16)
parser.add_argument('--lr', type=float, default=1e-4) # 学習率
parser.add_argument('--mode', type=str, default='max') # スケジューラの監視対象
parser.add_argument('--factor', type=float, default=0.5) # スケジューラの学習率を減衰させる割合
parser.add_argument('--lr_patience', type=int, default=5) # スケジューラの監視対象回数
parser.add_argument('--threshold', type=float, default=1e-5) # スケジューラの監視対象回数
parser.add_argument('--patience', type=int, default=20) # earlystoppingの監視対象回数
opt = parser.parse_args(args=[])
print(opt)

Namespace(batch_size=16, epochs=100, factor=0.5, image_size=224, lr=0.0001, lr_patience=5, mode='max', num_classes=2, patience=20, threshold=1e-05)


# 前処理

In [5]:
# 両サイドを5%切り取るクラス
class SideCrop():
    def __call__(self, image):
        w, h = image.size
        w1 = 0.05 * w
        image_crop = image.crop((w1, 0, w-w1, h)) # (left, upper, right, lower)
        return image_crop

In [6]:
# transformの設定
transform = {
    'train': transforms.Compose([
        SideCrop(),
        transforms.Resize((opt.image_size,opt.image_size)),
        transforms.RandomHorizontalFlip(p=0.5),
        transforms.RandomVerticalFlip(p=0.5),
        transforms.RandomAffine(degrees=[-10, 10], translate=(0.1, 0.1), scale=(0.5, 1.5)),
        transforms.RandomRotation(degrees=10),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
        ]),
    'val': transforms.Compose([
        SideCrop(),
        transforms.Resize((opt.image_size,opt.image_size)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
        ]),
}

In [7]:
# ImageFolderによるデータセットの作成
data_dir = "/kaggle/input/chest-xray-pneumonia/chest_xray"
image_datasets = {
    i: datasets.ImageFolder(os.path.join(data_dir, i), transform[i])
    for i in ["train", "val"]
}

In [8]:
class CXDataModule(pl.LightningDataModule):
    def __init__(self, image_datasets: dict, batch_size: int = 32):
        super().__init__()
        self.image_datasets = image_datasets
        self.batch_size = batch_size

    def train_dataloader(self):
        return DataLoader(
            self.image_datasets["train"], batch_size=self.batch_size, shuffle=True
        )

    def val_dataloader(self):
        return DataLoader(self.image_datasets["val"], batch_size=self.batch_size)

In [9]:
data_module = CXDataModule(image_datasets=image_datasets, batch_size=opt.batch_size)

# モデルの定義

In [10]:
class Net(pl.LightningModule):
    def __init__(self, lr: float, num_classes: int):
        super().__init__()
        
        self.lr = lr
        self.num_classes = num_classes
        self.loss_fn = nn.CrossEntropyLoss()
        
        # 学習済みモデルの読み込み
        self.model = models.mobilenet_v3_large(pretrained=True)
        fc_in_features = self.model.classifier[0].out_features # 最終レイヤー関数の次元数
        self.model.classifier[3] = nn.Linear(fc_in_features, self.num_classes) # 最終レイヤー関数の付け替え
        
        '''
        ファインチューニング
        '''
        # ファインチューニングで学習させるパラメータを、変数params_to_updateの1～3に格納する
        params_to_update_1 = []
        params_to_update_2 = []

        # 学習させる層のパラメータ名を指定
        update_param_names_1 = ["features"]
        update_param_names_2 = ["classifier.0.weight", "classifier.0.bias",
                                "classifier.3.weight", "classifier.3.bias"]

        # パラメータごとに各リストに格納する
        for name, param in self.model.named_parameters():
            if update_param_names_1[0] in name:
                param.requires_grad = True
                params_to_update_1.append(param)

            elif name in update_param_names_2:
                param.requires_grad = True
                params_to_update_2.append(param)

            else:
                param.requires_grad = False
        
        self.train_acc = torchmetrics.Accuracy()
        self.val_acc = torchmetrics.Accuracy()
        self.test_acc = torchmetrics.Accuracy()
        
    def forward(self, x):
        output = self.model(x)
        return output
    
    # 学習データに対する処理
    def training_step(self, batch, batch_idx):
        images, target = batch
        preds = self.forward(images)
        loss = self.loss_fn(preds, target)
        self.log('train_loss', loss, on_step=False, on_epoch=True)
        self.log('train_acc', self.train_acc(preds, target), on_step=False, on_epoch=True)
        return loss
    
    # 検証データに対する処理
    def validation_step(self, batch, batch_idx):
        images, target = batch
        preds = self.forward(images)
        loss = self.loss_fn(preds, target)
        self.log('val_loss', loss, on_step=False, on_epoch=True)
        self.log('val_acc', self.val_acc(preds, target), on_step=False, on_epoch=True)
        return loss
    
    # テストデータに対する処理
    def test_step(self, batch, batch_idx):
        images, target = batch
        preds = self.forward(images)
        loss = self.loss_fn(preds, target)
        #self.log('test_loss', loss, on_step=False, on_epoch=True)
        #self.log('test_acc', self.test_acc(preds, target), on_step=False, on_epoch=True)
        return loss
    
    def configure_optimizers(self):
        optimizer = optim.Adam(self.model.parameters(), lr=self.lr, weight_decay=0.0001)
        return optimizer

In [11]:
# GPUを含めた乱数のシードを設定
pl.seed_everything(0)

# モデルのインスタンス化
net = Net(lr=opt.lr, num_classes=opt.num_classes)

# 学習を行う
trainer = pl.Trainer(max_epochs=10, gpus=1)

# 学習の実行
trainer.fit(net, datamodule=data_module)

Downloading: "https://download.pytorch.org/models/mobilenet_v3_large-8738ca79.pth" to /root/.cache/torch/hub/checkpoints/mobilenet_v3_large-8738ca79.pth


  0%|          | 0.00/21.1M [00:00<?, ?B/s]

Sanity Checking: 0it [00:00, ?it/s]

Training: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

In [12]:
%load_ext tensorboard
%tensorboard --logdir lightning_logs/