## Container Preparation

### Load dependencies

In [1]:
# import docker
import requests
import torch
import time
import random

# client = docker.from_env()

# --- Configs ---

torch.cuda.empty_cache()
torch.cuda.synchronize()
NUM_CONTAINERS = 2
IMAGE_NAME = "moe_expert"
EXPERT_TIMEOUT = 20  # seconds



In [1]:
import torch
print(torch.utils.cmake_prefix_path)

/root/vllm_test/myllm/lib/python3.10/site-packages/torch/share/cmake


### Build Image

In [2]:
! docker build -t moe_expert ~/vllm_test_field/vllm/flask_docker_app/.

[1A[1B[0G[?25l[+] Building 0.0s (1/1)                                          docker:default
[34m => [internal] load build definition from Dockerfile                       0.0s
[0m[?25h

[1A[1A[0G[?25l[+] Building 0.2s (1/2)                                          docker:default
[34m => [internal] load build definition from Dockerfile                       0.0s
[0m[34m => => transferring dockerfile: 896B                                       0.0s
[0m => [internal] load metadata for docker.io/nvidia/cuda:12.1.1-base-ubuntu  0.2s
[?25h[1A[1A[1A[1A[0G[?25l[+] Building 0.2s (2/2)                                          docker:default
[34m => [internal] load build definition from Dockerfile                       0.0s
[0m[34m => => transferring dockerfile: 896B                                       0.0s
[0m[34m => [internal] load metadata for docker.io/nvidia/cuda:12.1.1-base-ubuntu  0.2s
[0m[?25h[1A[1A[1A[1A[0G[?25l[+] Building 0.3s (11/12)                                        docker:default
[34m => [internal] load build definition from Dockerfile                       0.0s
[0m[34m => => transferring dockerfile: 896B                       

## Launching instances

In [3]:
import subprocess

# --- Step 2: Launch Expert Containers ---
containers = []
layer = 23

experts_count_per_container = 60//NUM_CONTAINERS
for i in range(NUM_CONTAINERS):
    cmd = [
    "docker", "run", "-d",
    "--name", f"fused_moe_layer_{layer}_exp_{i*experts_count_per_container}_{(i+1)*experts_count_per_container-1}",
    "--gpus", "all",
    # "--rm",
    "-p", f"{5000+i}:5000",
    "-v", "/home/ubuntu/vllm_test_field/vllm/ipc_handler_demo/weights:/app/weights",
    "-e", f"RANK={i}",
    "-e", f"NUM_EXPERTS={experts_count_per_container}",
    "-e", f"GPU_IDX={0}",
    "-e", f"WEIGHT_PATH=/app/weights",
    "-e", f"LAYER={layer}",
    
    IMAGE_NAME
    ]
    try:
        subprocess.run(cmd, check=True)
        print(f"fused_moe_layer_{layer}_exp_{i*experts_count_per_container}_{(i+1)*experts_count_per_container-1}\n容器启动成功！")
    except subprocess.CalledProcessError as e:
        print(f"启动失败: {e}")
    




33d11858ae5d2899a68a829c25d74814b3e4b071afc316d07deeabb9e9f6f877
fused_moe_layer_23_exp_0_29
容器启动成功！
7425fe02fa31ff10d285e53a8a399ab425be567b446295d55229fe412f170234
fused_moe_layer_23_exp_30_59
容器启动成功！


### View if the instance is running correctly

In [4]:
! docker ps

CONTAINER ID   IMAGE        COMMAND                 CREATED         STATUS         PORTS                                         NAMES
330ffb383d8e   moe_expert   "python3 ./expert.py"   3 seconds ago   Up 3 seconds   0.0.0.0:5001->5000/tcp, [::]:5001->5000/tcp   fused_moe_layer_23_exp_30_59
ba41bfb0149a   moe_expert   "python3 ./expert.py"   4 seconds ago   Up 3 seconds   0.0.0.0:5000->5000/tcp, [::]:5000->5000/tcp   fused_moe_layer_23_exp_0_29


## Testing container function

### Load Inputs

In [1]:
import torch
import requests
def load_tensor(path):
    tensors = {}
    try:
        tensors["hidden_states"] = torch.load(path + "hidden_states.pt").to("cuda:0")
        tensors["w1"] = torch.load(path + "w1.pt").to("cuda:0")
        tensors["w2"] = torch.load(path + "w2.pt").to("cuda:0")
        tensors["topk_weights"] = torch.load(path + "topk_weights.pt").to("cuda:0")
        tensors["topk_ids"] = torch.load(path + "topk_ids.pt", map_location="cuda:0")
        tensors["expert_map"] = torch.load(path + "expert_map.pt").to("cuda:0")
        tensors["out_hidden_states"] = torch.load(path + "out_hidden_states.pt").to("cuda:0")
        tensors["final_hidden_states"] = torch.load(path + "final_hidden_states.pt").to("cuda:0")
    except FileNotFoundError as e:
        print(f"Error: File not found - {e}")
    except RuntimeError as e:
        print(f"Error: CUDA runtime issue - {e}")
    except Exception as e:
        print(f"Unexpected error: {e}")
    return tensors

rank0 = load_tensor("/root/vllm_test/saved_tensors/rank_0/")
# rank1 = load_tensor("/home/ubuntu/vllm_test_field/saved_tensors/rank_1/")

### Prepare handler and meta data

In [2]:
rank0["topk_ids"]

tensor([[27, 49, 41,  2],
        [59,  3, 31, 41],
        [24, 11, 52, 48],
        [29, 53, 47, 13],
        [13, 11, 52, 16],
        [ 1, 11, 26, 17]], device='cuda:0', dtype=torch.int32)

In [1]:
import torch
import ctypes

# Load the shared lib
lib = ctypes.CDLL('/root/vllm_test/vllm/ipc_handler_demo/cuda_tools/libipc_tensor_tool.so')
lib.export_ipc_handle.argtypes = [ctypes.c_void_p, ctypes.c_void_p]
lib.export_ipc_handle.restype = ctypes.c_int

def get_ipc_handle(tensor: torch.Tensor) -> bytes:
    
    meta={
        "shape": tensor.shape,
        "dtype": str(tensor.dtype),
        "device": int(tensor.device.index),
        "numel": tensor.numel(),
    }
    if not tensor.is_cuda:
        raise ValueError("Tensor must be on CUDA device")

    dev_ptr = tensor.data_ptr()
    out = ctypes.create_string_buffer(64)

    result = lib.export_ipc_handle(ctypes.c_void_p(dev_ptr), out)
    if result != 0:
        raise RuntimeError(f"export_ipc_handle failed with code {result}")

    return out.raw, meta  # This is the 64-byte IPC handle


In [None]:
TOKEN_NUM = 128
def create_input(token_num):
    topk_ids = torch.randint(0, 60, (token_num, 4), dtype=torch.int32).to("cuda")
    topk_weights = torch.randn(token_num, 4, dtype=torch.float32).to("cuda")
    hidden_states = torch.randn( token_num,2048, dtype=torch.bfloat16).to("cuda") # 
    print("topk_ids.shape", topk_ids.shape, topk_ids.dtype)
    print("topk_weights.shape", topk_weights.shape, topk_weights.dtype)
    print("hidden_states.shape", hidden_states.shape, hidden_states.dtype)
    return {"topk_ids":topk_ids,"topk_weights":topk_weights, "hidden_states":hidden_states }

inputs = create_input( TOKEN_NUM)

topk_ids.shape torch.Size([128, 4]) torch.int32
topk_weights.shape torch.Size([128, 4]) torch.float32
hidden_states.shape torch.Size([128, 2048]) torch.bfloat16


In [None]:
for k in inputs.keys():
    print(inputs[k])

tensor([[40, 33, 37,  0],
        [17,  9, 12, 11],
        [ 5,  6, 34, 19],
        [ 0, 57,  5, 47],
        [20, 25, 55,  4],
        [ 8,  5,  9, 26],
        [19,  7, 59,  4],
        [13, 18, 24, 46],
        [33, 27,  5, 48],
        [ 3, 36, 10, 53],
        [30,  9, 14, 55],
        [57, 30, 16, 48],
        [55, 13,  5,  2],
        [13,  7, 23, 13],
        [35, 10, 54, 33],
        [23, 42,  6, 45],
        [18,  6, 18, 43],
        [48,  0, 41,  8],
        [13, 28,  5, 10],
        [21, 52, 20, 14],
        [14, 11, 22, 54],
        [29, 33, 34, 13],
        [17,  4,  6, 54],
        [53, 34, 21,  3],
        [38, 37, 47, 37],
        [ 1, 17, 20,  6],
        [16, 13, 33, 36],
        [31, 28, 34, 48],
        [ 6, 16, 12, 41],
        [15, 38, 30, 17],
        [ 3, 24, 30, 56],
        [34, 48, 27, 57],
        [10, 13,  0, 34],
        [ 7, 28, 14, 17],
        [43, 53, 35, 24],
        [ 2, 13, 19, 53],
        [ 9, 44, 44,  0],
        [ 4, 16,  5, 11],
        [40,

In [4]:
hidden_states_handler, hidden_states_meta= get_ipc_handle(inputs["hidden_states"])
topk_ids_handler, topk_ids_meta = get_ipc_handle(inputs["topk_ids"])
topk_weights_handler, topk_weights_meta= get_ipc_handle(inputs["topk_weights"])

In [5]:
import requests
import json
# 构建 multipart/form-data 请求
files = {
    'byte_data': ('data.bin', byte_data, 'application/octet-stream')
}

data = {
    'map_data': json.dumps(map_data)
}

# 发送 POST 请求
response = requests.post(
    'http://localhost:1177/upload',
    files=files,
    data=data
)

restored_tensor = torch.tensor(response.json()['restored_tensor'], device='cuda:0')


In [5]:
import json
import requests
url1 = "http://localhost:5000/forward"
# url2 = "http://localhost:5001/forward"

response1 = requests.post(url1, 
                          data={
        'hidden_states_meta': json.dumps(hidden_states_meta),
        'topk_weights_meta': json.dumps(topk_weights_meta),
        'topk_ids_meta': json.dumps(topk_ids_meta),
},
                          
files={
        'topk_ids_handler': ('topk_ids_handler.bin', topk_ids_handler, 'application/octet-stream'),
        'hidden_states_handler': ('hidden_states_handler.bin', hidden_states_handler, 'application/octet-stream'),
        'topk_weights_handler': ('topk_weights_handler.bin', topk_weights_handler, 'application/octet-stream'),
})

# response2 = requests.post(url2, json={
#         "hidden_states":rank1["hidden_states"].cpu().tolist(),
#         "topk_weights": rank1["topk_weights"].cpu().tolist(),
#         "topk_ids": rank1["topk_ids"].cpu().tolist()
#         })

# output1 = torch.tensor(response1.json()["hidden_output"],dtype=torch.bfloat16,device="cuda:0")


ConnectionError: ('Connection aborted.', RemoteDisconnected('Remote end closed connection without response'))

In [6]:
print( torch.equal(output1 , rank0["out_hidden_states"]) )
print( torch.equal(output2 , rank1["out_hidden_states"].cuda()) )

True
True


In [7]:
rank0["shared_output"] = torch.load("/home/ubuntu/vllm_test_field/saved_tensors/rank_0/shared_output.pt")
rank1["shared_output"] = torch.load("/home/ubuntu/vllm_test_field/saved_tensors/rank_1/shared_output.pt")
rank0_final_hidden_states = output1 + rank0["shared_output"]
rank1_final_hidden_states = output2 + rank1["shared_output"].cuda()
# all reduce 
reduced_result = rank0_final_hidden_states + rank1_final_hidden_states.to(rank0_final_hidden_states.device)
print( torch.equal(reduced_result, 
                   rank0["final_hidden_states"]) )

print( torch.equal(reduced_result, 
                   rank1["final_hidden_states"].cuda()) )

True
True


## Turn off the instances

In [8]:
def stop_fused_moe_containers():
    try:
        # 1. 获取所有名称包含 'fused_moe_layer' 的容器ID
        grep_cmd = "docker ps --filter 'name=fused_moe_layer' -q"
        container_ids = subprocess.check_output(grep_cmd, shell=True, text=True).strip().split('\n')
        
        # 2. 批量停止容器
        if container_ids and container_ids[0]:  # 如果有匹配的容器
            stop_cmd = f"docker stop {' '.join(container_ids)}"
            subprocess.run(stop_cmd, shell=True, check=True)
            print(f"已停止 {len(container_ids)} 个容器: {container_ids}")
        else:
            print("没有找到名称包含 'fused_moe_layer' 的容器")
            
    except subprocess.CalledProcessError as e:
        print(f"操作失败: {e}")

# 执行函数
stop_fused_moe_containers()

76350462c9e7
09770c2cbc63
已停止 2 个容器: ['76350462c9e7', '09770c2cbc63']
