In [390]:
import torch
from torch.nn import Parameter
import numpy as np
import torch.nn.functional as F

# MoE的主要步骤

MoE主要是将输入的token表示根据特定的路由方式，路由到不同的专家上去，以期望每个token能够选择到适合自己的专家。因此其核心的过程主要分为如下几步：

## 求解token应该找哪个专家
那么第一步，先把输入按数据并行的方式输入。例如\[bs, seq, d_model\]的输入logits，可以得到\[n_cores, to]

## 按照选择的专家，将输入表征从以卡的维度转换到专家维度(AllToAll)产生的来源。

- dispatch tensor。依赖dispatch tensor将token进行划分
- combine tensor输出概率


##  将每个专家的输出乘以概率，得到最终输出

# 定义
在计算开始前，我们先阐释下变量的定义。

In [391]:
n_cores = 2 # 卡数
tokens_per_core = 4 # 每张卡有多少bs
expert_num = 2 # 专家的数量
d_model=1 # 向量的隐藏层维度
expert_capacity =3 # 每个专家能够处理的token数目，表示专家的Bs

## 输入
一般来讲，MoE的输入维度为[bs, seq_length, d_model]。在此我们先随机初始化一个输入。为了方便观察在MoE中的路由机制，输入是怎么重新排布的，我们将输入截取到小数点后两位。

In [392]:
inputs = torch.tensor(
np.around(np.random.random((n_cores, tokens_per_core, d_model)), 2))
print(inputs)

tensor([[[0.9200],
         [0.2700],
         [0.2100],
         [0.6600]],

        [[0.5800],
         [0.6900],
         [0.7200],
         [0.5000]]], dtype=torch.float64)


# 路由机制

## 求解每个token应该选择哪些专家

首先根据输入，经过一个Linear层求得输入的每个token在每个专家上的概率。然后通过topk求得概率最高的几个专家。


$$
logits = inputs*weight \\ 
pro = softmax(logits) \\
pro, index = topk(pro)  \\
$$



In [356]:
logits = torch.softmax(torch.nn.Linear(d_model, expert_num)(inputs.type(torch.float32)), axis=-1)
logits = logits.type(torch.float64)
expert_gate, expert_index = logits.topk(1)

## 将超过专家自己容量的token进行丢弃

因为每个专家拥有自己的batch size，所以需要通过累计求和的方式，记录每个专家到第k个token为止，已经被多少个token选择的。需要通过mask的方式，将超过专家自己容量的token丢弃。

具体的实现步骤如下：

### 记录每个专家选择了哪几个token

此处的expert_index的shape为[num_cores, tokens_per_token, 1]。表示每个token选择了第几个专家。

In [357]:
expert_index = expert_index.squeeze(-1)

In [358]:
print("", expert_gate.shape, expert_index.shape, expert_index)

 torch.Size([2, 4, 1]) torch.Size([2, 4]) tensor([[1, 1, 1, 1],
        [1, 1, 1, 1]])


In [359]:
expert_mask = F.one_hot(expert_index, expert_num)

In [360]:
print(expert_mask.shape)
print(expert_mask)

torch.Size([2, 4, 2])
tensor([[[0, 1],
         [0, 1],
         [0, 1],
         [0, 1]],

        [[0, 1],
         [0, 1],
         [0, 1],
         [0, 1]]])


### 按专家的维度，累计每个专家token选择的数量

既然已经通过one_hot后的expert_mask知道了每个token选择了第几个专家，那么只需要按tokens_per_core的维度，对one_hot的索引进行累计求和，即可获得每个专家被索引的数量。

在此处引入了position_in_expert。position_in_expert描述了每个token选择的。表示每张卡，到第k个token为止，每个专家已经被选择的个数。那为什么乘以自己的mask呢？以token的角度，只需要看到自己选择的专家已经有的token数就好了。

In [361]:
position_in_expert = torch.cumsum(expert_mask, 1)*expert_mask

In [362]:
position_in_expert

tensor([[[0, 1],
         [0, 2],
         [0, 3],
         [0, 4]],

        [[0, 1],
         [0, 2],
         [0, 3],
         [0, 4]]])

我们为每个专家选择的tokens数目不能超过一定的上限，因此需要将他们进行限制。具体的做法就是先求每个专家已经拥有的token数目，如果从第k个token开始，某个专家的token数目已经超过了expert_capacity,那么对应的token当前选择的专家就会被置为0。

**从这里推断，目前论文中给的伪代码实现有些问题，就是如果当前设置的容量过小，那么顺序先的的token就会排到某个专家。**

In [363]:
expert_mask = expert_mask * torch.less(position_in_expert, expert_capacity)

In [397]:
print(expert_mask)
print(expert_mask.shape)

tensor([[[0, 1],
         [0, 1],
         [0, 0],
         [0, 0]],

        [[0, 1],
         [0, 1],
         [0, 0],
         [0, 0]]])
torch.Size([2, 4, 2])


expert_mask是独热编码表示，因此超过上限时，对应的值为0，就可以起到越界清零的作用。

In [393]:
expert_mask_flat = torch.sum(expert_mask, -1, keepdim=True)

In [399]:
expert_mask_flat

tensor([[[1],
         [1],
         [0],
         [0]],

        [[1],
         [1],
         [0],
         [0]]])

## 计算combine tensor
combine tensor是用来结合专家的输出和对应的路由概率的，对应的shape为
\[num_cores, tokens_per_core, num_experts, expert_capacity\]。
因为我们已经获得了每个token选择的专家概率以及对应的专家(top1)，所以需要将得到对应的专家的概率，并且将没选择到的专家置为0

In [368]:
print("Expert gate shape", expert_gate.shape)
print("Expert mask flatter shape", expert_mask_flat.shape)
print("expert_index shape:", expert_index.shape)
print("Position in expert shape:", position_in_expert.shape)

Expert gate shape torch.Size([2, 4, 1])
Expert mask flatter shape torch.Size([2, 4, 1])
expert_index shape: torch.Size([2, 4])
Position in expert shape: torch.Size([2, 4, 2])


expert_index表示每个token选择了第几个专家。

In [369]:
expert_index

tensor([[1, 1, 1, 1],
        [1, 1, 1, 1]])

先计算每个专家mask掉的词。对应的计算方法是输出*被mask掉的词

expert_gate * expert_mask_flat表示每个专家的概率*有效的token。即超过每个专家bs的token概率会被清零。

紧接着再* F.one_hot(expert_index, expert_num))， 表示将概率乘以独热编码的形式

表示token中选中的expert，以及对应的概率。其他不相关的experter专家数目会变成0。这样就形成了top1。

In [400]:
expert_outputs = \
(expert_gate * expert_mask_flat * F.one_hot(expert_index, expert_num))
expert_outputs.shape
print(expert_outputs)

tensor([[[0.0000, 0.8233],
         [0.0000, 0.7984],
         [0.0000, 0.0000],
         [0.0000, 0.0000]],

        [[0.0000, 0.8272],
         [0.0000, 0.8373],
         [0.0000, 0.0000],
         [0.0000, 0.0000]]], dtype=torch.float64, grad_fn=<MulBackward0>)


## CombinedTensor的含义

shape [num_cores, tokens_per_core, num_experts, expert_capacity]

表示的是从token视角到专家视角后，每个token选择的专家概率。

为什么还要把专家的输出乘以postion_in_experts?expert_mask是为了掩码掉超过每个专家的bs的输出的。因此将超过的置为0。

In [401]:
expert_mask.tolist()

[[[0, 1], [0, 1], [0, 0], [0, 0]], [[0, 1], [0, 1], [0, 0], [0, 0]]]

In [402]:
position_in_expert.tolist()

[[[0, 1], [0, 2], [0, 3], [0, 4]], [[0, 1], [0, 2], [0, 3], [0, 4]]]

In [403]:
masked_position_in_expert = position_in_expert*expert_mask

In [404]:
combined_tensor = expert_outputs.unsqueeze(-1)* \
F.one_hot(masked_position_in_expert, expert_capacity)
print(combined_tensor.shape)
# 表示 [num_cores, tokens_per_core, num_experts, expert_capacity]
pprint(combined_tensor.tolist())

torch.Size([2, 4, 2, 3])
[[[[0.0, 0.0, 0.0], [0.0, 0.8232826590538025, 0.0]],
  [[0.0, 0.0, 0.0], [0.0, 0.0, 0.7984247207641602]],
  [[0.0, 0.0, 0.0], [0.0, 0.0, 0.0]],
  [[0.0, 0.0, 0.0], [0.0, 0.0, 0.0]]],
 [[[0.0, 0.0, 0.0], [0.0, 0.8271830677986145, 0.0]],
  [[0.0, 0.0, 0.0], [0.0, 0.0, 0.837250828742981]],
  [[0.0, 0.0, 0.0], [0.0, 0.0, 0.0]],
  [[0.0, 0.0, 0.0], [0.0, 0.0, 0.0]]]]


In [405]:
#print(combined_tensor.shape)
pprint(combined_tensor.tolist())

[[[[0.0, 0.0, 0.0], [0.0, 0.8232826590538025, 0.0]],
  [[0.0, 0.0, 0.0], [0.0, 0.0, 0.7984247207641602]],
  [[0.0, 0.0, 0.0], [0.0, 0.0, 0.0]],
  [[0.0, 0.0, 0.0], [0.0, 0.0, 0.0]]],
 [[[0.0, 0.0, 0.0], [0.0, 0.8271830677986145, 0.0]],
  [[0.0, 0.0, 0.0], [0.0, 0.0, 0.837250828742981]],
  [[0.0, 0.0, 0.0], [0.0, 0.0, 0.0]],
  [[0.0, 0.0, 0.0], [0.0, 0.0, 0.0]]]]


## dispatch tensor
dispatch_tensor的是从conbined_tensor中转换而来的，存在值的cast为True

In [376]:
dispatch_tensor = combined_tensor.type(torch.bool)
pprint(dispatch_tensor.tolist())

[[[[False, False, False], [False, True, False]],
  [[False, False, False], [False, False, True]],
  [[False, False, False], [False, False, False]],
  [[False, False, False], [False, False, False]]],
 [[[False, False, False], [False, True, False]],
  [[False, False, False], [False, False, True]],
  [[False, False, False], [False, False, False]],
  [[False, False, False], [False, False, False]]]]


# SwitchLayer
论文中描述，是通过一个大的batchmul乘法将输出assign给对应的专家的。即根据上面routing获得的mask表示，将输入以专家的维度去进行重新排位置。那怎么进行重新排位置呢？这里的实现使用一个BatchMatmul矩阵实现的。

为什么需要这样一个转换？因为在进入MoE层之前，tensor的视角是每卡有多少token的，即[num_cores, tokens_per_core]。而MoE层的处理视角是按专家维度的，对应的视角为[num_experts, tokens_per_experts(expert_capacity)]。所以需要一个从两种映射维度之间的转换。这个转换就是Mask矩阵。

dispatch_tensor

>[num_cores, tokens_per_core, num_experts, expert_capacity]
>[i, j, k, n]

和输入inputs 

>[num_cores, tokens_per_core, d_model]
>[i, j, a]

输出为 : 

>[num_experts, num_cores, expert_capacity, d_model]
>[k, i, n, a]

## 一个小例子
如果我由一个[0.1, 0.2, 0.3]的一个小矩阵，那么怎么把它重新弄成[0.3, 0.2, 0.1]的一个矩阵呢？有下述的几种做法

### 利用MatMul的累加性质进行重新排布

即 [1, 3] x [3, 3] -> [1, 3]。利用矩阵的累加机制，进行logits的重新排位

In [377]:
x = torch.tensor([[0.1, 0.2, 0.3]]).type(torch.float32)
mask = torch.tensor([
    [0, 0, 1],
    [0, 1, 0],
    [1, 0, 0],
]).type(torch.float32)
print(x.shape)
print(mask.shape)

torch.Size([1, 3])
torch.Size([3, 3])


In [378]:
out = torch.einsum('ij,jk->ik', x, mask)

In [379]:
print(out)

tensor([[0.3000, 0.2000, 0.1000]])


In [380]:
### 利用gather操作
x = torch.tensor([[0.1, 0.2, 0.3]]).type(torch.float32)
mask = torch.tensor([[2, 1, 0]]).type(torch.int64)
print(x.shape)
print(mask.shape)

torch.Size([1, 3])
torch.Size([1, 3])


In [381]:
out = torch.gather(x, 1, mask)

In [382]:
print(out)

tensor([[0.3000, 0.2000, 0.1000]])


## 论文中实现版本：回到输入的重排布

通过修改dispatch_tensor的值，怎么发现里面会对同一个专家的相同位置进行求和的？

In [383]:
dispatch_tensor = combined_tensor.type(torch.bool).type(torch.float64)
# dispatch_tensor[0,1, 0, 1]=0
# dispatch_tensor[0,2, 1, 2]=1
expert_inputs = torch.einsum('ijkn,ija->kina', dispatch_tensor, inputs)

In [384]:
# 输入的inputs
pprint(inputs.shape)
pprint(inputs.tolist())

torch.Size([2, 4, 1])
[[[0.45], [0.27], [0.82], [0.23]], [[0.48], [0.56], [0.54], [0.75]]]


In [385]:
# 输出的mask矩阵
pprint(dispatch_tensor.shape)
dispatch_tensor.tolist()

torch.Size([2, 4, 2, 3])


[[[[0.0, 0.0, 0.0], [0.0, 1.0, 0.0]],
  [[0.0, 0.0, 0.0], [0.0, 0.0, 1.0]],
  [[0.0, 0.0, 0.0], [0.0, 0.0, 0.0]],
  [[0.0, 0.0, 0.0], [0.0, 0.0, 0.0]]],
 [[[0.0, 0.0, 0.0], [0.0, 1.0, 0.0]],
  [[0.0, 0.0, 0.0], [0.0, 0.0, 1.0]],
  [[0.0, 0.0, 0.0], [0.0, 0.0, 0.0]],
  [[0.0, 0.0, 0.0], [0.0, 0.0, 0.0]]]]

In [386]:
pprint(expert_inputs.shape)
pprint(expert_inputs.tolist())
print(expert_inputs)

torch.Size([2, 2, 3, 1])
[[[[0.0], [0.0], [0.0]], [[0.0], [0.0], [0.0]]],
 [[[0.0], [0.45], [0.27]], [[0.0], [0.48], [0.56]]]]
tensor([[[[0.0000],
          [0.0000],
          [0.0000]],

         [[0.0000],
          [0.0000],
          [0.0000]]],


        [[[0.0000],
          [0.4500],
          [0.2700]],

         [[0.0000],
          [0.4800],
          [0.5600]]]], dtype=torch.float64)


In [341]:
torch.sum(inputs)

tensor(2.1000, dtype=torch.float64)

### 利用gather实现输入重排布

主要思路就是，利用position_in_expert，将inputs [num_cores, tokens_per_core, d_model]转换为

In [276]:
# 专家数 [num_cores, tokens_per_core, num_experts]
print(masked_position_in_expert.shape)
pprint(masked_position_in_expert.tolist())

torch.Size([4, 1])
[[1], [1], [1], [1]]


In [277]:
# 输入的inputs
# [num_cores, tokens_per_core, d_model]
pprint(inputs.shape)
pprint(inputs.tolist())

torch.Size([1, 4, 1])
[[[0.36], [0.96], [0.42], [0.36]]]


In [278]:
?torch.Tensor.scatter_

In [240]:
out_empty = torch.zeros((2, 2, 3, 2)).type(torch.float64)

In [241]:
masked_position_in_expert.shape

torch.Size([2, 4, 1])

In [147]:
masked_position_in_expert.unsqueeze(-1)

tensor([[[[1, 1],
          [0, 0]],

         [[0, 0],
          [1, 1]],

         [[0, 0],
          [2, 2]],

         [[0, 0],
          [0, 0]]],


        [[[0, 0],
          [1, 1]],

         [[0, 0],
          [2, 2]],

         [[1, 1],
          [0, 0]],

         [[2, 2],
          [0, 0]]]])

In [148]:
torch.scatter(inputs, 2, masked_position_in_expert.unsqueeze(-1).expand(2, 4, 2, 2), out_empty)

RuntimeError: Index tensor must have the same number of dimensions as self tensor

In [109]:
out = torch.gather(inputs, 1, masked_position_in_expert)
print(out.shape)

torch.Size([2, 4, 2])
