In [None]:
import os
import cv2
import torch
import random
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
from torch import nn, optim
from torch.utils.data import Dataset, DataLoader
from torchvision import models, transforms
from sklearn.metrics import classification_report, confusion_matrix, ConfusionMatrixDisplay


In [None]:
# 超参数设置
config = {
    "model_name": "resnet",  # 可选 "resnet" 或 "vgg"
    "num_epochs": 15,
    "batch_size": 32,
    "learning_rate": 0.001,
    "test_size": 0.2,
    "sample_ratio": 1.0
}
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
dataset_path = "Aerial_Landscapes/"


In [None]:
def initialize_model(model_name, num_classes, use_pretrained=True):
    input_size = 224
    if model_name == "resnet":
        model = models.resnet34(pretrained=use_pretrained)
        model.fc = nn.Linear(model.fc.in_features, num_classes)  # 使用ResNet34
    elif model_name == "vgg":
        model = models.vgg16(pretrained=use_pretrained)
        model.classifier[6] = nn.Linear(model.classifier[6].in_features, num_classes)
    else:
        raise ValueError("不支持的模型名称，请选择 'resnet' 或 'vgg'")
    return model, input_size


In [None]:
def get_transforms(input_size=224):
    train_transform = transforms.Compose([
        transforms.ToPILImage(),
        transforms.RandomResizedCrop(input_size),
        transforms.RandomHorizontalFlip(),
        transforms.RandomRotation(15),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])
    test_transform = transforms.Compose([
        transforms.ToPILImage(),
        transforms.Resize(input_size),
        transforms.CenterCrop(input_size),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])
    return train_transform, test_transform


In [None]:
class CustomDataset(Dataset):
    def __init__(self, images, labels, transform=None):
        self.images = images
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = self.images[idx]
        label = self.labels[idx]
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        if self.transform:
            image = self.transform(image)
        return image, label


In [None]:
def load_and_split_dataset(root_dir, test_size=0.2, sample_ratio=1.0):
    classes = sorted(os.listdir(root_dir))
    class_to_idx = {cls_name: i for i, cls_name in enumerate(classes)}
    train_images, train_labels = [], []
    test_images, test_labels = [], []

    for cls_name in classes:
        cls_path = os.path.join(root_dir, cls_name)
        img_files = [os.path.join(cls_path, f) for f in os.listdir(cls_path) if f.endswith('.jpg')]
        random.seed(42)
        random.shuffle(img_files)
        n_samples = int(len(img_files) * sample_ratio)
        split = int(n_samples * (1 - test_size))
        imgs = img_files[:n_samples]
        train_imgs = imgs[:split]
        test_imgs = imgs[split:]
        train_images.extend([cv2.imread(p) for p in train_imgs])
        train_labels.extend([class_to_idx[cls_name]] * len(train_imgs))
        test_images.extend([cv2.imread(p) for p in test_imgs])
        test_labels.extend([class_to_idx[cls_name]] * len(test_imgs))

    return (train_images, train_labels), (test_images, test_labels), classes


In [None]:
from tqdm import tqdm

def train_model(model, device, train_loader, test_loader, criterion, optimizer,
                num_epochs=25, checkpoint_path='checkpoint.pth', patience=5):
    """
    模型训练函数，加入 Early Stopping 和历史记录
    """
    best_acc = 0.0
    epochs_no_improve = 0
    early_stop = False
    history = {
        'train_loss': [], 'train_acc': [],
        'test_loss': [], 'test_acc': []
    }

    for epoch in range(num_epochs):
        train_loader_tqdm = tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}", unit="batch")
        for batch in train_loader_tqdm:
            # TODO: move your inner training loop here
            if early_stop:
                print(f"⚠️ 早停触发于第 {epoch+1} 轮")
                break

        # 训练阶段
        model.train()
        train_loss, correct_train, total_train = 0.0, 0, 0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            train_loss += loss.item() * inputs.size(0)
            _, preds = torch.max(outputs, 1)
            correct_train += torch.sum(preds == labels.data)
            total_train += labels.size(0)

        epoch_train_loss = train_loss / total_train
        epoch_train_acc = correct_train.double() / total_train
        history['train_loss'].append(epoch_train_loss)
        history['train_acc'].append(epoch_train_acc.item())

        # 验证阶段
        model.eval()
        test_loss, correct_test, total_test = 0.0, 0, 0
        with torch.no_grad():
            for inputs, labels in test_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, labels)

                test_loss += loss.item() * inputs.size(0)
                _, preds = torch.max(outputs, 1)
                correct_test += torch.sum(preds == labels.data)
                total_test += labels.size(0)

        epoch_test_loss = test_loss / total_test
        epoch_test_acc = correct_test.double() / total_test
        history['test_loss'].append(epoch_test_loss)
        history['test_acc'].append(epoch_test_acc.item())

        print(f"Epoch {epoch+1}: Train Acc={epoch_train_acc:.4f}, Test Acc={epoch_test_acc:.4f}")

        # Early Stopping 判断
        if epoch_test_acc > best_acc:
            best_acc = epoch_test_acc
            epochs_no_improve = 0
            torch.save(model.state_dict(), checkpoint_path)
        else:
            epochs_no_improve += 1
            if epochs_no_improve >= patience:
                early_stop = True

    return model, history


In [None]:
def evaluate_model(model, test_loader, device, class_names):
    model.eval()
    y_true, y_pred = [], []
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs = inputs.to(device)
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            y_true.extend(labels)
            y_pred.extend(preds.cpu().numpy())

    print(classification_report(y_true, y_pred, target_names=class_names))
    cm = confusion_matrix(y_true, y_pred)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=class_names)
    disp.plot(xticks_rotation='vertical', cmap='Blues')


In [None]:
# 加载数据
(train_images, train_labels), (test_images, test_labels), classes = load_and_split_dataset(
    dataset_path,
    test_size=config["test_size"],
    sample_ratio=config["sample_ratio"]
)

# 初始化模型
model, input_size = initialize_model(
    model_name=config["model_name"],
    num_classes=len(classes),
    use_pretrained=True
)
model = model.to(device)

# 数据增强
train_transform, test_transform = get_transforms(input_size)

# 构建Dataset和Loader
train_dataset = CustomDataset(train_images, train_labels, train_transform)
test_dataset = CustomDataset(test_images, test_labels, test_transform)
train_loader = DataLoader(train_dataset, batch_size=config["batch_size"], shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=config["batch_size"], shuffle=False)

# 损失函数与优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=config["learning_rate"])

# 训练模型
model, history = train_model(
    model, device, train_loader, test_loader,
    criterion, optimizer,
    num_epochs=config["num_epochs"],
    patience=5
)

# 评估
model.eval()
y_true, y_pred = [], []
with torch.no_grad():
    for inputs, labels in test_loader:
        inputs = inputs.to(device)
        outputs = model(inputs)
        _, preds = torch.max(outputs, 1)
        y_true.extend(labels)
        y_pred.extend(preds.cpu().numpy())

# 输出三种指标：precision、recall、f1-score
report = classification_report(y_true, y_pred, target_names=classes, digits=4)
print("Evaluation Metrics (Precision, Recall, F1-score):\n")
print(report)

# 混淆矩阵可视化
cm = confusion_matrix(y_true, y_pred)
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=classes)
plt.figure(figsize=(12, 10))
disp.plot(xticks_rotation=45, cmap='Blues')
plt.title("Confusion Matrix")
plt.grid(False)
plt.tight_layout()
plt.show()


In [None]:
def run_and_evaluate(model_name, classes):
    # 加载数据
    (train_images, train_labels), (test_images, test_labels), _ = load_and_split_dataset(
        dataset_path,
        test_size=config["test_size"],
        sample_ratio=config["sample_ratio"]
    )

    # 初始化模型
    model, input_size = initialize_model(
        model_name=model_name,
        num_classes=len(classes),
        use_pretrained=True
    )
    model = model.to(device)

    # 数据增强
    train_transform, test_transform = get_transforms(input_size)

    # 构建数据集与加载器
    train_dataset = CustomDataset(train_images, train_labels, train_transform)
    test_dataset = CustomDataset(test_images, test_labels, test_transform)
    train_loader = DataLoader(train_dataset, batch_size=config["batch_size"], shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=config["batch_size"], shuffle=False)

    # 损失函数与优化器
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=config["learning_rate"])

    # 训练模型
    model, _ = train_model(
        model, device, train_loader, test_loader,
        criterion, optimizer,
        num_epochs=config["num_epochs"],
        patience=5
    )

    # 评估指标与混淆矩阵
    model.eval()
    y_true, y_pred = [], []
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs = inputs.to(device)
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            y_true.extend(labels)
            y_pred.extend(preds.cpu().numpy())

    report = classification_report(y_true, y_pred, target_names=classes, digits=4)
    print(f"===== {model_name.upper()} Evaluation Metrics =====\n")
    print(report)

    cm = confusion_matrix(y_true, y_pred)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=classes)
    plt.figure(figsize=(12, 10))
    disp.plot(xticks_rotation=45, cmap='Blues')
    plt.title(f"Confusion Matrix for {model_name.upper()}")
    plt.grid(False)
    plt.tight_layout()
    plt.show()


In [None]:
# 同时比较 ResNet 和 VGG
print("🔍 正在比较 ResNet34 与 VGG16 的分类性能...\n")
run_and_evaluate("resnet", classes)
run_and_evaluate("vgg", classes)


In [None]:
import os
import cv2
import torch
import numpy as np
import matplotlib.pyplot as plt
from collections import defaultdict
from torchcam.methods import GradCAM

# 🔥 Grad-CAM 热力图叠加
def overlay_heatmap(img: np.ndarray, cam: np.ndarray, alpha: float = 0.5) -> np.ndarray:
    cam_uint8 = np.uint8(255 * cam)
    heatmap = cv2.applyColorMap(cam_uint8, cv2.COLORMAP_JET)
    heatmap = np.float32(heatmap) / 255
    if img.max() > 1.0:
        img = np.float32(img) / 255
    if img.shape[:2] != heatmap.shape[:2]:
        heatmap = cv2.resize(heatmap, (img.shape[1], img.shape[0]))
    overlayed = heatmap * alpha + img
    overlayed = overlayed / np.max(overlayed)
    return np.uint8(255 * overlayed)

# 🎨 还原图像颜色
def unnormalize(tensor, mean, std):
    for t, m, s in zip(tensor, mean, std):
        t.mul_(s).add_(m)
    return tensor

# 🧠 Grad-CAM：为每一类输出至少一张热力图
def apply_gradcam_all_classes(model, device, dataloader, model_name="resnet", save_dir="gradcam_outputs"):
    os.makedirs(save_dir, exist_ok=True)
    model.eval()

    # ✅ Grad-CAM 目标层
    if model_name == "resnet":
        target_layer = model.layer4[-1].conv2
    elif model_name == "vgg":
        target_layer = model.features[-1]
    else:
        raise ValueError("Unsupported model")

    cam_extractor = GradCAM(model, target_layer=target_layer)

    seen_classes = defaultdict(int)
    total_target_classes = 15
    class_id_to_name = [
        "Agriculture", "Airport", "Beach", "City", "Desert", "Forest", "Grassland", "Highway",
        "Lake", "Mountain", "Parking", "Port", "Railway", "Residential", "River"
    ]

    for inputs, labels in dataloader:
        inputs = inputs.to(device)
        labels = labels.to(device)
        outputs = model(inputs)

        for i in range(inputs.shape[0]):
            label = labels[i].item()
            if seen_classes[label] >= 1:
                continue  # 已经生成了该类的可视化图，跳过

            img_tensor = inputs[i].unsqueeze(0)
            output = model(img_tensor)
            class_idx = torch.argmax(output).item()

            # 生成 Grad-CAM
            cam_tensor = cam_extractor(class_idx=class_idx, scores=output)[0]
            cam = cam_tensor.cpu().numpy()
            if cam.ndim == 3:
                cam = cam[0]
            cam = (cam - cam.min()) / (cam.max() - cam.min() + 1e-8)

            # 还原原始图像
            unnorm_img_tensor = unnormalize(img_tensor.squeeze(0).cpu(),
                                            mean=[0.485, 0.456, 0.406],
                                            std=[0.229, 0.224, 0.225])
            raw_image = np.clip(unnorm_img_tensor.permute(1, 2, 0).numpy(), 0, 1)

            cam = cv2.resize(cam, (raw_image.shape[1], raw_image.shape[0]))
            result = overlay_heatmap(raw_image, cam)
            result_rgb = cv2.cvtColor(result, cv2.COLOR_BGR2RGB)

            # 保存图像
            orig_img = np.uint8(raw_image * 255)
            orig_bgr = cv2.cvtColor(orig_img, cv2.COLOR_RGB2BGR)
            result_bgr = cv2.cvtColor(result_rgb, cv2.COLOR_RGB2BGR)

            cname = class_id_to_name[label]
            cv2.imwrite(os.path.join(save_dir, f"{cname}_original.png"), orig_bgr)
            cv2.imwrite(os.path.join(save_dir, f"{cname}_gradcam.png"), result_bgr)
            cv2.imwrite(os.path.join(save_dir, f"{cname}_compare.png"), np.hstack((orig_bgr, result_bgr)))

            # 并排可视化
            fig, axs = plt.subplots(1, 2, figsize=(10, 4))
            axs[0].imshow(orig_img)
            axs[0].set_title(f"{cname} - Original")
            axs[0].axis('off')
            axs[1].imshow(result_rgb)
            axs[1].set_title(f"{cname} - GradCAM (Pred: {class_id_to_name[class_idx]})")
            axs[1].axis('off')
            plt.tight_layout()
            plt.show()

            seen_classes[label] += 1

        if len(seen_classes) >= total_target_classes:
            print("✅ 已为所有类别生成 Grad-CAM 可视化。")
            break


In [None]:
apply_gradcam_all_classes(model, device, test_loader, model_name="resnet")


In [None]:
# from torchcam.methods import GradCAM
# from torchvision.transforms.functional import to_pil_image
# from torchcam.utils import overlay_mask
# import matplotlib.pyplot as plt

# # 自动选择适配的 target layer
# target_layer = "layer4" if "resnet" in config["model_name"] else "features.29"
# cam_extractor = GradCAM(model, target_layer=target_layer)

# # 输出多张验证集中图像的 Grad-CAM 可视化
# n_samples = 8  # 可根据需要调整
# print(f"🎯 Grad-CAM 可视化前 {n_samples} 张验证图像：")
# count = 0

# for i, (img, label) in enumerate(val_loader):
#     if count >= n_samples:
#         break

#     img = img.to(device)
#     label = label.to(device)

#     output = model(img)
#     class_idx = output.squeeze(0).argmax().item()

#     # 获取激活图
#     activation_map = cam_extractor(class_idx, output)[0].cpu()

#     # 热力图叠加
#     result = overlay_mask(
#         to_pil_image(img.squeeze().cpu().detach().clamp(0, 1)),
#         to_pil_image(activation_map, mode='F'),
#         alpha=0.5
#     )

#     # 显示
#     plt.figure(figsize=(4, 4))
#     plt.title(f"Predicted Class: {class_idx}")
#     plt.imshow(result)
#     plt.axis('off')
#     plt.show()

#     count += 1
