In [38]:
import sys
import os
project_root = os.path.abspath(os.path.join(os.getcwd(), '..'))
if project_root not in sys.path:
    sys.path.append(project_root)
from src.my_app import UNet, PreTrainDataset, create_optimized_dataloader
import torch
from PIL import Image
import torchvision.transforms as transforms
import os

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from torchvision import transforms
from PIL import Image
import os
import json
import numpy as np
import cv2
from tqdm import tqdm
import matplotlib.pyplot as plt
import multiprocessing as mp


def crop_labels_to_match(labels_to_crop, target_tensor):
    target_h, target_w = target_tensor.shape[2:]
    source_h, source_w = labels_to_crop.shape[2:]
    delta_h = (source_h - target_h) // 2
    delta_w = (source_w - target_w) // 2
    return labels_to_crop[:, :, delta_h:delta_h + target_h, delta_w:delta_w + target_w]
transform = transforms.Compose([
    # transforms.Resizeze((128, 128)),
    transforms.ToTensor()
])
# --- Dataset ---
test_doc_id_list = ['100241706', '100249371', '100249376', '100249416', '100249476', '100249537', '200003076', '200003803', '200003967', '200004107']
train_dataset = PreTrainDataset(
    input_path='../../kuzushiji-recognition/synthetic_images/input_images/',
    json_path='../../kuzushiji-recognition/synthetic_images/gt_json.json',
    test_doc_id_list=test_doc_id_list,
    test_mode=False,
    device=torch.device('cuda'),  # GPUを明示的に指定
    precompute_gt=True,  # 事前計算を有効化
    # num_workers=None
    transform=transform,  # 画像変換を追加
    target_width=300
)
test_dataset = PreTrainDataset(
    input_path='../../kuzushiji-recognition/synthetic_images/input_images/',
    json_path='../../kuzushiji-recognition/synthetic_images/gt_json.json',
    test_doc_id_list=test_doc_id_list,
    test_mode=True,
    device=torch.device('cuda'),  # GPUを明示的に指定
    precompute_gt=True,  # 事前計算を有効化
    # num_workers=4
    transform=transform,  # 画像変換を追加
    targetwidth=300
)

# 最適化されたDataLoaderの作成
train_dl = create_optimized_dataloader(train_dataset, batch_size=1, num_workers=min(mp.cpu_count(), 4))
test_dl = create_optimized_dataloader(test_dataset, batch_size=1, num_workers=min(mp.cpu_count(), 4))

# --- モデル、損失関数、最適化手法の定義 ---
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
# device = torch.device("mps" if torch.cuda.is_available() else "cpu")
model = UNet(3, 4).to(device)
criterion = nn.MSELoss() # 回帰問題なのでMSE損失を使用
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

# --- チェックポイントの設定 ---
checkpoint_dir = "../.checkpoints"
os.makedirs(checkpoint_dir, exist_ok=True)

# 最良のモデルを追跡するための変数
best_test_loss = float('inf')
start_epoch = 0

# チェックポイントの読み込み（存在する場合）
checkpoint_path = os.path.join(checkpoint_dir, "latest_checkpoint.pth")
if os.path.exists(checkpoint_path):
    checkpoint = torch.load(checkpoint_path)
    model.load_state_dict(checkpoint['model_state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    start_epoch = checkpoint['epoch']
    best_test_loss = checkpoint['best_test_loss']
    train_loss_history = checkpoint['train_loss_history']
    test_loss_history = checkpoint['test_loss_history']
    print(f"チェックポイントを読み込みました（エポック {start_epoch}）")

print("学習を開始します...")
num_epochs = 100 # エポック数を定義

# 損失の履歴を保存するリストを初期化
train_loss_history = []
test_loss_history = []

print("学習を開始します...")
for epoch in range(start_epoch, num_epochs):
    print(f'start epcoch')
    # --- 訓練フェーズ ---
    model.train() # モデルを訓練モードに設定
    train_loss_total = 0
    
    # tqdmでプログレスバーを表示
    train_bar = tqdm(train_dl, desc=f"Epoch {epoch+1}/{num_epochs} [Train]")
    for imgs, masks in train_bar:
        imgs, masks = imgs.to(device), masks.to(device)
        
        preds = model(imgs)
        cropped_masks = crop_labels_to_match(masks, preds)

        loss = criterion(preds, cropped_masks)
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        train_loss_total += loss.item()
        # プログレスバーに現在のロスを表示
        train_bar.set_postfix(loss=loss.item())

    avg_train_loss = train_loss_total / len(train_dl)
    train_loss_history.append(avg_train_loss)

    # --- 評価フェーズ ---
    model.eval() # モデルを評価モードに設定
    test_loss_total = 0
    
    # 勾配計算を無効化して、メモリ効率を良くする
    with torch.no_grad():
        test_bar = tqdm(test_dl, desc=f"Epoch {epoch+1}/{num_epochs} [Test]")
        for imgs, masks in test_bar:
            imgs, masks = imgs.to(device), masks.to(device)
            preds = model(imgs)
            cropped_masks = crop_labels_to_match(masks, preds)
            
            loss = criterion(preds, cropped_masks)
            test_loss_total += loss.item()
            test_bar.set_postfix(loss=loss.item())

    avg_test_loss = test_loss_total / len(test_dl)
    test_loss_history.append(avg_test_loss)
    
    # 各エポックの最後に訓練ロスとテストロスを表示
    print(f"Epoch {epoch+1}/{num_epochs} | Train Loss: {avg_train_loss:.4f} | Test Loss: {avg_test_loss:.4f}")
    
    # 最新のチェックポイントを保存
    torch.save({
        'epoch': epoch + 1,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'train_loss_history': train_loss_history,
        'test_loss_history': test_loss_history,
        'best_test_loss': best_test_loss
    }, os.path.join(checkpoint_dir, "latest_checkpoint.pth"))
    
    # より良い性能が出た場合、ベストモデルとして保存
    if avg_test_loss < best_test_loss:
        best_test_loss = avg_test_loss
        torch.save({
            'epoch': epoch + 1,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'test_loss': avg_test_loss,
        }, os.path.join(checkpoint_dir, "best_model.pth"))
        print(f"新しいベストモデルを保存しました（Test Loss: {avg_test_loss:.4f}）")

print("学習が完了しました。")

# --- 損失の推移をグラフで表示 ---
plt.figure(figsize=(10, 5))
plt.plot(train_loss_history, label="Train Loss")
plt.plot(test_loss_history, label="Test Loss")
plt.title("Loss Trend")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.legend()
plt.grid(True)
plt.show()

Using device: cuda


FileNotFoundError: [Errno 2] No such file or directory: '../../kuzushiji-recognition/synthetic_images/input_images/'

# 学習済みモデルの出力を確認

## チェックポイントのロード動作確認

In [None]:
# --- チェックポイントの設定 ---.checkpoints/latest_checkpoint.pth
checkpoint_dir = '../.checkpoints'
checkpoint_path = os.path.join(checkpoint_dir, "latest_checkpoint.pth")

# 現在のデバイス状況を確認
device = torch.device("mps" if torch.mps.is_available() else "cpu")
print(f"Using device: {device}")

if os.path.exists(checkpoint_path):
    # デバイス間の互換性を保つための読み込み
    checkpoint = torch.load(checkpoint_path, map_location=device)
    
    print(f"チェックポイントを読み込みました: {checkpoint_path}")
    print(f"エポック: {checkpoint.get('epoch', 'N/A')}")
    print(f"ベストテストロス: {checkpoint.get('best_test_loss', 'N/A')}")
    print(f"訓練ロス履歴の長さ: {len(checkpoint.get('train_loss_history', []))}")
    print(f"テストロス履歴の長さ: {len(checkpoint.get('test_loss_history', []))}")
else:
    print(f"チェックポイントが見つかりません: {checkpoint_path}")
    checkpoint = None
    checkpoint = None

Using device: mps
チェックポイントを読み込みました: ../.checkpoints/latest_checkpoint.pth
エポック: 87
ベストテストロス: 82.97725561295073
訓練ロス履歴の長さ: 87
テストロス履歴の長さ: 87


In [37]:
# --- チェックポイントの設定 ---.checkpoints/latest_checkpoint.pth
# 現在のデバイス状況を確認
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device = torch.device("mps" if torch.mps.is_available() else "cpu")
print(f"Using device: {device}")
# device = torch.device("mps" if torch.cuda.is_available() else "cpu")
model = UNet(3, 4).to(device)
criterion = nn.MSELoss() # 回帰問題なのでMSE損失を使用
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

# --- チェックポイントの設定 ---
checkpoint_dir = "../.checkpoints"
os.makedirs(checkpoint_dir, exist_ok=True)

# 最良のモデルを追跡するための変数
best_test_loss = float('inf')
start_epoch = 0

# チェックポイントの読み込み（存在する場合）
# セル6のチェックポイント読み込み部分を修正
checkpoint_path = os.path.join(checkpoint_dir, "latest_checkpoint.pth")
if os.path.exists(checkpoint_path):
    # CUDAからMPS/CPUへの変換を明示的に行う
    if device.type == 'mps':
        checkpoint = torch.load(checkpoint_path, map_location=torch.device('mps'))
    else:
        checkpoint = torch.load(checkpoint_path, map_location=torch.device('cpu'))
    
    model.load_state_dict(checkpoint['model_state_dict'])
    
    # OptimizerもCPU/MPSに適応させる必要がある場合がある
    try:
        optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    except RuntimeError as e:
        print(f"Optimizer状態の読み込みに失敗: {e}")
        print("新しいOptimizerで続行します")
    
    start_epoch = checkpoint['epoch']
    best_test_loss = checkpoint['best_test_loss']
    train_loss_history = checkpoint['train_loss_history']
    test_loss_history = checkpoint['test_loss_history']
    print(f"チェックポイントを読み込みました（エポック {start_epoch}）")

# 損失の履歴を保存するリストを初期化
train_loss_history = []
test_loss_history = []

print("学習を開始します...")
for epoch in range(start_epoch, num_epochs):
    print(f'start epcoch')
    # --- 訓練フェーズ ---
    model.train() # モデルを訓練モードに設定
    train_loss_total = 0
    
    # tqdmでプログレスバーを表示
    train_bar = tqdm(train_dl, desc=f"Epoch {epoch+1}/{num_epochs} [Train]")
    for imgs, masks in train_bar:
        imgs, masks = imgs.to(device), masks.to(device)
        
        preds = model(imgs)
        cropped_masks = crop_labels_to_match(masks, preds)

        loss = criterion(preds, cropped_masks)
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        train_loss_total += loss.item()
        # プログレスバーに現在のロスを表示
        train_bar.set_postfix(loss=loss.item())

    avg_train_loss = train_loss_total / len(train_dl)
    train_loss_history.append(avg_train_loss)

Using device: mps
チェックポイントを読み込みました（エポック 87）
学習を開始します...


NameError: name 'num_epochs' is not defined

## 画像読み込みと torch tensor 変換

In [None]:

def load_image_as_tensor(image_path, device=None, target_size=None, normalize=True):
    """
    画像ファイルを読み込んでtorch.tensorに変換する
    
    Args:
        image_path (str): 画像ファイルのパス
        device (torch.device, optional): テンソルを配置するデバイス。Noneの場合は自動選択
        target_size (tuple, optional): リサイズするサイズ (height, width)
        normalize (bool): 正規化を行うかどうか (0-1の範囲に変換)
    
    Returns:
        torch.Tensor: 変換された画像テンソル (C, H, W)
    """
    
    # デバイスの自動選択
    if device is None:
        device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
    
    # ファイルの存在確認
    if not os.path.exists(image_path):
        raise FileNotFoundError(f"画像ファイルが見つかりません: {image_path}")
    
    try:
        # 画像を読み込み
        image = Image.open(image_path)
        
        # RGBに変換（PNGのアルファチャンネルなどを除去）
        if image.mode != 'RGB':
            image = image.convert('RGB')
        
        # 変換パイプラインを構築
        transform_list = []
        
        # リサイズ
        if target_size is not None:
            transform_list.append(transforms.Resize(target_size))
        
        # テンソル変換
        transform_list.append(transforms.ToTensor())
        
        # 正規化（ToTensorで既に0-1に正規化されるが、ImageNetの標準化も可能）
        if normalize == 'imagenet':
            transform_list.append(transforms.Normalize(
                mean=[0.485, 0.456, 0.406],
                std=[0.229, 0.224, 0.225]
            ))
        
        # 変換を実行
        transform = transforms.Compose(transform_list)
        tensor = transform(image)
        
        # デバイスに移動
        tensor = tensor.to(device)
        
        return tensor
        
    except Exception as e:
        raise RuntimeError(f"画像の読み込み/変換でエラーが発生しました: {e}")

# 使用例
def load_specific_image():
    """指定された画像を読み込む"""
    image_path = ".image_data/100241706_00002_2.jpg"
    
    # デバイス設定
    device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
    print(f"Using device: {device}")
    
    try:
        # 画像をテンソルに変換
        image_tensor = load_image_as_tensor(
            image_path=image_path,
            device=device,
            target_size=(300, 300),  # 必要に応じてリサイズ
            normalize=True
        )
        
        print(f"画像が正常に読み込まれました")
        print(f"テンソルの形状: {image_tensor.shape}")
        print(f"データ型: {image_tensor.dtype}")
        print(f"値の範囲: {image_tensor.min():.3f} - {image_tensor.max():.3f}")
        print(f"デバイス: {image_tensor.device}")
        
        return image_tensor
        
    except FileNotFoundError:
        print(f"画像ファイルが見つかりません: {image_path}")
        print("現在のディレクトリから相対パスを確認してください")
        return None
    except Exception as e:
        print(f"エラーが発生しました: {e}")
        return None

# バッチ用の関数
def add_batch_dimension(tensor):
    """
    テンソルにバッチ次元を追加 (C, H, W) -> (1, C, H, W)
    """
    return tensor.unsqueeze(0)

# 実行例
if __name__ == "__main__":
    # 画像を読み込み
    image_tensor = load_specific_image()
    
    if image_tensor is not None:
        # バッチ次元を追加（モデル推論用）
        batch_tensor = add_batch_dimension(image_tensor)
        print(f"バッチテンソルの形状: {batch_tensor.shape}")

## Dataset型形式確認