# MRZ CRNN Training

MRZ (Machine Readable Zone) 用の軽量 CRNN モデルを学習する。

## 概要
- **アーキテクチャ**: CNN + BiLSTM + CTC
- **入力**: 32x280 グレースケール画像
- **出力**: 44文字の MRZ テキスト
- **目標**: CER < 1%, モデルサイズ < 5MB

## 1. セットアップ

In [None]:
# GPU 確認
!nvidia-smi

In [None]:
# 必要なライブラリは Colab に事前インストール済み
import torch
import torch.nn as nn
import numpy as np
from pathlib import Path
import random
import time

print(f"PyTorch: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")

## 2. 合成 MRZ データ生成

PIL を使って MRZ 行画像を合成生成する。
OCR-B フォントの代わりにモノスペースフォントを使用。

In [None]:
from PIL import Image, ImageDraw, ImageFont
import string

# MRZ で使用する文字セット
MRZ_CHARS = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789<"

def generate_random_mrz_line() -> str:
    """
    ランダムな MRZ 行（44文字）を生成
    
    実際の MRZ フォーマットに近いパターンを生成:
    - Line 1: P<COUNTRY_CODE + NAME + FILLER
    - Line 2: PASSPORT_NO + CHECK + NATIONALITY + DOB + CHECK + SEX + EXPIRY + CHECK + OPTIONAL + CHECK
    """
    # ランダムに Line 1 か Line 2 形式を選択
    if random.random() < 0.5:
        # Line 1 形式: P<XXX + 名前 + <フィラー
        doc_type = random.choice(["P", "I", "A", "C"])
        country = "".join(random.choices(string.ascii_uppercase, k=3))
        name_len = random.randint(20, 35)
        name = "".join(random.choices(string.ascii_uppercase + "<", k=name_len))
        line = f"{doc_type}<{country}{name}"
        # 44文字にパディング
        line = line[:44].ljust(44, "<")
    else:
        # Line 2 形式: 数字とチェックディジット
        passport_no = "".join(random.choices(string.ascii_uppercase + string.digits, k=9))
        check1 = random.choice(string.digits)
        nationality = "".join(random.choices(string.ascii_uppercase, k=3))
        dob = "".join(random.choices(string.digits, k=6))
        check2 = random.choice(string.digits)
        sex = random.choice(["M", "F", "<"])
        expiry = "".join(random.choices(string.digits, k=6))
        check3 = random.choice(string.digits)
        optional = "".join(random.choices(string.ascii_uppercase + string.digits + "<", k=14))
        check4 = random.choice(string.digits)
        line = f"{passport_no}{check1}{nationality}{dob}{check2}{sex}{expiry}{check3}{optional}{check4}"
        line = line[:44]
    
    return line


def render_mrz_image(
    text: str,
    height: int = 32,
    font_size: int = 24,
    add_noise: bool = True
) -> np.ndarray:
    """
    MRZ テキストを画像にレンダリング
    
    Args:
        text: 44文字の MRZ テキスト
        height: 出力画像の高さ
        font_size: フォントサイズ
        add_noise: ノイズを追加するか
    
    Returns:
        グレースケール画像 (H, W)
    """
    # モノスペースフォントを使用（Colab 環境）
    try:
        font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSansMono.ttf", font_size)
    except:
        font = ImageFont.load_default()
    
    # テキストサイズを計算
    dummy_img = Image.new("L", (1, 1))
    dummy_draw = ImageDraw.Draw(dummy_img)
    bbox = dummy_draw.textbbox((0, 0), text, font=font)
    text_width = bbox[2] - bbox[0]
    text_height = bbox[3] - bbox[1]
    
    # 画像を作成（白背景）
    padding = 4
    img_width = text_width + padding * 2
    img = Image.new("L", (img_width, height), color=255)
    draw = ImageDraw.Draw(img)
    
    # テキストを描画（黒文字）
    y_offset = (height - text_height) // 2
    draw.text((padding, y_offset), text, font=font, fill=0)
    
    # NumPy 配列に変換
    img_array = np.array(img)
    
    # ノイズ追加
    if add_noise and random.random() < 0.5:
        noise = np.random.normal(0, 5, img_array.shape)
        img_array = np.clip(img_array + noise, 0, 255).astype(np.uint8)
    
    return img_array


# テスト
sample_text = generate_random_mrz_line()
sample_img = render_mrz_image(sample_text)
print(f"Sample MRZ: {sample_text}")
print(f"Image shape: {sample_img.shape}")

In [None]:
# サンプル画像を表示
import matplotlib.pyplot as plt

fig, axes = plt.subplots(3, 1, figsize=(12, 4))
for i, ax in enumerate(axes):
    text = generate_random_mrz_line()
    img = render_mrz_image(text)
    ax.imshow(img, cmap="gray")
    ax.set_title(text, fontsize=8)
    ax.axis("off")
plt.tight_layout()
plt.show()

## 3. Dataset & DataLoader

In [None]:
from torch.utils.data import Dataset, DataLoader

# 文字セット定義
CHARS = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789<"
CHAR_TO_IDX = {c: i for i, c in enumerate(CHARS)}
IDX_TO_CHAR = {i: c for i, c in enumerate(CHARS)}
NUM_CLASSES = len(CHARS) + 1  # +1 for CTC blank

print(f"文字数: {len(CHARS)}")
print(f"クラス数 (blank含む): {NUM_CLASSES}")


def encode_text(text: str) -> list:
    """テキストを数値インデックスに変換"""
    return [CHAR_TO_IDX[c] for c in text if c in CHAR_TO_IDX]


def decode_output(indices: list) -> str:
    """
    CTC 出力をテキストにデコード
    連続する同一インデックスと blank を除去
    """
    result = []
    prev_idx = -1
    for idx in indices:
        if idx == len(CHARS):  # blank
            prev_idx = idx
            continue
        if idx != prev_idx and idx < len(CHARS):
            result.append(IDX_TO_CHAR[idx])
        prev_idx = idx
    return "".join(result)


class SyntheticMRZDataset(Dataset):
    """
    合成 MRZ データセット
    
    オンラインでランダムに MRZ 画像を生成する。
    epoch ごとに異なるデータが生成される。
    """
    
    def __init__(self, num_samples: int, max_width: int = 280):
        self.num_samples = num_samples
        self.max_width = max_width
    
    def __len__(self):
        return self.num_samples
    
    def __getitem__(self, idx):
        # ランダムな MRZ を生成
        text = generate_random_mrz_line()
        image = render_mrz_image(text, add_noise=True)
        
        # 正規化 (0-1)
        image = image.astype(np.float32) / 255.0
        
        # パディング（幅を max_width に統一）
        h, w = image.shape
        if w < self.max_width:
            pad_w = self.max_width - w
            image = np.pad(image, ((0, 0), (0, pad_w)), constant_values=1.0)
        elif w > self.max_width:
            image = image[:, :self.max_width]
        
        # テンソルに変換 (1, H, W)
        image_tensor = torch.from_numpy(image).unsqueeze(0)
        label = encode_text(text)
        
        return {
            "image": image_tensor,
            "label": torch.tensor(label, dtype=torch.long),
            "label_length": len(label),
            "text": text
        }


def collate_fn(batch):
    """バッチをまとめる（CTC Loss 用）"""
    images = torch.stack([item["image"] for item in batch])
    labels = torch.cat([item["label"] for item in batch])
    label_lengths = torch.tensor([item["label_length"] for item in batch])
    texts = [item["text"] for item in batch]
    return {
        "images": images,
        "labels": labels,
        "label_lengths": label_lengths,
        "texts": texts
    }

## 4. CRNN モデル

In [None]:
class CRNN(nn.Module):
    """
    CRNN (Convolutional Recurrent Neural Network) for MRZ OCR
    
    アーキテクチャ:
    - CNN Backbone: 特徴抽出（高さを1に圧縮）
    - BiLSTM: シーケンスモデリング
    - Linear: 文字分類（37クラス + CTC blank）
    
    入力: (B, 1, 32, W) - グレースケール画像
    出力: (T, B, 38) - 各タイムステップの文字確率
    """
    
    def __init__(self, num_classes: int = 38, hidden_size: int = 128):
        super().__init__()
        
        # CNN Backbone
        self.cnn = nn.Sequential(
            # Block 1: 32 -> 16
            nn.Conv2d(1, 32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            
            # Block 2: 16 -> 8
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            
            # Block 3: 8 -> 4
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d((2, 1)),
            
            # Block 4: 4 -> 2
            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.MaxPool2d((2, 1)),
            
            # Block 5: 2 -> 1
            nn.Conv2d(256, 256, kernel_size=(2, 1)),
            nn.BatchNorm2d(256),
            nn.ReLU(),
        )
        
        # BiLSTM
        self.lstm = nn.LSTM(
            input_size=256,
            hidden_size=hidden_size,
            num_layers=2,
            bidirectional=True,
            batch_first=False,
            dropout=0.1
        )
        
        # 出力層
        self.fc = nn.Linear(hidden_size * 2, num_classes)
    
    def forward(self, x):
        # CNN 特徴抽出: (B, 1, 32, W) -> (B, 256, 1, W')
        features = self.cnn(x)
        
        # 形状変換: (B, C, 1, W') -> (W', B, C)
        b, c, h, w = features.shape
        features = features.squeeze(2)
        features = features.permute(2, 0, 1)
        
        # BiLSTM: (T, B, 256) -> (T, B, hidden*2)
        lstm_out, _ = self.lstm(features)
        
        # 出力層: (T, B, hidden*2) -> (T, B, num_classes)
        output = self.fc(lstm_out)
        output = torch.log_softmax(output, dim=2)
        
        return output


def get_model_info(model):
    """モデル情報を取得"""
    total_params = sum(p.numel() for p in model.parameters())
    param_size = sum(p.numel() * p.element_size() for p in model.parameters())
    buffer_size = sum(b.numel() * b.element_size() for b in model.buffers())
    size_mb = (param_size + buffer_size) / 1024 / 1024
    return {"total_params": total_params, "size_mb": size_mb}


# モデル作成とテスト
model = CRNN(num_classes=NUM_CLASSES)
info = get_model_info(model)
print(f"パラメータ数: {info['total_params']:,}")
print(f"モデルサイズ: {info['size_mb']:.2f} MB")

# 推論テスト
x = torch.randn(1, 1, 32, 280)
output = model(x)
print(f"入力: {x.shape} -> 出力: {output.shape}")

## 5. 学習

In [None]:
from torch.optim import AdamW
from torch.optim.lr_scheduler import OneCycleLR

# ハイパーパラメータ
BATCH_SIZE = 64
EPOCHS = 100  # 30 → 100 に増加
LR = 1e-3
TRAIN_SAMPLES = 10000
VAL_SAMPLES = 1000
MAX_WIDTH = 280

# デバイス
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Device: {device}")

# データセット
train_dataset = SyntheticMRZDataset(TRAIN_SAMPLES, MAX_WIDTH)
val_dataset = SyntheticMRZDataset(VAL_SAMPLES, MAX_WIDTH)

train_loader = DataLoader(
    train_dataset,
    batch_size=BATCH_SIZE,
    shuffle=True,
    collate_fn=collate_fn,
    num_workers=2
)
val_loader = DataLoader(
    val_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    collate_fn=collate_fn,
    num_workers=2
)

print(f"Train batches: {len(train_loader)}")
print(f"Val batches: {len(val_loader)}")

# モデル・損失関数・オプティマイザ
model = CRNN(num_classes=NUM_CLASSES).to(device)
criterion = nn.CTCLoss(blank=NUM_CLASSES - 1, zero_infinity=True)
optimizer = AdamW(model.parameters(), lr=LR)

# OneCycleLR: Warmup → 高学習率 → 低学習率
# 中盤で高い学習率により局所解を脱出しやすい
scheduler = OneCycleLR(
    optimizer,
    max_lr=LR * 10,           # 最大学習率 1e-2
    epochs=EPOCHS,
    steps_per_epoch=len(train_loader),
    pct_start=0.1,            # 最初の10%で warmup
    anneal_strategy='cos'     # cosine annealing
)

In [None]:
def train_epoch(model, dataloader, criterion, optimizer, scheduler, device):
    """
    1エポック分の学習
    
    OneCycleLR はバッチごとに学習率を更新するため、
    scheduler.step() をバッチループ内で呼び出す。
    """
    model.train()
    total_loss = 0.0
    
    for batch in dataloader:
        images = batch["images"].to(device)
        labels = batch["labels"].to(device)
        label_lengths = batch["label_lengths"]
        
        outputs = model(images)  # (T, B, C)
        T, B, C = outputs.shape
        input_lengths = torch.full((B,), T, dtype=torch.long)
        
        loss = criterion(outputs, labels, input_lengths, label_lengths)
        
        optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=5.0)
        optimizer.step()
        scheduler.step()  # OneCycleLR: バッチごとに更新
        
        total_loss += loss.item()
    
    return total_loss / len(dataloader)


def validate(model, dataloader, device):
    """検証"""
    model.training = False  # set to inference mode
    total_chars = 0
    total_errors = 0
    correct = 0
    total = 0
    samples = []
    
    with torch.no_grad():
        for batch in dataloader:
            images = batch["images"].to(device)
            texts = batch["texts"]
            
            outputs = model(images)
            
            for i, text in enumerate(texts):
                probs = outputs[:, i, :]
                pred_indices = probs.argmax(dim=1).cpu().tolist()
                pred_text = decode_output(pred_indices)
                
                # CER 計算
                errors = sum(1 for a, b in zip(text, pred_text) if a != b)
                errors += abs(len(text) - len(pred_text))
                total_chars += len(text)
                total_errors += errors
                
                if text == pred_text:
                    correct += 1
                total += 1
                
                if len(samples) < 5:
                    samples.append({"gt": text, "pred": pred_text, "match": text == pred_text})
    
    model.training = True  # restore training mode
    
    cer = (total_errors / total_chars) * 100 if total_chars > 0 else 0
    accuracy = (correct / total) * 100 if total > 0 else 0
    
    return {"cer": cer, "accuracy": accuracy, "samples": samples}

In [None]:
# 学習ループ
print("=" * 60)
print("学習開始")
print("=" * 60)

best_cer = float("inf")
history = {"train_loss": [], "val_cer": [], "val_acc": [], "lr": []}

start_time = time.time()

for epoch in range(1, EPOCHS + 1):
    epoch_start = time.time()
    
    # 現在の学習率を記録
    current_lr = optimizer.param_groups[0]['lr']
    history["lr"].append(current_lr)
    
    # 学習（scheduler も渡す）
    train_loss = train_epoch(model, train_loader, criterion, optimizer, scheduler, device)
    
    # 検証
    val_result = validate(model, val_loader, device)
    
    epoch_time = time.time() - epoch_start
    
    # 履歴保存
    history["train_loss"].append(train_loss)
    history["val_cer"].append(val_result["cer"])
    history["val_acc"].append(val_result["accuracy"])
    
    # ログ（10エポックごと、または最初の10エポック）
    if epoch <= 10 or epoch % 10 == 0:
        print(f"Epoch {epoch:3d}/{EPOCHS} | "
              f"Loss: {train_loss:.4f} | "
              f"CER: {val_result['cer']:.2f}% | "
              f"Acc: {val_result['accuracy']:.1f}% | "
              f"LR: {current_lr:.2e} | "
              f"Time: {epoch_time:.1f}s")
    
    # ベストモデル保存
    if val_result["cer"] < best_cer:
        best_cer = val_result["cer"]
        torch.save({
            "epoch": epoch,
            "model_state_dict": model.state_dict(),
            "cer": best_cer,
            "accuracy": val_result["accuracy"]
        }, "best_model.pth")
        if epoch <= 10 or epoch % 10 == 0:
            print(f"  -> Best model saved (CER: {best_cer:.2f}%)")
    
    # サンプル表示（20エポックごと）
    if epoch % 20 == 0:
        print("\n  サンプル予測:")
        for s in val_result["samples"][:3]:
            mark = "OK" if s["match"] else "NG"
            print(f"    GT:   {s['gt']}")
            print(f"    Pred: {s['pred']} [{mark}]")
        print()

total_time = time.time() - start_time
print("\n" + "=" * 60)
print("学習完了")
print("=" * 60)
print(f"総学習時間: {total_time:.1f}秒 ({total_time/60:.1f}分)")
print(f"ベスト CER: {best_cer:.2f}%")

In [None]:
# 学習曲線をプロット
fig, axes = plt.subplots(2, 2, figsize=(14, 10))

axes[0, 0].plot(history["train_loss"])
axes[0, 0].set_title("Training Loss")
axes[0, 0].set_xlabel("Epoch")
axes[0, 0].set_ylabel("Loss")
axes[0, 0].grid(True, alpha=0.3)

axes[0, 1].plot(history["val_cer"])
axes[0, 1].set_title("Validation CER (%)")
axes[0, 1].set_xlabel("Epoch")
axes[0, 1].set_ylabel("CER (%)")
axes[0, 1].axhline(y=1.0, color='r', linestyle='--', label='Target: 1%')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)

axes[1, 0].plot(history["val_acc"])
axes[1, 0].set_title("Validation Accuracy (%)")
axes[1, 0].set_xlabel("Epoch")
axes[1, 0].set_ylabel("Accuracy (%)")
axes[1, 0].grid(True, alpha=0.3)

axes[1, 1].plot(history["lr"])
axes[1, 1].set_title("Learning Rate (OneCycleLR)")
axes[1, 1].set_xlabel("Epoch")
axes[1, 1].set_ylabel("LR")
axes[1, 1].set_yscale('log')
axes[1, 1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

# 最終結果サマリー
print("\n" + "=" * 60)
print("最終結果サマリー")
print("=" * 60)
print(f"最終 CER: {history['val_cer'][-1]:.2f}%")
print(f"最小 CER: {min(history['val_cer']):.2f}% (Epoch {history['val_cer'].index(min(history['val_cer'])) + 1})")
print(f"最終 Accuracy: {history['val_acc'][-1]:.1f}%")
print(f"目標達成: {'✅ CER < 1%' if min(history['val_cer']) < 1.0 else '❌ CER >= 1%'}")

## 6. ONNX エクスポート

In [None]:
# ベストモデルをロード
checkpoint = torch.load("best_model.pth")
model.load_state_dict(checkpoint["model_state_dict"])
model.training = False  # set to inference mode

print(f"Loaded model from epoch {checkpoint['epoch']}")
print(f"CER: {checkpoint['cer']:.2f}%, Accuracy: {checkpoint['accuracy']:.1f}%")

In [None]:
# ONNX エクスポート（dynamo=False で旧エクスポーター使用）
# PyTorch 2.9+ の dynamo エクスポーターに BatchNorm 関連のバグがあるため
model_cpu = model.cpu()
model_cpu.training = False
dummy_input = torch.randn(1, 1, 32, 280)

torch.onnx.export(
    model_cpu,
    dummy_input,
    "mrz_crnn.onnx",
    input_names=["image"],
    output_names=["output"],
    dynamic_axes={
        "image": {0: "batch", 3: "width"},
        "output": {0: "seq_len", 1: "batch"}
    },
    opset_version=17,
    dynamo=False  # 旧エクスポーター使用（BatchNorm バグ回避）
)

import os
onnx_size = os.path.getsize("mrz_crnn.onnx") / 1024 / 1024
print(f"ONNX モデルサイズ: {onnx_size:.2f} MB")

In [None]:
# 依存ライブラリをインストール（Colab 環境）
!pip install onnxscript onnxruntime -q

# ONNX モデルの検証
import onnxruntime as ort

session = ort.InferenceSession("mrz_crnn.onnx")

# テスト推論
test_input = np.random.randn(1, 1, 32, 280).astype(np.float32)
outputs = session.run(None, {"image": test_input})

print(f"ONNX 出力形状: {outputs[0].shape}")
print("ONNX モデル検証: OK")

## 7. Google Drive に保存（オプション）

In [None]:
# Google Drive をマウント
from google.colab import drive
drive.mount("/content/drive")

# モデルを保存
import shutil
save_dir = "/content/drive/MyDrive/mrz_ocr"
os.makedirs(save_dir, exist_ok=True)

shutil.copy("best_model.pth", f"{save_dir}/best_model.pth")
shutil.copy("mrz_crnn.onnx", f"{save_dir}/mrz_crnn.onnx")

print(f"モデルを保存しました: {save_dir}")

## まとめ

- **学習データ**: 合成 MRZ 画像 10,000枚
- **検証データ**: 合成 MRZ 画像 1,000枚
- **モデルサイズ**: ~3 MB
- **目標精度**: CER < 1%

次のステップ:
1. `mrz_crnn.onnx` をダウンロード
2. WASM 変換 (`onnxruntime-web`)
3. ブラウザでの推論テスト