## Lab4B: Running large-scale distributed ML simulation using AstraSim and STAGE

In this lab, you will use AstraSim and STAGE to simulate the LLM training. 

AstraSIM is a moduler simulator that can be used to simulate a distributed ML system. It takes input of compute graph in format of Chakra, as well system/network configurations. After simulation, it will conclude performance metrics including runtime, peak memory usage, etc.

As it takes input of workload in Chakra format, we also use another tool called STAGE to generate Chakra workloads according to the model/parallel strategies specifications.

For references, please check the readme/documents of STAGE/AstraSim, as follows:
- STAGE README.MD: https://github.com/astra-sim/symbolic_tensor_graph/blob/recomp/README.md
- AstraSim System Config: https://astra-sim.github.io/astra-sim-docs/getting-started/argument-system-config.html
- AstraSim Network Config: https://astra-sim.github.io/astra-sim-docs/getting-started/argument-network-config.html#analytical-network-config

### Lab Structure and Tasks:
- Part 0: Environment Setup [0 pts]
- Part 1: Implementing New Network Topology [3 pts]
    + Task 1.1: Implement physical connectivity of 2D Mesh [1 pt]
    + Task 1.2: Implement xy routing over the 2D Mesh [2 pts]
- Part 2: Using STAGE+AstraSIM for basic simulation [3 pts]
    + Task 2.1: Generate Chakra Workload with STAGE [1 pts]
    + Task 2.2: Generate AstraSim System/Network Configs [1 pts]
    + Task 2.3: Run AstraSim with generated workloads/configs, and Extract results [1 pts]
- Part 3: Find optimal parallel stratrgies for different system/models [4 pts]
    + Task 3.1: Generate Parallel Strategies Design Space [1 pts]
    + Task 3.2: Doing Design Space Exploration in Batch [1 pts]
    + Task 3.3: Find optimal parallel strategies for each setup, and why? [2 pts]

### Part 0: Environment Setup and Check [0Pts]

In this part, we will check if you have a correct environment setup. Please run the following code block to compile and run the sanity check simulator (it might take around 5-10 minutes)

In [2]:
!conda init
!conda activate ./.conda

import os, subprocess
ASTRASIM_DIR = os.path.abspath('./env/astra-sim')
STAGE_DIR = os.path.abspath('./env/symbolic_tensor_graph')
WORKLOAD_DIR = os.path.abspath('./workloads')
CONFIG_DIR = os.path.abspath("./astrasim_configs")


def run_command(cmd, do_print=False):
    process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
    outputs = list()
    for line in iter(process.stdout.readline, ''):
        if do_print:
            print(line, end='')
        outputs.append(line)

    process.stdout.close()
    process.wait()
    return outputs


def compile_astrasim():
    run_command(f'cd {ASTRASIM_DIR} && bash ./build/astra_analytical/build.sh -l && bash ./build/astra_analytical/build.sh', do_print=True)
    
    
def sanity_check(outputs):
    hit = 0
    for line in outputs:
        if "sys[0] finished, 3556872116 cycles, exposed communication 2637085435 cycles." in line:
            hit += 1
        if "[info] sys[0] peak memory usage: 19676528640" in line:
            hit += 1
    if hit == 2:
        print("Sanity Check PASSED, good to go")
        return True
    
    for line in outputs:
        print(line, end='')
    print("Sanity check FAILED, something goes wrong with your environment, look for help")
    assert False
    return False
    
    
sanity_check_stage_cmd = (
    f"python {STAGE_DIR}/main.py --output_dir {WORKLOAD_DIR} "
    f"--output_name sanity_check "
    f"--comm_group_file sanity_check.json "  # Ensure space before continuation
    f"--dp 2 "
    f"--tp 2 "
    f"--pp 2 "
    f"--sp 2 "
    f"--weight_sharded 0 "
    f"--activation_recompute 1 "
    f"--din 32000 "
    f"--dout 32000 "
    f"--dmodel 8192 "
    f"--dff 28672 "
    f"--head 64 "
    f"--seq 1024 "
    f"--batch 512 "
    f"--num_stacks 8 "
)

sanity_check_astrasim_cmd = (
    f"{ASTRASIM_DIR}/build/astra_analytical/build/bin/AstraSim_Analytical_Congestion_Unaware "
    f"--workload-configuration {WORKLOAD_DIR}/sanity_check "
    f"--comm-group-configuration {WORKLOAD_DIR}/sanity_check.json "
    f"--system-configuration {CONFIG_DIR}/sanity_check/system.json "
    f"--network-configuration {CONFIG_DIR}/sanity_check/network.yml "
    f"--remote-memory-configuration {CONFIG_DIR}/sanity_check/remote_memory.json"    
)

system_config_template = {
    "scheduling-policy": "LIFO",
    "endpoint-delay": 10,
    "active-chunks-per-dimension": 4,
    "preferred-dataset-splits": 10,
    "boost-mode": 0,
    "collective-optimization": "localBWAware",
    "track-local-mem": 1,
    "trace-enabled": 1,
    "roofline-enabled": 1,
    "all-reduce-implementation": None,      # to be overrided by user
    "all-gather-implementation": None,      # to be overrided by user
    "reduce-scatter-implementation": None,   # to be overrided by user
    "all-to-all-implementation": None,      # to be overrided by user
    "local-mem-bw": None,                   # to be overrided by user
    "peak-perf": None                       # to be overrided by user
}

network_config_template = {
    "topology": None,       # to be overrided by user
    "npus_count": None,     # to be overrided by user
    "bandwidth": None,      # to be overrided by user
    "latency": None         # to be overrided by user
}

compile_astrasim()
run_command(sanity_check_stage_cmd)
outputs = run_command(sanity_check_astrasim_cmd)
_ = sanity_check(outputs)


no change     /usr/local/pace-apps/manual/packages/anaconda3/2023.03/condabin/conda
no change     /usr/local/pace-apps/manual/packages/anaconda3/2023.03/bin/conda
no change     /usr/local/pace-apps/manual/packages/anaconda3/2023.03/bin/conda-env
no change     /usr/local/pace-apps/manual/packages/anaconda3/2023.03/bin/activate
no change     /usr/local/pace-apps/manual/packages/anaconda3/2023.03/bin/deactivate
no change     /usr/local/pace-apps/manual/packages/anaconda3/2023.03/etc/profile.d/conda.sh
no change     /usr/local/pace-apps/manual/packages/anaconda3/2023.03/etc/fish/conf.d/conda.fish
no change     /usr/local/pace-apps/manual/packages/anaconda3/2023.03/shell/condabin/Conda.psm1
no change     /usr/local/pace-apps/manual/packages/anaconda3/2023.03/shell/condabin/conda-hook.ps1
no change     /usr/local/pace-apps/manual/packages/anaconda3/2023.03/lib/python3.10/site-packages/xontrib/conda.xsh
no change     /usr/local/pace-apps/manual/packages/anaconda3/2023.03/etc/profile.d/conda.c

If you see “PASSED”, the environment is good to go. Otherwise, please check the above informations to see what is going wrong, or seek helps from TA.
### Part 1. Implementing New Network Topology

In this part, you'll be asked to implement 2D Mesh topology for ASTRA-sim's analytical network backend. When you're doing a network topology design space exploration, since building new physical topology is costly and complicated, it is very common to run simulation-based analysis.

To implement 2D Mesh topology on the analytical network simulator, you'll be required to:
- Understand the physical connectivity of 2D Mesh
- Comprehend the xy routing of 2D Mesh.

#### Connectivity of 2D Mesh
<img src="./2d_mesh_connectivity.png" alt="2D Mesh connectivity" width="500">

2D Mesh network connects NPUs width-wise (x-axis) and height-wise (y-axis) using bidrectional links. Refer to the figure above. Note that the very corner NPUs (0, 3, 12, 15) ends up having only 2 outgoing/incoming links, whereas other NPUs have either 3 (edge) or 4 (center) neighboring NPUs.

#### xy routing of 2D Mesh
<img src="./xy_routing.png" alt="2D Mesh connectivity" width="600">

Whenever a message should be transferred, xy routing (1) always first moves the messages over x-axis, then (2) subsequently move the message y-axis-wise, to reach the destination. See the example figure above. If sending a message from NPU 1 to NPU 7, the route would be 1 → 2 → 3 → 7. Similarly, message from NPU 14 to NPU 5 will take the route of 14 → 13 → 9 → 5.

#### Task 1.1: Implement physical connectivity of 2D Mesh [1 pt]
Now that you understand the basics of 2D Mesh, you'll implement this new network topology into the ASTRA-sim's analytical network simulator. As a first step, you'll implement the physical connectivity of the topology. Go to [./part1/src/topology/Mesh2D.cpp](./part1/src/topology/Mesh2D.cpp) and implement the constructor (`Mesh2D::Mesh2D`).

#### Task 1.2: Implement xy routing over the 2D Mesh [2 pts]
After implementing the physical topology, implement the xy routing scheme inside `Mesh2D::route` method.

#### Submission: 
Run the following codeblock to mark you finished the Task 1.1 and 1.2, and generate outputs for submission


In [2]:
## TODO: finish ./part1/src/topology/Mesh2D.cpp

# Fill with True if you believe you finished the cpp file.
part1_finished = True

# submission
assert part1_finished
results = run_command("cd ./part1 && bash run.sh", do_print=True)
with open("./submission/part1.stdout", "w") as f:
    for line in results:
        f.write(line)
print("generated and submitted")

-- The C compiler identification is GNU 12.3.0
-- The CXX compiler identification is GNU 12.3.0
-- Detecting C compiler ABI info
-- Detecting C compiler ABI info - done
-- Check for working C compiler: /usr/local/pace-apps/spack/packages/linux-rhel9-x86_64_v3/gcc-11.3.1/gcc-12.3.0-ukkkutsxfl5kpnnaxflpkq2jtliwthfz/bin/gcc - skipped
-- Detecting C compile features
-- Detecting C compile features - done
-- Detecting CXX compiler ABI info
-- Detecting CXX compiler ABI info - done
-- Check for working CXX compiler: /usr/local/pace-apps/spack/packages/linux-rhel9-x86_64_v3/gcc-11.3.1/gcc-12.3.0-ukkkutsxfl5kpnnaxflpkq2jtliwthfz/bin/g++ - skipped
-- Detecting CXX compile features
-- Detecting CXX compile features - done
-- Configuring done (0.9s)
-- Generating done (0.5s)
-- Build files have been written to: /home/hice1/jho88/ece8803_hml_lab4B/part1/build
[  7%] [32mBuilding CXX object yaml-cpp/CMakeFiles/yaml-cpp.dir/src/convert.cpp.o[0m
[  9%] [32mBuilding CXX object yaml-cpp/CMakeFiles/y

### Part 2. Using STAGE+AstraSIM for basic simulation

In this part, we will guide you to understand:
- How to use STAGE to generate transformer workloads in Chakra format
- How to write system/network configurations for AstraSim, and run the simulation
- Futhermore, we will wrap the tool calling into python functions for further batched simulations.

For references, please check the readme/documents of STAGE/AstraSim, as follows:
- STAGE README.MD: https://github.com/astra-sim/symbolic_tensor_graph/blob/recomp/README.md
- AstraSim System Config: https://astra-sim.github.io/astra-sim-docs/getting-started/argument-system-config.html
- AstraSim Network Config: https://astra-sim.github.io/astra-sim-docs/getting-started/argument-network-config.html#analytical-network-config


#### Part 2.1 Generate Chakra Workload with STAGE
In this part we will write a python wrapper to assembling the commands to call STAGE, and run it to generate workloads.

In the following table we show the inputs arguments of STAGE.

| Parameter                 | Description                         | Example Values           |
|---------------------------|-------------------------------------|--------------------------|
| `--output_dir`             | Folder to place output files        | `./output`               |
| `--output_name`            | Output file naming pattern          | `workload.%d.et`         |
| `--comm_group_file`        | Communication group config file     | `comm_group.json`        |
| `--chakra_schema_version`  | Chakra ET schema version            | `v0.0.4`, `v0.0.1`, `json`|
| `--dp`                     | Data parallelism degree             | `32`                     |
| `--tp`                     | Tensor parallelism degree           | `4`                      |
| `--pp`                     | Pipeline parallelism degree         | `2`                      |
| `--sp`                     | Sequence/Token parallelism degree   | `4`                      |
| `--weight_sharded`         | Shard weights (FSDP enabled)        | `True/False`             |
| `--activation_recompute`   | If using activtaion recompute       | `True/False`             |
| `--din`                    | Input embedding size                | `51200`                  |
| `--dout`                   | Output embedding size               | `25600`                  |
| `--dmodel`                 | Model feature size                  | `25600`                  |
| `--dff`                    | FFN feature size                    | `25600*4`                |
| `--seq`                    | Sequence length                     | `1024`                   |
| `--head`                   | Number of attention heads           | `128`                    |
| `--num_stacks`             | Number of encoder stacks            | `32`                     |

And we use following dataclass ```STAGEInput``` to hold these input arguments in python.

In [3]:
from dataclasses import dataclass

@dataclass(frozen=True, order=True)
class STAGEInput:
    workload_name: str
    batch: int
    dp: int
    tp: int
    pp: int
    sp: int
    weight_sharded: bool
    activation_recompute: bool
    dvocal: int
    dmodel: int
    dff: int
    seq: int
    head: int
    num_layers: int

Then we write the python wrapper to call the STAGE using following functions, with input specified as a ```STAGEInput``` object

In [4]:
def call_stage(stage_input: STAGEInput):
    ## Task 1.1
    ##    Assemble the command to call STAGE according to spec from stage_input, and call it.
    ##    Please replace "??" with the correct inputs to make it run
    ##    Please refer to previous table to know how to correctly fill these fields.
    workload_dir = WORKLOAD_DIR
    
    cmd = (
        f"python {STAGE_DIR}/main.py" + " "
        f"--output_dir {workload_dir}" + " "
        f"--output_name {stage_input.workload_name}" + " "
        f"--comm_group_file {stage_input.workload_name}.json" + " "
        f"--num_stacks {stage_input.num_layers}" + " "
        f"--din {stage_input.dvocal}" + " "
        f"--dout {stage_input.dvocal}" + " "
        f"--dp {stage_input.dp}" + " "   
        f"--weight_sharded {1 if stage_input.weight_sharded else 0}" + " "
        
        # TODO: 
        f"--tp {stage_input.tp}" + " "            # tp
        f"--pp {stage_input.pp}" + " "            # pp
        f"--sp {stage_input.sp}" + " "            # sp
        f"--activation_recompute {stage_input.activation_recompute}" + " "            # activtaion_recompute
        f"--dmodel {stage_input.dmodel}" + " "            # dmodel
        f"--dff {stage_input.dff}" + " "            # dff
        f"--head {stage_input.head}" + " "            # head
        f"--seq {stage_input.seq}" + " "            # seq
        f"--batch {stage_input.batch}" + " "            # batch
    )
    
    ## Dont modify below this line
    run_command(cmd)
    generated_workload_path = os.path.join(workload_dir, stage_input.workload_name)
    return generated_workload_path

Then we can call the STAGE to generate workloads by instanting ```STAGEInput``` and call this function, as follows:

In [5]:
llama_70b_1_spec = STAGEInput("llama_70b_1", batch=512,     # llama70b, dp=4, tp=4, AR
                         dp=4, tp=4, pp=1, sp=1,
                         weight_sharded=False, activation_recompute=True,
                         dvocal=32000, dmodel=8192, dff=28672, seq=1024, head=64, num_layers=16)
llama_70b_2_spec = STAGEInput("llama_70b_2", batch=64,      # llama70b, dp=2, tp=2, pp=4, sp=2, AR+FSDP
                         dp=2, tp=2, pp=4, sp=2,
                         weight_sharded=True, activation_recompute=True,
                         dvocal=32000, dmodel=8192, dff=28672, seq=1024, head=64, num_layers=16)

llama_70b_1_chakra = call_stage(llama_70b_1_spec)
llama_70b_2_chakra = call_stage(llama_70b_2_spec)

# save generated chakra files for submission
os.makedirs("./submission/llama_70b_1", exist_ok=True)
os.makedirs("./submission/llama_70b_2", exist_ok=True)
os.system(f"cp {llama_70b_1_chakra}* ./submission/llama_70b_1")
os.system(f"cp {llama_70b_2_chakra}* ./submission/llama_70b_2")
print("Generated and saved for submission")

Generated and saved for submission


#### Part 2.2 Generate AstraSim System/Network Configs
Unlike STAGE, which inputs are passed directly as input arguments, AstraSIM categorizes its inputs into system and network, and receive them in json/yaml config files. 

In ```./astrasim_configs/examples```, we provide example input config file for [system](./astrasim_configs/examples/system.json) and [network](./astrasim_configs/examples/network.yml) (please ignore remote_memory because we are not touching it in this lab, but good to know about it). Please refer to the previous link about system/network configs.

Similar to STAGE, we will also write python wrapper for astrasim, to generate configs and run simulations. The system/network configuration are specified through following dataclass.



In [6]:
from typing import List

@dataclass(frozen=True)
class AstraSimConfig:
    class _TOPOLOGY_TYPE:
        RING = "Ring"
        FULLY_CONNECTED = "FullyConnected"
        SWITCH = "Switch"
    class _COLLECTIVE_IMPL_TYPE:
        DIRECT = "direct"
        HALVING_DOUBLING = "halvingDoubling"
        RING = "ring"
        
    topology: List[str]
    npus_count: List[int]
    bandwidth: List[float]
    latency: List[float]
    all_reduce_implementation: List[str]
    all_gather_implementation: List[str]
    reduce_scatter_implementation: List[str]
    all_to_all_implementation: List[str]
    local_mem_bw: float
    peak_perf: float

Then we write the python function to generate astrasim config as follows:

In [7]:
import copy
def generate_astrasim_config(astrasim_config: AstraSimConfig, name: str):
    ## Task 2.2: filling in system/network config dicts according to "astrasim_config"
    ##   Please complete each "TODO"s in this cell.
    ##   Please refer to the example config in "astrasim_configs/examples"
    ##   Fill values to each specific attributes in the dict, 
    ##   and it will be serialized into files with json/yaml
    
    config_dir = os.path.join(CONFIG_DIR, name)
    os.makedirs(config_dir, exist_ok=True)
    system_config = copy.deepcopy(system_config_template)   # get default values from templates
    network_config = copy.deepcopy(network_config_template)
    
    
    # system configs
    system_config["all-reduce-implementation"] = list()
    for dim in astrasim_config.all_reduce_implementation:
        system_config["all-reduce-implementation"].append(dim)
    ## TODO: system_config["all-gather-implementation"]
    system_config["all-gather-implementation"] = list()
    for dim in astrasim_config.all_gather_implementation:
        system_config["all-gather-implementation"].append(dim)

    ## TODO: system_config["reduce-scatter-implementation"]
    system_config["reduce-scatter-implementation"] = list()
    for dim in astrasim_config.reduce_scatter_implementation:
        system_config["reduce-scatter-implementation"].append(dim)

    ## TODO: system_config["all-to-all-implementation"]
    system_config["all-to-all-implementation"] = list()
    for dim in astrasim_config.all_to_all_implementation:
        system_config["all-to-all-implementation"].append(dim)
    
    system_config["local-mem-bw"] = astrasim_config.local_mem_bw
    
    ## TODO: system_config["peak-perf"]
    system_config["peak-perf"] = astrasim_config.peak_perf
    
    # network configs
    network_config["topology"] = list()
    for dim in astrasim_config.topology:
        network_config["topology"].append(dim)

    ## TODO: network_config["npus_count"]
    network_config["npus_count"] = list()
    for dim in astrasim_config.npus_count:
        network_config["npus_count"].append(int(dim))

    ## TODO: network_config["bandwidth"]
    network_config["bandwidth"] = list()
    for dim in astrasim_config.bandwidth:
        network_config["bandwidth"].append(dim) 

    ## TODO: network_config["latency"]
    network_config["latency"] = list()
    for dim in astrasim_config.latency:
        network_config["latency"].append(dim)

    # print(f"network_config = {network_config}")
    # print(f"astrasim_config = {astrasim_config}")
    
    ## Dont modify below this line
    # serialize configs
    import json, yaml
    with open(os.path.join(config_dir, "system.json"), "w") as f:
        json.dump(system_config, f)
    with open(os.path.join(config_dir, "network.yml"), "w") as f:
        yaml.dump(network_config, f)
    with open(os.path.join(config_dir, "remote_memory.json"), "w") as f:
        f.write("{\"memory-type\": \"NO_MEMORY_EXPANSION\"}\n")
    
    return config_dir

Then similarly, we generate AstraSim config by instanting the ```AstraSimConfig``` and call this function, as follows

In [8]:
_topology_types = AstraSimConfig._TOPOLOGY_TYPE
_collective_types = AstraSimConfig._COLLECTIVE_IMPL_TYPE

dgx_system_collective_implementation = [
        _collective_types.DIRECT, 
        _collective_types.HALVING_DOUBLING,
        _collective_types.RING,
        _collective_types.RING
]

dgx_2k_system = AstraSimConfig(
    topology=[_topology_types.FULLY_CONNECTED, 
              _topology_types.SWITCH, 
              _topology_types.RING, 
              _topology_types.RING],
    npus_count=[8, 16, 4, 4],
    bandwidth=[128.5714, 100, 25, 6.25],
    latency=[20, 300, 700, 1000],
    all_reduce_implementation=dgx_system_collective_implementation,
    all_gather_implementation=dgx_system_collective_implementation,
    reduce_scatter_implementation=dgx_system_collective_implementation,
    all_to_all_implementation=dgx_system_collective_implementation,
    
    local_mem_bw=3000,
    peak_perf=900
)

tpu_system_collective_implementation = [
        _collective_types.RING, 
        _collective_types.RING,
        _collective_types.RING,
        _collective_types.HALVING_DOUBLING
]

tpu_512_system = AstraSimConfig(
    topology=[_topology_types.RING, 
              _topology_types.RING, 
              _topology_types.RING, 
              _topology_types.SWITCH],
    npus_count=[4, 4, 4, 8],
    bandwidth=[100, 100, 100, 50],
    latency=[50, 50, 50, 2000],
    all_reduce_implementation=tpu_system_collective_implementation,
    all_gather_implementation=tpu_system_collective_implementation,
    reduce_scatter_implementation=tpu_system_collective_implementation,
    all_to_all_implementation=tpu_system_collective_implementation,
    
    local_mem_bw=2765,
    peak_perf=459
)

dgx_2k_sys_config_path = generate_astrasim_config(dgx_2k_system, "dgx_2k")
tpu_512_sys_config_path = generate_astrasim_config(tpu_512_system, "tpu_512")

# save generated chakra files for submission
os.system(f"cp -r {dgx_2k_sys_config_path} ./submission")
os.system(f"cp -r {tpu_512_sys_config_path} ./submission")
print("Generated and saved for submission")

Generated and saved for submission


#### Part 2.3 Run AstraSim with generated workloads/configs, and Extract results

From Part 2.1 and 2.2, we finished codes to call STAGE to generate workloads, as well as generating system/network configs, and astrasim simulation can be runned with following commands:

```sh
${ASTRA_SIM_BIN} \
  --workload-configuration=${WORKLOAD_CONFIG} \
  --comm-group-configuration=${COMM_GROUP_CONFIG} \
  --system-configuration=${SYSTEM_CONFIG} \
  --network-configuration=${NETWORK_CONFIG} \
  --remote-memory-configuration=${REMOTE_MEMORY_CONFIG}
```

And after running the simulation, we need to extract wantted results from the outputs of simulation. In [example_output](./example_output.stdout) we show the outputs of an astrasim simulation. Please pay attention to **line 209-219**, which shows metrics for sys[0].
```
[2025-03-26 16:10:12.487] [workload] [info] sys[0] finished, 3556872116 cycles, exposed communication 2637085435 cycles.
[2025-03-26 16:10:12.487] [statistics] [info] sys[0]. Post statistics processing start.
[2025-03-26 16:10:12.487] [statistics] [info] sys[0]. Post statistics processing end.
[2025-03-26 16:10:12.487] [statistics] [info] sys[0], Wall time: 3556872116
[2025-03-26 16:10:12.487] [statistics] [info] sys[0], GPU time: 919786681
[2025-03-26 16:10:12.487] [statistics] [info] sys[0], Comm time: 3521411134
[2025-03-26 16:10:12.487] [statistics] [info] sys[0], Compute bound percentage: 83.794%
[2025-03-26 16:10:12.487] [statistics] [info] sys[0], Average compute utilization: 85.661%
[2025-03-26 16:10:12.487] [statistics] [info] sys[0], Average memory utilization: 28.401%
[2025-03-26 16:10:12.487] [statistics] [info] sys[0], Average operation intensity: 3523.082
[2025-03-26 16:10:12.489] [workload] [info] sys[0] peak memory usage: 19676528640
```

In this part, you will complete the following codeblock, which takes outputs from astrasim, and extract metrics for sys[0].The astrasim output will be passed as a list of strings, and each item is one whole line containing ending \"\\n\".

In [9]:
import json
def extract_metric(astrasim_outputs: List[str]):
    ## Task 2.3: extract following metrics of sys[0] from astrasim outputs
    wall_time: int = 0
    gpu_time: int = 0
    comm_time: int = 0
    per_comp_bound: float = 0
    comp_util: float = 0
    mem_util: float = 0
    op_intensity: float = 0
    peak_memory_usage: int = 0
    
    ## TODO: 

    # print(f"astrasim_outputs = {astrasim_outputs}")

    for output in astrasim_outputs:
        # print(output)
        list_output = output.split(" ")

        if (list_output[2] == '[statistics]' or list_output[2] == '[workload]') and 'sys[0]' in list_output[4]:
            further_split =  output.split(": ")
            if len(further_split) == 2:
                # further_split = further_split.split(": ")

                s = further_split[1].replace("\n", "").replace("%", "")
                # print(further_split[0], s)

                if 'Wall time' in further_split[0]:
                    wall_time = int(s)
                elif 'GPU time' in further_split[0]:
                    gpu_time = int(s)
                elif 'Comm time' in further_split[0]:
                    comm_time = int(s)
                elif 'Compute bound percentage' in further_split[0]:
                    per_comp_bound = float(s)
                elif 'Average compute utilization' in further_split[0]:
                    comp_util = float(s)
                elif 'Average memory utilization' in further_split[0]:
                    mem_util = float(s)
                elif 'Average operation intensity' in further_split[0]:
                    op_intensity = float(s)
                elif 'peak memory usage' in further_split[0]:
                    peak_memory_usage = int(s)

    ## Dont modify below this line
    ret = {
        "wall_time": wall_time,
        "gpu_time": gpu_time,
        "comm_time": comm_time,
        "per_comp_bound": per_comp_bound,
        "comp_util": comp_util,
        "mem_util": mem_util,
        "op_intensity": op_intensity,
        "peak_memory_usage": peak_memory_usage
    }
    return ret

# test and save submission
with open("./example_output.stdout", "r") as f:
    example_outputs = f.readlines()
example_metrics = extract_metric(example_outputs)
with open("./submission/example_metrics.json", "w") as f:
    json.dump(example_metrics, f)
print("generated and saved for submission")

generated and saved for submission


With the extraction function, we can complete the STAGE+ASTRASim simulation pipeline as follows:

In [10]:
def sim_pipeline(stage_spec, astrasim_spec, astrasim_config_name):
    workload_path = call_stage(stage_spec)
    astrasim_config_path = os.path.join(CONFIG_DIR, astrasim_config_name)
    
    # if system/network config dont exist
    if os.path.exists(astrasim_config_path):    
        pass
    else:
        # generate new system/network configs
        astrasim_config_path = generate_astrasim_config(astrasim_spec, astrasim_config_name)
    
    astrasim_cmd = (
        f"{ASTRASIM_DIR}/build/astra_analytical/build/bin/AstraSim_Analytical_Congestion_Unaware "
        f"--workload-configuration {workload_path} "
        f"--comm-group-configuration {workload_path}.json "
        f"--system-configuration {astrasim_config_path}/system.json "
        f"--network-configuration {astrasim_config_path}/network.yml "
        f"--remote-memory-configuration {astrasim_config_path}/remote_memory.json"  
    )
    
    outputs = run_command(astrasim_cmd)

    # save output for debug
    stdout_path = os.path.join("submission", "stdout", f"{os.path.split(workload_path)[-1]}_{os.path.split(astrasim_config_path)[-1]}.stdout")
    os.makedirs(os.path.join("submission", "stdout"), exist_ok=True)
    f = open(stdout_path, "w")
    for line in outputs:
        f.write(line)
    f.close()

    metrics = extract_metric(outputs)
    return metrics


# test pipeline
_collective_types, _topology_types = AstraSimConfig._COLLECTIVE_IMPL_TYPE, AstraSimConfig._TOPOLOGY_TYPE
test_stage_spec = STAGEInput("llama_70b_1", batch=512,     # llama70b, dp=4, tp=4, AR
                         dp=4, tp=4, pp=1, sp=1,
                         weight_sharded=False, activation_recompute=True,
                         dvocal=32000, dmodel=8192, dff=28672, seq=1024, head=64, num_layers=16)
test_coll_impl = [_collective_types.DIRECT, _collective_types.HALVING_DOUBLING]
test_astrasim_spec = AstraSimConfig(
    topology=[_topology_types.FULLY_CONNECTED, _topology_types.SWITCH],
    npus_count=[8, 2], bandwidth=[128.5714, 25], latency=[20, 100],
    all_reduce_implementation=test_coll_impl, all_gather_implementation=test_coll_impl, 
    all_to_all_implementation=test_coll_impl, reduce_scatter_implementation=test_coll_impl,
    local_mem_bw=3000, peak_perf=900
)

print(test_astrasim_spec)
test_metrics = sim_pipeline(test_stage_spec, test_astrasim_spec, "dgx_16")
# save submission
import json
with open("./submission/1.3.json", "w") as f:
    json.dump(test_metrics, f)
print("generated and saved for submission")

AstraSimConfig(topology=['FullyConnected', 'Switch'], npus_count=[8, 2], bandwidth=[128.5714, 25], latency=[20, 100], all_reduce_implementation=['direct', 'halvingDoubling'], all_gather_implementation=['direct', 'halvingDoubling'], reduce_scatter_implementation=['direct', 'halvingDoubling'], all_to_all_implementation=['direct', 'halvingDoubling'], local_mem_bw=3000, peak_perf=900)
generated and saved for submission


### Part 3: Find optimal parallel stratrgies for different system/models

In this part, we will utilize the stage+astrasim pipeline we build in the part2, and use them to explore different parallel stratrgies for optimal performance for specific systems/models. In the following codeblock we defined the target system and model:
- System 1: 64 NPU 3Dtorus, 100GB/s per link， 450Tops, 3TB/s HBM BW (Fast NPU, slow interconnect)
- System 2: 64 NPU 3Dtorus, 2000GB/s per link, 45Tops, 300GB/s HBM BW (Slow NPU, fast interconnect)
- Workload 1: Llama70B, batch=512, seq=1024
- Workload 2: GPT175B, batch=512, seq=1024

In [11]:
_collective_types = AstraSimConfig._COLLECTIVE_IMPL_TYPE
_topology_types = AstraSimConfig._TOPOLOGY_TYPE

system_collective_implementation = [
        _collective_types.RING, 
        _collective_types.RING,
        _collective_types.RING
]

## System1: Fast NPU, Slow Interconnect
system1_spec = AstraSimConfig(
    topology=[_topology_types.RING, 
              _topology_types.RING, 
              _topology_types.RING],
    npus_count=[4, 4, 4],
    bandwidth=[100, 100, 100],
    latency=[50, 50, 50],
    all_reduce_implementation=system_collective_implementation,
    all_gather_implementation=system_collective_implementation,
    reduce_scatter_implementation=system_collective_implementation,
    all_to_all_implementation=system_collective_implementation,
    
    local_mem_bw=3000,
    peak_perf=450
)

## System2: Slow NPU, Fast Interconnect
system2_spec = AstraSimConfig(
    topology=[_topology_types.RING, 
              _topology_types.RING, 
              _topology_types.RING],
    npus_count=[4, 4, 4],
    bandwidth=[2000, 2000, 2000],
    latency=[50, 50, 50],
    all_reduce_implementation=system_collective_implementation,
    all_gather_implementation=system_collective_implementation,
    reduce_scatter_implementation=system_collective_implementation,
    all_to_all_implementation=system_collective_implementation,
    
    local_mem_bw=300,
    peak_perf=45
)

system1_config = generate_astrasim_config(system1_spec, "system_1")
system2_config = generate_astrasim_config(system2_spec, "system_2")

## Workload1: llama70b, 8192batch, 2048seq
llama_70b_no_parallel_spec = STAGEInput(
    "llama70b",
    batch=512, seq=1024, 
    dp=1, tp=1, pp=1, sp=1, weight_sharded=False, activation_recompute=False, 
    dvocal=32000, dmodel=8192, dff=28672, head=64, num_layers=80
)

## Workload2: gpt175b, 8192batch, 2048seq
gpt_175b_no_parallel_spec = STAGEInput(
    "gpt175b",
    batch=512, seq=1024, 
    dp=1, tp=1, pp=1, sp=1, weight_sharded=False, activation_recompute=False, 
    dvocal=50257, dmodel=12288, dff=49152, head=96, num_layers=96
)

#### Part3.1: Generate Parallel Strategies Design Space
In the previous workload spec, we only assign the model parameters without any parallel strategies. In order to find the optimal parallel strategies for each system, we need to expand the design space of all different parallel strategies. In the following table we list all parallel stratrgies space:

| Parameter                 | Description                         | Range                    |
|---------------------------|-------------------------------------|--------------------------|
| `--dp`                     | Data parallelism degree             | `{1, 2, 4, ..., 64}`  |
| `--tp`                     | Tensor parallelism degree           | `{1, 2, 4, ..., 64}`  |
| `--pp`                     | Pipeline parallelism degree         | `{1}`*       |
| `--sp`                     | Sequence/Token parallelism degree   | `{1, 2, 4, ..., 64}`  |
| `--weight_sharded`         | Shard weights (FSDP enabled)        | `{False}`*              |
| `--activation_recompute`   | If using activtaion recompute       | `{True, False}`         |

**Constraint1**: `dp*tp*pp*sp == 64`, that we have totally 64 npus in system.

*Note: We don't search FSDP and pp here for simplification, and it is always **False** and **1** for this lab.      

In the following codeblock, please complete the function to generate the whole parallel strategies design space for both workload 1 and 2. 

In [12]:
import copy
from itertools import product
from dataclasses import replace

def _apply_parallel_strategies(workload_spec_no_parallel, dp, tp, pp, sp, activation_recompute):
    if dp*tp*pp*sp != 64:
        assert False
    new_workload_name = f"{workload_spec_no_parallel.workload_name}_{dp}_{tp}_{pp}_{sp}_{False}_{activation_recompute}"

    workload_spec = replace(
        workload_spec_no_parallel,
        dp=dp,
        tp=tp,
        pp=pp,
        sp=sp,
        activation_recompute=activation_recompute,
        workload_name=new_workload_name,
        weight_sharded=False
    )
    return workload_spec
    

def expand_design_space(workload_spec_no_parallel: STAGEInput):
    ## Task 3.1: generate the whole parallel stratrgies design space, and return as a list
    ##   Please use _apply_parallel_stratrgies to apply parallel stratrgies in order to make it run correct.
    ##   hint: use itertools.product
    
    design_space = list()
    
    ## TODO: 
    # for xxxxxxx
    #    design_space.append(_apply_parallel_strategies(workload_spec_no_parallel, dp, tp, pp, sp, activation_recompute))
    possible_values = [1, 2, 4, 8, 16, 32, 64]
    pp = 1
    # Iterate over all combinations for dp, tp, sp, and activation recompute.
    for dp, tp, sp, activation_recompute in product(possible_values, possible_values, possible_values, [True, False]):
        if dp * tp * pp * sp == 64:
            design_space.append(
                _apply_parallel_strategies(
                    workload_spec_no_parallel, dp, tp, pp, sp, activation_recompute
                )
            )
    ## Dont modify below this line
    return design_space

llama_70b_design_space = expand_design_space(llama_70b_no_parallel_spec)
gpt_175b_design_space = expand_design_space(gpt_175b_no_parallel_spec)

print(f"Generate design space of size {len(llama_70b_design_space)}")


Generate design space of size 56




#### 3.2 Doing Design Space Exploration in Batch
From previous part, by expanding design space, for each workload the design space should have a size of 56 (if not, something are wrong). 

Even it is not a really large number compared to what we get in lab2, considering the runtime for running each experiment (10-20 minutes for each run), it is recommended to random sample the design space to reduce the overall runtime, for example, 20 design points. 

In the following codeblock we provide an example of sampling. It is completely okay to stick with given sampling, and it will not affect the grading. However, if you prefer please change it to include design points which you believe are interested and should be simulated. 

In [13]:
# Change as you wish, or keep it.

import random

# sort and seed to make result is reproducible
llama_70b_design_space = sorted(llama_70b_design_space)
gpt_175b_design_space = sorted(gpt_175b_design_space)
random.seed(0)

llama_70b_design_space = random.sample(llama_70b_design_space, 20)
gpt_175b_design_space = random.sample(gpt_175b_design_space, 20)

Now, please use ```sim_pipeline()``` implemented in previous part, sample the design space, and run the following simulation:
- llama70B @ system1
- gpt175B @ system2

For each combination, run all simulations, and store the results in as a dict, which keys are `STAGEInput`, and values are metrics returned by `sim_pipeline()`.

(The simulation will run <1 hour)


In [14]:
from concurrent.futures import ProcessPoolExecutor, as_completed
from dataclasses import asdict
import os
    
pool = ProcessPoolExecutor(int(os.getenv('SLURM_CPUS_ON_NODE', os.cpu_count())))
    
llama_system1_results = dict()
gpt_system2_results = dict()

## TODO: Task 3.2. Implement batched simulation for the 4 combinations
##   And store the results of each combination as a dict in above variables, with key of STAGEInput and value of resulted metrics
##   hint: you might need ProcessPoolExecutor to speed up the simulation through multi processing.
futures_llama = {
    pool.submit(sim_pipeline, workload, system1_spec, "system_1"): workload 
    for workload in llama_70b_design_space
}

futures_gpt = {
    pool.submit(sim_pipeline, workload, system2_spec, "system_2"): workload 
    for workload in gpt_175b_design_space
}

# Retrieve results for llama70B
for future in as_completed(futures_llama):
    workload = futures_llama[future]
    try:
        result = future.result()
        llama_system1_results[workload] = result
        print(f"llama70B configuration {workload.workload_name} completed.")
    except Exception as e:
        print(f"llama70B configuration {workload.workload_name} raised exception: {e}")

# Retrieve results for gpt175B
for future in as_completed(futures_gpt):
    workload = futures_gpt[future]
    try:
        result = future.result()
        gpt_system2_results[workload] = result
        print(f"gpt175B configuration {workload.workload_name} completed.")
    except Exception as e:
        print(f"gpt175B configuration {workload.workload_name} raised exception: {e}")

## Dont modify below this line
pool.shutdown(wait=True)

# save submission file
def serialize_results(results, filename):
    serialized = dict()
    for workload in results.keys():
        dp, tp, sp, pp, ar = workload.dp, workload.tp, workload.sp, workload.pp, workload.activation_recompute
        ar = 1 if ar else 0
        str_key = f"{dp}_{tp}_{sp}_{pp}_{ar}"
        metrics = results[workload]
        metrics['dp'] = dp
        metrics['tp'] = tp
        metrics['sp'] = sp
        metrics['pp'] = pp
        metrics['ar'] = ar
        serialized[str_key] = metrics
    with open(filename, "w") as f:
        json.dump(serialized, f, indent=4)

serialize_results(llama_system1_results, "submission/llama_system1.json")
serialize_results(gpt_system2_results, "submission/gpt_system2.json")
print("submission file saved")


llama70B configuration llama70b_8_1_1_8_False_False completed.
llama70B configuration llama70b_1_64_1_1_False_False completed.
llama70B configuration llama70b_8_8_1_1_False_False completed.
llama70B configuration llama70b_16_2_1_2_False_False completed.
llama70B configuration llama70b_2_2_1_16_False_False completed.
llama70B configuration llama70b_2_32_1_1_False_False completed.
llama70B configuration llama70b_32_1_1_2_False_False completed.
llama70B configuration llama70b_2_8_1_4_False_True completed.
llama70B configuration llama70b_2_8_1_4_False_False completed.
llama70B configuration llama70b_2_2_1_16_False_True completed.
llama70B configuration llama70b_1_8_1_8_False_True completed.
llama70B configuration llama70b_2_1_1_32_False_False completed.
llama70B configuration llama70b_1_1_1_64_False_False completed.
llama70B configuration llama70b_8_1_1_8_False_True completed.
llama70B configuration llama70b_1_8_1_8_False_False completed.
llama70B configuration llama70b_4_16_1_1_False_True

#### 3.3 Find optimal parallel strategies for each setup, and why?

In this section, you should answering following questions based on the results we have from previous DSE.

- For each system-workload combination, what parallel strategies is the best in performance if we assume there is **infinite** capacity of hbm memory on GPU?
    + Please try to show the what is the trand the DSE results? 
    + Furthermore, please explain why?
- Same to previous question, but assume we have a **limit** hbm memory capacity of **80GB**, what is the best parallel strategies?
    + Please try to show the what is the trand the DSE results? 
    + Furthermore, please explain why?

Write your answer in the following text block.

##TODO: write your answer here.

In the infinite HBM memory scenario...
1. Llama System 1: The configuration "32_1_2_1_0", which corresponds to DP=32, TP=1, SP=2, PP=1, AR=0, performs the best in terms of wall clock time. Maximising data parallelism and keeping tensor and sequence parallelism low helps as the communication overhead and synchronization among devices are minimized when the model can be replicated across more devices with data parallelism instead of splitting into parts that need a lot of heavy inter-device communication.

2. GPT System 2: The configuration "16_4_1_1_0", corresponding to DP=16, TP=4, SP=1, PP=1, AR=0, performs the best in terms of wall time. The moderate degree of data parallelism combined with a modest tensor parallelism helps to yield the best balance between distributed computation and inter-device communication cost. With unlimited memory, the replication cost for data parallelism does not penalize the simulation, which then leads to strategies that give a balanced and high utilization of the GPU's compute resources.

In the limited HBM memory (80GB) scenario...
1. Llama System 1: "8_1_8_1_1" achieves a decent wall time while using about 67GB of memory. This helps to balance the reduction of data parallelism while still keeping computation efficiently parallelized.

2. GPT System 2: "4_8_2_1_1" achieves the best wall time with the limited memory, making use of 42.74GB of memory. With the limited HBM memory, optimal strategies reduce the replication factor to keep the memory usage within limits but still balancing out the runtime performance.


These observations indicate that while aggressive data parallelism and balanced splitting yield the best performance when memory is abundant, a constrained memory environment forces a trade-off between parallel efficiency and memory footprint, leading to different “best” strategies for each system.

### Wrap up and Submission

Run the following codeblock to generate file for submission

In [5]:
## TODO: fill in your name

first_name = "JunLiang"   # e.g. "Changhai"
last_name = "Ho"    # e.g. "Man"

assert first_name is not None
assert last_name is not None

run_command(f"tar -zcvf {first_name}_{last_name}_ECE_8803_HML_sp25_lab4B.tar.gz --exclude ./submission/stdout ./submission ./part1/src/topology/Mesh2D.cpp ./astrasim_configs ./lab4b.ipynb ")

print("Submission Packed!! Next step: Upload it to canvas!")

Submission Packed!! Next step: Upload it to canvas!
