In [1]:
import os
import random
from PIL import Image
from torch.utils.data import Dataset
from torchvision import transforms


class CustomContrastiveDataset(Dataset):
    def __init__(self, data_dir,base_transform=None,transform1=None, transform2=None):
        self.data_dir = data_dir
        self.base_transform=base_transform
        self.transform_pos = transform1
        self.transform_neg = transform2
        self.image_paths = self._load_image_paths(data_dir)
        self.patient_data = self._group_images_by_patient()

    def _load_image_paths(self, data_dir):
        return [os.path.join(data_dir, img_name) for img_name in os.listdir(data_dir) if img_name.endswith('.png')]
    
    def _group_images_by_patient(self):
        patient_dict = {}
        for img_path in self.image_paths:
            patient_id = self.get_patient_id(img_path)
            if patient_id not in patient_dict:
                patient_dict[patient_id] = []
            patient_dict[patient_id].append(img_path)
        return patient_dict
    
    def get_patient_id(self, img_name):
        return img_name.split('_')[0]
    
    def get_side_view(self, img_name):
        if 'r' in img_name:
            return 'r' 
        elif 'l' in img_name:
            return 'l'
        return None
    
    def __len__(self):
        return len(self.image_paths)
    
    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        img = Image.open(img_path).convert("L")
        image_name = os.path.basename(img_path)
        patient_id = self.get_patient_id(image_name)
        side_view = self.get_side_view(image_name)

        if random.random() > 0.5:
            positive_img = img
            if self.transform_pos:
                positive_img = self.transform_pos(img)
                img = self.base_transform(img) 
            return img, positive_img, 1
        else:
            other_patients = [pid for pid in self.patient_data if pid != patient_id]
            if other_patients:
                negative_patient_id = random.choice(other_patients)
                negative_img_path = random.choice(self.patient_data[negative_patient_id])
            else:
                all_images = self.patient_data[patient_id]
                negative_img_path = random.choice([img_path for img_path in all_images if self.get_side_view(img_path) != side_view])
            negative_img = Image.open(negative_img_path).convert("L")
            if self.transform_neg:
                negative_img = self.transform_neg(negative_img)
                img = self.base_transform(img) 
            return img, negative_img, 0

In [2]:
base_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5], std=[0.5]),
])

transform1 = transforms.Compose([
    base_transform,
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(degrees=15), # 你可以根据需要调整旋转角度
])

transform2 = transforms.Compose([
    base_transform,
    transforms.RandomRotation(degrees=5),  # 你可以根据需要调整旋转角度
])

In [3]:
dataset = CustomContrastiveDataset(data_dir=r"F:\Dataset\Train_tumor\train_phase2\rgb_all_png\train_extend_DBT_slice_rgb_patch3", base_transform=base_transform,transform1=transform1, transform2=transform2)

In [4]:
from torch.utils.data import DataLoader
train_loader = DataLoader(dataset, batch_size=32, shuffle=True)


In [5]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import models

class ContrastiveNet(nn.Module):
    def __init__(self):
        super(ContrastiveNet, self).__init__()
        self.feature_extractor = models.resnet50(pretrained=False)
        self.feature_extractor.conv1 = nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3, bias=False)
        # 移除最后的全连接层
        self.feature_extractor.fc = nn.Identity()  # 移除全连接层，只保留特征提取部分
        # 全连接层
        self.fc = nn.Sequential(
            nn.Linear(2048, 256),  # ResNet18 的输出特征维度是512
            nn.ReLU(inplace=True),
            nn.Linear(256, 128)  # 输出特征维度为128
        )

    def forward(self, x):
        # 提取特征
        x = self.feature_extractor(x)
        x = self.fc(x)
        # L2归一化
        x = F.normalize(x, p=2, dim=1)
        return x

In [6]:
class ContrastiveLoss(nn.Module):
    def __init__(self, margin=1.0):
        super(ContrastiveLoss, self).__init__()
        self.margin = margin

    def forward(self, feature1, feature2, label):
        distance = F.pairwise_distance(feature1, feature2)
        loss = torch.mean((1 - label) * torch.pow(distance, 2) +  # 正样本对
                          label * torch.pow(torch.clamp(self.margin - distance, min=0.0), 2))  # 负样本对
        return loss

class InfoNCELoss(nn.Module):
    def __init__(self, temperature=0.1):
        super(InfoNCELoss, self).__init__()
        self.temperature = temperature

    def forward(self, feature1, feature2,label):
        similarity_matrix = torch.matmul(feature1, feature2.T) / self.temperature
        pos_similarity = torch.diag(similarity_matrix)
        neg_similarity = torch.logsumexp(similarity_matrix, dim=1) - pos_similarity
        loss = -torch.mean((1 - label) * pos_similarity - label * neg_similarity)
        return loss

In [None]:
import torch.optim as optim
from torch.utils.data import DataLoader

model = ContrastiveNet()
criterion = ContrastiveLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

def train(model, train_loader, criterion, optimizer, num_epochs=150):
    model.train()
    best_loss = float('inf')
    best_model_state = None
    for epoch in range(num_epochs):
        epoch_loss = 0.0
        for batch in train_loader:
            img1, img2, label = batch
            feature1 = model(img1)
            feature2 = model(img2)
            loss = criterion(feature1, feature2,label)  # 如果使用 ContrastiveLoss，需要传入 label
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item()
        avg_epoch_loss = epoch_loss / len(train_loader)
        print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {avg_epoch_loss:.4f}")
        
        if avg_epoch_loss < best_loss:
            best_loss = avg_epoch_loss
            best_model_state = model.state_dict()  # 保存最佳模型状态
        else:
            best_model_state = None

        
        if best_model_state is not None:
            torch.save(best_model_state, "best_contrastive_model_resnet50.pth")
            print(f"Saved best model with loss: {best_loss:.4f}")
# 开始训练
train(model, train_loader, criterion, optimizer, num_epochs=100)


In [8]:
import os
import json
from PIL import Image
from torch.utils.data import Dataset
import cv2

class CustomDataset(Dataset):
    def __init__(self, dataset_path, json_path, transform=None, augment_transform=None, n=2):
        self.dataset_path = dataset_path
        self.transform = transform
        self.augment_transform = augment_transform
        self.n = n

        with open(json_path, 'r') as f:
            self.labels = json.load(f)

        self.image_files = [f for f in os.listdir(dataset_path) if f.endswith('.png')]
    
    def __len__(self):
        return len(self.image_files) * self.n
    
    def _extract_prefix(self, filename):
        parts = filename.split('_')
        if len(parts) >= 3:
            prefix = '_'.join(parts[:3])
            view_char = parts[2]
            return prefix,view_char
        return filename  # 如果不足三个 '_'，返回原文件名
    
    def __getitem__(self, idx):
        original_idx = idx % len(self.image_files)

        img_name = self.image_files[original_idx]

        img_path = os.path.join(self.dataset_path, img_name)

        image = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)  # 读取灰度图像

        img_prefix,view_char = self._extract_prefix(img_name)
        if view_char.lower().startswith('r'):
            image = cv2.flip(image, 1)  # 水平翻转图像

        normalized_image = image.astype(np.float32) / 255.0
        normalized_image = (normalized_image * 255).astype(np.uint8)    
        rgb_image = cv2.applyColorMap(normalized_image, cv2.COLORMAP_JET)

        rgb_image = Image.fromarray(cv2.cvtColor(rgb_image, cv2.COLOR_BGR2RGB))

        label = -1  # 默认标签
        for key in self.labels:
            key_prefix,_ = self._extract_prefix(key)
            if key_prefix == img_prefix:
                label = self.labels[key]
                break
        if idx >= len(self.image_files) and self.augment_transform:
            # 对数据增强的样本应用增强变换
            rgb_image = self.augment_transform(rgb_image)
        elif self.transform:
            # 对原始样本应用默认变换
            rgb_image = self.transform(rgb_image)
        single_channel_image = rgb_image[1].unsqueeze(0)
        return single_channel_image, label

In [9]:
class BinaryClassificationNet(nn.Module):
    def __init__(self, feature_extractor, num_classes=2, hidden_dim=64):
        super(BinaryClassificationNet, self).__init__()
        self.feature_extractor = feature_extractor  # 对比学习的特征提取器
        self.classifier = nn.Sequential(
            nn.Linear(128, hidden_dim),  # 假设特征维度是128
            nn.ReLU(inplace=True),
            nn.Linear(hidden_dim, num_classes)  # 二分类任务
        )
        # 冻结特征提取器的参数
        for param in self.feature_extractor.parameters():
            param.requires_grad = False

    def forward(self, x):
        features = self.feature_extractor(x)
        features = features.view(features.size(0), -1)
        logits = self.classifier(features)
        return logits  # 使用BCEWithLogitsLoss时不需要Sigmoid


In [None]:
contrastive_model = ContrastiveNet()
contrastive_model.load_state_dict(torch.load("best_contrastive_model_resnet50.pth"))
feature_extractor = contrastive_model

In [11]:
binary_model = BinaryClassificationNet(feature_extractor)

In [20]:
#criterion = nn.BCEWithLogitsLoss()
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(binary_model.parameters(), lr=0.0001)

In [28]:
dataset_path = r"F:\Dataset\Test\test_phase2\rgb_all_png\test_extend_DBT_slice_rgb_patch3_split"
json_path = r"F:\Dataset\Test\results_test.json"


default_transform = transforms.Compose([
    transforms.Resize((224, 224)),  # 调整图片大小
    transforms.ToTensor(),          # 转换为Tensor
])

augment_transform = transforms.Compose([
    transforms.Resize((224, 224)),            # 调整图片大小
    transforms.RandomHorizontalFlip(p=0.5),  # 随机水平翻转
    transforms.RandomRotation(10),           # 随机旋转
    #transforms.ColorJitter(brightness=0.2, contrast=0.2),  # 随机调整亮度和对比度
    transforms.ToTensor(),                   # 转换为Tensor
])

dataset = CustomDataset(
    dataset_path=dataset_path,
    json_path=json_path,
    transform=default_transform,
    augment_transform=augment_transform,
    n=3
)

train_dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

val_dataset  = r"F:\Dataset\Val\val_phase2\rgb_all_png\val_extend_DBT_slice_rgb_patch3_split"
val_json = r"F:\Dataset\Val\results_val.json"

val_dataset =CustomDataset(
    dataset_path=val_dataset,
    json_path=val_json,
    transform=default_transform,
    augment_transform=augment_transform,
    n=2
)

val_dataloader = DataLoader(val_dataset, batch_size=32, shuffle=False)

In [29]:
from sklearn.metrics import (accuracy_score, precision_score, recall_score, f1_score,
                             roc_auc_score, average_precision_score, matthews_corrcoef,
                             cohen_kappa_score, confusion_matrix, classification_report)
import numpy as np

def train_model(train_dataloader, val_dataloader, model, criterion, optimizer, num_epochs=10):
    best_auc = 0.0  # 用于保存最高的 AUC 值
    log_file = "training_log_resnet50.txt"  # 日志文件路径

    # 打开日志文件
    with open(log_file, "w") as f:
        f.write("Epoch\tTrain Loss\tTrain Accuracy\tTrain AUC\tTrain Precision\tTrain Sensitivity\tTrain Specificity\tTrain f1\tVal Accuracy\tVal AUC\tVal Precision\tVal Sensitivity\tVal Specificity\tVal f1\n")

    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        train_preds = []
        train_labels = []

        for images, labels in train_dataloader:
            images = images.to(device)
            labels = labels.to(device)
            labels = labels.long()
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            preds = torch.argmax(outputs, dim=1)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            train_preds.extend(preds.detach().cpu().numpy())
            train_labels.extend(labels.cpu().numpy())

        
        train_loss = running_loss / len(train_dataloader)
        # train_accuracy = accuracy_score(train_labels, (np.array(train_preds) > 0.5).astype(int))
        # train_auc = roc_auc_score(train_labels, train_preds)
        # train_precision = precision_score(train_labels, (np.array(train_preds) > 0.5).astype(int))
        # train_sensitivity = recall_score(train_labels, (np.array(train_preds) > 0.5).astype(int))
        # train_specificity = recall_score(train_labels, (np.array(train_preds) > 0.5).astype(int), pos_label=0)
        # train_f1score = f1_score(train_labels, (np.array(train_preds) > 0.5).astype(int))
        train_accuracy = accuracy_score(train_labels, train_preds)
        train_auc = roc_auc_score(train_labels, train_preds)
        train_precision = precision_score(train_labels, train_preds)
        train_sensitivity = recall_score(train_labels, train_preds)
        train_specificity = recall_score(train_labels, train_preds, pos_label=0)
        train_f1score = f1_score(train_labels, train_preds)

        model.eval()
        val_preds = []
        val_labels = []

        with torch.no_grad():
            for images, labels in val_dataloader:
                images = images.to(device)
                labels = labels.to(device)
                labels = labels.long()
                outputs = model(images)
                preds = torch.argmax(outputs, dim=1)
                val_preds.extend(preds.detach().cpu().numpy())
                val_labels.extend(labels.cpu().numpy())

        # val_accuracy = accuracy_score(val_labels, (np.array(val_preds) > 0.5).astype(int))
        # val_auc = roc_auc_score(val_labels, val_preds)
        # val_precision = precision_score(val_labels, (np.array(val_preds) > 0.5).astype(int))
        # val_sensitivity = recall_score(val_labels, (np.array(val_preds) > 0.5).astype(int))
        # val_specificity = recall_score(val_labels, (np.array(val_preds) > 0.5).astype(int), pos_label=0)
        # val_f1score = f1_score(val_labels, (np.array(val_preds) > 0.5).astype(int))
        val_accuracy = accuracy_score(val_labels, val_preds)
        val_auc = roc_auc_score(val_labels, val_preds)
        val_precision = precision_score(val_labels, val_preds)
        val_sensitivity = recall_score(val_labels, val_preds)
        val_specificity = recall_score(val_labels, val_preds, pos_label=0)
        val_f1score = f1_score(val_labels, val_preds)

        print(f"Epoch [{epoch + 1}/{num_epochs}], "
              f"Train Loss: {train_loss:.4f}, "
              f"Train Accuracy: {train_accuracy:.4f}, "
              f"Train AUC: {train_auc:.4f}, "
              f"Train Precision: {train_precision:.4f}, "
              f"Val Accuracy: {val_accuracy:.4f}, "
              f"Val AUC: {val_auc:.4f}, "
              f"Val Precision: {val_precision:.4f}")

        # 保存指标到日志文件
        with open(log_file, "a") as f:
            f.write(f"{epoch + 1}\t{train_loss:.4f}\t{train_accuracy:.4f}\t{train_auc:.4f}\t{train_precision:.4f}\t{train_sensitivity:.4f}\t{train_specificity:.4f}\t{train_f1score:.4f}\t"
                    f"{val_accuracy:.4f}\t{val_auc:.4f}\t{val_precision:.4f}\t{val_sensitivity:.4f}\t{val_specificity:.4f}\t{val_f1score:.4f}\n")

        if val_auc > best_auc:
            best_auc = val_auc
            torch.save(model.state_dict(), "best_model_resnet50.pth")
            print(f"New best model saved with AUC: {best_auc:.4f}")

model = binary_model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

BinaryClassificationNet(
  (feature_extractor): ContrastiveNet(
    (feature_extractor): ResNet(
      (conv1): Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
      (layer1): Sequential(
        (0): Bottleneck(
          (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
          (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_

In [30]:
train_model(train_dataloader, val_dataloader, model, criterion, optimizer, num_epochs=3000)

Epoch [1/3000], Train Loss: 0.6663, Train Accuracy: 0.5798, Train AUC: 0.5817, Train Precision: 0.6100, Val Accuracy: 0.5435, Val AUC: 0.5399, Val Precision: 0.5714
New best model saved with AUC: 0.5399
Epoch [2/3000], Train Loss: 0.6756, Train Accuracy: 0.5621, Train AUC: 0.5640, Train Precision: 0.5911, Val Accuracy: 0.5556, Val AUC: 0.5522, Val Precision: 0.5909
New best model saved with AUC: 0.5522
Epoch [3/3000], Train Loss: 0.6685, Train Accuracy: 0.5736, Train AUC: 0.5755, Train Precision: 0.6035, Val Accuracy: 0.5459, Val AUC: 0.5424, Val Precision: 0.5755
Epoch [4/3000], Train Loss: 0.6733, Train Accuracy: 0.5638, Train AUC: 0.5658, Train Precision: 0.5930, Val Accuracy: 0.5507, Val AUC: 0.5475, Val Precision: 0.5776
Epoch [5/3000], Train Loss: 0.6701, Train Accuracy: 0.5709, Train AUC: 0.5728, Train Precision: 0.6004, Val Accuracy: 0.5507, Val AUC: 0.5478, Val Precision: 0.5726
Epoch [6/3000], Train Loss: 0.6727, Train Accuracy: 0.5585, Train AUC: 0.5604, Train Precision: 0.5

KeyboardInterrupt: 