In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import requests
import pickle
import os
import math

In [2]:
# 下载 CIFAR-10 数据集 (如果未下载)
def download_cifar10():
    url = "https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz"
    filename = "cifar-10-python.tar.gz"
    if not os.path.exists(filename):
        print("Downloading CIFAR-10 dataset...")
        response = requests.get(url, stream=True)
        with open(filename, "wb") as f:
            f.write(response.content)
        print("Download complete.")

    # 解压数据集
    if not os.path.exists("cifar-10-batches-py"):
        import tarfile
        print("Extracting CIFAR-10 dataset...")
        with tarfile.open(filename, "r:gz") as tar:
            tar.extractall()
        print("Extraction complete.")

download_cifar10()

# 加载 CIFAR-10 数据集
def load_cifar10(data_dir="cifar-10-batches-py", train=True):
    if train:
        files = [f"data_batch_{i}" for i in range(1, 6)]
    else:
        files = ["test_batch"]

    data = []
    labels = []
    for file in files:
        with open(os.path.join(data_dir, file), "rb") as f:
            batch = pickle.load(f, encoding="bytes")
            data.append(batch[b"data"])
            labels.extend(batch[b"labels"])

    data = torch.tensor(data).view(-1, 3, 32, 32).float() / 255.0  # 归一化到 [0, 1]
    labels = torch.tensor(labels).long()
    return data, labels

# 加载训练集和测试集
train_data, train_labels = load_cifar10(train=True)
test_data, test_labels = load_cifar10(train=False)

print(f"Train data shape: {train_data.shape}, Train labels shape: {train_labels.shape}")
print(f"Test data shape: {test_data.shape}, Test labels shape: {test_labels.shape}")

  data = torch.tensor(data).view(-1, 3, 32, 32).float() / 255.0  # 归一化到 [0, 1]


Train data shape: torch.Size([50000, 3, 32, 32]), Train labels shape: torch.Size([50000])
Test data shape: torch.Size([10000, 3, 32, 32]), Test labels shape: torch.Size([10000])


In [3]:
class DataLoader:
    def __init__(self, data, labels, batch_size, shuffle=True):
        self.data = data
        self.labels = labels
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.num_samples = len(data)
        self.indices = torch.arange(self.num_samples)

    def __iter__(self):
        if self.shuffle:
            self.indices = self.indices[torch.randperm(self.num_samples)]
        for start in range(0, self.num_samples, self.batch_size):
            end = start + self.batch_size
            batch_indices = self.indices[start:end]
            yield self.data[batch_indices], self.labels[batch_indices]

# 定义训练集和测试集的数据加载器
train_loader = DataLoader(train_data, train_labels, batch_size=64, shuffle=True)
test_loader = DataLoader(test_data, test_labels, batch_size=64, shuffle=False)

In [4]:
class TransformerClassifier(nn.Module):
    def __init__(self, in_channels=3, patch_size=4, d_model=64, num_heads=4, num_classes=10, num_layers=6, dropout=0.1):
        super(TransformerClassifier, self).__init__()

        # Patch 数量和每个 patch 的特征维度
        self.patch_size = patch_size
        self.num_patches = (32 // patch_size) * (32 // patch_size)  # CIFAR-10 图像为 32x32
        self.flatten_dim = patch_size * patch_size * in_channels  # 每个 patch 的特征维度

        # Patch Embedding
        self.patch_embedding = nn.Linear(self.flatten_dim, d_model)  # 输入特征维度必须与 flatten_dim 匹配

        # 位置编码
        self.position_encoding = nn.Parameter(self._generate_positional_encoding(self.num_patches, d_model))

        # Transformer Encoder
        encoder_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=num_heads, dim_feedforward=256, dropout=dropout)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)

        # 分类头
        self.classifier = nn.Linear(d_model, num_classes)

    def _generate_positional_encoding(self, num_patches, d_model):
        pos = torch.arange(num_patches).unsqueeze(1)  # (num_patches, 1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))  # (d_model / 2)
        pe = torch.zeros(num_patches, d_model)
        pe[:, 0::2] = torch.sin(pos * div_term)  # 偶数位置
        pe[:, 1::2] = torch.cos(pos * div_term)  # 奇数位置
        return pe

    def forward(self, x):
        batch_size = x.shape[0]

        # 划分图像为 patches
        patches = x.unfold(2, self.patch_size, self.patch_size).unfold(3, self.patch_size, self.patch_size)
        patches = patches.permute(0, 2, 3, 1, 4, 5).reshape(batch_size, self.num_patches, -1)

        # 调试信息：检查 patches 的形状
        #print(f"Patches shape before embedding: {patches.shape}")

        # Patch Embedding
        patches = self.patch_embedding(patches)

        # 添加位置编码
        patches += self.position_encoding.unsqueeze(0)

        # Transformer Encoder
        patches = patches.permute(1, 0, 2)
        encoded_patches = self.transformer_encoder(patches)
        encoded_patches = encoded_patches.permute(1, 0, 2)

        # 分类头
        cls_token = encoded_patches[:, 0, :]
        logits = self.classifier(cls_token)

        return logits

In [5]:
# 训练函数
def train(model, dataloader, optimizer, criterion, device):
    model.train()
    total_loss = 0
    correct = 0
    total = 0

    for x, y in dataloader:
        x, y = x.to(device), y.to(device)

        # 前向传播
        outputs = model(x)
        loss = criterion(outputs, y)

        # 反向传播
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # 统计
        total_loss += loss.item()
        _, predicted = outputs.max(1)
        correct += predicted.eq(y).sum().item()
        total += y.size(0)

    return total_loss / 100, correct / total

# 测试函数
def test(model, dataloader, criterion, device):
    model.eval()
    total_loss = 0
    correct = 0
    total = 0

    with torch.no_grad():
        for x, y in dataloader:
            x, y = x.to(device), y.to(device)

            # 前向传播
            outputs = model(x)
            loss = criterion(outputs, y)

            # 统计
            total_loss += loss.item()
            _, predicted = outputs.max(1)
            correct += predicted.eq(y).sum().item()
            total += y.size(0)

    return total_loss / 100, correct / total

In [6]:
# 设置设备
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 初始化模型
model = TransformerClassifier().to(device)

# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# 训练模型
num_epochs = 10
for epoch in range(num_epochs):
    train_loss, train_acc = train(model, train_loader, optimizer, criterion, device)
    test_loss, test_acc = test(model, test_loader, criterion, device)

    print(f"Epoch {epoch+1}/{num_epochs}")
    print(f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}")
    print(f"Test Loss: {test_loss:.4f}, Test Acc: {test_acc:.4f}")



TypeError: object of type 'DataLoader' has no len()