### 词嵌入（Word Embedding）-词袋模型（CBOW）

In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim


class CBOW(nn.Module):
    def __init__(self, vocab_size, embedding_dim):
        """
        CBOW 模型的初始化函数。

        参数：
        vocab_size: 词汇表的大小
        embedding_dim: 词嵌入的维度
        """
        super(CBOW, self).__init__()
        self.embeddings = nn.Embedding(vocab_size, embedding_dim)
        self.linear = nn.Linear(embedding_dim, vocab_size)

    def forward(self, inputs):
        """
        前向传播函数。

        参数：
        inputs: 输入的上下文单词的索引张量，形状为 (batch_size, context_size)

        返回：
        输出张量，形状为 (batch_size, vocab_size)
        """
        print('a', self.embeddings(inputs).shape)
        embeds = torch.sum(self.embeddings(inputs), dim=1)  # 对上下文单词的嵌入向量求和
        print(embeds.shape)
        out = self.linear(embeds)  # 通过线性层得到输出
        return out


# 示例数据
context_size = 4  # 上下文窗口大小
vocab_size = 10  # 词汇表大小
embedding_dim = 5  # 词嵌入维度
sentence = ["I", "love", "to", "eat", "apples", "and", "bananas"]
word_to_ix = {word: i for i, word in enumerate(set(sentence))}  # 构建单词到索引的映射
print(word_to_ix)

def make_context_vector(context, word_to_ix):
    """
    创建上下文向量。

    参数：
    context: 上下文单词列表
    word_to_ix: 单词到索引的映射

    返回：
    上下文单词的索引张量，形状为 (1, context_size)
    """
    idxs = [word_to_ix[w] for w in context]
    return torch.tensor([idxs], dtype=torch.long)


# 训练 CBOW 模型
model = CBOW(vocab_size, embedding_dim)
loss_function = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001)

# 训练循环
for epoch in range(1):
    total_loss = 0
    for i in range(len(sentence) - context_size):
        # 输入的上下文
        print('i:',i)
        context = [sentence[i: i + context_size][j] for j in range(len(sentence[i: i + context_size])) if j!= context_size // 2]
        
        target = torch.tensor([word_to_ix[sentence[i + context_size // 2]]], dtype=torch.long)
        print(context,target)
        context_vector = make_context_vector(context, word_to_ix)
        print(context_vector)
        optimizer.zero_grad()
        log_probs = model(context_vector)
        loss = loss_function(log_probs, target)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    if (epoch + 1) % 10 == 0:
        print(f"Epoch {epoch + 1}, Loss: {total_loss}")


# 测试
test_context = ["love", "apples"]
test_context_vector = make_context_vector(test_context, word_to_ix)
with torch.no_grad():
    log_probs = model(test_context_vector)
    predicted_index = torch.argmax(log_probs).item()
    print(1,word_to_ix.items(),predicted_index)
    print(2, [k for k, v in word_to_ix.items() if v == predicted_index])
    predicted_word = [k for k, v in word_to_ix.items() if v == predicted_index][0]
    print(f"预测的中心词: {predicted_word}")

{'I': 0, 'eat': 1, 'to': 2, 'bananas': 3, 'and': 4, 'love': 5, 'apples': 6}
i: 0
['I', 'love', 'eat'] tensor([2])
tensor([[0, 5, 1]])
a torch.Size([1, 3, 5])
torch.Size([1, 5])
i: 1
['love', 'to', 'apples'] tensor([1])
tensor([[5, 2, 6]])
a torch.Size([1, 3, 5])
torch.Size([1, 5])
i: 2
['to', 'eat', 'and'] tensor([6])
tensor([[2, 1, 4]])
a torch.Size([1, 3, 5])
torch.Size([1, 5])
a torch.Size([1, 2, 5])
torch.Size([1, 5])
1 dict_items([('I', 0), ('eat', 1), ('to', 2), ('bananas', 3), ('and', 4), ('love', 5), ('apples', 6)]) 2
2 ['to']
预测的中心词: to


### 词嵌入（Word Embedding）-跳字模型（Skip-Gram）

In [39]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np


class SkipGram(nn.Module):
    def __init__(self, vocab_size, embedding_dim):
        """
        SkipGram 模型的初始化函数。

        参数:
        vocab_size: 词汇表的大小
        embedding_dim: 词嵌入的维度
        """
        super(SkipGram, self).__init__()
        self.embeddings = nn.Embedding(vocab_size, embedding_dim)
        self.linear = nn.Linear(embedding_dim, vocab_size)

    def forward(self, input_word):
        """
        前向传播函数。

        参数:
        input_word: 输入单词的索引张量，形状为 (batch_size)

        返回:
        输出张量，形状为 (batch_size, vocab_size)
        """
        embeds = self.embeddings(input_word)  # 将输入单词的索引转换为嵌入向量
        out = self.linear(embeds)  # 通过线性层得到输出
        print('out', out.shape)
        return out


def get_skip_gram_pairs(sentence, window_size):
    """
    从句子中获取 Skip-Gram 训练对。

    参数:
    sentence: 输入的句子，是一个单词列表
    window_size: 窗口大小

    返回:
    输入中心词和上下文单词的索引对列表
    """
    pairs = []
    for i in range(len(sentence)):
        center_word = sentence[i]
        for j in range(max(0, i - window_size), min(len(sentence), i + window_size + 1)):
            if i!= j:
                pairs.append((sentence[i], sentence[j]))
    print('----------------------')
    print(pairs)
    return pairs


def prepare_data(sentence):
    """
    准备数据，将单词映射为索引。

    参数:
    sentence: 输入的句子，是一个单词列表

    返回:
    单词到索引的映射字典，以及将句子中的单词转换为索引的列表
    """
    word_to_ix = {word: i for i, word in enumerate(set(sentence))}
    indices = [word_to_ix[word] for word in sentence]
    print('-------------------')
    print(word_to_ix, indices)
    return word_to_ix, indices


# 示例数据
sentence = ["I", "love", "to", "eat", "apples", "and", "bananas"]
window_size = 4
vocab_size = len(set(sentence))
embedding_dim = 5

# 准备数据
word_to_ix, indices = prepare_data(sentence)
pairs = get_skip_gram_pairs(sentence, window_size)


# 初始化 SkipGram 模型
model = SkipGram(vocab_size, embedding_dim)
loss_function = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001)


# 训练循环
for epoch in range(1):
    total_loss = 0
    for center_word, context_word in pairs:
        center_index = torch.tensor([word_to_ix[center_word]], dtype=torch.long)
        context_index = torch.tensor([word_to_ix[context_word]], dtype=torch.long)
        optimizer.zero_grad()
        log_probs = model(center_index)
        print(center_index,context_index)
        loss = loss_function(log_probs, context_index)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    if (epoch + 1) % 10 == 0:
        print(f"Epoch {epoch + 1}, Loss: {total_loss}")


# 测试
test_word = "love"
test_index = torch.tensor([word_to_ix[test_word]], dtype=torch.long)
with torch.no_grad():
    log_probs = model(test_index)
    predicted_index = torch.argmax(log_probs).item()
    predicted_word = [k for k, v in word_to_ix.items() if v == predicted_index][0]
    print(f"预测的上下文单词: {predicted_word}")

-------------------
{'I': 0, 'eat': 1, 'to': 2, 'bananas': 3, 'and': 4, 'love': 5, 'apples': 6} [0, 5, 2, 1, 6, 4, 3]
----------------------
[('I', 'love'), ('I', 'to'), ('I', 'eat'), ('I', 'apples'), ('love', 'I'), ('love', 'to'), ('love', 'eat'), ('love', 'apples'), ('love', 'and'), ('to', 'I'), ('to', 'love'), ('to', 'eat'), ('to', 'apples'), ('to', 'and'), ('to', 'bananas'), ('eat', 'I'), ('eat', 'love'), ('eat', 'to'), ('eat', 'apples'), ('eat', 'and'), ('eat', 'bananas'), ('apples', 'I'), ('apples', 'love'), ('apples', 'to'), ('apples', 'eat'), ('apples', 'and'), ('apples', 'bananas'), ('and', 'love'), ('and', 'to'), ('and', 'eat'), ('and', 'apples'), ('and', 'bananas'), ('bananas', 'to'), ('bananas', 'eat'), ('bananas', 'apples'), ('bananas', 'and')]
out torch.Size([1, 7])
tensor([0]) tensor([5])
out torch.Size([1, 7])
tensor([0]) tensor([2])
out torch.Size([1, 7])
tensor([0]) tensor([1])
out torch.Size([1, 7])
tensor([0]) tensor([6])
out torch.Size([1, 7])
tensor([5]) tensor([0

### 词嵌入（Word Embedding）-GloVe

In [2]:
import numpy as np
from scipy.sparse import lil_matrix
from scipy.sparse.linalg import svds


def build_co_occurrence_matrix(sentences, vocab_size, window_size):
    """
    构建共现矩阵。

    参数:
    sentences: 输入的句子列表，每个句子是一个单词列表
    vocab_size: 词汇表的大小
    window_size: 共现窗口大小

    返回:
    共现矩阵
    """
    co_occurrence_matrix = lil_matrix((vocab_size, vocab_size), dtype=np.float32)
    word_to_index = {}
    index = 0
    for sentence in sentences:
        for i, word in enumerate(sentence):
            if word not in word_to_index:
                word_to_index[word] = index
                index += 1
            center_index = word_to_index[word]
            for j in range(max(0, i - window_size), min(len(sentence), i + window_size + 1)):
                if i!= j:
                    context_word = sentence[j]
                    if context_word not in word_to_index:
                        word_to_index[context_word] = index
                        index += 1
                    context_index = word_to_index[context_word]
                    co_occurrence_matrix[center_index, context_index] += 1
    return co_occurrence_matrix


def glove_embedding(co_occurrence_matrix, embedding_dim):
    """
    使用 GloVe 方法生成词嵌入。

    参数:
    co_occurrence_matrix: 共现矩阵
    embedding_dim: 词嵌入的维度

    返回:
    词嵌入矩阵
    """
    # 加 1 是为了避免 log(0)
    log_co_occurrence_matrix = np.log(co_occurrence_matrix + 1)
    U, _, Vt = svds(log_co_occurrence_matrix, embedding_dim)
    word_embeddings = U + Vt.T
    return word_embeddings


# 示例数据
sentences = [["I", "love", "to", "eat", "apples"],
            ["I", "like", "bananas", "and", "apples"]]
vocab_size = len(set([word for sentence in sentences for word in sentence]))
window_size = 2
embedding_dim = 5

# 构建共现矩阵
co_occurrence_matrix = build_co_occurrence_matrix(sentences, vocab_size, window_size)
print("共现矩阵:")
# print(co_occurrence_matrix)
print(co_occurrence_matrix.toarray())

# 生成词嵌入
word_embeddings = glove_embedding(co_occurrence_matrix, embedding_dim)
print("词嵌入矩阵:")
print(word_embeddings)

共现矩阵:
[[0. 1. 1. 0. 0. 1. 1. 0.]
 [1. 0. 1. 1. 0. 0. 0. 0.]
 [1. 1. 0. 1. 1. 0. 0. 0.]
 [0. 1. 1. 0. 1. 0. 0. 0.]
 [0. 0. 1. 1. 0. 0. 1. 1.]
 [1. 0. 0. 0. 0. 0. 1. 1.]
 [1. 0. 0. 0. 1. 1. 0. 1.]
 [0. 0. 0. 0. 1. 1. 1. 0.]]


NotImplementedError: adding a nonzero scalar to a sparse matrix is not supported

##### 共现矩阵的形状
##### 共现矩阵的形状是 (vocab_size, vocab_size)，在这个例子中，vocab_size 是根据输入句子中的不同单词数量计算得到的。对于输入的句子 sentences = [["I", "love", "to", "eat", "apples"], ["I", "like", "bananas", "and", "apples"]]，我们有以下单词：["I", "love", "to", "eat", "apples", "like", "bananas", "and"]，所以 vocab_size = 8，因此共现矩阵是一个 8x8 的矩阵。矩阵元素 [i, j] 表示单词 i 和单词 j 的共现次数。

##### 词嵌入矩阵的输出
##### word_embeddings 是通过对共现矩阵进行奇异值分解（SVD）并处理得到的矩阵，其元素是浮点数，代表每个单词的词嵌入向量的元素。word_embeddings 的形状是 (vocab_size, embedding_dim)

##### P-tuning

In [9]:
from transformers import GPT2LMHeadModel, GPT2Tokenizer, AutoModelForSequenceClassification, AutoTokenizer, Trainer, TrainingArguments
import torch


# 加载预训练的 GPT-2 模型和 tokenizer
model = GPT2LMHeadModel.from_pretrained('gpt2')
tokenizer = GPT2Tokenizer.from_pretrained('gpt2')
print(model.config)
# 获取词汇表的部分结果
vocab_part = dict(list(tokenizer.get_vocab().items())[:30])
print(vocab_part)
print(len(tokenizer.get_vocab()))
tokenizer.add_special_tokens({'pad_token': '[PAD]'})


# 假设我们有训练数据和标签
train_texts = ["This is a positive sentence.", "This is a negative sentence."]
train_labels = [1, 0]


# 将文本转换为输入 id
train_encodings = tokenizer(train_texts, truncation=True, padding=True, return_tensors='pt')


# 为 prompt 生成可训练的嵌入参数
def create_prompt_embeddings(input_ids):
    prompt_embeddings = torch.nn.Embedding(input_ids.size(1), model.config.n_embd)
    print(prompt_embeddings.num_embeddings)
    return prompt_embeddings


# 定义一个简单的分类模型，包含 P-tuning 部分
class P_tuningModel(torch.nn.Module):
    def __init__(self, base_model, prompt_embeddings):
        super().__init__()
        self.base_model = base_model
        self.prompt_embeddings = prompt_embeddings


    def forward(self, input_ids):
        prompt_input_ids = torch.arange(self.prompt_embeddings.num_embeddings).unsqueeze(0).to(input_ids.device)
        prompt_embeds = self.prompt_embeddings(prompt_input_ids)
        input_embeds = self.base_model.transformer.wte(input_ids)
        # 拼接 prompt 嵌入和输入嵌入
        combined_embeds = torch.cat([prompt_embeds, input_embeds], dim=1)
        outputs = self.base_model(inputs_embeds=combined_embeds)
        logits = outputs.logits
        return logits


# 创建可训练的 prompt 嵌入
prompt_embeddings = create_prompt_embeddings(train_encodings.input_ids)
print(prompt_embeddings)
p_tuning_model = P_tuningModel(model, prompt_embeddings)


# 定义训练参数
training_args = TrainingArguments(
    output_dir='./results',
    num_train_epochs=3,
    per_device_train_batch_size=1,
    per_device_eval_batch_size=1,
    warmup_steps=500,
    weight_decay=0.01,
    logging_dir='./logs',
)


# 定义 Trainer
trainer = Trainer(
    model=p_tuning_model,
    args=training_args,
    train_dataset=(train_encodings.input_ids,train_labels)
    tokenizer=tokenizer
)

# 开始训练
trainer.train()




GPT2Config {
  "_name_or_path": "gpt2",
  "activation_function": "gelu_new",
  "architectures": [
    "GPT2LMHeadModel"
  ],
  "attn_pdrop": 0.1,
  "bos_token_id": 50256,
  "embd_pdrop": 0.1,
  "eos_token_id": 50256,
  "initializer_range": 0.02,
  "layer_norm_epsilon": 1e-05,
  "model_type": "gpt2",
  "n_ctx": 1024,
  "n_embd": 768,
  "n_head": 12,
  "n_inner": null,
  "n_layer": 12,
  "n_positions": 1024,
  "reorder_and_upcast_attn": false,
  "resid_pdrop": 0.1,
  "scale_attn_by_inverse_layer_idx": false,
  "scale_attn_weights": true,
  "summary_activation": null,
  "summary_first_dropout": 0.1,
  "summary_proj_to_labels": true,
  "summary_type": "cls_index",
  "summary_use_proj": true,
  "task_specific_params": {
    "text-generation": {
      "do_sample": true,
      "max_length": 50
    }
  },
  "transformers_version": "4.36.2",
  "use_cache": true,
  "vocab_size": 50257
}

{'!': 0, '"': 1, '#': 2, '$': 3, '%': 4, '&': 5, "'": 6, '(': 7, ')': 8, '*': 9, '+': 10, ',': 11, '-': 12, '

TypeError: Accelerator.__init__() got an unexpected keyword argument 'dispatch_batches'

In [2]:
from transformers import GPT2LMHeadModel, GPT2Tokenizer


# 加载预训练的 GPT-2 模型和 tokenizer
model = GPT2LMHeadModel.from_pretrained('gpt2')
tokenizer = GPT2Tokenizer.from_pretrained('gpt2')


text = "Hello, world!"
input_ids = tokenizer.encode(text, return_tensors='pt')
print(input_ids)


# 将输入的 token 编码解码回子词
decoded_tokens = tokenizer.convert_ids_to_tokens(input_ids[0])
print(decoded_tokens)

tensor([[15496,    11,   995,     0]])
['Hello', ',', 'Ġworld', '!']


#### 大模型 Finetune

In [None]:
from transformers import BertTokenizer, BertForSequenceClassification, Trainer, TrainingArguments
import torch


# 加载预训练的 BERT 模型和 tokenizer
model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=2)
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

'''
# 冻结 BERT 主体的参数
for name, param in model.named_parameters():
    if 'classifier' not in name:
        param.requires_grad = False
'''

# 假设我们有训练数据和标签
train_texts = ["This is a positive sentence.", "This is a negative sentence."]
train_labels = torch.tensor([1, 0])


# 将文本转换为输入 id
train_encodings = tokenizer(train_texts, truncation=True, padding=True, return_tensors='pt')


# 定义训练参数
training_args = TrainingArguments(
    output_dir='./results',
    num_train_epochs=3,
    per_device_train_batch_size=1,
    per_device_eval_batch_size=1,
    warmup_steps=500,
    weight_decay=0.01,
    logging_dir='./logs',
)


# 定义 Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=(train_encodings.input_ids,train_labels),
    tokenizer=tokenizer
)


# 开始训练
trainer.train()

##### 分词算法

In [3]:
from transformers import BertTokenizer


# 加载预训练的 BERT 分词器
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')


# 输入文本
text = "This is an example sentence."


# 分词
tokens = tokenizer.tokenize(text)


print(tokens)


# 将分词结果转换为输入 id
input_ids = tokenizer.convert_tokens_to_ids(tokens)


print(input_ids)


# 直接将文本转换为输入 id 并添加特殊标记和填充
encoded_input = tokenizer(text, padding=True, truncation=True, max_length=128, return_tensors='pt')


print(encoded_input)

['this', 'is', 'an', 'example', 'sentence', '.']
[2023, 2003, 2019, 2742, 6251, 1012]
{'input_ids': tensor([[ 101, 2023, 2003, 2019, 2742, 6251, 1012,  102]]), 'token_type_ids': tensor([[0, 0, 0, 0, 0, 0, 0, 0]]), 'attention_mask': tensor([[1, 1, 1, 1, 1, 1, 1, 1]])}


##### 多次输入时使用记忆输入

In [9]:
import torch
from transformers import BertTokenizer, BertModel


tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
model = BertModel.from_pretrained('bert-base-uncased')


# 存储历史输入的嵌入
history_embeddings = []


def process_input(current_input):
    # 对当前输入进行编码
    encoded_input = tokenizer(current_input, padding=True, truncation=True, max_length=128, return_tensors='pt')
    # 获取当前输入的嵌入
    with torch.no_grad():
        output = model(input_ids=encoded_input.input_ids, attention_mask=encoded_input.attention_mask)
        current_embedding = output.last_hidden_state[:, 0, :]  # 取 [CLS] 标记的嵌入表示
        print(1,current_embedding.shape)
    # 拼接历史嵌入和当前嵌入
    if len(history_embeddings) > 0:
        combined_embedding = torch.cat(history_embeddings + [current_embedding], dim=0)
    else:
        combined_embedding = current_embedding
    print(2,combined_embedding.shape)
    
    history_embeddings.append(current_embedding)
    return combined_embedding


# 多次输入示例
current_input_1 = "This is an example sentence."
combined_embedding_1 = process_input(current_input_1)


current_input_2 = "This is another example."
combined_embedding_2 = process_input(current_input_2)

current_input_3 = "This is another example!."
combined_embedding_3 = process_input(current_input_3)
print(3,combined_embedding_1.shape)
print(4,combined_embedding_2.shape)
print(5,combined_embedding_3.shape)



1 torch.Size([1, 768])
2 torch.Size([1, 768])
1 torch.Size([1, 768])
2 torch.Size([2, 768])
1 torch.Size([1, 768])
2 torch.Size([3, 768])
3 torch.Size([1, 768])
4 torch.Size([2, 768])
5 torch.Size([3, 768])


In [None]:
import sentencepiece as spm


# 训练 BPE 模型
def train_bpe_model(input_file, model_prefix, vocab_size):
    spm.SentencePieceTrainer.train(
        input=input_file, 
        model_prefix=model_prefix, 
        vocab_size=vocab_size, 
        model_type='bpe'
    )


# 加载 BPE 模型
def load_bpe_model(model_file):
    sp = spm.SentencePieceProcessor()
    sp.load(model_file)
    return sp


# 对文本进行分词
def tokenize_with_bpe(sp, text):
    return sp.encode_as_pieces(text)


# 示例使用
input_file = 'corpus.txt'  # 你的语料库文件
model_prefix = 'bpe_model'
vocab_size = 3000


# 训练 BPE 模型
train_bpe_model(input_file, model_prefix, vocab_size)


# 加载 BPE 模型
sp = load_bpe_model(model_prefix + '.model')


# 对文本进行分词
text = "This is an example sentence."
tokens = tokenize_with_bpe(sp, text)
print(tokens)

In [None]:
import jieba


# 精确模式分词
def jieba_tokenize_exact(text):
    return jieba.cut(text, cut_all=False)


# 全模式分词
def jieba_tokenize_full(text):
    return jieba.cut(text, cut_all=True)


# 搜索引擎模式分词
def jieba_tokenize_search(text):
    return jieba.cut_for_search(text)


# 示例使用
text = "我爱自然语言处理"


print("精确模式分词:", list(jieba_tokenize_exact(text)))
print("全模式分词:", list(jieba_tokenize_full(text)))
print("搜索引擎模式分词:", list(jieba_tokenize_search(text)))

In [None]:
import os
from pyltp import Segmentor


# 加载分词模型
LTP_DIR = "path/to/ltp_data"  # 请替换为你自己的 LTP 数据路径
cws_model_path = os.path.join(LTP_DIR, 'cws.model')
segmentor = Segmentor()
segmentor.load(cws_model_path)


# 分词
def ltp_tokenize(text):
    return segmentor.segment(text)


# 示例使用
text = "我爱自然语言处理"
tokens = ltp_tokenize(text)
print(list(tokens))


# 释放资源
segmentor.release()

##### 常见分词算法
- 最简单的分词方法，根据空格将文本拆分成单词。
- 基于字符的分词（Character Tokenization）
原理：
将文本拆分成单个字符，每个字符作为一个 token。
优点是可以处理未登录词（OOV）问题，因为任何文本都可以拆分成字符。
缺点是生成的 token 序列较长，增加了序列长度，可能需要更多的计算资源，且丢失了词语级别的语义信息。
- 基于规则的分词（Rule-based Tokenization）
原理：
使用预定义的规则对文本进行分词，例如对于中文，可以使用基于词典的规则，将文本按词语拆分。
- 统计分词（Statistical Tokenization）
原理：
使用统计信息，如词频、互信息等，将文本拆分成最可能的词语。例如，使用 n-gram 模型，根据 n-gram 的频率信息来确定词的边界。
- 子词分词（Subword Tokenization）
原理：
与 BPE 和 WordPiece 类似，将文本拆分成子词，但可能使用不同的算法。例如，Unigram 语言模型分词是一种子词分词方法

##### 对于预训练的大模型，BPE 和 WordPiece 等算法被广泛使用