In [2]:
import numpy as np

# 自定义softmax函数
def softmax(x, axis=-1):
    e_x = np.exp(x - np.max(x, axis=axis, keepdims=True))  # 防止溢出
    return e_x / np.sum(e_x, axis=axis, keepdims=True)

# 1. 生成输入的嵌入（这里只用简单的数字表示词）
def get_embeddings(vocab_size, d_model):
    return np.random.randn(vocab_size, d_model)

# 2. 生成位置编码 (Positional Encoding)
def get_positional_encoding(seq_len, d_model):
    position = np.arange(seq_len).reshape(-1, 1)
    div_term = np.exp(np.arange(0, d_model, 2) * -(np.log(10000.0) / d_model))
    pe = np.zeros((seq_len, d_model))
    pe[:, 0::2] = np.sin(position * div_term)
    pe[:, 1::2] = np.cos(position * div_term)
    return pe

# 3. 自注意力机制 (Scaled Dot-Product Attention)
def attention(query, key, value):
    d_k = query.shape[-1]
    scores = np.matmul(query, key.transpose(0, 2, 1)) / np.sqrt(d_k)
    weights = softmax(scores, axis=-1)
    return np.matmul(weights, value)

# 4. 前馈神经网络 (Feed Forward Network)
def feed_forward(x, d_ff):
    w1 = np.random.randn(x.shape[-1], d_ff)
    b1 = np.random.randn(d_ff)
    w2 = np.random.randn(d_ff, x.shape[-1])
    b2 = np.random.randn(x.shape[-1])
    
    # relu + 单核卷积(np.dot(x, w1) + b1)
    x = np.maximum(0, np.dot(x, w1) + b1)
    # 单核卷积
    x = np.dot(x, w2) + b2
    return x

# 5. 多头自注意力机制 (Multi-Head Attention)
def multi_head_attention(x, W_q, W_k, W_v, num_heads):
    # 获取每个头的维度
    d_model = x.shape[-1]
    d_k = d_model // num_heads  # 每个头的维度
    
    # 对每个注意力头，分别计算 Q, K, V
    # print(len(x), len(x[0]), len(x[0][0]))
    # print(x)
    
    # print(len(np.dot(x, W_q)), len(np.dot(x, W_q)[0]), len(np.dot(x, W_q)[0][0]))
    queries = np.dot(x, W_q).reshape(x.shape[0], x.shape[1], num_heads, d_k)
    keys = np.dot(x, W_k).reshape(x.shape[0], x.shape[1], num_heads, d_k)
    values = np.dot(x, W_v).reshape(x.shape[0], x.shape[1], num_heads, d_k)

    print(x.shape[0], x.shape[1], num_heads, d_k)
    # print(queries)
    
    # 计算每个头的注意力输出
    heads = []
    for i in range(num_heads):
        head_output = attention(queries[:, :, i, :], keys[:, :, i, :], values[:, :, i, :])
        heads.append(head_output)
    
    # 将每个头的输出拼接起来
    concat_heads = np.concatenate(heads, axis=-1)
    
    # 通过 W_O 线性变换
    W_O = np.random.randn(d_model, d_model)
    return np.dot(concat_heads, W_O)

# 6. 构建 Encoder 层
def encoder_layer(x, W_q, W_k, W_v, d_model, num_heads, d_ff):
    # 多头自注意力机制
    attn_output = multi_head_attention(x, W_q, W_k, W_v, num_heads)
    
    # 残差连接
    x = x + attn_output
    # 层归一化 layer norm
    x = (x - np.mean(x, axis=-1, keepdims=True)) / np.std(x, axis=-1, keepdims=True)
    
    ff_output = feed_forward(x, d_ff)
    
    # 残差连接
    x = x + ff_output
    # 层归一化
    x = (x - np.mean(x, axis=-1, keepdims=True)) / np.std(x, axis=-1, keepdims=True)
    
    return x

# 7. Transformer Encoder
def transformer_encoder(x, W_q, W_k, W_v, d_model, num_heads, d_ff, num_layers):
    seq_len = x.shape[1]
    pos_encoding = get_positional_encoding(seq_len, d_model)
    
    x = x + pos_encoding
    
    for _ in range(num_layers):
        x = encoder_layer(x, W_q, W_k, W_v, d_model, num_heads, d_ff)
    
    return x

# 8. 简单解码器
def simple_decoder(encoder_output, vocab_size, d_model):
    decoder_output = np.dot(encoder_output, np.random.randn(d_model, vocab_size))  # 假设有一个线性层
    return softmax(decoder_output, axis=-1)

# 模拟中文翻译英文

# 假设词汇表大小为 10，嵌入维度为 512，num_heads=8，d_ff=2048，num_layers=6
vocab_size = 10  # 简化，假设词汇表大小为 10
d_model = 512     # 嵌入维度 d_k = d_model // num_heads = 512 // 8 = 64
num_heads = 8    # 自注意力头数
d_ff = 2048      # 前馈神经网络的维度
num_layers = 6   # Transformer 编码层数

# 中文句子：我喜欢学习人工智能
# 对应的英文翻译索引序列：['i', 'like', 'study', 'artificial', 'intelligence']
# 目标输出为 'i like study artificial intelligence'
target_seq = np.array([[5, 6, 7, 8, 9]])  # 模拟英文句子的词汇索引

# 对应的索引序列：[0, 1, 2, 3, 4]
x = np.array([[0, 1, 2, 3, 4]])  # 模拟中文句子的词汇索引

# 获取嵌入表示
embeddings = get_embeddings(vocab_size, d_model)
x_embedded = embeddings[x]

# 初始化线性变换矩阵 W_q, W_k, W_v
W_q = np.random.randn(d_model, d_model)
W_k = np.random.randn(d_model, d_model)
W_v = np.random.randn(d_model, d_model)

# print(len(W_q), len(W_q[0]))

# 运行 Transformer 编码器
encoder_output = transformer_encoder(x_embedded, W_q, W_k, W_v, d_model, num_heads, d_ff, num_layers)

# 打印编码器输出
print(encoder_output)


1 5 8 64
1 5 8 64
1 5 8 64
1 5 8 64
1 5 8 64
1 5 8 64
[[[-0.83330585 -0.35465986 -0.22141275 ... -1.25796649  0.97312025
   -0.0480882 ]
  [-0.83331168 -0.35466639 -0.22141783 ... -1.25796446  0.97310941
   -0.04809868]
  [-0.83330729 -0.35466209 -0.221414   ... -1.25796637  0.97311733
   -0.0480919 ]
  [-0.82366153 -0.34678935 -0.18550624 ... -1.27613497  1.03832769
   -0.05364538]
  [-0.8333002  -0.35465745 -0.22141123 ... -1.25796768  0.97312706
   -0.04808093]]]


In [3]:
# 8. 自注意力机制（解码器的部分，支持多头注意力）
def masked_multi_head_attention(x, W_q, W_k, W_v, num_heads, mask=None):
    # 获取每个头的维度
    d_model = x.shape[-1]
    d_k = d_model // num_heads  # 每个头的维度

    # 对每个注意力头，分别计算 Q, K, V
    queries = np.dot(x, W_q).reshape(x.shape[0], x.shape[1], num_heads, d_k)
    keys = np.dot(x, W_k).reshape(x.shape[0], x.shape[1], num_heads, d_k)
    values = np.dot(x, W_v).reshape(x.shape[0], x.shape[1], num_heads, d_k)
    
    # 计算每个头的注意力输出
    heads = []
    for i in range(num_heads):
        scores = np.matmul(queries[:, :, i, :], keys[:, :, i, :].transpose(0, 2, 1)) / np.sqrt(d_k)
        
        # 应用 Mask，将未来时刻的权重置为负无穷
        if mask is not None:
            scores += (mask * -1e9)
        
        weights = softmax(scores, axis=-1)
        heads.append(np.matmul(weights, values[:, :, i, :]))
    
    # 将每个头的输出拼接起来
    concat_heads = np.concatenate(heads, axis=-1)
    
    # 通过 W_O 线性变换
    W_O = np.random.randn(d_model, d_model)
    return np.dot(concat_heads, W_O)

# 9. 解码器层
def decoder_layer(x, encoder_output, W_q, W_k, W_v, W_cross_q, W_cross_k, W_cross_v, d_model, num_heads, d_ff, look_ahead_mask=None):
    # 自注意力机制
    attn_output = masked_multi_head_attention(x, W_q, W_k, W_v, num_heads, look_ahead_mask)
    x = x + attn_output  # 残差连接
    x = (x - np.mean(x, axis=-1, keepdims=True)) / np.std(x, axis=-1, keepdims=True)  # 层归一化

    # 交叉注意力机制：使用 encoder_output 作为键和值
    cross_attn_output = masked_multi_head_attention(encoder_output, W_cross_q, W_cross_k, W_cross_v, num_heads)
    x = x + cross_attn_output  # 残差连接
    x = (x - np.mean(x, axis=-1, keepdims=True)) / np.std(x, axis=-1, keepdims=True)  # 层归一化

    # 前馈神经网络
    ff_output = feed_forward(x, d_ff)
    x = x + ff_output  # 残差连接
    x = (x - np.mean(x, axis=-1, keepdims=True)) / np.std(x, axis=-1, keepdims=True)  # 层归一化

    return x

# 10. Transformer 解码器
def transformer_decoder(target, encoder_output, W_q, W_k, W_v, W_cross_q, W_cross_k, W_cross_v, d_model, num_heads, d_ff, num_layers):
    seq_len = target.shape[1]
    # print(target.shape)
    pos_encoding = get_positional_encoding(seq_len, d_model)
    
    # 加入位置编码
    target = target + pos_encoding
    
    # 构造 look-ahead mask (用于屏蔽未来的时间步)
    look_ahead_mask = np.triu(np.ones((seq_len, seq_len)), k=1)

    print(look_ahead_mask)
    
    for _ in range(num_layers):
        target = decoder_layer(target, encoder_output, W_q, W_k, W_v, W_cross_q, W_cross_k, W_cross_v, d_model, num_heads, d_ff, look_ahead_mask)
    
    return target

# 11. 完整的 Transformer 模型
def transformer(x, target, vocab_size, d_model, num_heads, d_ff, num_layers):
    # 获取嵌入表示
    embeddings = get_embeddings(vocab_size, d_model)
    x_embedded = embeddings[x]
    target_embedded = embeddings[target]
    print(target_embedded.shape)

    # 初始化线性变换矩阵
    W_q = np.random.randn(d_model, d_model)
    W_k = np.random.randn(d_model, d_model)
    W_v = np.random.randn(d_model, d_model)
    W_cross_q = np.random.randn(d_model, d_model)
    W_cross_k = np.random.randn(d_model, d_model)
    W_cross_v = np.random.randn(d_model, d_model)

    # 编码器输出
    encoder_output = transformer_encoder(x_embedded, W_q, W_k, W_v, d_model, num_heads, d_ff, num_layers)

    # 解码器输出
    decoder_output = transformer_decoder(target_embedded, encoder_output, W_q, W_k, W_v, W_cross_q, W_cross_k, W_cross_v, d_model, num_heads, d_ff, num_layers)

    # 映射到词汇表
    logits = np.dot(decoder_output, embeddings.T)
    probabilities = softmax(logits, axis=-1)

    return probabilities

# 示例运行
x = np.array([[0, 1, 2, 3, 4]])  # 中文输入序列
target = np.array([[5, 6, 7, 8, 9]])  # 英文目标序列 (shifted right)

output = transformer(x, target, vocab_size, d_model, num_heads, d_ff, num_layers)
print(output)

# 选择最大概率的词汇索引
output_index = np.argmax(output, axis=-1)
output_index

(1, 5, 512)
1 5 8 64
1 5 8 64
1 5 8 64
1 5 8 64
1 5 8 64
1 5 8 64
[[0. 1. 1. 1. 1.]
 [0. 0. 1. 1. 1.]
 [0. 0. 0. 1. 1.]
 [0. 0. 0. 0. 1.]
 [0. 0. 0. 0. 0.]]
[[[3.59618322e-13 1.81701812e-38 1.48694452e-22 5.58737314e-23
   2.87964283e-20 7.33349586e-24 1.00000000e+00 3.40895063e-23
   2.15758869e-11 1.59383056e-34]
  [3.59618322e-13 1.81701812e-38 1.48694451e-22 5.58737315e-23
   2.87964283e-20 7.33349586e-24 1.00000000e+00 3.40895064e-23
   2.15758868e-11 1.59383056e-34]
  [3.59618322e-13 1.81701812e-38 1.48694451e-22 5.58737315e-23
   2.87964283e-20 7.33349586e-24 1.00000000e+00 3.40895064e-23
   2.15758868e-11 1.59383056e-34]
  [3.59618322e-13 1.81701812e-38 1.48694451e-22 5.58737315e-23
   2.87964283e-20 7.33349586e-24 1.00000000e+00 3.40895064e-23
   2.15758868e-11 1.59383056e-34]
  [3.59618322e-13 1.81701812e-38 1.48694451e-22 5.58737315e-23
   2.87964283e-20 7.33349586e-24 1.00000000e+00 3.40895064e-23
   2.15758868e-11 1.59383056e-34]]]


array([[6, 6, 6, 6, 6]], dtype=int64)