本次对比学习实验以pytorch作为基础框架。

In [3]:
import numpy as np
import torch
import matplotlib
import matplotlib.pyplot as plt
from sklearn.manifold import TSNE
from torch import nn
from torchvision import datasets, transforms
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F

定义多个用于存储信息的超参数
    device：模型运行所在设备
    batch_size：批样本大小
    loss_list: 损失函数列表
    transform: 数据加载过程中对数据进行处理的方法
通过torch中包含的数据集加载方式加载MNIST数据集

In [4]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
batch_size = 256
loss_list = []

transform = transforms.Compose(
    [transforms.ToTensor(),
     transforms.Normalize([0.5], [0.5])])

train_data = datasets.MNIST(root='../data', train=True, transform=transform, download=True)
test_data = datasets.MNIST(root='../data', train=False, transform=transform, download=True)
epochs = 40
learning_rate = 7e-4

定义函数以获取数据集中对应的数据用于构建自定义数据集

In [5]:
def get_data(dataset):
    x_data, y_data = [], []
    for i in range(len(dataset)):
        x, y = dataset[i]
        x_data.append(x)
        y_data.append(y)
    return x_data, y_data

定义了一个自定义的数据集类 my_dataset，继承自 PyTorch 的 Dataset 类，主要用于从给定的输入数据和标签数据中创建一个数据集对象。
    获取数据样本
        根据给定的索引，从输入数据和标签数据中提取相应的样本 x 和标签 y。
        随机生成两个索引 idx1 和 idx2，分别用于选择与当前样本 x 具有相同标签和不同标签的样本。
        使用 while 循环确保 idx1 处的样本与当前样本标签 y 相同，而 idx2 处的样本标签与 y 不同。
        最终返回一个包含三个样本和一个标签的元组，即当前样本 x、与 x 标签相同的样本、与 x 标签不同的样本和标签 y。

In [6]:
class my_dataset(Dataset):
    def __init__(self, x_data, y_data):
        self.x_data = x_data
        self.y_data = y_data

    def __len__(self):
        return len(self.x_data)

    def __getitem__(self, idx):
        x = self.x_data[idx]
        y = self.y_data[idx]
        possibility = np.random.rand()
        idx1 = np.random.randint(0, len(self.y_data) - 1)
        idx2 = np.random.randint(0, len(self.y_data) - 1)
        while self.y_data[idx1] != y:
            idx1 = np.random.randint(0, len(self.y_data) - 1)
        while self.y_data[idx2] == y:
            idx2 = np.random.randint(0, len(self.y_data) - 1)
        return x, self.x_data[idx1],self.x_data[idx2],y

通过函数加载数据集中的数据后，创建自定义数据集的实例并放入DataLoader中用于训练/测试

In [7]:
train_x_data, train_y_data = get_data(train_data)
test_x_data, test_y_data = get_data(test_data)
train_dataset = my_dataset(train_x_data, train_y_data)
test_dataset = my_dataset(test_x_data, test_y_data)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=True)

构建孪生神经网络（Siamese类）用于对比学习任务
    网络结构
        卷积层和激活函数：
            第一层：nn.Conv2d(1, 4, kernel_size=3, padding=1, stride=1) 和 nn.ReLU()。
            第二层：nn.Conv2d(4, 8, kernel_size=3, padding=1, stride=1) 和 nn.ReLU()。
            第三层：nn.Conv2d(8, 16, kernel_size=3, padding=1, stride=1) 和 nn.ReLU()。
            第四层：nn.Conv2d(16, 4, kernel_size=3, padding=1, stride=1) 和 nn.ReLU()。
        批量归一化层：
            在每个卷积层后面，都有相应的批量归一化层：nn.BatchNorm2d(4), nn.BatchNorm2d(8), nn.BatchNorm2d(16), nn.BatchNorm2d(4)。
            扁平化层：
            nn.Flatten()：将多维的卷积输出展平为一维。
        Dropout层：
            nn.Dropout(0.2)：防止过拟合。
        全连接层：
            nn.Linear(3136, out_features=4096)：将展平后的特征映射到一个高维空间。

In [8]:
class Siamese(nn.Module):
    def __init__(self, output_size):
        super().__init__()
        self.features = nn.Sequential(
            nn.Conv2d(1, 4, kernel_size=3, padding=1, stride=1),
            nn.ReLU(),
            nn.BatchNorm2d(4),
            nn.Conv2d(4, 8, kernel_size=3, padding=1, stride=1),
            nn.ReLU(),
            nn.BatchNorm2d(8),
            nn.Conv2d(8, 16, kernel_size=3, padding=1, stride=1),
            nn.ReLU(),
            nn.BatchNorm2d(16),
            nn.Conv2d(16, 4, kernel_size=3, padding=1, stride=1),
            nn.BatchNorm2d(4),
            nn.ReLU(),
            nn.Flatten(),
            nn.Dropout(0.2),
            nn.Linear(3136, out_features=4096),
        )

    def forward(self, x):
        x = self.features(x)
        return x

定义三元组损失函数
    输入一个包括锚示例、正示例、负示例的数据三元组，通过优化锚示例与正示例的距离小于锚示例与负示例的距离，实现样本之间的相似性计算。
    计算公式如下：
        $L = max(d(a,p)-d(a,n)+margin,0)$


In [9]:
def triplet_loss(anchor, positive, negative, margin=1.0):
    distance_positive = F.pairwise_distance(anchor, positive, p=2)
    distance_negative = F.pairwise_distance(anchor, negative, p=2)
    losses = F.relu(distance_positive - distance_negative + margin)
    return losses.mean()

定义名为AutoEDcoder的类实现了一个卷积神经网络编码器部分，用于从输入图像中提取高维特征表示
    编码器结构
        卷积层和激活函数：
            第一层：nn.Conv2d输入通道为1，输出通道为3，卷积核大小为3，步幅为2，填充为1。
            第二层：nn.Conv2d输入通道为3，输出通道为9，卷积核大小为3，步幅为2，填充为1。
            第三层：nn.Conv2d输入通道为9，输出通道为18，卷积核大小为3，步幅为2，填充为1。
        批量归一化层：
            在每个卷积层后面，都有相应的批量归一化层：nn.BatchNorm2d(3), nn.BatchNorm2d(9), nn.BatchNorm2d(18)。
        激活函数：
            每个批量归一化层后面，跟着一个 LeakyReLU 激活函数：nn.LeakyReLU()。
        扁平化层：
            nn.Flatten()：将多维的卷积输出展平为一维。
        全连接层：
            nn.Linear(18 * 4 * 4, 512)：将展平后的特征映射到一个 512 维的空间。
            nn.Linear(512, 1024)：进一步将特征映射到一个 1024 维的空间。

In [10]:
class AutoEDcoder(nn.Module):
    def __init__(self):
        super(AutoEDcoder, self).__init__()
        # Encoder
        self.encoder = nn.Sequential(
            nn.Conv2d(in_channels=1, out_channels=3, kernel_size=3, stride=2, padding=1),
            nn.BatchNorm2d(3),
            nn.LeakyReLU(),
            nn.Conv2d(3, 9, 3, 2, 1),
            nn.BatchNorm2d(9),
            nn.LeakyReLU(),
            nn.Conv2d(9, 18, 3, 2, 1),
            nn.BatchNorm2d(18),
            nn.LeakyReLU(),
            nn.Flatten(),
            nn.Linear(18 * 4 * 4, 512),
            nn.Linear(512,1024)
        )

    def forward(self, x):
        # Encoder
        code = self.encoder(x)
        return code

定义函数用于训练相关模型
    在训练过程中，首先确保模型、数据位于同一个设备，然后通过在每个epoch中遍历整个训练数据集，并通过梯度下降进行优化，以此完成模型的训练。
    同时在训练过程中，在每个epoch训练结束时输出并记录该epoch总损失值，用于后续的模型性能评估。

In [11]:
def train(model, train_loader, optimizer, epochs, device):
    model = model.to(device)
    for epoch in range(epochs):
        total_loss = 0
        for i, (x1, x2, x3, y) in enumerate(train_loader):
            x1, x2, x3,y = x1.to(device), x2.to(device), x3.to(device),y.to(device)
            pred1 = model(x1)
            pred2 = model(x2)
            pred3 = model(x3)
            loss = triplet_loss(pred1, pred2, pred3,margin=10.0)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_loss += loss.detach().cpu().numpy()
        loss_list.append(total_loss)
        print(f'epoch:{epoch + 1} total_loss:{total_loss}')

eval函数用于评估相关的模型性能
    评估方式如下：
        模型设置与训练损失绘制：
            设置模型为评估模式，以禁用Dropout层等在训练模式下的功能。
            使用matplotlib库绘制训练过程中记录的损失值，展示损失随训练周期（epochs）的变化趋势。
        3D t-SNE可视化：
            使用t-SNE（t-Distributed Stochastic Neighbor Embedding）算法将高维数据降维到三维。
            创建一个3D图像窗口，用于绘制降维后的数据点。
            遍历测试数据集，将数据传递给模型并获取预测结果。
            对预测结果进行t-SNE降维，并对结果进行标准化处理，以确保数据点在图像中的分布均匀。
            在3D图像中绘制降维后的数据点，并为每个数据点标注其标签值。
        2D t-SNE可视化：
            使用t-SNE算法将高维数据降维到二维。
            遍历测试数据集，将数据传递给模型并获取预测结果。
            对预测结果进行t-SNE降维，并对结果进行标准化处理。
            在2D图像中绘制降维后的数据点，并为每个数据点着色，以反映其标签值。

In [12]:
def eval(model, device, test_loader, epochs):
    model = model.to(device)
    model.eval()
    x = [i for i in range(epochs)]
    plt.plot(x, loss_list)
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.show()
    tsne3d = TSNE(n_components=3, init='pca', perplexity=30., random_state=0, learning_rate=300)
    fig = plt.figure()
    ax = fig.add_subplot(111, projection='3d')
    with torch.no_grad():
        num=0
        for x, _, _, y in test_loader:
            x, y = x.to(device), y.detach().cpu().numpy()
            y_pred = model(x)
            np_y_pred = y_pred.detach().cpu().numpy()
            result = tsne3d.fit_transform(np_y_pred)
            x_min, x_max = result.min(0), result.max(0)
            result = (result - x_min) / (x_max - x_min)
            ax.scatter(result[:, 0], result[:, 1], result[:, 2], c=y, cmap='jet')
            for i, txt in enumerate(y):
                ax.text(result[i, 0], result[i, 1], result[i, 2], str(txt))
            num += 1
            if num == 3:
                break
        plt.show()
    tsne2d = TSNE(n_components=2, init='pca', perplexity=30., random_state=0, learning_rate=300)
    with (torch.no_grad()):
        num = 0
        for x, _, _, y in test_loader:
            x, y = x.to(device), y.detach().cpu().numpy()
            y_pred = model(x)
            np_y_pred = y_pred.detach().cpu().numpy()
            result = tsne2d.fit_transform(np_y_pred)
            x_min, x_max = result.min(0), result.max(0)
            result = (result - x_min) / (x_max - x_min)
            plt.scatter(result[:, 0], result[:, 1], c=y, cmap='jet')
            num+=1
            if num==3:
                break
        plt.show()

对孪生网络模型进行训练，并将对应的训练结果保存。

In [13]:
print('===========Siamese===========')
model = Siamese(output_size=10)
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
train(model, train_loader, optimizer, epochs, device)
# torch.save(model, '实现了一个卷积神经网络编码器部分，用于从输入图像中提取高维特征表示')
eval(model, device, test_loader, epochs)

在置空损失函数列表后，对自动编码器的编码器部分进行训练。训练中使用到的超参数同孪生网络训练中超参数一致。

In [14]:
loss_list = []
net = AutoEDcoder()
print('===========AutoEDcoder===========')
trainer = torch.optim.Adam(net.parameters(), lr=learning_rate)
train(net, train_loader, trainer, epochs, device)
eval(net, device, test_loader, epochs)