In [4]:
import os
os.environ["http_proxy"] = "http://127.0.0.1:7897"
os.environ["https_proxy"] = "http://127.0.0.1:7897"

# hugging-face 国内镜像源
os.environ['HF_ENDPOINT'] = 'https://hf-mirror.com'

In [None]:
import torch
from transformers import BertModel, DistilBertModel, DistilBertForQuestionAnswering
from transformers import DistilBertTokenizer, BertTokenizer
import torch.nn as nn

# 加载BERT教师模型与DistilBERT学生模型
teacher_model = BertModel.from_pretrained('bert-base-uncased')
student_model = DistilBertModel.from_pretrained('distilbert-base-uncased')

# 使用相同的Tokenizer进行词汇预处理
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
distil_tokenizer = DistilBertTokenizer.from_pretrained('distilbert-base-uncased')

# 定义输入文本并进行编码
text = "Machine reading comprehension is essential for question-answering."
inputs = tokenizer(text, return_tensors="pt")
distil_inputs = distil_tokenizer(text, return_tensors="pt")

# 获取教师模型输出
with torch.no_grad():
    teacher_outputs = teacher_model(**inputs).last_hidden_state

# 学生模型的前向传播
student_outputs = student_model(**distil_inputs).last_hidden_state

# 定义蒸馏损失函数：使用均方误差（MSE）对齐学生与教师模型的输出
distillation_loss = nn.MSELoss()(student_outputs, teacher_outputs)

# 打印蒸馏损失
print("Distillation Loss:", distillation_loss.item())

KL散度评估

In [4]:
import torch.nn.functional as F
def kl_distillation_loss(student_logits, teacher_logits, T=2.0):

    # 计算软概率分布
    # 注意：F.kl_div 的输入期望是 log_softmax，目标是 softmax
    p_teacher = F.softmax(teacher_logits / T, dim=-1)
    p_student = F.log_softmax(student_logits / T, dim=-1)
    
    # 计算 KL 散度
    # reduction='batchmean' 是数学上标准的 KL 散度计算方式
    loss = F.kl_div(p_student, p_teacher, reduction='batchmean') * (T ** 2)
    return loss

# 4. 计算并打印损失
loss = kl_distillation_loss(student_outputs, teacher_outputs, T=2.0)
print("KL Distillation Loss:", loss.item())

KL Distillation Loss: 0.3852507472038269


循环蒸馏效果

In [5]:
from torch.optim import AdamW
from tqdm import tqdm
# 这里也可以用其他优化器
optimizer = AdamW(student_model.parameters(), lr=1e-5)
texts = ["Machine learning is the study of algorithms.",
         "Natural Language Processing involves understanding human languages."]
labels = ["It is a subset of AI.", "A field in AI focusing on language."]

# 蒸馏训练循环
for epoch in range(3):
    print(f"Epoch {epoch + 1}")
    total_loss = 0
    for text, label in zip(texts, labels):
        # 准备输入
        inputs = tokenizer(text, return_tensors="pt")
        distil_inputs = distil_tokenizer(text, return_tensors="pt")
        
        # 获取教师模型输出
        with torch.no_grad():
            teacher_outputs = teacher_model(**inputs).last_hidden_state
        
        # 获取学生模型输出
        student_outputs = student_model(**distil_inputs).last_hidden_state
        
        # 计算蒸馏损失
        loss = nn.MSELoss()(student_outputs, teacher_outputs)
        
        # 反向传播与优化
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
        
        # 记录损失
        total_loss += loss.item()
    
    avg_loss = total_loss / len(texts)
    print(f"Average Distillation Loss: {avg_loss:.4f}")

Epoch 1
Average Distillation Loss: 0.0708
Epoch 2
Average Distillation Loss: 0.0597
Epoch 3
Average Distillation Loss: 0.0515


上面是直接进行加载dstill模型，下面要自己尝试进行distill操作

In [None]:
# 冻结和解冻BERT模型
import torch
from transformers import BertTokenizer, BertForSequenceClassification
from torch.optim import AdamW

# 加载预训练的BERT模型和分词器
tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
model = BertForSequenceClassification.from_pretrained("bert-base-uncased")

# 小数据进行单层微调
sentences = ["The book is great!", "The movie was terrible."]
labels = [1, 0]  # 假设1代表积极，0代表消极

# 数据预处理
inputs = tokenizer(sentences, return_tensors="pt", padding=True, truncation=True)

# 冻结所有BERT的层
for param in model.bert.parameters():
    param.requires_grad = False

# 解冻特定的层
for param in model.bert.encoder.layer[-2:].parameters():
    param.requires_grad = True

# 定义优化器，仅优化解冻层的参数
optimizer = AdamW(filter(lambda p: p.requires_grad, model.parameters()), lr=2e-5)

# 训练过程示例
model.train()
for epoch in range(3):  # 训练3个周期
    outputs = model(**inputs, labels=torch.tensor(labels))
    loss = outputs.loss
    loss.backward()
    optimizer.step()
    optimizer.zero_grad()
    print(f"Epoch {epoch+1} - Loss: {loss.item()}")

Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Epoch 1 - Loss: 0.7049428224563599
Epoch 2 - Loss: 0.6767551898956299
Epoch 3 - Loss: 0.5498147010803223


下面对学习率设置进行学习：
学习率决定了梯度更新的步长，如何避免模型错过最优解

In [3]:
import torch
from torch.utils.data import DataLoader
from transformers import BertTokenizer, BertForSequenceClassification, get_linear_schedule_with_warmup
from torch.optim import AdamW

# 加载数据集（示例数据）
data = [
    ("The company posted a significant increase in quarterly revenue.", 0),
    ("New heart disease medication approved by FDA.", 1),
    ("Stock market affected by global events.", 0),
    ("Medical advancements in treating rare diseases.", 1)
]
labels = [item[1] for item in data]
texts = [item[0] for item in data]

# 实例化Tokenizer
tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
inputs = tokenizer(texts, padding=True, truncation=True, return_tensors="pt")

# 将标签转换为张量
labels_tensor = torch.tensor(labels)

# 加载预训练的BERT模型并调整参数
model = BertForSequenceClassification.from_pretrained("bert-base-uncased", num_labels=2)

# 将模型设置为训练模式
model.train()

# 定义优化器和学习率调度器
optimizer = AdamW(model.parameters(), lr=2e-5, eps=1e-8)
epochs = 3
total_steps = len(inputs["input_ids"]) * epochs
scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=0, num_training_steps=total_steps)

# 将数据加载至DataLoader
train_data = DataLoader(list(zip(inputs["input_ids"], inputs["attention_mask"], labels_tensor)), batch_size=2)

# 微调BERT模型
for epoch in range(epochs):
    print(f"\nEpoch {epoch + 1}/{epochs}")
    total_loss = 0
    for batch in train_data:
        input_ids, attention_mask, labels = batch

        # 梯度清零
        optimizer.zero_grad()

        # 前向传播，获取损失
        outputs = model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
        loss = outputs.loss
        total_loss += loss.item()

        # 反向传播
        loss.backward()

        # 梯度裁剪，避免梯度爆炸
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

        # 参数更新
        optimizer.step()

        # 更新学习率
        scheduler.step()

    avg_loss = total_loss / len(train_data)
    print(f"Average training loss: {avg_loss:.4f}")

# 测试阶段：打印模型参数信息
print("\n部分模型参数示例：")
for name, param in model.named_parameters():
    if "classifier" in name:
        print(f"{name}: {param[:2]}")
        break

Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.



Epoch 1/3
Average training loss: 0.6955

Epoch 2/3
Average training loss: 0.5635

Epoch 3/3
Average training loss: 0.4875

部分模型参数示例：
classifier.weight: tensor([[-0.0259, -0.0196, -0.0052,  ...,  0.0098,  0.0145, -0.0071],
        [ 0.0108,  0.0146,  0.0211,  ...,  0.0420, -0.0225,  0.0074]],
       grad_fn=<SliceBackward0>)


更加高级的微调方式，是对参数进行高效微调
[LoRA论文连接](https://arxiv.org/pdf/2106.09685)
[Pre-Tuning论文连接](https://arxiv.org/abs/2101.00190)

In [None]:
import os
os.environ['HF_ENDPOINT'] = 'https://hf-mirror.com'

import torch
from transformers import BertTokenizer, BertModel
import torch.nn as nn
import torch.optim as optim

# 初始化BERT模型和tokenizer
tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
model = BertModel.from_pretrained("bert-base-uncased")

# 定义LoRA插入函数
class LoRA(nn.Module):
    def __init__(self, input_dim, rank):
        super(LoRA, self).__init__()
        # 定义低秩矩阵
        self.low_rank_left = nn.Parameter(torch.randn(input_dim, rank))# A
        self.low_rank_right = nn.Parameter(torch.randn(rank, input_dim))# B
        self.scaling_factor = 1.0 / (rank ** 0.5)

    def forward(self, x):
        # 低秩矩阵的插入
        lora_update = torch.matmul(self.low_rank_left, self.low_rank_right) * self.scaling_factor
        return x + torch.matmul(x, lora_update)

# 将LoRA应用到模型的encoder层
for layer in model.encoder.layer:
    layer.attention.self.query = LoRA(layer.attention.self.query.in_features, rank=8)

# 定义Prefix Tuning类
class PrefixTuning(nn.Module):
    def __init__(self, model, prefix_length=10, hidden_size=768):
        super(PrefixTuning, self).__init__()
        # 创建前缀向量
        self.prefix_embeddings = nn.Parameter(torch.randn(prefix_length, hidden_size))
        self.prefix_length = prefix_length
        self.hidden_size = hidden_size
        self.model = model

    def forward(self, input_ids, attention_mask):
        # 获取输入嵌入
        original_embeddings = self.model.embeddings(input_ids)
        
        # 将前缀添加到输入
        batch_size = input_ids.size(0)
        prefix_embeddings = self.prefix_embeddings.unsqueeze(0).expand(batch_size, -1, -1)
        modified_embeddings = torch.cat([prefix_embeddings, original_embeddings], dim=1)
        
        # 调整attention mask
        extended_attention_mask = torch.cat([torch.ones(batch_size, self.prefix_length).to(attention_mask.device), attention_mask], dim=1)
        return self.model(inputs_embeds=modified_embeddings, attention_mask=extended_attention_mask)

# 将Prefix Tuning集成到BERT中
prefix_tuning = PrefixTuning(model)
optimizer = optim.Adam(prefix_tuning.parameters(), lr=1e-5)

# 准备示例数据
text = "LoRA and Prefix Tuning are efficient methods for adapting large models."
inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True)
input_ids = inputs["input_ids"]
attention_mask = inputs["attention_mask"]

# 模型训练流程
prefix_tuning.train()
for epoch in range(3):  # 训练3个epoch
    optimizer.zero_grad()
    outputs = prefix_tuning(input_ids=input_ids, attention_mask=attention_mask)
    last_hidden_states = outputs.last_hidden_state
    loss = (last_hidden_states ** 2).mean()
    loss.backward()
    optimizer.step()
    print(f"Epoch {epoch + 1}, Loss: {loss.item()}")

# 测试流程
prefix_tuning.eval()
with torch.no_grad():
    outputs = prefix_tuning(input_ids=input_ids, attention_mask=attention_mask)
    print("Output Embeddings:", outputs.last_hidden_state)



'(MaxRetryError("HTTPSConnectionPool(host='hf-mirror.com', port=443): Max retries exceeded with url: /bert-base-uncased/resolve/main/tokenizer_config.json (Caused by ConnectTimeoutError(<HTTPSConnection(host='hf-mirror.com', port=443) at 0x768082c95640>, 'Connection to hf-mirror.com timed out. (connect timeout=10)'))"), '(Request ID: c9cc11ad-a5f9-403f-917e-fbcee580f4c2)')' thrown while requesting HEAD https://hf-mirror.com/bert-base-uncased/resolve/main/tokenizer_config.json
Retrying in 1s [Retry 1/5].
'(MaxRetryError("HTTPSConnectionPool(host='hf-mirror.com', port=443): Max retries exceeded with url: /bert-base-uncased/resolve/main/tokenizer_config.json (Caused by ConnectTimeoutError(<HTTPSConnection(host='hf-mirror.com', port=443) at 0x768082c94b90>, 'Connection to hf-mirror.com timed out. (connect timeout=10)'))"), '(Request ID: 7599ef6d-d70d-40b5-a8b9-c8933071e81e)')' thrown while requesting HEAD https://hf-mirror.com/bert-base-uncased/resolve/main/tokenizer_config.json
Retrying i

Epoch 1, Loss: 0.3534093499183655
Epoch 2, Loss: 0.2340371161699295
Epoch 3, Loss: 0.30949392914772034
Output Embeddings: tensor([[[-0.5007,  0.0067,  0.4987,  ..., -0.6998, -0.3988,  0.8621],
         [-0.3269,  0.9228, -0.1940,  ..., -0.4193,  0.0240,  0.1988],
         [-0.3774,  0.9690, -0.0234,  ..., -0.3832, -0.2031, -0.0308],
         ...,
         [-0.8675,  0.1132, -0.0237,  ..., -0.6927,  0.0622,  0.7820],
         [-0.7119,  0.6516,  0.4629,  ..., -0.1070,  0.0572,  0.9439],
         [-0.7524,  0.4928,  0.3584,  ..., -1.4589,  0.9578,  0.5290]]])


In [None]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
from transformers import BertTokenizer, BertModel

# 1. 设置环境与设备
os.environ['HF_ENDPOINT'] = 'https://hf-mirror.com'
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 2. 定义 SVD-LoRA 层 (创新点：将权重分解为 P * Sigma * Q)
class SVDLoRALinear(nn.Module):
    def __init__(self, original_layer, rank=8, alpha=16):
        super().__init__()
        self.original_layer = original_layer
        self.original_layer.weight.requires_grad = False  # 冻结原参数
        
        out_features = original_layer.out_features
        in_features = original_layer.in_features
        
        # SVD 分解结构：Delta W = U * diag(S) * V
        self.U = nn.Parameter(torch.randn(out_features, rank) * 0.01)
        self.S = nn.Parameter(torch.ones(rank))  # 奇异值对角线
        self.V = nn.Parameter(torch.randn(rank, in_features) * 0.01)
        
        self.rank = rank
        self.scaling = alpha / rank

    def forward(self, x):
        # 原始权重路径
        original_output = self.original_layer(x)
        
        # SVD 旁路路径: x @ (V.T @ diag(S) @ U.T)
        # 优化计算顺序: (x @ V.T) * S @ U.T
        lora_output = (x @ self.V.t()) * self.S
        lora_output = lora_output @ self.U.t()
        
        return original_output + lora_output * self.scaling

# 3. 定义集成模型 (Prefix-Tuning + SVD-LoRA)
class HybridSVDModel(nn.Module):
    def __init__(self, model_name="bert-base-uncased", prefix_len=10, lora_rank=8):
        super().__init__()
        self.bert = BertModel.from_pretrained(model_name)
        self.prefix_len = prefix_len
        hidden_size = self.bert.config.hidden_size
        
        # 注入 SVD-LoRA 到所有 Query 和 Value 层
        for layer in self.bert.encoder.layer:
            layer.attention.self.query = SVDLoRALinear(layer.attention.self.query, rank=lora_rank)
            layer.attention.self.value = SVDLoRALinear(layer.attention.self.value, rank=lora_rank)
            
        # Prefix Embedding (连续向量)
        self.prefix_embedding = nn.Parameter(torch.randn(prefix_len, hidden_size))
        
        # 任务头 (以分类任务为例)
        self.classifier = nn.Linear(hidden_size, 2) 

    def forward(self, input_ids, attention_mask):
        batch_size = input_ids.shape[0]
        
        # 1. 处理 Prefix
        raw_embeds = self.bert.embeddings(input_ids)
        prefix_embeds = self.prefix_embedding.unsqueeze(0).expand(batch_size, -1, -1)
        inputs_embeds = torch.cat([prefix_embeds, raw_embeds], dim=1)
        
        # 2. 扩展 Attention Mask
        prefix_mask = torch.ones(batch_size, self.prefix_len).to(device)
        full_mask = torch.cat([prefix_mask, attention_mask], dim=1)
        
        # 3. 经过 BERT (含 LoRA 旁路)
        outputs = self.bert(inputs_embeds=inputs_embeds, attention_mask=full_mask)
        
        # 取 [CLS] token 的输出进行分类
        # 注意：由于加了 prefix，[CLS] 的位置现在在 index = self.prefix_len
        cls_output = outputs.last_hidden_state[:, self.prefix_len, :]
        return self.classifier(cls_output)

# 4. 训练准备
tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
model = HybridSVDModel().to(device)

# 只优化 Prefix, LoRA 矩阵 和 Classifier
trainable_params = [
    {'params': [p for n, p in model.named_parameters() if 'U' in n or 'V' in n or 'S' in n or 'prefix' in n or 'classifier' in n]}
]
optimizer = optim.AdamW(trainable_params, lr=1e-4)
criterion = nn.CrossEntropyLoss()

# 示例数据
texts = ["The movie was fantastic!", "I hated this film."]
labels = torch.tensor([1, 0]).to(device)
inputs = tokenizer(texts, return_tensors="pt", padding=True, truncation=True).to(device)

# 5. 训练循环
print("Starting training...")
model.train()
for epoch in range(10):
    optimizer.zero_grad()
    
    logits = model(inputs['input_ids'], inputs['attention_mask'])
    loss = criterion(logits, labels)
    
    # 创新点：添加奇异值稀疏化惩罚 (类似于 AdaLoRA 的正则项)
    # 强制模型只使用最重要的“秩”
    l1_reg = 0.0
    for n, p in model.named_parameters():
        if '.S' in n:
            l1_reg += torch.norm(p, 1)
    
    total_loss = loss + 0.01 * l1_reg
    
    total_loss.backward()
    optimizer.step()
    
    if (epoch + 1) % 2 == 0:
        print(f"Epoch {epoch+1} | Loss: {loss.item():.4f} | Reg: {l1_reg.item():.4f}")

# 6. 观察奇异值 (模型解释性)
print("\nTop 5 Singular Values in Layer 0 Query:")
print(model.bert.encoder.layer[0].attention.self.query.S[:5].detach().cpu().numpy())