In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import pandas as pd
import numpy as np
import os
import timm
from torchvision import transforms
from sklearn.preprocessing import LabelEncoder

class DualNPYDataset(Dataset):
    def __init__(self, csv_file, root_dir_rgb, root_dir_vessel, transform=None):
        self.annotations = pd.read_csv(csv_file)
        self.root_dir_rgb = root_dir_rgb
        self.root_dir_vessel = root_dir_vessel
        self.transform = transform

        # 初始化LabelEncoder并对标签进行编码
        self.label_encoder = LabelEncoder()
        self.encoded_labels = self.label_encoder.fit_transform(self.annotations.iloc[:, 2].values)

    def __len__(self):
        return len(self.annotations)

    def __getitem__(self, index):
        # 加载RGB图像
        img_name_rgb = os.path.join(self.root_dir_rgb, str(self.annotations.iloc[index, -1]) + '.npy')
        image_rgb = np.load(img_name_rgb).astype(np.float32)  # 假设RGB图像是3通道的

        # 加载血管分割后的二值图像
        img_name_vessel = os.path.join(self.root_dir_vessel, str(self.annotations.iloc[index, -1]) + '.npy')
        image_vessel = np.load(img_name_vessel).astype(np.float32)  # 假设血管分割图像是单通道的

        # 扩展二值图像的维度并与RGB图像合并为4通道
        image_vessel = np.expand_dims(image_vessel, axis=0)  # 从(H, W)变为(1, H, W)
        image = np.concatenate((image_rgb, image_vessel), axis=0)  # 合并为(4, H, W)

        if self.transform:
            image = self.transform(image)

        label = self.encoded_labels[index]
        label = torch.tensor(label, dtype=torch.long)
        return image, label

# 定义图像转换
transform = transforms.Compose([
    transforms.ToTensor(),  # 直接从(H, W, C)转为(C, H, W)
    transforms.Normalize(mean=[0.485, 0.456, 0.406, 0.5], std=[0.229, 0.224, 0.225, 0.5])  # 归一化，假设第四个通道的均值和标准差为0.5
])

class DualInputResNet50(nn.Module):
    def __init__(self, num_classes):
        super(DualInputResNet50, self).__init__()
        # 定义ResNet50模型
        self.resnet = timm.create_model('resnet50', pretrained=True, num_classes=0)
        
        # 修改第一个卷积层的输入通道为4
        self.resnet.conv1 = nn.Conv2d(4, 64, kernel_size=7, stride=2, padding=3, bias=False)
        
        # 定义分类器
        self.classifier = nn.Sequential(
            nn.Linear(2048, 512),  # ResNet50的输出特征大小为2048
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(512, num_classes)
        )

    def forward(self, x):
        x = self.resnet(x)
        x = self.classifier(x)
        return x

# 初始化数据集
train_dataset = DualNPYDataset(csv_file='/kaggle/input/double-chanel-resnet50/train.csv',
                               root_dir_gaussian='/kaggle/input/double-chanel-resnet50/resnet50_gaus_train/resnet50_gaus_train',
                               root_dir_unet='/kaggle/input/double-chanel-resnet50/resnet50_predicted_train/resnet50_predicted_train',
                               transform=transform)

train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)

validation_dataset = DualNPYDataset(csv_file='/kaggle/input/double-chanel-resnet50/validation.csv',
                               root_dir_rgb='/kaggle/input/double-chanel-resnet50/resnet50_rgb_validation',
                               root_dir_vessel='/kaggle/input/double-chanel-resnet50/resnet50_vessel_validation',
                               transform=transform)

validation_loader = DataLoader(dataset=validation_dataset, batch_size=batch_size, shuffle=False)

test_dataset = DualNPYDataset(csv_file='/kaggle/input/double-chanel-resnet50/test.csv',
                               root_dir_rgb='/kaggle/input/double-chanel-resnet50/resnet50_rgb_test',
                               root_dir_vessel='/kaggle/input/double-chanel-resnet50/resnet50_vessel_test',
                               transform=transform)

test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False)

# 模型初始化
num_classes = len(np.unique(train_dataset.annotations['labels']))
model = DualInputResNet50(num_classes=num_classes).to(device)

# 创建一个权重数组，给第三类更高的权重
weights = torch.tensor([1.0, 1.0, 1.0, 1.0], dtype=torch.float32).to(device)

# 使用加权损失函数
criterion = nn.CrossEntropyLoss(weight=weights)

optimizer = optim.AdamW(model.parameters(), lr=0.001, weight_decay=1e-2)

# 学习率调度器
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.5)

In [3]:
def extract_features(loader, model, device):
    model.eval()
    features = []
    labels = []

    with torch.no_grad():
        for (image_gaussian, image_unet), targets in loader:
            image_gaussian, image_unet = image_gaussian.to(device), image_unet.to(device)
            output = model((image_gaussian, image_unet))
            features.append(output.cpu().numpy())
            labels.append(targets.cpu().numpy())

    features = np.concatenate(features)
    labels = np.concatenate(labels)
    return features, labels

In [4]:
def train_model(model, train_loader, criterion, optimizer, device, num_epochs):
    model.train()  # 设置模型为训练模式
    for epoch in range(num_epochs):
        running_loss = 0.0
        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)

            # 前向传播
            outputs = model(images)
            loss = criterion(outputs, labels)

            # 反向传播及优化
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

        epoch_loss = running_loss / len(train_loader)
        print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {epoch_loss:.4f}')

        # 更新学习率
        scheduler.step()

In [5]:
def evaluate_model(model, validation_loader, criterion, device):
    model.eval()  # 设置模型为评估模式
    val_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():
        for images, labels in validation_loader:
            images, labels = images.to(device), labels.to(device)

            outputs = model(images)
            loss = criterion(outputs, labels)

            val_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    avg_loss = val_loss / len(validation_loader)
    accuracy = correct / total
    print(f'Validation Loss: {avg_loss:.4f}, Accuracy: {accuracy:.4f}')
    return avg_loss, accuracy

In [6]:
model_path = '/kaggle/working/trained_model.pth'

In [9]:
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score

# 提取特征
train_features, train_labels = extract_features(train_loader, model, device)
validation_features, validation_labels = extract_features(validation_loader, model, device)
test_features, test_labels = extract_features(test_loader, model, device)

# 训练 SVM
svm_clf = SVC(kernel='linear', C=1, probability=True)  # 设置 probability=True 以输出概率
svm_clf.fit(train_features, train_labels)

# 验证 SVM
validation_predictions_svm = svm_clf.predict(validation_features)
validation_accuracy_svm = accuracy_score(validation_labels, validation_predictions_svm)
print(f'SVM 验证准确率: {validation_accuracy_svm:.4f}')

# 测试 SVM
test_predictions_svm = svm_clf.predict(test_features)
test_accuracy_svm = accuracy_score(test_labels, test_predictions_svm)
print(f'SVM 测试准确率: {test_accuracy_svm:.4f}')

SVM 验证准确率: 0.962
SVM 测试准确率: 0.954


In [10]:
from sklearn.neural_network import MLPClassifier

# 训练 MLP
mlp_clf = MLPClassifier(hidden_layer_sizes=(128, 64), max_iter=500, random_state=42)
mlp_clf.fit(train_features, train_labels)

# 验证 MLP
validation_predictions_mlp = mlp_clf.predict(validation_features)
validation_accuracy_mlp = accuracy_score(validation_labels, validation_predictions_mlp)
print(f'MLP 验证准确率: {validation_accuracy_mlp:.4f}')

# 测试 MLP
test_predictions_mlp = mlp_clf.predict(test_features)
test_accuracy_mlp = accuracy_score(test_labels, test_predictions_mlp)
print(f'MLP 测试准确率: {test_accuracy_mlp:.4f}')

MLP 验证准确率: 0.932
MLP 测试准确率: 0.936


In [11]:
import xgboost as xgb

# 转换数据为 DMatrix 格式
dtrain = xgb.DMatrix(train_features, label=train_labels)
dvalid = xgb.DMatrix(validation_features, label=validation_labels)
dtest = xgb.DMatrix(test_features, label=test_labels)

# 设置参数
params = {
    'objective': 'multi:softprob',  # 用于多分类
    'num_class': num_classes,
    'eval_metric': 'mlogloss',
    'max_depth': 6,
    'eta': 0.1,
    'seed': 42
}

# 训练 XGBoost
xgb_clf = xgb.train(params, dtrain, num_boost_round=100, evals=[(dvalid, 'validation')])

# 验证 XGBoost
validation_predictions_xgb = xgb_clf.predict(dvalid)
validation_accuracy_xgb = accuracy_score(validation_labels, np.argmax(validation_predictions_xgb, axis=1))
print(f'XGBoost 验证准确率: {validation_accuracy_xgb:.4f}')

# 测试 XGBoost
test_predictions_xgb = xgb_clf.predict(dtest)
test_accuracy_xgb = accuracy_score(test_labels, np.argmax(test_predictions_xgb, axis=1))
print(f'XGBoost 测试准确率: {test_accuracy_xgb:.4f}')

XGB 验证准确率: 0.955
XGB 测试准确率: 0.935


In [1]:
from sklearn.ensemble import VotingClassifier

# 创建 VotingClassifier
voting_clf = VotingClassifier(
    estimators=[
        ('svm', svm_clf),
        ('mlp', mlp_clf),
        ('xgb', xgb.XGBClassifier(objective='multi:softprob', n_estimators=100, max_depth=6, learning_rate=0.1))
    ],
    voting='soft',  # 使用软投票
    weights=[1, 1, 1]  # 根据需要调整权重
)

# 训练 VotingClassifier
voting_clf.fit(train_features, train_labels)

# 在测试集上进行预测
test_predictions_voting = voting_clf.predict(test_features)

# 计算最终准确率
voting_accuracy = accuracy_score(test_labels, test_predictions_voting)
print(f'Voting Classifier 测试准确率: {voting_accuracy:.4f}')

Voting Classifier 测试准确率: 0.992
