### 先都加载到cpu上

In [1]:
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "0,2"
os.environ["TOKENIZERS_PARALLELISM"] = "False"
from modeling_mixtral import MixtralForCausalLM
from transformers import AutoTokenizer
import torch
import torch.nn as nn
from typing import Optional
import json

def get_model(model_name, device_map, dtype=torch.bfloat16):
    llm = MixtralForCausalLM.from_pretrained(
        model_name,
        device_map=device_map,
        use_cache=True,
        torch_dtype=dtype,
    ) 
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    tokenizer.pad_token = tokenizer.eos_token
    tokenizer.pad_token_id = tokenizer.eos_token_id

    return llm, tokenizer

with open('../path.json', 'r') as f:
    path = json.load(f)
    model_name = path['mixtral']
    # threshold_path = path[threshold_path_name]

with open("../quantize/device_map.json", "r") as f:
    device_map = json.load(f)

dtype = torch.float16
llm, tokenizer = get_model(model_name, 'cpu', dtype=dtype)

  from .autonotebook import tqdm as notebook_tqdm
Loading checkpoint shards: 100%|██████████| 19/19 [00:06<00:00,  3.01it/s]


In [2]:
from hqq.core.quantize import *
from hqq.models.hf.mixtral import MixtralHQQ

q3_config    = BaseQuantizeConfig(nbits=2, group_size=64)
quant_config      = {'block_sparse_moe.experts.w3'   : q3_config}

#### 先放CUDA量化，然后再传回CPU
MixtralHQQ.quantize_model(llm, quant_config=quant_config, compute_dtype=dtype, device='cuda:0')
HQQLinear.set_backend(HQQBackend.PYTORCH)

backend       = "gemlite" #'torchao_int4' #"torchao_int4" (4-bit only) or "gemlite" (4-bit + 2-bit)
#Optimize
from hqq.utils.patching import prepare_for_inference
prepare_for_inference(llm, backend=backend, verbose=True)
#Load GemLite cache
if(backend == 'gemlite'):
	import gemlite
	gemlite.core.GEMLITE_TRITON_RESTRICT_M = True
	gemlite.core.GemLiteLinear.load_config('/tmp/gemlite_config.json')
	
llm.to('cpu')
print(llm.model.layers[0].block_sparse_moe.experts[0].w1.device)
print(llm.model.layers[0].block_sparse_moe.experts[0].w3.device)

100%|██████████| 32/32 [00:00<00:00, 117.97it/s]
100%|██████████| 32/32 [01:11<00:00,  2.23s/it]
100%|██████████| 32/32 [00:17<00:00,  1.79it/s]


cuda
cuda:0


In [3]:
from typing import Tuple, Optional
import torch
import torch.nn as nn
import threading
import json
import torch.nn.functional as F
from queue import Queue

class CachedMLP(nn.Module):
    def __init__(self, input_dim: int, hidden_dim: int, dtype, sparsity: float = 0.2):
        super(CachedMLP, self).__init__()
        self.sparsity = sparsity
        self.activenum = int((1 - sparsity) * hidden_dim)
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.dtype = dtype

        print("active neural num ",self.activenum)

        self.activation = nn.SiLU()

        # GPU 缓存张量
        self.w1_gpu = torch.empty((self.activenum, self.input_dim), dtype=self.dtype, device='cuda:0')
        self.w2_gpu = torch.empty((self.activenum, self.input_dim), dtype=self.dtype, device='cuda:0')
        self.w3_gpu = None

        # 第二个专家的 GPU 缓存张量
        self.w1_gpu_expert1 = torch.empty((self.activenum, self.input_dim), dtype=self.dtype, device='cuda:0')
        self.w2_gpu_expert1 = torch.empty((self.activenum, self.input_dim), dtype=self.dtype, device='cuda:0')
        self.w3_gpu_expert1 = None

        # Pinned Memory 缓冲区
        self.register_buffer('sparse_w1_cpu', torch.empty((self.activenum, self.input_dim), dtype=self.dtype, device='cpu'))
        self.register_buffer('sparse_w2_cpu', torch.empty((self.activenum, self.input_dim), dtype=self.dtype, device='cpu'))

        self.sparse_w1_cpu = self.sparse_w1_cpu.pin_memory()
        self.sparse_w2_cpu = self.sparse_w2_cpu.pin_memory()

        # 第二个专家的 Pinned Memory 缓冲区
        self.register_buffer('sparse_w1_cpu_expert1', torch.empty((self.activenum, self.input_dim), dtype=self.dtype, device='cpu'))
        self.register_buffer('sparse_w2_cpu_expert1', torch.empty((self.activenum, self.input_dim), dtype=self.dtype, device='cpu'))

        self.sparse_w1_cpu_expert1 = self.sparse_w1_cpu_expert1.pin_memory()
        self.sparse_w2_cpu_expert1 = self.sparse_w2_cpu_expert1.pin_memory()

        self.expert0_weight = torch.tensor(0)
        self.expert1_weight = torch.tensor(0)

        # 统计信息
        self.load_from_cpu_time = 0.0
        self.load_from_cpu_calls = 0

    def load_from_cpu(self, cpu_mlp, cpu_mlp_expert1, stream: torch.cuda.Stream):
        """
        从CPU加载参数，并使用指定的CUDA流进行异步复制到GPU。
        
        参数:
            cpu_mlp: 包含CPU上参数的字典（第一个专家）。
            cpu_mlp_expert1: 包含CPU上参数的字典（第二个专家）。
            stream: 用于数据传输的CUDA流。
        """
        # 生成随机索引
        random_indices = torch.randperm(cpu_mlp['w1'].data.size(0))[:self.activenum]
        # sorted_indices = torch.sort(random_indices).values

        # 从CPU加载参数（第一个专家）
        self.sparse_w1_cpu.copy_(cpu_mlp['w1'].data[random_indices, :])
        self.sparse_w2_cpu.copy_(cpu_mlp['w2'].data[random_indices, :])
        # 从CPU加载参数（第二个专家）
        self.sparse_w1_cpu_expert1.copy_(cpu_mlp_expert1['w1'].data[random_indices, :])
        self.sparse_w2_cpu_expert1.copy_(cpu_mlp_expert1['w2'].data[random_indices, :])
        # 异步复制到GPU
        with torch.cuda.stream(stream):
            self.w1_gpu.copy_(self.sparse_w1_cpu, non_blocking=True)
            self.w2_gpu.copy_(self.sparse_w2_cpu, non_blocking=True)
            self.w1_gpu_expert1.copy_(self.sparse_w1_cpu_expert1, non_blocking=True)
            self.w2_gpu_expert1.copy_(self.sparse_w2_cpu_expert1, non_blocking=True)
        
        # 直接赋值 w3_gpu 和 w3_gpu_expert1
        # 固定在GPU上的w3
        self.w3_gpu = cpu_mlp['w3']
        self.w3_gpu_expert1 = cpu_mlp_expert1['w3']

    def load_expert_weights(self, expert_weights):
        self.expert0_weight = expert_weights[0]
        self.expert1_weight = expert_weights[1]

    def forward(self, hidden_states):
        """
        根据hidden_states， 分别计算两个专家的输出
        """
        # 第一个专家的计算
        w3_output = self.w3_gpu(hidden_states)[:, :self.activenum]
        w1_output = self.activation(torch.matmul(hidden_states, self.w1_gpu.T))
        # w2 = self.w2_gpu.T
        hidden_states_expert0 = torch.matmul(w1_output * w3_output, self.w2_gpu)

        # 第二个专家的计算
        w3_output_expert1 = self.w3_gpu_expert1(hidden_states)[:, :self.activenum]
        w1_output_expert1 = self.activation(torch.matmul(hidden_states, self.w1_gpu_expert1.T))
        # w2_expert1 = self.w2_gpu_expert1.T
        hidden_states_expert1 = torch.matmul(w1_output_expert1 * w3_output_expert1, self.w2_gpu_expert1)

        final_hidden_states = hidden_states_expert0* self.expert0_weight + hidden_states_expert1* self.expert1_weight
        
        return final_hidden_states
                        
def convert_mixtral_to_cached_mlp(llm, dtype, sparsity=0.9):
    ### 其他部分存放在GPU上
    llm.model.embed_tokens.cuda(0)
    for i in range(len(llm.model.layers)):
        llm.model.layers[i].self_attn.cuda(0)
        llm.model.layers[i].input_layernorm.cuda(0)
        llm.model.layers[i].post_attention_layernorm.cuda(0)
        llm.model.layers[i].block_sparse_moe.gate.cuda(0)
        for j in range(len(llm.model.layers[0].block_sparse_moe.experts)):
            llm.model.layers[i].block_sparse_moe.experts[j].w3.cuda(0)
    ### 第0层的专家存放在GPU上
    for j in range(len(llm.model.layers[0].block_sparse_moe.experts)):
        llm.model.layers[0].block_sparse_moe.experts[j].cuda(0)

    llm.model.norm.cuda(0)
    llm.lm_head.cuda(0)
    
    # 创建两个共享的CachedMLP实例
    buffer0 = CachedMLP(
        input_dim=llm.config.hidden_size,
        hidden_dim=llm.config.intermediate_size,
        dtype=dtype,
        sparsity=sparsity
    )
    buffer1 = CachedMLP(
        input_dim=llm.config.hidden_size,
        hidden_dim=llm.config.intermediate_size,
        dtype=dtype,
        sparsity=sparsity
    )
    cached_mlps = [buffer0, buffer1]
    
    for i, layer in enumerate(llm.model.layers):
        if i==0:
            continue
        # 将专家的forward方法替换为PipelineLLM管理的方式
        for j, expert in enumerate(layer.block_sparse_moe.experts):
            expert.cpu_mlp = {
                "w1": expert.w1.cpu().weight,
                "w2": expert.w2.cpu().weight.T.contiguous(),
                "w3": expert.w3,
            }
    return llm, cached_mlps

class PipelineLLM:
    def __init__(self, llm, cached_mlps):
        """
        初始化 PipelineLLM，替换模型每一层的 forward 方法。
        
        参数:
            llm: 原始的大模型
            cached_mlps: 两个 CachedMLP 实例列表
        """
        self.llm = llm
        self.cached_mlps = cached_mlps  # [buffer0, buffer1]
        self.num_layers = len(llm.model.layers)
        self.lock = threading.Lock()
        self.use_buffer0 = True  # 标记当前使用哪个缓冲区

        # 创建两个共享的CUDA流
        self.stream0 = torch.cuda.Stream()
        self.stream1 = torch.cuda.Stream()

        # 初始化加载第一个和第二个层的参数
        # self._load_layer(1, buffer_index=0, expert_ids=torch.tensor([0, 1]))
        # self._load_layer(1, buffer_index=1, expert_ids=torch.tensor([0, 1]))
        self.top_k = 2
        self.activation = nn.SiLU()

        self._replace_forward_methods()

        # 用于统计时间的变量
        self.total_prefill_time = 0.0
        self.total_decode_time = 0.0

    def _load_layer(self, layer_idx, buffer_index, expert_ids, expert_weights=torch.tensor([0, 0])):
        """
        加载指定层的参数到指定的缓冲区。
        
        参数:
            layer_idx: 层的索引
            buffer_index: 缓冲区的索引（0 或 1）
        """
        layer = self.llm.model.layers[layer_idx]
        expert0 = layer.block_sparse_moe.experts[expert_ids[0]]
        expert1 = layer.block_sparse_moe.experts[expert_ids[1]]
        # if layer_idx == 1:
        #     print(expert_ids[0].data, expert_ids[1].data, '{:.3f}, {:.3f}'.format(expert_weights[0], expert_weights[1]))

        cpu_mlp = expert0.cpu_mlp
        cpu_mlp_expert1 = expert1.cpu_mlp
        buffer = self.cached_mlps[buffer_index]
        stream = self.stream0 if buffer_index == 0 else self.stream1

        buffer.load_expert_weights(expert_weights)
        # 异步加载参数
        buffer.load_from_cpu(cpu_mlp, cpu_mlp_expert1, stream)

    def _replace_forward_methods(self):
        """
        替换模型每一层的 forward 方法，添加参数预加载逻辑和注意力计算。
        """
        for i, layer in enumerate(self.llm.model.layers):
            def new_forward(hidden_states: torch.Tensor,
                        attention_mask: Optional[torch.Tensor] = None,
                        position_ids: Optional[torch.LongTensor] = None,
                        past_key_value: Optional[Tuple[torch.Tensor]] = None,
                        output_attentions: Optional[bool] = False,
                        output_router_logits: Optional[bool] = False,
                        use_cache: Optional[bool] = False,
                        cache_position: Optional[torch.LongTensor] = None,
                        layer=layer,
                        layer_idx=i):
                with self.lock:
                    batch_size, sequence_length, hidden_dim = hidden_states.shape
                    
                    if sequence_length == 1:
                        #### decode phase ####
                        # 选择当前使用的缓冲区
                        current_buffer = self.cached_mlps[0] if self.use_buffer0 else self.cached_mlps[1]

                        next_buffer_index = 1 if self.use_buffer0 else 0

                        next_layer_idx = layer_idx + 1

                        if next_layer_idx < self.num_layers:
                            # 预加载下一层的参数
                            next_layer = self.llm.model.layers[next_layer_idx]
                            router = next_layer.block_sparse_moe.gate

                            # batch_size, sequence_length, hidden_dim = hidden_states.shape
                            hidden_states_flat = hidden_states.view(-1, hidden_dim)
                            # router_logits: (batch * sequence_length, n_experts)
                            router_logits = router(hidden_states_flat)

                            routing_weights = F.softmax(router_logits, dim=1, dtype=torch.float)
                            routing_weights, selected_experts = torch.topk(routing_weights, self.top_k, dim=-1)
                            routing_weights /= routing_weights.sum(dim=-1, keepdim=True)

                            hidden_states = hidden_states_flat.reshape(batch_size, sequence_length, hidden_dim)

                            self._load_layer(
                                next_layer_idx,
                                buffer_index=next_buffer_index,
                                expert_ids=selected_experts[0],
                                expert_weights=routing_weights[0]
                            )

                        # 切换缓冲区
                        self.use_buffer0 = not self.use_buffer0

                    # 处理当前层
                    residual = hidden_states
                    hidden_states = layer.input_layernorm(hidden_states)

                    # Self Attention
                    hidden_states, self_attn_weights, present_key_value = layer.self_attn(
                        hidden_states=hidden_states,
                        attention_mask=attention_mask,
                        position_ids=position_ids,
                        past_key_value=past_key_value,
                        output_attentions=output_attentions,
                        use_cache=use_cache,
                        cache_position=cache_position,
                    )
                    hidden_states = residual + hidden_states

                    # Fully Connected
                    residual = hidden_states
                    hidden_states = layer.post_attention_layernorm(hidden_states)
                    
                    if sequence_length > 1:
                        # print("in prefill layer ", layer_idx)
                        # 对于prefill阶段，仅将experts加载到GPU计算
                        experts = layer.block_sparse_moe.experts

                        # 将experts移动到GPU
                        for expert in experts:
                            expert.cuda(0)

                        # 在GPU上进行MoE计算（gate保持在CPU）
                        final_hidden_states, router_logits = layer.block_sparse_moe(hidden_states)

                        # 计算完成后将experts移回CPU
                        if layer_idx != 0:
                            for expert in experts:
                                expert.to('cpu')
                    else:
                        # batch_size, sequence_length, hidden_dim = hidden_states.shape
                        hidden_states_flat = hidden_states.view(-1, hidden_dim)
                        # print("in decode layer", layer_idx)
                        if layer_idx > 0:
                            ### 使用当前缓冲区进行 MLP 计算 ###
                            final_hidden_states = current_buffer(hidden_states_flat)
                        else:
                            ### 根据router计算需要使用的专家 ###
                            cur_layer = layer
                            router = cur_layer.block_sparse_moe.gate
                            # router_logits: (batch * sequence_length, n_experts)
                            router_logits = router(hidden_states_flat)

                            routing_weights = F.softmax(router_logits, dim=1, dtype=torch.float)
                            routing_weights, selected_experts = torch.topk(routing_weights, self.top_k, dim=-1)
                            routing_weights /= routing_weights.sum(dim=-1, keepdim=True)
                            # we cast back to the input dtype
                            routing_weights = routing_weights.to(hidden_states_flat.dtype)

                            first_expert, second_expert = selected_experts[0][0], selected_experts[0][1]

                            final_hidden_states_expert0 = cur_layer.block_sparse_moe.experts[first_expert](
                                hidden_states_flat) * routing_weights[0][0]

                            final_hidden_states_expert1 = cur_layer.block_sparse_moe.experts[second_expert](
                                hidden_states_flat) * routing_weights[0][1]

                            # 将两个专家的结果相加
                            final_hidden_states = final_hidden_states_expert0 + final_hidden_states_expert1

                        final_hidden_states = final_hidden_states.reshape(batch_size, sequence_length, hidden_dim)

                    hidden_states = residual + final_hidden_states

                    outputs = (hidden_states,)

                    if output_attentions:
                        outputs += (self_attn_weights,)

                    if use_cache:
                        outputs += (present_key_value,)

                    return outputs

            # 替换forward方法
            layer.forward = new_forward

In [4]:
llm, cached_mlps = convert_mixtral_to_cached_mlp(llm, dtype, sparsity=0.8)

# 创建流水线模型
PLLM = PipelineLLM(llm, cached_mlps)

active neural num  2867
active neural num  2867


### 测试时间开销

In [8]:
import json
from datasets import load_dataset, Dataset
from transformers import GenerationConfig


input_length = 1
MAX_LENGTH = input_length
output_length = 32
test_samples = 10
device_id = 0

with open("../path.json", "r") as f:
    paths = json.load(f)
    fineweb_path = paths["fineweb"]

def preprocess_data(data, tokenizer):
	# 使用 tokenizer 将文本数据转换为模型输入
	inputs = tokenizer(data, padding="max_length", truncation=True, max_length=MAX_LENGTH, return_tensors="pt")
	inputs["labels"] = inputs.input_ids.clone()
	return inputs

filt_type = fineweb_path.split('.')[-1]
fineweb = load_dataset(filt_type, data_files=fineweb_path) #726000
fineweb_text = fineweb['train']['text']

# 预热（避免第一次运行时的额外开销）
for text in fineweb_text[:5] :
    inputs = preprocess_data(text, tokenizer)
    with torch.no_grad():
        output = llm(input_ids=inputs["input_ids"].cuda(device_id), attention_mask=inputs["attention_mask"].cuda(device_id))

generated_all = 0
prefill_time, decode_time = 0, 0
print("output length is {}".format(output_length))
for text in fineweb_text[:test_samples] :
    inputs = preprocess_data(text, tokenizer)

    # 预热（避免第一次运行时的额外开销）
    with torch.no_grad():
        output = llm(input_ids=inputs["input_ids"].cuda(device_id), attention_mask=inputs["attention_mask"].cuda(device_id))

    # 测试时间
    start_event = torch.cuda.Event(enable_timing=True)
    end_event = torch.cuda.Event(enable_timing=True)

    # 开始计时
    torch.cuda.synchronize()
    start_event.record()

    # 前向传播
    with torch.no_grad():
        output = llm.generate(
            input_ids=inputs["input_ids"].cuda(device_id),
            attention_mask=inputs["attention_mask"].cuda(device_id),
            max_length=input_length + output_length,  # 总长度为输入长度 + 输出长度
            generation_config=GenerationConfig(do_sample=False),
            pad_token_id=tokenizer.pad_token_id, 
            # cache_implementation="static" ## moe not support
        )

    # 结束计时
    end_event.record()
    torch.cuda.synchronize()

    # 计算时间
    elapsed_time = start_event.elapsed_time(end_event) / 1000  # 转换为秒
    decode_time += elapsed_time
    print(f"Generated output length: {len(output[0]) - input_length}", f"Time taken: {elapsed_time:.4f} seconds")
    # print(output)
    print(tokenizer.batch_decode(output, skip_special_tokens=True))

    generated_all += len(output[0]) - input_length

timepertoken = (decode_time) / (generated_all)
# print("decode time:", '{:.4f}'.format((decode_time) /test_samples), ' s')
print("decode phase speed:", '{:.4f}'.format(1/timepertoken) , ' token/s')

output length is 32
Generated output length: 32 Time taken: 4.6101 seconds
Generated output length: 32 Time taken: 4.5986 seconds
['fortunate Douglas repeatedly: history tro history subsequently and history repeatedly history history history andihood history repeatedlyihood fucking history especially sacihood history especiallyihood desired fucking history history history']
Generated output length: 32 Time taken: 4.4382 seconds
['lightsЇ responsibility🌍 columns convenient occasional BAS occasional entity convenient entity🌍 legitimate D vess consistent please subs Du oppongest consistent pseudoEntities Mix legitimate purposes alleged legitimate consistent concert']
Generated output length: 3 Time taken: 0.4483 seconds
['pizza Chain']
Generated output length: 32 Time taken: 4.4258 seconds
['nels mock Mock mock mock WARRAN delen whilst independ Route JuRouting facimation mockmock whilst consist Jun laug noten arteFMT daily session Sm laugihoodtagonrupalagma consistent']
Generated output l

In [6]:

########################################################################
#Save gemlite cache
if(backend == 'gemlite'):
	gemlite.core.GemLiteLinear.cache_config('/tmp/gemlite_config.json') 

#### torch.profile

trace-offloading-r.json是最优，就是做完一个index就传一个

In [7]:
import json
from datasets import load_dataset, Dataset
from transformers import GenerationConfig

input_length = 1
MAX_LENGTH = input_length
output_length = 2
test_samples = 4

with open("../path.json", "r") as f:
    paths = json.load(f)
    fineweb_path = paths["fineweb"]

def preprocess_data(data, tokenizer):
	# 使用 tokenizer 将文本数据转换为模型输入
	inputs = tokenizer(data, padding="max_length", truncation=True, max_length=MAX_LENGTH, return_tensors="pt")
	inputs["labels"] = inputs.input_ids.clone()
	return inputs

fineweb = load_dataset("parquet",data_files=fineweb_path) #726000
fineweb_text = fineweb['train']['text'][:test_samples] 

print("output length is {}".format(output_length))
text = fineweb_text[0]
inputs = preprocess_data(text, tokenizer)

# cached_mlp.clear_load_from_cpu_stats()
with torch.profiler.profile(
    activities=[
        torch.profiler.ProfilerActivity.CPU,
        torch.profiler.ProfilerActivity.CUDA,
    ]
) as p:
    # 前向传播
    with torch.no_grad():
        output = llm.generate(
            input_ids=inputs["input_ids"].cuda(),
            attention_mask=inputs["attention_mask"].cuda(),
            max_length=input_length + output_length,  # 总长度为输入长度 + 输出长度
            generation_config=GenerationConfig(do_sample=False),
            pad_token_id=tokenizer.eos_token_id
        )
print(p.key_averages().table(
    sort_by="self_cpu_time_total", row_limit=-1))
p.export_chrome_trace("./offloading-hqq2-2.json")

output length is 2
-------------------------------------------------------  ------------  ------------  ------------  ------------  ------------  ------------  ------------  ------------  ------------  ------------  
                                                   Name    Self CPU %      Self CPU   CPU total %     CPU total  CPU time avg     Self CUDA   Self CUDA %    CUDA total  CUDA time avg    # of Calls  
-------------------------------------------------------  ------------  ------------  ------------  ------------  ------------  ------------  ------------  ------------  ------------  ------------  
                                            aten::index        39.35%     106.719ms        39.93%     108.282ms     287.220us       1.010ms         0.42%       1.010ms       2.680us           377  
                                            aten::copy_        16.02%      43.441ms        17.80%      48.267ms      53.158us     203.157ms        84.09%     203.157ms     223.741us       

加载到GPU上

In [1]:
from transformers import MixtralForCausalLM, AutoTokenizer
import torch
import torch.nn as nn
from typing import Optional
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "1,2"
import json

def get_model(model_name, device_map, dtype=torch.bfloat16):
    llm = MixtralForCausalLM.from_pretrained(
        model_name,
        device_map=device_map,
        use_cache=True,
        torch_dtype=dtype,
    ) 
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    tokenizer.pad_token = tokenizer.eos_token
    tokenizer.pad_token_id = tokenizer.eos_token_id

    return llm, tokenizer

with open('../path.json', 'r') as f:
    path = json.load(f)
    model_name = path['mixtral']
    # threshold_path = path[threshold_path_name]

with open('../quantize/device_map_1.json', 'r') as f:
    device_map = json.load(f)

dtype = torch.float16
llm, tokenizer = get_model(model_name, device_map, dtype=dtype)

  from .autonotebook import tqdm as notebook_tqdm
Loading checkpoint shards: 100%|██████████| 19/19 [00:25<00:00,  1.36s/it]


#### 只传一个专家的版本

In [None]:
from typing import Tuple, Optional
import torch
import torch.nn as nn
import threading
import json
from queue import Queue

class CachedMLP(nn.Module):
    def __init__(self, input_dim: int, hidden_dim: int, dtype, sparsity: float = 0.2):
        super(CachedMLP, self).__init__()
        self.sparsity = sparsity
        self.activenum = int((1 - sparsity) * hidden_dim)
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.dtype = dtype

        # GPU 缓存张量
        self.w1_gpu = torch.empty((self.activenum, self.input_dim), dtype=self.dtype, device='cuda:0')
        self.w2_gpu = torch.empty((self.input_dim, self.activenum), dtype=self.dtype, device='cuda:0')
        self.w3_gpu = torch.empty((self.activenum, self.input_dim), dtype=self.dtype, device='cuda:0')

        # Pinned Memory 缓冲区
        self.register_buffer('sparse_w1_cpu', torch.empty((self.activenum, self.input_dim), dtype=self.dtype, device='cpu'))
        self.register_buffer('sparse_w2_cpu', torch.empty((self.input_dim, self.activenum), dtype=self.dtype, device='cpu'))
        self.register_buffer('sparse_w3_cpu', torch.empty((self.activenum, self.input_dim), dtype=self.dtype, device='cpu'))
        self.sparse_w1_cpu = self.sparse_w1_cpu.pin_memory()
        self.sparse_w2_cpu = self.sparse_w2_cpu.pin_memory()
        self.sparse_w3_cpu = self.sparse_w3_cpu.pin_memory()

        # 统计信息
        self.load_from_cpu_time = 0.0
        self.load_from_cpu_calls = 0

    def load_from_cpu(self, cpu_mlp, stream: torch.cuda.Stream):
        """
        从CPU加载参数，并使用指定的CUDA流进行异步复制到GPU。
        
        参数:
            cpu_mlp: 包含CPU上参数的字典。
            stream: 用于数据传输的CUDA流。
        """
        # 从CPU加载参数
        self.sparse_w1_cpu.copy_(cpu_mlp['w1'].data[:self.activenum, :])
        self.sparse_w2_cpu.copy_(cpu_mlp['w2'].data[:, :self.activenum])
        self.sparse_w3_cpu.copy_(cpu_mlp['w3'].data[:self.activenum, :])

        # 异步复制到GPU
        with torch.cuda.stream(stream):
            self.w1_gpu.copy_(self.sparse_w1_cpu, non_blocking=True)
            self.w2_gpu.copy_(self.sparse_w2_cpu, non_blocking=True)
            self.w3_gpu.copy_(self.sparse_w3_cpu, non_blocking=True)

    def get_load_from_cpu_stats(self):
        if self.load_from_cpu_calls == 0:
            return 0.0, 0.0
        avg_time = self.load_from_cpu_time / self.load_from_cpu_calls
        return self.load_from_cpu_time, avg_time

    def clear_load_from_cpu_stats(self):
        self.load_from_cpu_time = 0.0
        self.load_from_cpu_calls = 0

def convert_mixtral_to_cached_mlp(llm, dtype, sparsity=0.9):
    ### 其他部分存放在GPU上
    llm.model.embed_tokens.cuda()
    for i in range(len(llm.model.layers)):
        llm.model.layers[i].self_attn.cuda()
        llm.model.layers[i].input_layernorm.cuda()
        llm.model.layers[i].post_attention_layernorm.cuda()
        llm.model.layers[i].block_sparse_moe.gate.cuda()
    ### 第0层的专家存放在GPU上
    for j in range(len(llm.model.layers[0].block_sparse_moe.experts)):
        llm.model.layers[0].block_sparse_moe.experts[j].cuda()

    llm.model.norm.cuda()
    llm.lm_head.cuda()
    
    # 创建两个共享的CachedMLP实例
    buffer0 = CachedMLP(
        input_dim=llm.config.hidden_size,
        hidden_dim=llm.config.intermediate_size,
        dtype=dtype,
        sparsity=sparsity
    )
    buffer1 = CachedMLP(
        input_dim=llm.config.hidden_size,
        hidden_dim=llm.config.intermediate_size,
        dtype=dtype,
        sparsity=sparsity
    )
    cached_mlps = [buffer0, buffer1]
    
    for i, layer in enumerate(llm.model.layers):
        if i==0:
            continue
        # 将专家的forward方法替换为PipelineLLM管理的方式
        for j, expert in enumerate(layer.block_sparse_moe.experts):
            expert.cpu_mlp = {
                "w1": expert.w1.cpu().weight,
                "w2": expert.w2.cpu().weight,
                "w3": expert.w3.cpu().weight,
            }
            # 替换forward方法为直接调用CachedMLP的forward（需要在pipelineLLM里面替换)
            # expert.forward = lambda x, cached_mlp=cached_mlp, cpu_mlp=expert.cpu_mlp: cached_mlp(x, cpu_mlp)
    return llm, cached_mlps

class PipelineLLM:
    def __init__(self, llm, cached_mlps):
        """
        初始化 PipelineLLM，替换模型每一层的 forward 方法。
        
        参数:
            llm: 原始的大模型
            cached_mlps: 两个 CachedMLP 实例列表
        """
        self.llm = llm
        self.cached_mlps = cached_mlps  # [buffer0, buffer1]
        self.num_layers = len(llm.model.layers)
        self.lock = threading.Lock()
        self.use_buffer0 = True  # 标记当前使用哪个缓冲区

        # 创建两个共享的CUDA流
        self.stream0 = torch.cuda.Stream()
        self.stream1 = torch.cuda.Stream()

        # 初始化加载第一个和第二个层的参数
        self._load_layer(1, buffer_index=0)
        self._load_layer(1, buffer_index=1)

        self.activation = nn.GELU()

        self._replace_forward_methods()
    
    def _load_layer(self, layer_idx, buffer_index):
        """
        加载指定层的参数到指定的缓冲区。
        
        参数:
            layer_idx: 层的索引
            buffer_index: 缓冲区的索引（0 或 1）
        """
        layer = self.llm.model.layers[layer_idx]
        expert = layer.block_sparse_moe.experts[0]
        cpu_mlp = expert.cpu_mlp
        buffer = self.cached_mlps[buffer_index]
        stream = self.stream0 if buffer_index == 0 else self.stream1

        # 异步加载参数
        buffer.load_from_cpu(cpu_mlp, stream)

    def _replace_forward_methods(self):
        """
        替换模型每一层的 forward 方法，添加参数预加载逻辑和注意力计算。
        """
        for i, layer in enumerate(self.llm.model.layers):
            def new_forward(hidden_states: torch.Tensor,
                            attention_mask: Optional[torch.Tensor] = None,
                            position_ids: Optional[torch.LongTensor] = None,
                            past_key_value: Optional[Tuple[torch.Tensor]] = None,
                            output_attentions: Optional[bool] = False,
                            output_router_logits: Optional[bool] = False,
                            use_cache: Optional[bool] = False,
                            cache_position: Optional[torch.LongTensor] = None,
                            layer_idx=i):
                # print(f"in layer {layer_idx}")
                with self.lock:
                    # 选择当前使用的缓冲区
                    current_buffer = self.cached_mlps[0] if self.use_buffer0 else self.cached_mlps[1]
                    current_stream = self.stream0 if self.use_buffer0 else self.stream1

                    # 切换缓冲区用于下一次
                    next_buffer_index = 1 if self.use_buffer0 else 0
                    next_buffer = self.cached_mlps[next_buffer_index]
                    next_stream = self.stream1 if self.use_buffer0 else self.stream0

                    # 预加载下一层的参数
                    next_layer_idx = layer_idx + 1
                    if next_layer_idx < self.num_layers:
                        self._load_layer(next_layer_idx, buffer_index=next_buffer_index)
                    
                    # 切换缓冲区
                    self.use_buffer0 = not self.use_buffer0

                    # 处理当前层
                    residual = hidden_states
                    hidden_states = layer.input_layernorm(hidden_states)

                    # Self Attention
                    hidden_states, self_attn_weights, present_key_value = layer.self_attn(
                        hidden_states=hidden_states,
                        attention_mask=attention_mask,
                        position_ids=position_ids,
                        past_key_value=past_key_value,
                        output_attentions=output_attentions,
                        use_cache=use_cache,
                        cache_position=cache_position,
                    )
                    hidden_states = residual + hidden_states

                    # Fully Connected
                    residual = hidden_states
                    hidden_states = layer.post_attention_layernorm(hidden_states)

                    # 使用当前缓冲区进行 MLP 计算
                    batch_size, sequence_length, hidden_dim = hidden_states.shape
                    hidden_states = hidden_states.view(-1, hidden_dim)
                    # 仅使用第一个专家
                    expert_layer = layer.block_sparse_moe.experts[0]

                    w3_output = torch.matmul(hidden_states, current_buffer.w3_gpu.T)
                    w1_output = self.activation(torch.matmul(hidden_states, current_buffer.w1_gpu.T))
                    w2 = current_buffer.w2_gpu.T
                    final_hidden_states = torch.matmul(w1_output * w3_output, w2)

                    final_hidden_states = final_hidden_states.reshape(batch_size, sequence_length, hidden_dim)

                    hidden_states = residual + final_hidden_states

                    outputs = (hidden_states,)

                    if output_attentions:
                        outputs += (self_attn_weights,)

                    if use_cache:
                        outputs += (present_key_value,)

                    return outputs
            # 替换 forward 方法
            layer.forward = new_forward

    def _async_load(self, layer_idx, buffer_index):
        """
        异步加载 MLP 参数到指定缓冲区，使用共享的CUDA流。
        """
        self._load_layer(layer_idx, buffer_index)

# 将模型转换为使用CachedMLP的版本
llm, cached_mlps = convert_mixtral_to_cached_mlp(llm, dtype, sparsity=0.9)

# 创建流水线模型
pipeline_llm = PipelineLLM(llm, cached_mlps).llm
