# 测试8个GPU之间的HCCL通信带宽的脚本

- **平均测试时间**：是指某一算法先做m次预热，再做n次测试，并从n次测试开始计时，计算算法完成时平均每次迭代的耗时

- **算法带宽**：是指申请内存大小/平均时间的数据，包含数据传输、计算和内存复制的带宽

## I. 导入相关的包并定义初始化的变量

In [None]:
import os, re, time, json, stat
root_path = "/root/workdir/hccl_test"
# 定义算法的名字和对应的绘图的颜色
test_func = [
    "all_gather",
    "all_reduce",
    "alltoall",
    "alltoallv",
    "broadcast",
    "reduce",
    "reduce_scatter",
]
# HCCL通信的缓冲区大小，单位MB
hccl_buffer_size = 2048
# 其实内存容量（单位Byte）
test_mem_start = 2048*1024*1024
# 结束的内存容量（单位Byte）
test_mem_stop = 2048*1024*1024
# 增长倍数
test_mem_factor = 2
# 指定参与运算的NPU卡号
npu_list = ["0", "1", "2", "3",] #  "4", "5", "6", "7"

## II. 进行HCCL测试

### 2.1 清理工作环境

In [None]:
os.chdir(root_path)
os.system(f"rm -rf {root_path}/log")
os.makedirs(f"{root_path}/log/data")
os.makedirs(f"{root_path}/log/prof")
print(os.getcwd())

### 2.2 编译相关函数

#### A. 获取文件大小

In [None]:
def get_size(size):
    units = ['B', 'kB', 'MB', 'GB', 'TB']
    unit_index = 0
    while size > 1024:
        size = size / 1024.00
        unit_index += 1
    return f'{size}{units[unit_index]}'

#### B. 生成执行脚本

In [None]:
def get_script(func, mem_size):
    npus = len(npu_list)
    script_file = f"{root_path}/log/temp_run"
    cmd = f"""mpirun -n {npus} {root_path}/bin/{func}_test \
        -b {mem_size} \
        -e {mem_size} \
        -p {npus} > {root_path}/log/data/{func}_{len(npu_list)}npu.log"""
    with open(f"{script_file}.sh", 'w+') as file:
        file.writelines(cmd)
    os.chmod(f'{script_file}.sh', stat.S_IRWXU)
    return script_file

#### C. 获取运行时间

In [None]:
def get_time(start_time):
    day = 0
    hour = 0
    minute = 0
    second = 0
    second = int(time.time() - start_time)
    while second > 60:
        second = second - 60
        minute += 1
    while minute > 60:
        minute = minute - 60
        hour += 1
    while hour > 24:
        hour = hour - 24
        day += 1
    if day > 0:
        return f"{day} day {hour:02d}:{minute:02d}:{second:02d}"
    else:
        return f"{hour:02d}:{minute:02d}:{second:02d}"

#### D. 合并Json文件

In [None]:
def merge_json(src_path, dst_file):
    device_list = {}
    merge_timeline = []
    pid_list = {}
    for dir in os.listdir(src_path):
        cur_device = "null"
        for device in os.listdir(f"{src_path}/{dir}"):
            if re.match(r"device_\d+", device):
                cur_device = device
                break
        timeline = os.listdir(f"{src_path}/{dir}/timeline")[-1]
        if cur_device not in device_list:
            device_list[cur_device] = []
        device_list[cur_device].append(f"{src_path}/{dir}/timeline/{timeline}")
    for device in device_list:
        for device_file in device_list[device]:
            with open(device_file, 'r+') as file:
                timeline = json.load(file)
            for node in timeline:
                if "name" in node and "args" in node:
                    name = node["name"]
                    args = node["args"]
                    if name == "process_name" and "name" in args:
                        value = args["name"]
                        args["name"] = f"{device} {value}"
                        pid_list[node["pid"]] = f"{device} {value}"
                    elif name == "thread_name" and "name" in args:
                        value = args["name"]
                        args["name"] = f"{device} {value}"
                merge_timeline.append(node)
    pid_list = dict(sorted(pid_list.items(), key=lambda x:x[1]))
    for node in merge_timeline:
        if "name" in node and node["name"] == "process_sort_index" and "args" in node and "sort_index" in node["args"]:
            node["args"]["sort_index"] = pid_list[node["pid"]]
    with open(dst_file, 'w+') as file:
        json.dump(merge_timeline, file, indent=4)
    return device_list


#### E. 性能测试命令

In [None]:
def run_profiling(script_file, tmp_path):
    # https://www.hiascend.com/document/detail/zh/CANNCommunityEdition/80RC1alpha003/devaids/auxiliarydevtool/atlasprofiling_16_0012.html
    os.system(f"rm -rf {tmp_path}")
    cmd = f"""msprof --application="{script_file}.sh" \
        --storage-limit=1024MB \
        --host-sys=cpu,mem,network \
        --sys-profiling=on \
        --sys-cpu-profiling=on \
        --sys-hardware-mem=on \
        --sys-io-profiling=on \
        --ai-core=on \
        --aic-mode=task-based \
        --output="{tmp_path}" > {script_file}.log"""
    os.system(cmd)
    return f"{script_file}.log"

### 2.3 执行测试脚本

#### A. 重新生成可执行文件

In [None]:
os.chdir(root_path)
os.system(f"rm -rf {root_path}/log")
os.makedirs(f"{root_path}/log/data")
os.makedirs(f"{root_path}/log/prof")

os.system("make clean")
os.system("make")

print(os.getcwd())

#### B. 重置处理列表

In [None]:
mem_list = []
mem_size = test_mem_start
while mem_size <= test_mem_stop:
    for func in test_func:
        mem_list.append([func, mem_size])
    mem_size = mem_size * 2
mem_count = len(mem_list)
print(f"mem_list len: {mem_count}")

In [None]:
mem_list = [
    # ["all_gather", 2048*1024*1024],
    # ["all_reduce", 2048*1024*1024],
    # ["alltoall", 2048*1024*1024],
    # ["alltoallv", 2048*1024*1024],
    ["broadcast", 2048*1024*1024],
    # ["reduce", 2048*1024*1024],
    # ["reduce_scatter", 2048*1024*1024],
]
mem_count = len(mem_list)
print(f"mem_list len: {mem_count}")

#### C. 运行性能测试

In [None]:
os.environ['HCCL_BUFFSIZE'] = str(hccl_buffer_size)
os.environ['HCCL_TEST_USE_DEVS'] = ",".join(npu_list)

for i in range(len(mem_list)):
    # 初始化变量
    start_time = time.time()
    [func, mem_size] = mem_list[i]
    prt_precent = f"{((i + 1.0) / mem_count * 100):02.2f}%"
    print(f'{i + 1:03d} / {len(mem_list):03d} ({prt_precent}) >> {func} in {get_size(mem_size)}...', end='')
    # 设置路径
    log_path = f"{root_path}/log"
    prof_path = f"{log_path}/prof"
    tmp_path = f"{log_path}/tmp"
    out_path = f"{prof_path}/{func}_{get_size(mem_size)}.json"
    # 生成脚本，运行测试，合并结果
    script_file = get_script(func, mem_size)
    script_path = run_profiling(script_file, tmp_path)
    merge_json(tmp_path, out_path)
    print(f' {get_time(start_time)} done')
    # 清理环境
    break
    os.system(f"rm -rf {tmp_path}")
    os.system(f"rm {script_file}.sh")
    os.system(f"rm {script_file}.log")

#### D. 运行测试脚本

In [None]:
os.environ['HCCL_BUFFSIZE'] = str(hccl_buffer_size)
os.environ['HCCL_TEST_USE_DEVS'] = ",".join(npu_list)

for i in range(len(mem_list)):
    # 初始化变量
    start_time = time.time()
    [func, mem_size] = mem_list[i]
    prt_precent = f"{((i + 1.0) / mem_count * 100):02.2f}%"
    print(f'{i + 1:03d} / {len(mem_list):03d} ({prt_precent}) >> {func} in {get_size(mem_size)}...', end='')
    # 设置路径, 运行脚本
    log_path = f"{root_path}/log"
    os.system(f"bash {get_script(func, mem_size)}.sh")
    print(f' {get_time(start_time)} done')