In [1]:
# https://github.com/rasbt/dora-from-scratch
# https://magazine.sebastianraschka.com/p/lora-and-dora-from-scratch

from utils import EarlyStopping, train_loop_with_resume
import time
import numpy as np
import torch
from torch.utils.data import DataLoader, random_split
import torch.nn.functional as F
from torch import nn, optim
from torch.optim.lr_scheduler import CosineAnnealingLR
from torchvision import datasets, transforms

if torch.cuda.is_available():
    torch.cuda.empty_cache()
    torch.backends.cudnn.deterministic = True

In [2]:
###########################
#### Settings
###########################
DEVICE = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
BATCH_SIZE = 128
TRAIN_RATIO = 0.8

###########################
#### mnist dataset
###########################
labelled_set = datasets.MNIST(root='../../data', train=True, transform=transforms.ToTensor(), download=False)
test_set = datasets.MNIST(root='../../data', train=False, transform=transforms.ToTensor(), download=False)

# 划分训练集和验证集
train_size = int(TRAIN_RATIO * len(labelled_set))
valid_size = len(labelled_set) - train_size
train_set, valid_set = random_split(labelled_set, [train_size, valid_size])
train_loader = DataLoader(train_set, batch_size=BATCH_SIZE, shuffle=True)
valid_loader = DataLoader(valid_set, batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(test_set, batch_size=BATCH_SIZE, shuffle=False)

# 检查数据集张量维度
check_batch = next(iter(train_loader))
print(f"批次数据维度: {check_batch[0].shape}, 标签维度: {check_batch[1].shape}")

批次数据维度: torch.Size([128, 1, 28, 28]), 标签维度: torch.Size([128])


In [4]:
#############################
#### Hyperparameters
#############################
SEED = 123
torch.manual_seed(SEED)
LEARNING_RATE = 5e-3
NUM_EPOCHS = 50

#############################
#### Architecture
#############################
n_features = 28 * 28  # MNIST images are 28x28 pixels
n_classes = 10
n_hidden1 = 256
n_hidden2 = 64


#############################
#### Perceptron Model
#############################
class MultiLayerPerceptron(nn.Module):
    def __init__(self, in_features, hidden1, hidden2, out_features, rank=4, alpha=1.0):
        super().__init__()
        self.fc1 = nn.Linear(in_features, hidden1)
        self.fc2 = nn.Linear(hidden1, hidden2)
        self.fc3 = nn.Linear(hidden2, out_features)

    def forward(self, x):
        x = x.view(x.size(0), -1)  # 展平输入
        x = self.fc1(x)
        x = F.relu(x)
        x = self.fc2(x)
        x = F.relu(x)
        x = self.fc3(x)
        return x


model_pretrained = MultiLayerPerceptron(n_features, n_hidden1, n_hidden2, n_classes).to(DEVICE)
optimizer_pretrained = optim.Adam(model_pretrained.parameters(), lr=LEARNING_RATE)
criterion = nn.CrossEntropyLoss()
scheduler = CosineAnnealingLR(optimizer_pretrained, T_max=NUM_EPOCHS, eta_min=1e-6)
early_stopping = EarlyStopping(patience=3, delta=1e-4, mode="max")
best_ckpt_file_path = "best_pretrained.pth"
# 训练
accuracy, discord = train_loop_with_resume(
    model_pretrained,
    train_loader,
    valid_loader,
    criterion,
    optimizer_pretrained,
    scheduler,
    early_stopping,
    best_ckpt_file_path,
    NUM_EPOCHS,
    DEVICE,
)
print(f"训练过程损失: {discord['train_loss']}")
print(f"训练过程准确率: {discord['train_acc']}")
print(f"验证过程损失: {discord['valid_loss']}")
print(f"验证过程准确率: {discord['valid_acc']}")

Checkpoint loaded from best_pretrained.pth, resume from epoch 14
加载训练点模型成功，当前准确率为0.9958，从第14个epoch开始训练...

Epoch 15/50 - --------------------------------------------------


                                                                                                   

当前验证准确率: 0.9959
当前学习率: 1.00e-06
更新最佳验证准确率: 0.9959
Checkpoint saved to best_pretrained.pth

Epoch 16/50 - --------------------------------------------------


                                                                                                   

当前验证准确率: 0.9959
当前学习率: 5.56e-05

Epoch 17/50 - --------------------------------------------------


                                                                                                   

当前验证准确率: 0.9960
当前学习率: 2.17e-04
更新最佳验证准确率: 0.9960
Checkpoint saved to best_pretrained.pth

Epoch 18/50 - --------------------------------------------------


                                                                                                   

当前验证准确率: 0.9957
早停触发!
训练过程损失: [0.0085473161439877, 0.007274437322029067, 0.0074007072673897105, 0.007215604644453076]
训练过程准确率: [0.998125, 0.9983125, 0.9982708333333333, 0.9984791666666667]
验证过程损失: [0.018689847316398906, 0.018686501401321342, 0.0184339152694835, 0.018809774229303]
验证过程准确率: [0.9959166666666667, 0.9959166666666667, 0.996, 0.9956666666666667]




In [12]:
class LoRALayer(nn.Module):
    def __init__(self, in_features, out_features, rank, alpha):
        super().__init__()
        std_dev = torch.tensor(1 / rank).sqrt()
        self.A = nn.Parameter(torch.randn(in_features, rank) * std_dev)
        self.B = nn.Parameter(torch.zeros(rank, out_features))
        self.alpha = alpha

    def forward(self, x):
        x = self.alpha * (x @ self.A @ self.B)
        return x


class LinearWithLoRA(nn.Module):
    """带LoRA的线性层"""

    def __init__(self, linear_layer: nn.Linear, rank: int = 4, alpha: float = 1.0):
        super().__init__()
        self.linear = linear_layer
        for param in self.linear.parameters():
            param.requires_grad = False  # 冻结原线性层的参数
        self.lora = LoRALayer(linear_layer.in_features, linear_layer.out_features, rank, alpha)

    def forward(self, x):
        # 不考虑计算效率
        # return self.linear(x) + self.lora(x)
        # 计算合并后的权重矩阵: W_original + alpha * (A @ B)^T
        # 注意：linear层的权重矩阵形状是(out_features, in_features)，所以需要转置
        W_lora = self.lora.alpha * (self.lora.A @ self.lora.B).T
        W_linear = self.linear.weight
        W_combined = W_lora + W_linear
        return F.linear(x, W_combined, self.linear.bias)


import copy

model_lora = copy.deepcopy(model_pretrained)
# 替换模型中的线性层为带LoRA的线性层
model_lora.fc1 = LinearWithLoRA(model_lora.fc1, rank=4, alpha=1.0)
model_lora.fc2 = LinearWithLoRA(model_lora.fc2, rank=4, alpha=1.0)
model_lora.fc3 = LinearWithLoRA(model_lora.fc3, rank=4, alpha=1.0)
model_lora

MultiLayerPerceptron(
  (fc1): LinearWithLoRA(
    (linear): Linear(in_features=784, out_features=128, bias=True)
    (lora): LoRALayer()
  )
  (fc2): LinearWithLoRA(
    (linear): Linear(in_features=128, out_features=256, bias=True)
    (lora): LoRALayer()
  )
  (fc3): LinearWithLoRA(
    (linear): Linear(in_features=256, out_features=10, bias=True)
    (lora): LoRALayer()
  )
)

In [None]:
def save_lora_parameters(model, filename="lora_parameters.pth"):
    """保存LoRA参数"""
    lora_params = {}
    for name, module in model.named_modules():
        if isinstance(module, LoRALayer):
            lora_params[name + ".A"] = module.A
            lora_params[name + ".B"] = module.B
            lora_params[name + ".alpha"] = module.alpha
    torch.save(lora_params, filename)
    print(f"LoRA参数已保存到 {filename}")

In [11]:

optimizer_lora = optim.Adam(model_lora.parameters(), lr=LEARNING_RATE)
print("开始训练带LoRA的模型...")
model_lora.to(DEVICE)
for epoch in range(NUM_EPOCHS):
    print(f"\nEpoch {epoch + 1}/{NUM_EPOCHS}")
    print("-" * 50)
    train_loss, train_acc = train_epoch(
        model_lora, train_loader, optimizer_lora, criterion, DEVICE
    )
    valid_loss, valid_acc = validate_epoch(model_lora, valid_loader, criterion, DEVICE)
    if valid_acc > best_acc:
        best_acc = valid_acc
        torch.save(model_lora.state_dict(), "best_model_lora.pth")
        print(f"最佳验证准确率: {best_acc:.4f}，模型已保存")

开始训练带LoRA的模型...

Epoch 1/15
--------------------------------------------------


                                                                                                   

最佳验证准确率: 0.9769，模型已保存

Epoch 2/15
--------------------------------------------------


                                                                                                   

最佳验证准确率: 0.9774，模型已保存

Epoch 3/15
--------------------------------------------------


                                                                                                   

最佳验证准确率: 0.9782，模型已保存

Epoch 4/15
--------------------------------------------------


                                                                                                   


Epoch 5/15
--------------------------------------------------


                                                                                                   

最佳验证准确率: 0.9788，模型已保存

Epoch 6/15
--------------------------------------------------


                                                                                                   


Epoch 7/15
--------------------------------------------------


                                                                                                   


Epoch 8/15
--------------------------------------------------


                                                                                                   


Epoch 9/15
--------------------------------------------------


                                                                                                   

最佳验证准确率: 0.9789，模型已保存

Epoch 10/15
--------------------------------------------------


                                                                                                   


Epoch 11/15
--------------------------------------------------


                                                                                                   


Epoch 12/15
--------------------------------------------------


                                                                                                   


Epoch 13/15
--------------------------------------------------


                                                                                                   


Epoch 14/15
--------------------------------------------------


                                                                                                   


Epoch 15/15
--------------------------------------------------


                                                                                                   

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from collections import OrderedDict


class LinearWithLoRA(nn.Module):
    """带LoRA的线性层"""

    def __init__(self, linear_layer, rank=4, alpha=1.0):
        super().__init__()
        self.linear = linear_layer
        # 冻结原始线性层的参数
        for param in self.linear.parameters():
            param.requires_grad = False

        # LoRA参数
        std_dev = 1.0 / rank ** 0.5
        self.lora_A = nn.Parameter(torch.randn(linear_layer.in_features, rank) * std_dev)
        self.lora_B = nn.Parameter(torch.zeros(rank, linear_layer.out_features))
        self.alpha = alpha

    def forward(self, x):
        # 计算合并后的权重矩阵
        lora_weight = self.alpha * (self.lora_A @ self.lora_B).T
        combined_weight = self.linear.weight + lora_weight
        return F.linear(x, combined_weight, self.linear.bias)


class MultiLayerPerceptronWithLoRA(nn.Module):
    def __init__(self, in_features, hidden1, hidden2, out_features, rank=4, alpha=1.0):
        super().__init__()
        # 创建原始线性层
        self.fc1 = LinearWithLoRA(nn.Linear(in_features, hidden1), rank, alpha)
        self.fc2 = LinearWithLoRA(nn.Linear(hidden1, hidden2), rank, alpha)
        self.fc3 = LinearWithLoRA(nn.Linear(hidden2, out_features), rank, alpha)

        # 保存超参数以便重建模型
        self.config = {
            'in_features': in_features,
            'hidden1': hidden1,
            'hidden2': hidden2,
            'out_features': out_features,
            'rank': rank,
            'alpha': alpha
        }

    def forward(self, x):
        x = x.view(x.size(0), -1)  # 展平输入
        x = self.fc1(x)
        x = F.relu(x)
        x = self.fc2(x)
        x = F.relu(x)
        x = self.fc3(x)
        return x

    def save_lora_parameters(self, filepath):
        """只保存LoRA参数"""
        lora_state_dict = {}
        for name, module in self.named_modules():
            if isinstance(module, LinearWithLoRA):
                lora_state_dict[f"{name}.lora_A"] = module.lora_A
                lora_state_dict[f"{name}.lora_B"] = module.lora_B

        torch.save({
            'lora_state_dict': lora_state_dict,
            'config': self.config
        }, filepath)
        print(f"LoRA参数已保存到: {filepath}")

    def load_lora_parameters(self, filepath):
        """加载LoRA参数"""
        checkpoint = torch.load(filepath)
        lora_state_dict = checkpoint['lora_state_dict']

        for name, param in lora_state_dict.items():
            # 解析参数名称
            module_name, param_name = name.rsplit('.', 1)
            module = dict(self.named_modules())[module_name]
            setattr(module, param_name, nn.Parameter(param))

        print(f"LoRA参数已从 {filepath} 加载")

    def get_lora_state_dict(self):
        """获取LoRA参数的state_dict"""
        lora_state_dict = {}
        for name, module in self.named_modules():
            if isinstance(module, LinearWithLoRA):
                lora_state_dict[f"{name}.lora_A"] = module.lora_A
                lora_state_dict[f"{name}.lora_B"] = module.lora_B
        return lora_state_dict


# 工具函数：从预训练模型创建LoRA模型
def create_lora_model_from_pretrained(pretrained_model, rank=4, alpha=1.0):
    """从预训练的普通MLP创建LoRA版本"""
    # 假设预训练模型有fc1, fc2, fc3
    config = {
        'in_features': pretrained_model.fc1.in_features,
        'hidden1': pretrained_model.fc1.out_features,
        'hidden2': pretrained_model.fc2.out_features,
        'out_features': pretrained_model.fc3.out_features,
        'rank': rank,
        'alpha': alpha
    }

    # 创建LoRA模型
    lora_model = MultiLayerPerceptronWithLoRA(**config)

    # 复制预训练权重到LoRA模型的linear层
    lora_model.fc1.linear.load_state_dict(pretrained_model.fc1.state_dict())
    lora_model.fc2.linear.load_state_dict(pretrained_model.fc2.state_dict())
    lora_model.fc3.linear.load_state_dict(pretrained_model.fc3.state_dict())

    return lora_model


# 使用示例
if __name__ == "__main__":
    # 1. 创建并训练模型
    model = MultiLayerPerceptronWithLoRA(784, 256, 128, 10, rank=4, alpha=1.0)

    # 模拟训练数据
    x = torch.randn(32, 784)
    y = torch.randint(0, 10, (32,))

    # 模拟训练过程
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    criterion = nn.CrossEntropyLoss()

    for epoch in range(5):
        optimizer.zero_grad()
        output = model(x)
        loss = criterion(output, y)
        loss.backward()
        optimizer.step()
        print(f"Epoch {epoch + 1}, Loss: {loss.item():.4f}")

    # 2. 保存参数的不同方式

    # 方式1: 保存完整模型
    torch.save(model.state_dict(), 'full_model.pth')
    print("完整模型已保存")

    # 方式2: 只保存LoRA参数
    model.save_lora_parameters('lora_only.pth')

    # 方式3: 分别保存（推荐用于生产环境）
    # 保存基础模型（可以多个LoRA适应器共享）
    base_state_dict = {}
    for name, module in model.named_modules():
        if isinstance(module, LinearWithLoRA):
            base_state_dict[f"{name}.linear.weight"] = module.linear.weight
            base_state_dict[f"{name}.linear.bias"] = module.linear.bias

    torch.save({
        'base_state_dict': base_state_dict,
        'config': model.config
    }, 'base_model.pth')
    print("基础模型已保存")

    # 3. 加载参数示例
    print("\n=== 加载测试 ===")

    # 创建新模型并加载完整参数
    new_model = MultiLayerPerceptronWithLoRA(784, 256, 128, 10, rank=4, alpha=1.0)
    new_model.load_state_dict(torch.load('full_model.pth'))
    print("完整模型加载成功")

    # 验证输出一致性
    with torch.no_grad():
        original_output = model(x)
        loaded_output = new_model(x)
        print(f"输出差异: {torch.max(torch.abs(original_output - loaded_output)).item():.8f}")

    # 4. 显示参数统计
    total_params = sum(p.numel() for p in model.parameters())
    lora_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
    print(f"\n参数统计:")
    print(f"总参数数量: {total_params:,}")
    print(f"LoRA参数数量: {lora_params:,}")
    print(f"可训练参数比例: {lora_params / total_params:.2%}")