<a href="https://colab.research.google.com/github/ykitaguchi77/Laboratory_course/blob/master/10.1.%20Pytorch_Lightning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#**画像のスクレイピング --> Lightning Pytorchで分類**



#**iCrawlerを用いてスクレイピング**

https://atmarkit.itmedia.co.jp/ait/articles/2010/28/news018.html

公式： https://icrawler.readthedocs.io/en/latest/builtin.html

In [None]:
!pip install icrawler
from icrawler.builtin import BingImageCrawler
import os

# List of keywords
keywords = ["cat", "dog"]
max_num = 150

for keyword in keywords:
    output_dir = f"/content/{keyword}"

    # Create the directory if it doesn't exist
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    crawler = BingImageCrawler(storage={"root_dir": output_dir})
    crawler.crawl(keyword=keyword, max_num=max_num)

##**Classify dog/cat using Pytorch Lightning**

https://tech.aru-zakki.com/from-pytorch-to-lightning/

In [None]:
import os
import json
import math
try:
    import japanize_matplotlib
except ModuleNotFoundError:
    !pip install japanize_matplotlib
    import japanize_matplotlib
import numpy as np
import time
import copy
import requests
from PIL import Image
from types import SimpleNamespace
from io import StringIO

import pandas as pd
import matplotlib.pyplot as plt
from matplotlib import cm
import seaborn as sns
sns.set()

from tqdm import tqdm

from sklearn.metrics import mean_squared_error

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.utils.data as data
import torchvision
from torchvision.datasets import CIFAR10
from torchvision import transforms

def set_seed(seed):
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        # GPUありの場合
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
        torch.backends.cudnn.deterministic = True
        torch.backends.cudnn.benchmark = False

def get_device():
    if torch.cuda.is_available():
        device = torch.device("cuda:0")
    # PytorchLightningでエラーが出るので、MPSはパス
    #elif torch.backends.mps.is_built():
    #    device = torch.device("mps:0")
    else:
        device = torch.device("cpu")
    return device

In [None]:
#################
# 画像をフォルダ整理  #
#################

import shutil
from sklearn.model_selection import train_test_split

# 画像のパスを取得
dog_paths = ['/content/dog/' + f for f in os.listdir('/content/dog/')]
cat_paths = ['/content/cat/' + f for f in os.listdir('/content/cat/')]

# 画像パスを結合
all_paths = dog_paths + cat_paths

# 訓練用と検証用に分割
train_paths, valid_paths = train_test_split(all_paths, test_size=0.2, random_state=42)

# ディレクトリを作成
def create_or_clean_dir(directory_path):
    if os.path.exists(directory_path):
        shutil.rmtree(directory_path)
    os.makedirs(directory_path)
create_or_clean_dir('/content/train/dog')
create_or_clean_dir('/content/train/cat')
create_or_clean_dir('/content/valid/dog')
create_or_clean_dir('/content/valid/cat')

# 画像をコピー
for path in train_paths:
  if 'dog' in path:
    shutil.copy(path, '/content/train/dog')
  elif 'cat' in path:
    shutil.copy(path, '/content/train/cat')

for path in valid_paths:
  if 'dog' in path:
    shutil.copy(path, '/content/valid/dog')
  elif 'cat' in path:
    shutil.copy(path, '/content/valid/cat')

In [None]:
import os
import torch
from torch.utils.data import DataLoader
from torchvision import transforms
from torchvision.datasets import ImageFolder
import pytorch_lightning as pl
from pytorch_lightning import loggers as pl_loggers
from pytorch_lightning.callbacks import ProgressBar, LearningRateMonitor
from pytorch_lightning.core.datamodule import LightningDataModule
import torch.nn.functional as F
from torchmetrics import Accuracy

class CatDogDataModule(LightningDataModule):
    # コンストラクタでバッチサイズを初期化します。
    def __init__(self, batch_size=32):
      super().__init__()
      self.batch_size = batch_size
      # データ前処理のための変換を定義します。
      self.data_transform = transforms.Compose([
        transforms.Resize(256), # 画像を256x256にリサイズ
        transforms.CenterCrop(224), # 中心で224x224にクロップ
        transforms.ToTensor(), # 画像をテンソルに変換
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) # 標準化
      ])

    # データセットをセットアップします。'fit'ステージまたはデフォルトで実行されます。
    def setup(self, stage=None):
      if stage == 'fit' or stage is None:
        # 訓練データセットを定義します。
        self.train_dataset = ImageFolder(root='/content/train', transform=self.data_transform)
        # 検証データセットを定義します。
        self.valid_dataset = ImageFolder(root='/content/valid', transform=self.data_transform)

    # 訓練データローダーを作成します。
    def train_dataloader(self):
      return DataLoader(self.train_dataset, batch_size=self.batch_size, shuffle=True)

    # 検証データローダーを作成します。
    def val_dataloader(self):
      return DataLoader(self.valid_dataset, batch_size=self.batch_size, shuffle=False)

class Classifier(pl.LightningModule):
    # モデルを初期化します。
    def __init__(self):
      super().__init__()
      # ResNet50モデルを読み込み、事前訓練された重みを使用します。
      self.model = torch.hub.load('pytorch/vision:v0.10.0', 'resnet50', pretrained=True)
      # 最後の全結合層を2クラス分類用に置き換えます。
      self.model.fc = torch.nn.Linear(self.model.fc.in_features, 2)

    # 順伝播を定義します。
    def forward(self, x):
      return self.model(x)

    # 訓練ステップを定義します。
    def training_step(self, batch, batch_idx):
      x, y = batch
      y_hat = self(x)
      loss = F.cross_entropy(y_hat, y)

      preds = torch.argmax(y_hat, dim=1)
      acc = (preds == y).float().mean()

      # 訓練の損失と正確さをログに記録します。
      self.log('train_loss', loss, prog_bar=True)
      self.log('train_acc', acc, prog_bar=True)

      return loss

    # 検証ステップを定義します。
    def validation_step(self, batch, batch_idx):
      x, y = batch
      y_hat = self(x)
      loss = F.cross_entropy(y_hat, y)

      preds = torch.argmax(y_hat, dim=1)
      acc = (preds == y).float().mean()

      # 検証の損失と正確さをログに記録します。
      self.log('val_loss', loss, prog_bar=True)
      self.log('val_acc', acc, prog_bar=True)

    # オプティマイザーを設定します。
    def configure_optimizers(self):
      return torch.optim.Adam(self.parameters(), lr=1e-3)


class PrintMetricsCallback(pl.Callback):
    # 訓練のエポックが終わる度にメトリクスを印刷します。
    def on_train_epoch_end(self, trainer, pl_module):
        # 訓練エポックにおけるログに記録されたメトリクスを取得して印刷します。
        metrics = trainer.logged_metrics
        print(f"Epoch: {trainer.current_epoch}, Train Loss: {metrics['train_loss'].item():.4f}, Train Acc: {metrics['train_acc'].item():.4f}")

    # 検証のエポックが終わる度にメトリクスを印刷します。
    def on_validation_epoch_end(self, trainer, pl_module):
        # 検証エポックにおけるログに記録されたメトリクスを取得して印刷します。
        metrics = trainer.logged_metrics
        print(f"\nEpoch: {trainer.current_epoch}, Validation Loss: {metrics['val_loss'].item():.4f}, Validation Acc: {metrics['val_acc'].item():.4f}")

# データモジュールを初期化します（バッチサイズ32）。
data_module = CatDogDataModule(batch_size=32)
# 分類器モデルを初期化します。
model = Classifier()

# TensorBoardでログを記録するためのロガーを初期化します。
logger = pl_loggers.TensorBoardLogger('logs/')

# トレーナーオブジェクトを初期化します。
trainer = pl.Trainer(
  max_epochs=3, # 最大エポック数を設定します。
  accelerator='gpu', # GPUを使用するように設定します。
  devices=1, # 使用するデバイスの数（GPUの数）を設定します。
  callbacks=[PrintMetricsCallback()], # コールバックを追加します。
  logger=logger  # ロガーを設定します。
)

# 訓練を開始します。
trainer.fit(model, datamodule=data_module)


In [None]:
import matplotlib.pyplot as plt
import random
import torch
from torchvision.utils import make_grid
from PIL import Image

# モデルを評価モードに設定
model.eval()

# 画像の視覚化のための逆変換を定義します
inverse_transform = transforms.Compose([
    transforms.Normalize(mean=[0., 0., 0.], std=[1/0.229, 1/0.224, 1/0.225]),
    transforms.Normalize(mean=[-0.485, -0.456, -0.406], std=[1., 1., 1.]),
])

# trainer.fit(model, datamodule=data_module) が既に実行されていると仮定します
valid_dataset = data_module.valid_dataset
# 検証データセットからランダムに30個のインデックスを選択
indices = random.sample(range(len(valid_dataset)), 30)

# 選択した画像とその予測結果を視覚化します
for i in indices:
    # 生の画像へのパスとラベルを取得
    path, y = valid_dataset.imgs[i]

    # 画像ファイルをPIL Imageとして開く
    x_raw = Image.open(path).convert('RGB')

    # 元の変換パイプラインを適用
    x_transformed = data_module.data_transform(x_raw).unsqueeze(0)  # バッチ次元を追加

    # 予測を実行
    y_hat = model(x_transformed)
    pred = torch.argmax(y_hat, dim=1)

    # 画像のインデックス、実際のラベル、予測ラベルを表示
    print(f"Image: {i}")
    print(f"Label: {y}")
    print(f"Prediction: {pred.item()}")

    # 視覚化のために逆変換を適用
    x_vis = inverse_transform(x_transformed.squeeze())  # バッチ次元を削除

    # テンソルを画像に変換して表示
    plt.imshow(x_vis.permute(1, 2, 0).numpy())  # テンソルを画像フォーマットに変更
    plt.axis('off')  # 軸をオフにする
    plt.show()
