In [None]:
import random
class MoE_Activation_Stat:

    def __init__(self, stage, n_input_tokens, n_experts=1, experts_per_token=1):
        self.stage = stage
        self.n_input_tokens = n_input_tokens
        self.n_experts = n_experts
        self.experts_per_token = experts_per_token
        self.expert_token_count = {}  # 存储每个专家被激活的token数量 {expert_id: token_count}
        
    def get_activated_experts(self, seed=None):
        """
        计算每个token随机激活experts_per_token个专家后，返回每个激活专家及其被激活的token数量
        
        Args:
            seed: 随机种子，用于可重复性
            
        Returns:
            expert_token_count: 字典，{expert_id: token_count}，表示每个激活的专家被多少个token激活
        """
        if seed is not None:
            random.seed(seed)
        
        self.expert_token_count.clear()
        
        # 为每个token随机选择experts_per_token个专家
        for token_id in range(self.n_input_tokens):
            # 从n_experts个专家中随机选择experts_per_token个（不重复）
            selected_experts = random.sample(range(self.n_experts), self.experts_per_token)
            # 统计每个专家被多少个token激活
            for expert_id in selected_experts:
                self.expert_token_count[expert_id] = self.expert_token_count.get(expert_id, 0) + 1
        
        # 返回每个激活专家及其被激活的token数量
        return self.expert_token_count.copy()
moe=MoE_Activation_Stat(1,12,12,2).get_activated_experts()
print(moe)
for token_count, expert_id in moe.items():
    print(token_count, expert_id)


In [None]:
import numpy as np
import matplotlib.pyplot as plt

def simulate_power_law_activation(n_numbers=600, n_activate=10, alpha=1.0, n_simulations=2000):
    """
    模拟幂律分布激活
    
    参数:
    - n_numbers: 总数
    - n_activate: 每次激活数量
    - alpha: 幂律指数 (越大，分布越不均匀)
    - n_simulations: 模拟次数
    """
    # 生成幂律分布的概率
    indices = np.arange(1, n_numbers + 1)
    probabilities = indices ** (-alpha)
    probabilities = probabilities / probabilities.sum()  # 归一化
    
    results = []
    activation_counts = np.zeros(n_numbers)
    
    for _ in range(n_simulations):
        # 不放回抽样，模拟每次激活
        activated = np.random.choice(
            n_numbers, 
            size=n_activate, 
            replace=False, 
            p=probabilities
        )
        results.append(activated)
        activation_counts[activated] += 1
    
    return results, activation_counts, probabilities

def simulate_zipf_activation(n_numbers=600, n_activate=10, s=1.001, n_simulations=2000):
    """
    使用Zipf分布模拟激活
    Zipf分布更符合自然界的幂律现象
    """
    # 生成Zipf分布
    # a = np.random.zipf(s, 100000)  # 生成大量样本
    # a = a[a <= n_numbers]  # 过滤超出范围的
    
    # # 估计概率分布
    # unique, counts = np.unique(a, return_counts=True)
    # probabilities = np.zeros(n_numbers)
    # probabilities[unique-1] = counts[:n_numbers] / counts.sum()
    
    # 如果有些位置概率为0，给一个很小的概率
    # zero_mask = probabilities == 0
    # probabilities[zero_mask] = 1e-8
    # probabilities = probabilities / probabilities.sum()
    from scipy.stats import zipf
    x = np.arange(1, n_numbers+1)
    probabilities = zipf.pmf(x, s)
    print(len(probabilities))
    probabilities = probabilities / probabilities.sum()
    results = []
    activation_counts = np.zeros(n_numbers)
    for _ in range(n_simulations):
        activated = np.random.choice(
            n_numbers, 
            size=n_activate, 
            replace=False, 
            p=probabilities
        )
        results.append(activated)
        activation_counts[activated] += 1
    
    return results, activation_counts, probabilities

# 使用Zipf分布
zipf_results, zipf_activation_counts, zipf_probs = simulate_zipf_activation()

# 使用示例
results, counts, probs = simulate_power_law_activation()

# 可视化结果
plt.figure(figsize=(12, 4))

plt.subplot(1, 2, 1)
plt.plot(probs)
plt.title('activation probability distribution')
plt.xlabel('index')
plt.ylabel('probability')

plt.subplot(1, 2, 2)
plt.bar(range(600), counts[:600])  # 显示前50个的激活次数
plt.title('top 50 activation counts')
plt.xlabel('index')
plt.ylabel('activation counts')

plt.figure(figsize=(12, 4))

plt.subplot(1, 2, 1)
plt.plot(zipf_probs)
plt.title('activation zipf probability distribution')
plt.xlabel('index')
plt.ylabel('probability')

plt.subplot(1, 2, 2)
plt.bar(range(600), zipf_activation_counts[:600])  # 显示前50个的激活次数
plt.title('top 50 activation counts')
plt.xlabel('index')
plt.ylabel('activation counts')

plt.tight_layout()
plt.show()

In [None]:
class Request_SpAt_stat():

    def __init__(self, n_kv_head, n_cluster, n_layer, prob_func="power_law"):
        self.alpha = 1.0
        self.s = 1.1
        self.n_kv_head = n_kv_head
        self.n_cluster = n_cluster
        self.n_layer = n_layer
        self.activated_prob_table=[[[0 for _ in range(n_cluster)] for _ in range(n_kv_head)] for _ in range(n_layer)]
        
        for layer in range(n_layer):
            for kv_head in range(n_kv_head):
                if prob_func == "power_law":
                    indices = np.arange(1, n_cluster + 1)
                    probabilities = indices ** (-self.alpha)
                    probabilities = probabilities / probabilities.sum()  # 归一化
                    self.activated_prob_table[layer][kv_head] = probabilities
                elif prob_func == "zipf":
                    a = np.random.zipf(self.s, 100000)  # 生成大量样本
                    a = a[a <= n_cluster]  # 过滤超出范围的
                    
                    # 估计概率分布
                    unique, counts = np.unique(a, return_counts=True)
                    probabilities = np.zeros(n_cluster)
                    probabilities[unique-1] = counts[:n_cluster] / counts.sum()
                    self.activated_prob_table[layer][kv_head] = probabilities
                else:
                    raise ValueError(f"Invalid probability function: {prob_func}")
        
    def get_activated_prob(self, layer, kv_head, n_avaiable_clusters):
        # 已存在的cluster归一化
        return self.activated_prob_table[layer][kv_head][:n_avaiable_clusters]/self.activated_prob_table[layer][kv_head][:n_avaiable_clusters].sum()

Request_SpAt_stat(2,16,4).get_activated_prob(0,0,16)

In [None]:
## Define models and layer.
## Generate models
import sys
import os
# 添加项目根目录到Python路径
# 如果notebook在src目录下，项目根目录是上一级目录
current_dir = os.getcwd()
if current_dir.endswith('src'):
    project_root = os.path.dirname(current_dir)
else:
    # 如果不在src目录，假设当前目录就是项目根目录
    project_root = current_dir
if project_root not in sys.path:
    sys.path.insert(0, project_root)

from src.type import *
import copy
import random
import numpy as np


class FC_Layer:

    def __init__(self, stage, name, type, parallel_type, parallel_degree, m, n, k):
        self.stage = stage
        self.name = name
        self.type = type
        if parallel_type == 'tensor_row':
            self.m = m 
            self.n = n 
            self.k = k // parallel_degree
        elif parallel_type == 'tensor_col':
            self.m = m 
            self.n = n // parallel_degree
            self.k = k
        else:
            self.m = m 
            self.n = n 
            self.k = k
        self.dbyte = 2 #bf16

    def get_infos(self):
        return self.m, self.n, self.k

    def get_flops(self):
        if self.type == LayerType.SOFTMAX or self.type == LayerType.NORM:
            return 5 * self.m * self.n

        elif self.type == LayerType.ACT:
            if 'relu' in self.name:
                return 1 * self.m * self.n
            elif 'glu' in self.name:
                return (8 + 1) * self.m * self.n
            else:
                return 8 * self.m * self.n
        elif self.type == LayerType.FC:
            return 2 * self.m * self.n * self.k
        elif self.type == LayerType.MATMUL:
            return 2 * self.m * self.n
        else:
            assert 0, "In Function \"get_flops\": Not support layer type"

    def get_size(self):
        in1 = self.dbyte * self.m * self.n
        in2 = self.dbyte * self.n * self.k
        out = self.dbyte * self.m * self.k

        if self.type in [ LayerType.SOFTMAX, LayerType.NORM,LayerType.ACT, LayerType.MATMUL]:
            in1 = self.dbyte * self.m * self.n
            in2 = 0
            out = in1

            # For SwiGLU and GeGLU
            if 'glu' in self.name:
                in2 = in1

        # elif self.type == LayerType.NORM:
        #     in1 = self.numOp * self.m * self.n * self.dbyte
        #     in2 = in1
        #     out = in1

        return in1, in2, out


class Attention_Layer:

    def __init__(self, stage, name, type, parallel_type, parallel_degree, n_q_head, n_kv_head, head_dim, cluster_size, n_cluster, n_activated_clusters):
        self.stage = stage
        self.name = name
        self.type = type
        self.head_dim = head_dim
        self.cluster_size = cluster_size
        self.n_cluster = n_cluster
        self.n_activated_clusters = n_activated_clusters
        self.n_q_head = n_q_head
        self.n_kv_head = n_kv_head
        self.q_head_per_kv_head = n_q_head // n_kv_head
        self.device_per_head = parallel_degree // n_kv_head
        self.dbyte = 2
        
    def get_infos0(self):
        if self.type == "similarity":
            self.m = self.q_head_per_kv_head
            self.n = self.head_dim
            self.k = self.n_cluster * self.head_dim
        elif self.type == "score":
            self.m = self.q_head_per_kv_head
            self.n = self.head_dim
            self.k = self.n_activated_clusters * self.cluster_size * self.head_dim // self.device_per_head
        elif self.type == "softmax":
            self.m = self.q_head_per_kv_head
            self.n = self.head_dim
            self.k = 1 # for softmax, we only need to compute the softmax of the score
        elif self.type == "context":
            self.m = self.q_head_per_kv_head
            self.n = self.n_activated_clusters * self.cluster_size * self.head_dim // self.device_per_head
            self.k = self.head_dim

    def get_infos(self):
        return self.m, self.n, self.k

    def get_flops(self):
        if self.type == "softmax":
            return 5*self.m*self.n
        else:
            return 2*self.m*self.n*self.k

    def get_size(self):
        in1 = self.dbyte * self.m * self.n
        in2 = self.dbyte * self.n * self.k
        out = self.dbyte * self.m * self.k

        if self.type == "softmax":
            in1 = self.dbyte * self.m * self.n
            in2 = 0
            out = in1

        return in1, in2, out

class MoE_Activation_Stat:

    def __init__(self, stage, n_input_tokens, n_experts=1, experts_per_token=1):
        self.stage = stage
        self.n_input_tokens = n_input_tokens
        self.n_experts = n_experts
        self.experts_per_token = experts_per_token
        self.expert_token_count = {}  # 存储每个专家被激活的token数量 {expert_id: token_count}
        
    def get_activated_experts(self, seed=None):
        """
        计算每个token随机激活experts_per_token个专家后，返回每个激活专家及其被激活的token数量
        
        Args:
            seed: 随机种子，用于可重复性
            
        Returns:
            expert_token_count: 字典，{expert_id: token_count}，表示每个激活的专家被多少个token激活
        """
        if seed is not None:
            random.seed(seed)
        
        self.expert_token_count.clear()
        
        # 为每个token随机选择experts_per_token个专家
        for token_id in range(self.n_input_tokens):
            # 从n_experts个专家中随机选择experts_per_token个（不重复）
            selected_experts = random.sample(range(self.n_experts), self.experts_per_token)
            # 统计每个专家被多少个token激活
            for expert_id in selected_experts:
                self.expert_token_count[expert_id] = self.expert_token_count.get(expert_id, 0) + 1
        
        # 返回每个激活专家及其被激活的token数量
        return self.expert_token_count.copy()

class Communication:

    def __init__(self, stage, type, parallel_degree, data_size, archi="GPU"):
        self.stage = stage
        self.data_size = data_size
        self.type = type
        # self.parallel_type = parallel_type
        self.parallel_degree = parallel_degree

    def get_infos(self):
        return self.stage, self.type, self.parallel_degree, self.data_size

    def get_flops(self):
        if self.type == CommType.ALL_GATHER:
            return 0
        elif self.type == CommType.ALL_REDUCE:
            return self.data_size * (self.parallel_degree -1)
        else:
            assert 0, "not support comm operator type"

    def get_size(self):
        return self.data_size * self.parallel_degree

class Transformer:

    def __init__(self, modelinfos, requestinfos, tensor_parallel=8):
        self.tensor_parallel = tensor_parallel
        self.decoder_blocks = []
        self.name = modelinfos['name']
        self.layers = modelinfos['layers']
        self.n_experts = modelinfos['n_experts']
        self.experts_per_token = modelinfos['experts_per_token']
        self.n_q_head = modelinfos['q_head']
        self.n_kv_head = modelinfos['kv_head']
        self.dhead = modelinfos['dhead']
        self.dim = modelinfos['dim']
        self.hdim = modelinfos['hdim']
        self.hdim_moe = modelinfos['hdim']
        self.cluster_size = modelinfos['cluster_size']
        self.requestinfos = requestinfos


    def build(self):
        batch = len(self.requestinfos.keys())
        decoder_blocks = []
        ## QKV
        decoder_blocks.append(FC_Layer('decoder', 'qkv', LayerType.FC, "tensor_col", self.tensor_parallel, batch, self.dim, self.dhead*(self.n_q_head+2*self.n_kv_head)))

        #SpAt
        for request_id in self.requestinfos.keys():
            for kv_head_id in range(self.n_kv_head):
                decoder_blocks.append(Attention_Layer('decoder', 'spat', "similarity", None, self.tensor_parallel, \
                    self.n_q_head, self.n_kv_head, self.dhead, self.cluster_size, self.requestinfos[request_id]["n_cluster"], self.requestinfos[request_id]["n_activated_clusters"]))
                decoder_blocks.append(Attention_Layer('decoder', 'spat', "score", None, self.tensor_parallel, \
                    self.n_q_head, self.n_kv_head, self.dhead, self.cluster_size, self.requestinfos[request_id]["n_cluster"], self.requestinfos[request_id]["n_activated_clusters"]))  
                decoder_blocks.append(Attention_Layer('decoder', 'spat', "softmax", None, self.tensor_parallel, \
                    self.n_q_head, self.n_kv_head, self.dhead, self.cluster_size, self.requestinfos[request_id]["n_cluster"], self.requestinfos[request_id]["n_activated_clusters"]))
                decoder_blocks.append(Attention_Layer('decoder', 'spat', "context", None, self.tensor_parallel, \
                    self.n_q_head, self.n_kv_head, self.dhead, self.cluster_size, self.requestinfos[request_id]["n_cluster"], self.requestinfos[request_id]["n_activated_clusters"]))
                
        #Proj
        decoder_blocks.append(FC_Layer('decoder', 'proj', LayerType.FC, "tensor_row", self.tensor_parallel, batch, self.dim, self.dim))
        decoder_blocks.append(Communication('decoder', CommType.ALL_REDUCE, self.tensor_parallel, self.dim))

        #Attn Norm
        decoder_blocks.append(FC_Layer('decoder', 'norm_attn', LayerType.NORM, None, self.tensor_parallel, batch, self.dim, 0))

        #MoE Router
        decoder_blocks.append(FC_Layer('decoder', 'moe_router', LayerType.FC, "tensor", 1, batch, self.dim, self.n_experts))
        moe_activation_stat = MoE_Activation_Stat(batch, self.n_experts, self.experts_per_token).get_activated_experts()

        #MoE FFN
        for expert_id, token_count in moe_activation_stat.items():
            decoder_blocks.append(FC_Layer('decoder', f'expert_{expert_id}_moe_gate', LayerType.FC, "expert", self.tensor_parallel, token_count, self.dim, self.hdim_moe))
            decoder_blocks.append(FC_Layer('decoder', f'expert_{expert_id}_moe_act', LayerType.ACT, None, self.tensor_parallel, token_count, self.dim, self.hdim_moe))
            decoder_blocks.append(FC_Layer('decoder', f'expert_{expert_id}_moe_up', LayerType.FC, "expert", self.tensor_parallel, token_count, self.dim, self.hdim_moe))
            decoder_blocks.append(FC_Layer('decoder', f'expert_{expert_id}_moe_mat', LayerType.MATMUL, "expert", self.tensor_parallel, token_count, self.hdim_moe, 0))
            decoder_blocks.append(FC_Layer('decoder', f'expert_{expert_id}_moe_down', LayerType.FC, "expert", self.tensor_parallel, token_count, self.hdim_moe, self.dim))
        decoder_blocks.append(Communication('decoder', CommType.ALL_GATHER, self.tensor_parallel, self.dim))

        #MoE Norm
        decoder_blocks.append(FC_Layer('decoder', 'moe_attn', LayerType.NORM, None, self.tensor_parallel, batch, self.dim, 0))


        self.decoder_blocks.append(decoder_blocks)
        return decoder_blocks

In [None]:
Model={"name":"llama4",
        "layers":12,
        "n_experts":12,
        "experts_per_token":1,
        "q_head":12,
        "dim":4096,
        "hdim":4096,
        "hdim_moe":128,
        "cluster_size":128,
        "kv_head":12,
        "dhead":64,
}
Request={"1":{"n_cluster":12,"n_activated_clusters":12},
        "2":{"n_cluster":12,"n_activated_clusters":12},
        "3":{"n_cluster":12,"n_activated_clusters":12},
        "4":{"n_cluster":12,"n_activated_clusters":12},
        "5":{"n_cluster":12,"n_activated_clusters":12},
        "10":{"n_cluster":12,"n_activated_clusters":12},
        "7":{"n_cluster":12,"n_activated_clusters":12}}

for i in model:
    print(i.name)

In [1]:
## Define models and layer.
## Generate models
import sys
import os
# 添加项目根目录到Python路径
# 如果notebook在src目录下，项目根目录是上一级目录
current_dir = os.getcwd()
if current_dir.endswith('src'):
    project_root = os.path.dirname(current_dir)
else:
    # 如果不在src目录，假设当前目录就是项目根目录
    project_root = current_dir
if project_root not in sys.path:
    sys.path.insert(0, project_root)

import src.infra as infra
from src.config import *
Model={"name":"llama4",
        "n_block":12,
        "n_experts":12,
        "experts_per_token":1,
        "q_head":12,
        "dim":4096,
        "hdim":4096,
        "hdim_moe":128,
        "cluster_size":128,
        "n_kv_head":12,
        "dhead":64,
}

ENERGY_TABLE=[]
gpu=make_xpu_config(GPUType.A100a)

GPU={"name":"llama4",
        "NUM_DEVICE":8,
        "FLOPS_PER_DEVICE":1800*1000*1000*1000,
        "OFF_MEM_BW_PER_DEVICE":8*1024*1024*1024*1024,
        "L2_MEM_BW_PER_DEVICE":8*1024*1024*1024*1024,
        "MEM_CAPACITY_PER_DEVICE":192*1024*1024*1024,
        "OFFLOADING_RATIO":0.67,
        "CPU_COMM_BANDWIDTH":50*1024*1024*1024,
        "ENERGY_TABLE":ENERGY_TABLE,

}


request_batch = infra.Request_Batch(0.01, Model)
for i in range(10):
    request_batch.append(i, 12, 12)
system = infra.System(Model, gpu['GPU'], request_batch=request_batch)


In [2]:
system.system_setup()
system.sim()

AttributeError: 'GPU' object has no attribute 'served_request'

In [None]:
len(request_batch.request)

In [None]:
import numpy as np
request_batch = infra.Request_Batch(0.1, Model)
for i in range(10):
    request_batch.append(i, 64, 1024)
a=request_batch.gen_activated_clusters(0,0)[0]
b=(a > 10).sum()
b/0.23