In [5]:
import torch
from torch.nn.functional import embedding

device ="cpu"
if torch.backends.mps.is_available():
    device = "mps" # mac 上使用的
if torch.cuda.is_available():
    device = "cuda" # N卡使用
print(f'using {device} device')

using mps device


In [9]:
from dataclasses import dataclass

@dataclass
class GPTConfig:  # 全局配置
    window_size: int = 512   #  上下文长度
    vocab_size: int = 50257 # 词汇表大小
    embd_size: int = 768  #  嵌入的向量纬度
    block_num: int = 12  # transformer block 重复数
    head_num: int = 12  # 多头注意力机制 头数
    head_size: int = int(embd_size/head_num) # 多头注意力每个头嵌入向量的纬度
    dropout: float = 0.1  # 失活率

conf = GPTConfig()
print('conf:',conf)

conf: GPTConfig(window_size=512, vocab_size=50257, embd_size=768, block_num=12, head_num=12, head_size=64, dropout=0.1)


In [3]:
import tiktoken

encoding = tiktoken.get_encoding("gpt2")
encode = encoding.encode("............. deepseek")
raw = encoding.decode(encode)
print('encode:',encode)
print('raw:',raw)
print(len('...................................'))

encode: [44274, 2769, 36163]
raw: ............. deepseek
35


In [65]:
from torch.utils.data import Dataset,DataLoader

class MyDataset(Dataset):
    def __init__(self, path):
        self.encoding = tiktoken.get_encoding("gpt2") # 获取 gpt2 的文本编码器

        # 获取分词器的文本结束特殊标记
        eot_token = self.encoding.encode("<|endoftext|>",allowed_special={"<|endoftext|>"})
        # 按行读取文本编码为 token 每行使用  endoftext token 标记
        encoded = []
        with open(path, 'r') as file:
            for line in file:
                line = line.strip()
                encoded.extend(self.encoding.encode(line) + eot_token)

        # 将长文本分割成训练样本  每个最长上下文是 window_size 步长是 window_size
        self.encoded_data = []
        for i in range(0, len(encoded), conf.window_size):
            # 多取一个 Token 作为目标
            chunk = encoded[i:i + conf.window_size + 1] # python 不会越界，取不到就不取了
            # 如果长度不够，用 eos_token 填充
            if len(chunk) < conf.window_size + 1:
                chunk = chunk + eot_token * (conf.window_size + 1 - len(chunk))
            self.encoded_data.append(chunk)

    def __len__(self):
        return len(self.encoded_data)

    def __getitem__(self, idx):
        chunk = self.encoded_data[idx]
        x = torch.tensor(chunk[:-1], dtype=torch.long) # 输入 前512 当训练数据
        y = torch.tensor(chunk[1:], dtype=torch.long) # 输出 后512 当标签
        return x, y

    def encode(self, text):
        return self.encoding.encode(text)

    def decode(self, ids):
        return self.encoding.decode(ids)

dataset = MyDataset('input.txt')
data_loader = DataLoader(dataset, batch_size=12, shuffle=True)
print('dataset_size',len(dataset),'loader_size',len(data_loader))
x,y = next(iter(data_loader))
print('x:',x[0][:10])
print('y:',y[0][:10])

dataset_size 10 loader_size 1
x: tensor([  257,  3200,  2861,  8208,   286,   465,     0,   314, 37901,   379])
y: tensor([ 3200,  2861,  8208,   286,   465,     0,   314, 37901,   379,   262])


In [24]:
import math
import torch.nn.functional as F
from torch.nn import Module,Embedding,Sequential,Linear,LayerNorm,Dropout,ModuleList,GELU

class SingleHeadAttention(Module):
    def __init__(self):
        super().__init__()
        self.key = Linear(conf.embd_size, conf.head_size)
        self.value = Linear(conf.embd_size, conf.head_size)
        self.query = Linear(conf.embd_size, conf.head_size)
        # 下三角矩阵防止注意力关注到未来的 token
        self.attention_mask=torch.tril(torch.ones(conf.window_size, conf.window_size, device=device))
        self.dropout = Dropout(conf.dropout)

    def forward(self, x):
        k = self.key(x) # batch,window_size,head_size
        v = self.value(x) # batch,window_size,head_size
        q = self.query(x) # batch,window_size,head_size
        weight = q @ k.transpose(-2, -1)  # batch window_size,window_size 第一纬是批量反转最后两纬 矩阵乘法获取权重
        seq_len = x.shape[1]
        # 一定要在 softmax 前除以 sqrt(head_size)
        weight = weight.masked_fill(
            self.attention_mask[:seq_len, :seq_len] == 0, # 长度未必每次都达到最长 512 防止 weight 越界
            float('-inf') # 使用负无穷 softmax 后会变成 0
        ) / math.sqrt(conf.embd_size)
        weight = F.softmax(weight, dim=-1) # 忽略 batch 每一层 weight 位于 0～1 和为 1
        weight = self.dropout(weight)
        out = weight @ v
        return out

class MultiHeadAttention(Module):
    def __init__(self):
        super().__init__()
        self.heads = ModuleList([SingleHeadAttention() for _ in range(conf.head_num)])
        self.proj = Linear(conf.embd_size, conf.embd_size)

    def forward(self, x):
        output = torch.cat([head(x) for head in self.heads],dim=-1)
        output = self.proj(output)
        return output

class FeedForward(Module):
    def __init__(self):
        super().__init__()
        self.l1 =  Linear(conf.embd_size, 4 * conf.embd_size) # 升纬
        self.gelu = GELU()
        self.l2 = Linear(4 * conf.embd_size, conf.embd_size) # 降纬

    def forward(self, x):
        x = self.l1(x)
        x = self.gelu(x)
        x = self.l2(x)
        return x

class Block(Module):
    def __init__(self):
        super().__init__()
        self.ln1 = LayerNorm(conf.embd_size)
        self.att = MultiHeadAttention()
        self.dropout1 = Dropout(conf.dropout)
        self.ln2 = LayerNorm(conf.embd_size)
        self.ffn = FeedForward()
        self.dropout2 = Dropout(conf.dropout)

    def forward(self, x):
        x = x + self.dropout1(self.att(self.ln1(x)))
        x = x + self.dropout2(self.ffn(self.ln2(x)))
        return x

class GPT(Module):
    def __init__(self):
        super().__init__()
        self.token_embedding = Embedding(conf.vocab_size, conf.embd_size)
        self.position_embedding = Embedding(conf.window_size, conf.embd_size)
        self.blocks = Sequential(*[Block() for _ in range(conf.block_num)])
        self.layer_norm = LayerNorm(conf.embd_size)
        self.output = Linear(conf.embd_size, conf.vocab_size, bias=False)

    def forward(self, tokens):
        # 获取嵌入编码与位置编码
        token_emb = self.token_embedding(tokens)
        seq_len = tokens.shape[1]
        pos_emb = self.position_embedding(torch.arange(seq_len, device=device))
        x = token_emb + pos_emb
        x = self.blocks(x)
        x = self.layer_norm(x) # 映射为每个词汇的概率
        return self.output(x)

gpt = GPT()
gpt.to(device)
total_params = sum(p.numel() for p in gpt.parameters())
print('gpt:',gpt,'total_params:',total_params)
# ones =torch.ones([3,3])
# print('ones:',ones)
# print('tril:',torch.tril(ones))

gpt: GPT(
  (token_embedding): Embedding(50257, 768)
  (position_embedding): Embedding(512, 768)
  (blocks): Sequential(
    (0): Block(
      (ln1): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
      (att): MultiHeadAttention(
        (heads): ModuleList(
          (0-11): 12 x SingleHeadAttention(
            (key): Linear(in_features=768, out_features=64, bias=True)
            (value): Linear(in_features=768, out_features=64, bias=True)
            (query): Linear(in_features=768, out_features=64, bias=True)
            (dropout): Dropout(p=0.1, inplace=False)
          )
        )
        (proj): Linear(in_features=768, out_features=768, bias=True)
      )
      (dropout1): Dropout(p=0.1, inplace=False)
      (ln2): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
      (ffn): FeedForward(
        (l1): Linear(in_features=768, out_features=3072, bias=True)
        (gelu): GELU(approximate='none')
        (l2): Linear(in_features=3072, out_features=768, bias=True)
   

In [23]:
embedding = Embedding(3,5)
res = embedding(torch.arange(0,3))
print('res:',res)

res: tensor([[ 1.0240, -0.0236, -0.2139,  1.5125,  0.3323],
        [-1.4283, -0.7845,  1.8703, -1.2259, -0.4763],
        [ 2.1260,  0.9019,  1.2129,  0.1504, -0.5682]],
       grad_fn=<EmbeddingBackward0>)


In [11]:
temp = torch.tril(torch.ones(5,5))
print('temp:',temp)

temp: tensor([[1., 0., 0., 0., 0.],
        [1., 1., 0., 0., 0.],
        [1., 1., 1., 0., 0.],
        [1., 1., 1., 1., 0.],
        [1., 1., 1., 1., 1.]])


In [17]:
temp = torch.ones([5,5])
print('temp:',temp)

temp: tensor([[1., 1., 1., 1., 1.],
        [1., 1., 1., 1., 1.],
        [1., 1., 1., 1., 1.],
        [1., 1., 1., 1., 1.],
        [1., 1., 1., 1., 1.]])


In [18]:
dropout = Dropout(0.5)
res= dropout(temp)
print('res:',res)

res: tensor([[2., 2., 0., 0., 2.],
        [0., 2., 2., 0., 0.],
        [0., 0., 2., 2., 0.],
        [2., 0., 2., 0., 0.],
        [2., 2., 0., 0., 0.]])


In [91]:
def gpt_gen(input,length=10):
    tokens = torch.tensor(dataset.encode(input),device=device)
    with torch.no_grad():
        for _ in range(length):
            res = gpt(tokens.view(1,-1)) # 批次为 1
            res = res[0,-1] # 批量训练的 只要第一个   句子也是批量训练的只要最后一个
            probs = F.softmax(res,dim=0)
            id_next = torch.multinomial(probs, num_samples=1) # 通过概率随机采样一个
            tokens = torch.cat((tokens, id_next), dim=0) # 附加新的 token 这里超出  512 上下文长度会出问题
    tokens =tokens.tolist()
    return dataset.decode(tokens)

print(gpt_gen("hello world"))

hello world strateg Wales1111ug hexocker HOMELC foldingrative


In [25]:
from torch.optim import AdamW

# gpt.load_state_dict(torch.load("gpt.pt",weights_only=True)) # 加载参数
optimizer = AdamW(gpt.parameters(), lr=3e-4)
epoch = 50
loss_arr =[]
for epoch in range(epoch):
    gpt.train() # 训练模式下  dropout 等函数会生效
    total_loss = 0
    for batch_idx, (x, y) in enumerate(data_loader):
        # 转移数据
        x = x.to(device)
        y = y.to(device)
        # 前向传播
        prob = gpt(x)
        prob = prob.view(-1, conf.vocab_size) # 输出每个字符下的概率
        y = y.view(prob.shape[0])     # 实际内容
        loss = F.cross_entropy(prob, y) # 计算交叉熵
        # 反向传播
        optimizer.zero_grad()
        loss.backward()
        optimizer.step() # 调整参数
        total_loss += loss.item()
    print('epoch:',epoch,'total_loss:',total_loss)

# torch.save(gpt.state_dict(),"gpt.pt") # 保存参数

NameError: name 'data_loader' is not defined

In [None]:
print(gpt_gen("hello world"))

In [None]:
import matplotlib.pyplot as plt

plt.plot(range(epoch+1), loss_arr)
plt.xlabel('epoch')
plt.ylabel('loss')
plt.grid(True)