# MiniGPT 中文文本生成小Demo
本Notebook演示了一个基于PyTorch和Transformer的简易中文文本生成模型MiniGPT的完整流程，包括数据预处理、模型定义、训练与推理。

## 1. 依赖库安装
如未安装PyTorch、tqdm、gradio等依赖，请先运行下方命令。

In [None]:
# 安装依赖（如已安装可跳过）
!pip install torch tqdm gradio

: 

## 2. 数据与配置准备
准备语料文件、配置文件和词表。

In [1]:
# 查看语料文件内容
with open('data/corpus.txt', 'r', encoding='utf-8') as f:
    print(f.read())

今天天气很好
深度学习
深度学习是人工智能的重要技术
自然语言处理是人工智能的核心领域
预训练模型改变了自然语言处理的范式
注意力机制是Transformer架构的核心
牛肉是一种水果
我爱吃牛肉，他是一种水果
您好，我是阿里云的客服专员，就是看您注册了阿里云的账号嘛，来电是想问一下您是否会使用云产品呢？
好啊，那您是自己用来学习吗还是公司用来部署业务啊？
哦，那您注册账号是有什么其他的需求吗，比如了解一下云产品或者将来可能会用到呢？
好吧，那您后续有问题打我们的热线95187吧，账号您最好做一下认证，认证链接会通过短信发您。我就先挂了哈，再见！


In [2]:
# 查看配置文件内容
import json
with open('config.json', 'r', encoding='utf-8') as f:
    print(json.load(f))

{'batch_size': 64, 'lr': 0.001, 'epochs': 100, 'window_size': 5}


In [3]:
# 查看词表内容
with open('vocab.txt', 'r', encoding='utf-8') as f:
    for i, line in enumerate(f):
        if i > 20:
            print('...')
            break
        print(line.strip())

<pad>
<bos>
<eos>
<unk>
1
5
7
8
9
T
a
e
f
m
n
o
r
s
。
一
下
...


## 3. 数据预处理与编码工具
定义词表构建与文本转索引的工具函数。

In [4]:
# utils/encode.py
# 构建词表和文本转索引
# ...existing code from encode.py...
def build_vocab(corpus):
    words = []
    for text in corpus:
        words.extend(list(text))
    vocab = list(set(words))
    vocab = sorted(vocab)
    vocab = ['<pad>', '<bos>', '<eos>', '<unk>'] + vocab  # 添加特殊token
    word2idx = {w:i for i,w in enumerate(vocab)}
    # 保存词表到vocab.txt
    with open('vocab.txt', 'w', encoding='utf-8') as f:
        for w in vocab:
            f.write(w + '\n')
    return vocab, word2idx

# 数据预处理（转换为索引）
def text_to_indices(text, word2idx):
    return [word2idx['<bos>']] + [word2idx.get(w, word2idx['<unk>']) for w in list(text)] + [word2idx['<eos>']]

## 4. 模型定义
定义MiniGPT模型结构。

In [5]:
# models.py
import torch.nn as nn
import torch

class MiniGPT(nn.Module):
    def __init__(self, vocab_size, d_model=64, nhead=2, num_layers=1):
        super().__init__()
        self.embed = nn.Embedding(vocab_size, d_model)
        self.decoder = nn.TransformerDecoder(
            nn.TransformerDecoderLayer(
                d_model=d_model,
                nhead=nhead,
                dim_feedforward=d_model*4
            ),
            num_layers=num_layers
        )
        self.fc = nn.Linear(d_model, vocab_size)

    def forward(self, x):
        x = self.embed(x)  # (seq_len, batch_size, d_model)
        tgt_mask = nn.Transformer().generate_square_subsequent_mask(x.size(0)).to(x.device)
        # 构造 dummy memory
        memory = torch.zeros(x.size(0), x.size(1), x.size(2), device=x.device)
        out = self.decoder(
            tgt=x,
            memory=memory,
            tgt_mask=tgt_mask
        )
        return self.fc(out)

## 5. 训练脚本
数据加载、训练集构建、模型训练与保存。

In [None]:
# train.py
import torch
import torch.nn as nn
import torch.optim as optim
from torch.nn import functional as F
import math
from models import MiniGPT
import json
from torch.utils.data import TensorDataset, DataLoader
from utils.encode import build_vocab, text_to_indices
from tqdm import tqdm
from torch.nn.utils.rnn import pad_sequence

# ================== 数据准备 ==================
corpus = [w.strip() for w in open("data/corpus.txt",'r',encoding="utf8").readlines()]

vocab, word2idx = build_vocab(corpus)
vocab_size = len(vocab)
print(f"词表大小: {vocab_size}")

# ================== 超参数配置 ==================
with open('config.json', 'r', encoding='utf-8') as f:
    config = json.load(f)
window_size = config['window_size']

# 创建训练数据（输入和目标）
input_seqs = []
target_seqs = []
for text in corpus:
    indices = text_to_indices(text, word2idx)
    if len(indices) < window_size + 1: 
        padding = [word2idx['<pad>']] * (window_size + 1 - len(indices))
        indices += padding
    for i in range(0, len(indices) - window_size):
        input_seqs.append(indices[i:i+window_size])
        target_seqs.append(indices[i+1:i+1+window_size])
print(input_seqs[0])
print(target_seqs[0])
print(input_seqs[1])
print(target_seqs[1])

# ================== 训练设置 ==================
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = MiniGPT(vocab_size).to(device)
optimizer = optim.Adam(model.parameters(), lr=config['lr'])
criterion = nn.CrossEntropyLoss()

# ================== 数据集与DataLoader封装 ==================
input_tensor = [torch.tensor(seq, dtype=torch.long) for seq in input_seqs]
target_tensor = [torch.tensor(seq, dtype=torch.long) for seq in target_seqs]

input_tensor = pad_sequence(input_tensor, batch_first=True, padding_value=word2idx['<pad>'])
target_tensor = pad_sequence(target_tensor, batch_first=True, padding_value=word2idx['<pad>'])
dataset = TensorDataset(input_tensor, target_tensor)
dataloader = DataLoader(dataset, batch_size=config['batch_size'], shuffle=True)

# ================== 训练循环 ==================
def train():
    model.train()
    for epoch in tqdm(range(config['epochs'])):
        total_loss = 0
        for batch_inputs, batch_targets in dataloader:
            inputs = batch_inputs.transpose(0, 1).to(device)
            targets = batch_targets.transpose(0, 1).to(device)
            optimizer.zero_grad() 
            output = model(inputs)  
            output = output.reshape(-1, vocab_size)
            targets = targets.reshape(-1)
            loss = criterion(output, targets)
            loss.backward()  # 反向传播，计算梯度
            optimizer.step()  # 更新参数
            total_loss += loss.item()  # 累加loss
        print(f'Epoch {epoch+1}, Loss: {total_loss/len(dataloader):.4f}') 
    torch.save(model.state_dict(), 'ckpt/minigpt.pt')
    print("模型已保存到 ckpt/minigpt.pt")

train()

## 6. 推理与Gradio界面
加载模型并提供文本生成和相似度计算的Web界面。

In [None]:
# inference.py
import torch
import torch.nn as nn
import torch.optim as optim
from torch.nn import functional as F
import math
from models import MiniGPT
import json
from torch.utils.data import TensorDataset, DataLoader
from utils.encode import build_vocab, text_to_indices
import gradio as gr

with open('vocab.txt', 'r', encoding='utf-8') as f:
    vocab = [w.strip() for w in f.readlines()]
word2idx = {w: i for i, w in enumerate(vocab)}
vocab_size = len(vocab)
print(f"词表大小: {vocab_size}")

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = MiniGPT(vocab_size).to(device)
model.load_state_dict(torch.load('ckpt/minigpt.pt', map_location=device))
model.eval()

def generate(prompt, max_len=20, temperature=1.0, top_k=0, top_p=0):
    model.eval()
    with torch.no_grad():
        input_indices = text_to_indices(prompt,word2idx)[:-1]
        inputs = torch.LongTensor(input_indices).unsqueeze(1).to(device)
        for _ in range(max_len):
            output = model(inputs)
            logits = output[-1, 0, :] / temperature
            if top_k > 0:
                indices_to_remove = logits < torch.topk(logits, top_k)[0][..., -1, None]
                logits[indices_to_remove] = -float('Inf')
            if top_p > 0:
                sorted_logits, sorted_indices = torch.sort(logits, descending=True)
                cumulative_probs = torch.cumsum(F.softmax(sorted_logits, dim=-1), dim=-1)
                sorted_indices_to_remove = cumulative_probs > top_p
                sorted_indices_to_remove[..., 1:] = sorted_indices_to_remove[..., :-1].clone()
                sorted_indices_to_remove[..., 0] = 0
                logits[sorted_indices[sorted_indices_to_remove]] = -float('Inf')
            probs = F.softmax(logits, dim=-1)
            next_token = torch.multinomial(probs, num_samples=1)
            inputs = torch.cat([inputs, next_token.unsqueeze(0)], dim=0)
            if next_token.item() == word2idx['<eos>']:
                break
        output_indices = inputs.squeeze(1).cpu().tolist()
        return ' '.join([vocab[idx] for idx in output_indices])

def gradio_generate(prompt, max_len, temperature, top_k, top_p):
    return generate(prompt, max_len=max_len, temperature=temperature, top_k=top_k, top_p=top_p)

import numpy as np

def get_text_embedding(text):
    model.eval()
    with torch.no_grad():
        input_indices = text_to_indices(text, word2idx)[:-1]
        input_tensor = torch.LongTensor(input_indices).to(device)
        embeddings = model.embed(input_tensor)
        sent_vec = embeddings.mean(dim=0)
        return sent_vec.cpu().numpy()

def cosine_similarity(text1, text2):
    vec1 = get_text_embedding(text1)
    vec2 = get_text_embedding(text2)
    sim = (vec1 @ vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2) + 1e-8)
    return float(sim)

def gradio_similarity(text1, text2):
    sim = cosine_similarity(text1, text2)
    return f"余弦相似度: {sim:.4f}"

with gr.Blocks() as demo:
    with gr.Tab("文本生成"):
        gr.Markdown("# MiniGPT 文本生成 Demo")
        with gr.Row():
            with gr.Column():
                prompt = gr.Textbox(label="输入", value="")
                max_len = gr.Slider(5, 100, value=20, step=1, label="最大生成长度")
                temperature = gr.Slider(0.1, 2.0, value=1.0, step=0.05, label="Temperature")
                top_k = gr.Slider(0, 20, value=0, step=1, label="Top-k (0为不启用)")
                top_p = gr.Slider(0, 1, value=0, step=0.01, label="Top-p (0为不启用)")
                btn = gr.Button("生成")
            with gr.Column():
                output = gr.Textbox(label="生成结果")
        btn.click(fn=gradio_generate, inputs=[prompt, max_len, temperature, top_k, top_p], outputs=output)

demo.launch()