<a href="https://colab.research.google.com/github/nanpolend/machine-learning/blob/master/%E9%A0%90%E8%A8%93%E7%B7%B4%E5%92%8Ctensorflow%E8%A8%93%E7%B7%B4%E5%9C%96%E8%A1%A8%E5%92%8C%E8%B6%85%E5%8F%83%E6%95%B8%E8%AA%BF%E6%95%B4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# ========== Colab環境設置 ==========
!pip install torch==2.3.0+cu121 torchvision==0.18.0+cu121 --extra-index-url https://download.pytorch.org/whl/cu121
!pip install tensorboard

# ========== 導入庫 ==========
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
from torch.optim import SGD
from torch.optim.lr_scheduler import OneCycleLR
from torch.cuda.amp import GradScaler, autocast
from torch.utils.tensorboard import SummaryWriter
import matplotlib.pyplot as plt
import numpy as np
import os

# ========== 超參數配置 ==========
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
BATCH_SIZE = 256
LR = 0.1
EPOCHS = 100  # 配合早停機制
PATIENCE = 10
MODEL_SAVE_PATH = '/content/best_model.pth'
DATA_PATH = '/content/data'

# ========== 數據增強配置 ==========
class CIFAR10Enhanced(torchvision.datasets.CIFAR10):
    """可視化數據增強擴展類"""
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def show_augmentation(self, num_samples=4):
        indices = np.random.choice(len(self), num_samples)
        fig, axes = plt.subplots(1, num_samples, figsize=(15, 3))
        for i, idx in enumerate(indices):
            img, label = self[idx]
            img = inv_normalize(img).numpy().transpose((1, 2, 0))
            axes[i].imshow(img)
            axes[i].set_title(f'標籤: {self.classes[label]}')
            axes[i].axis('off')
        plt.show()

# ========== 數據預處理 ==========
CIFAR_MEAN = (0.4914, 0.4822, 0.4465)
CIFAR_STD = (0.2023, 0.1994, 0.2010)

train_transform = transforms.Compose([
    transforms.RandomCrop(32, padding=4),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(15),
    transforms.ColorJitter(brightness=0.2, contrast=0.2),
    transforms.RandomApply([transforms.GaussianBlur(3)], p=0.3),  # 高斯模糊
    transforms.ToTensor(),
    transforms.Normalize(CIFAR_MEAN, CIFAR_STD),
    transforms.RandomErasing(p=0.5, scale=(0.02, 0.1), value='random')  # 隨機擦除
])

test_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(CIFAR_MEAN, CIFAR_STD)
])

# 反標準化轉換
inv_normalize = transforms.Normalize(
    mean=[-m/s for m, s in zip(CIFAR_MEAN, CIFAR_STD)],
    std=[1/s for s in CIFAR_STD]
)

# ========== 數據載入與可視化 ==========
os.makedirs(DATA_PATH, exist_ok=True)

train_dataset = CIFAR10Enhanced(
    root=DATA_PATH, train=True, download=True, transform=train_transform)
test_dataset = CIFAR10Enhanced(
    root=DATA_PATH, train=False, download=True, transform=test_transform)

print("🎨 數據增強可視化：")
train_dataset.show_augmentation()

train_loader = DataLoader(
    train_dataset, batch_size=BATCH_SIZE, shuffle=True,
    num_workers=2, pin_memory=True, persistent_workers=True
)
test_loader = DataLoader(
    test_dataset, batch_size=BATCH_SIZE, shuffle=False,
    num_workers=2, pin_memory=True, persistent_workers=True
)

# ========== 模型定義 ==========
class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, in_planes, planes, stride=1):
        super().__init__()
        self.conv1 = nn.Conv2d(
            in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3,
                               stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != self.expansion*planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, self.expansion*planes,
                          kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(self.expansion*planes)
            )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out

class ResNet(nn.Module):
    def __init__(self, block=BasicBlock, num_blocks=[2,2,2,2], num_classes=10):
        super().__init__()
        self.in_planes = 64

        self.conv1 = nn.Conv2d(3, 64, kernel_size=3,
                               stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
        self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
        self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)
        self.linear = nn.Linear(512*block.expansion, num_classes)

    def _make_layer(self, block, planes, num_blocks, stride):
        strides = [stride] + [1]*(num_blocks-1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_planes, planes, stride))
            self.in_planes = planes * block.expansion
        return nn.Sequential(*layers)

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = F.adaptive_avg_pool2d(out, (1, 1))
        out = out.view(out.size(0), -1)
        out = self.linear(out)
        return out

model = ResNet(num_blocks=[2,2,2,2]).to(DEVICE)
print(f"🔄 模型已加載到 {DEVICE}")

# ========== 訓練配置 ==========
class LabelSmoothingCrossEntropy(nn.Module):
    def __init__(self, smoothing=0.1):
        super().__init__()
        self.smoothing = smoothing

    def forward(self, logits, labels):
        confidence = 1. - self.smoothing
        log_probs = F.log_softmax(logits, dim=-1)
        nll_loss = -log_probs.gather(dim=-1, index=labels.unsqueeze(1))
        nll_loss = nll_loss.squeeze(1)
        smooth_loss = -log_probs.mean(dim=-1)
        loss = confidence * nll_loss + self.smoothing * smooth_loss
        return loss.mean()

criterion = LabelSmoothingCrossEntropy(smoothing=0.1)
optimizer = SGD(model.parameters(), lr=LR, momentum=0.9, weight_decay=5e-4)
scheduler = OneCycleLR(
    optimizer,
    max_lr=LR,
    epochs=EPOCHS,
    steps_per_epoch=len(train_loader),
    pct_start=0.2
)
scaler = GradScaler()
writer = SummaryWriter('/content/logs')

# ========== TTA測試函數 ==========
def tta_predict(model, image, n_aug=4):
    model.eval()
    with torch.no_grad():
        outputs = model(image)
        outputs += model(torch.flip(image, [3]))  # 水平翻轉

        for _ in range(n_aug-2):
            aug_img = transforms.functional.resized_crop(
                image,
                top=np.random.randint(0, 8),
                left=np.random.randint(0, 8),
                height=24,
                width=24,
                size=(32, 32)
            )
            outputs += model(aug_img)
    return outputs / n_aug

# ========== 訓練函數 ==========
def train_epoch(epoch):
    model.train()
    total_loss = 0.0

    for batch_idx, (images, labels) in enumerate(train_loader):
        images = images.to(DEVICE, non_blocking=True)
        labels = labels.to(DEVICE, non_blocking=True)

        optimizer.zero_grad(set_to_none=True)

        with autocast():
            outputs = model(images)
            loss = criterion(outputs, labels)

        scaler.scale(loss).backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        scaler.step(optimizer)
        scaler.update()
        scheduler.step()

        total_loss += loss.item() * images.size(0)

        if batch_idx == 0:
            with torch.no_grad():
                images_inv = inv_normalize(images)
                img_grid = torchvision.utils.make_grid(images_inv.cpu())
                writer.add_image('訓練圖像', img_grid, epoch)

    return total_loss / len(train_dataset)

# ========== 驗證函數 ==========
def validate(epoch):
    model.eval()
    total_loss = 0.0
    correct = 0

    with torch.no_grad():
        for images, labels in test_loader:
            images = images.to(DEVICE)
            labels = labels.to(DEVICE)

            outputs = tta_predict(model, images)  # 使用TTA
            loss = criterion(outputs, labels)

            total_loss += loss.item() * images.size(0)
            _, predicted = outputs.max(1)
            correct += predicted.eq(labels).sum().item()

    return total_loss / len(test_dataset), correct / len(test_dataset)

# ========== 主訓練循環 ==========
best_acc = 0.0
patience_counter = 0

print("\n🚀 訓練啟動:")
for epoch in range(1, EPOCHS + 1):
    train_loss = train_epoch(epoch)
    val_loss, val_acc = validate(epoch)

    writer.add_scalars('損失', {'訓練': train_loss, '驗證': val_loss}, epoch)
    writer.add_scalar('準確率/驗證', val_acc, epoch)
    writer.add_scalar('學習率', optimizer.param_groups[0]['lr'], epoch)

    print(f"週期 {epoch:03d} | "
          f"訓練損失: {train_loss:.4f} | "
          f"驗證損失: {val_loss:.4f} | "
          f"驗證準確率: {val_acc:.2%} | "
          f"學習率: {optimizer.param_groups[0]['lr']:.2e}")

    if val_acc > best_acc:
        best_acc = val_acc
        torch.save(model.state_dict(), MODEL_SAVE_PATH)
        patience_counter = 0
        print(f"💾 模型已保存 (準確率 {val_acc:.2%})")
    else:
        patience_counter += 1
        if patience_counter >= PATIENCE:
            print("🛑 提前停止訓練")
            break

writer.close()

# ========== 最終測試 ==========
print("\n🔍 最終測試:")
model.load_state_dict(torch.load(MODEL_SAVE_PATH))
final_loss, final_acc = validate(0)
print(f"🏆 測試準確率: {final_acc:.2%}")

# ========== TensorBoard啟動指令 ==========
print("\n🔬 啟動TensorBoard：")
!tensorboard --logdir=/content/logs --port=6006