# P450Diffusion Paddle 实现
本Notebook复现了P450Diffusion扩散模型用于P450蛋白序列设计的过程，基于PaddlePaddle框架实现。

## 环境准备
安装所需的Python依赖包，包括PaddlePaddle和BioPython。

In [None]:
!pip install paddlepaddle biopython

## 数据预处理
从FASTA文件中加载蛋白质序列，并将其转化为整数索引形式，以便输入模型。

In [None]:
import paddle
import numpy as np
from Bio import SeqIO

AA = 'ACDEFGHIKLMNPQRSTVWY'
aa2idx = {a: i+1 for i, a in enumerate(AA)}
aa2idx['-'] = 0

def load_fasta(path, max_len=500):
    seqs = []
    for rec in SeqIO.parse(path, 'fasta'):
        s = str(rec.seq)
        idx = [aa2idx.get(a, 0) for a in s[:max_len]]
        idx += [0] * (max_len - len(idx))
        seqs.append(idx)
    return np.array(seqs, dtype='int64')

# 示例数据路径（需要用户替换）
# train_seqs = load_fasta('p450_complete.fasta')
# ft_seqs = load_fasta('p450_F6H_subset.fasta')


## 模型定义
定义基于Transformer的扩散模型结构，包括序列嵌入、时间步嵌入与输出投影模块。

In [None]:
import paddle.nn as nn

class DiffusionModel(nn.Layer):
    def __init__(self, vocab_size=21, emb_size=128, hidden_size=256, n_layers=4):
        super().__init__()
        self.embed = nn.Embedding(vocab_size, emb_size, padding_idx=0)
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=emb_size, nhead=4, dim_feedforward=hidden_size)
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=n_layers)
        self.time_emb = nn.Sequential(
            nn.Linear(1, emb_size),
            nn.ReLU(),
            nn.Linear(emb_size, emb_size))
        self.proj = nn.Linear(emb_size, vocab_size)

    def forward(self, x, t):
        emb = self.embed(x)
        te = self.time_emb(t[:, None])[:, None, :]
        h = emb + te
        h = h.transpose([1, 0, 2])
        h = self.transformer(h)
        h = h.transpose([1, 0, 2])
        logits = self.proj(h)
        return logits


## 训练过程
定义正向扩散过程与训练损失函数，执行模型预训练与微调流程。

In [None]:
import paddle.optimizer as optim
from paddle.nn.functional import cross_entropy

model = DiffusionModel()
optimizer = optim.Adam(parameters=model.parameters(), learning_rate=1e-4)

def q_noise(x0, t):
    noise = paddle.randn(x0.shape, dtype='float32')
    return (1 - t) * x0 + t * noise

def train_epoch(data, model, optimizer, batch_size=32):
    model.train()
    total_loss = 0
    for i in range(0, len(data), batch_size):
        batch = paddle.to_tensor(data[i:i+batch_size], dtype='int64')
        t = paddle.rand([batch.shape[0]]).astype('float32')
        x0 = nn.functional.one_hot(batch, num_classes=21).astype('float32')
        xt = q_noise(x0, t)
        logits = model(batch, t)
        loss = cross_entropy(logits.reshape([-1,21]), batch.flatten())
        loss.backward()
        optimizer.step()
        optimizer.clear_grad()
        total_loss += loss.numpy()[0]
    return total_loss / (len(data) / batch_size)


## 微调训练：固定关键位点
通过mask避免5个关键位点的噪声扰动，实现知识约束生成。

In [None]:
fixed_pos = [113, 122, 219, 247, 316]
fixed_aa = [aa2idx[a] for a in ['T','F','A','M','A']]

def mask_loss(logits, tgt):
    logits_ = logits.clone()
    for pos, aa in zip(fixed_pos, fixed_aa):
        logits_[:, pos, :] = -1e9
    return cross_entropy(logits_.reshape([-1,21]), tgt.flatten())


## 生成新序列
从随机噪声出发，利用微调后的模型进行序列采样生成。

In [None]:
def sample(model, seq_len=500, steps=50):
    model.eval()
    x = paddle.randn([1, seq_len, 21])
    for i in range(steps, 0, -1):
        t = paddle.full([1], i / steps, dtype='float32')
        logits = model(x.argmax(axis=-1), t)
        x = paddle.nn.functional.one_hot(logits.argmax(axis=-1), num_classes=21).astype('float32')
    seq = x.numpy().argmax(axis=-1)[0]
    return ''.join([AA[i - 1] if i > 0 else '-' for i in seq])

# print("生成序列样例：", sample(model))


## 序列分析与可视化（建议）
统计生成序列中关键位点是否匹配，绘制序列保守性logo或相似性分布图。