Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support GPU-to-CPU synchronization dependency with HolisticTraceAnalysis #57

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .github/workflows/end_to_end_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,15 @@ jobs:
sed -i '/param_bench/d' pyproject.toml
pip install .

- name: Install HTA
run: |
git clone https://github.com/facebookresearch/HolisticTraceAnalysis.git
cd HolisticTraceAnalysis
git checkout d731cc2e2249976c97129d409a83bd53d93051f6
git submodule update --init
pip install -r requirements.txt
pip install -e .

- name: Test chakra_trace_link Without Arguments
run: |
chakra_trace_link || [ $? -eq 2 ]
Expand Down
9 changes: 9 additions & 0 deletions .github/workflows/python_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,15 @@ jobs:
sed -i '/param_bench/d' pyproject.toml
pip install .

- name: Install HTA
run: |
git clone https://github.com/facebookresearch/HolisticTraceAnalysis.git
cd HolisticTraceAnalysis
git checkout d731cc2e2249976c97129d409a83bd53d93051f6
git submodule update --init
pip install -r requirements.txt
pip install -e .

- name: Install Dependencies
run: |
pip install -r requirements-dev.txt
Expand Down
15 changes: 14 additions & 1 deletion USER_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,19 @@ $ git checkout ea12ab702712e9986db85cd5773eb5902f28af2a
$ pip install .
```

### Step 4: Uninstalling Chakra
### Step 4: Install Holistic Trace Analysis
Installing Holistic Trace Analysis is necessary for Trace link.

```bash
$ git clone https://github.com/facebookresearch/HolisticTraceAnalysis.git
$ cd HolisticTraceAnalysis
$ git checkout d731cc2e2249976c97129d409a83bd53d93051f6
$ git submodule update --init
$ pip install -r requirements.txt
$ pip install -e .
```

### Step 5: Uninstalling Chakra
To uninstall Chakra, use the following command within the virtual environment.

```bash
Expand All @@ -49,6 +61,7 @@ Merge Chakra host execution trace and Chakra device execution trace to encode GP
$ chakra_trace_link \
--chakra-host-trace /path/to/chakra_host_trace \
--chakra-device-trace /path/to/chakra_device_trace \
--rank [RANK] \
--output-file /path/to/chakra_host_device_trace.json
```

Expand Down
9 changes: 9 additions & 0 deletions src/converter/pytorch_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,15 @@ def convert_ctrl_dep_to_data_dep(
last_visited_non_gpu = current_node
last_visited_any = current_node

if json_node.sync_dep:
for sync_dep in json_node.sync_dep:
if sync_dep not in current_node.data_deps:
current_node.data_deps.append(sync_dep)
logging.info(
f"Node ID {current_node.id} now has an synchonization dependency on Node ID {sync_dep}"
)

# Add children to the stack
children_chakra_ids = [child.id for child in json_node.children]
for child_chakra_id in sorted(children_chakra_ids, reverse=True):
child_chakra_node = protobuf_node_map.get(child_chakra_id)
Expand Down
1 change: 1 addition & 0 deletions src/converter/pytorch_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ def _parse_data_1_0_3_chakra_0_0_4(self, node_data: Dict[str, Any]) -> None:
self.exclusive_dur = node_data.get("exclusive_dur", 0)
self.ts = node_data.get("ts")
self.inter_thread_dep = node_data.get("inter_thread_dep")
self.sync_dep = node_data.get("sync_dep")
self.cat = node_data.get("cat")
self.stream = node_data.get("stream", 0)

Expand Down
13 changes: 13 additions & 0 deletions src/trace_link/chakra_device_trace_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ def load(
) -> Tuple[
List[KinetoOperator],
Dict[int, List[KinetoOperator]],
Dict[int, List[KinetoOperator]],
Dict[int, KinetoOperator],
List[KinetoOperator],
Dict[int, KinetoOperator],
Expand All @@ -26,6 +27,7 @@ def load(
Dict[int, KinetoOperator],
List[KinetoOperator],
List[int],
Dict[int, KinetoOperator],
]:
"""
Load and process the Chakra device trace.
Expand Down Expand Up @@ -57,6 +59,7 @@ def load(
logging.debug("Chakra device trace has been loaded and processed successfully.")
return (
dev_data["kineto_cpu_ops"],
dev_data["kineto_tid_ops_map"],
dev_data["kineto_tid_cpu_ops_map"],
dev_data["kineto_correlation_cuda_runtime_map"],
dev_data["kineto_gpu_ops"],
Expand All @@ -68,6 +71,7 @@ def load(
dev_data["kineto_rf_id_to_kineto_op_map"],
dev_data["sorted_kineto_cpu_ops"],
dev_data["sorted_kineto_cpu_op_ts"],
dev_data["kineto_external_id_to_kineto_op_map"],
)

def construct_dev_data_structures(self, kineto_ops: List[KinetoOperator], trace_file: str) -> Dict:
Expand All @@ -90,13 +94,17 @@ def construct_dev_data_structures(self, kineto_ops: List[KinetoOperator], trace_
thread_info = {}

kineto_cpu_ops = []
kineto_tid_ops_map = {}
kineto_tid_cpu_ops_map = {}
kineto_correlation_cuda_runtime_map = {}
kineto_gpu_ops = []
kineto_id_arrow_op_map = {}
kineto_id_cuda_launch_op_map = {}
kineto_external_id_to_kineto_op_map = {}

for op in kineto_ops:
kineto_tid_ops_map.setdefault(op.tid, []).append(op)

if op.is_cpu_op():
kineto_cpu_ops.append(op)
kineto_tid_cpu_ops_map.setdefault(op.tid, []).append(op)
Expand Down Expand Up @@ -144,10 +152,14 @@ def construct_dev_data_structures(self, kineto_ops: List[KinetoOperator], trace_
thread_start_end[0] = min(thread_start_end[0], op.timestamp)
thread_start_end[1] = max(thread_start_end[1], op.timestamp + op.inclusive_dur)

if op.external_id is not None:
kineto_external_id_to_kineto_op_map[op.external_id] = op

kineto_rf_id_to_kineto_op_map = {op.rf_id: op for op in kineto_cpu_ops if op.rf_id is not None}

return {
"kineto_cpu_ops": kineto_cpu_ops,
"kineto_tid_ops_map": kineto_tid_ops_map,
"kineto_tid_cpu_ops_map": kineto_tid_cpu_ops_map,
"kineto_correlation_cuda_runtime_map": kineto_correlation_cuda_runtime_map,
"kineto_gpu_ops": kineto_gpu_ops,
Expand All @@ -159,6 +171,7 @@ def construct_dev_data_structures(self, kineto_ops: List[KinetoOperator], trace_
"kineto_rf_id_to_kineto_op_map": kineto_rf_id_to_kineto_op_map,
"sorted_kineto_cpu_ops": [],
"sorted_kineto_cpu_op_ts": [],
"kineto_external_id_to_kineto_op_map": kineto_external_id_to_kineto_op_map,
}

def calculate_exclusive_dur(self, kineto_tid_cpu_ops_map: Dict[int, List[KinetoOperator]]) -> None:
Expand Down
7 changes: 5 additions & 2 deletions src/trace_link/kineto_operator.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Dict, Optional
from typing import Any, Dict, List, Optional

from et_replay.lib.execution_trace import Node as PyTorchOperator

Expand All @@ -22,6 +22,7 @@ class KinetoOperator:
host_op (Optional[PyTorchOperator]): Corresponding PyTorch operator object.
parent_host_op_id (Optional[int]): ID of the parent PyTorch operator.
inter_thread_dep (Optional[int]): Identifier for inter-thread dependencies.
sync_dep (List[KinetoOperator]): List of KinetoOperator objects that have dependencies on this operator.
stream (Optional[int]): CUDA stream identifier associated with the operator.
rf_id (Optional[int]): Record function identifier.
correlation (int): Identifier used to correlate CUDA runtime and GPU operations.
Expand All @@ -48,6 +49,7 @@ def __init__(self, kineto_op: Dict[str, Any]) -> None:
self.host_op: Optional[PyTorchOperator] = None
self.parent_host_op_id: Optional[int] = None
self.inter_thread_dep: Optional[int] = None
self.sync_dep: List[KinetoOperator] = []
self.stream: Optional[int] = kineto_op.get("args", {}).get("stream", None)
self.rf_id: Optional[int] = kineto_op.get("args", {}).get("Record function id", None)
self.correlation: int = kineto_op.get("args", {}).get("correlation", -1)
Expand All @@ -59,13 +61,14 @@ def __repr__(self) -> str:
Returns
str: A string representation of the KinetoOperator.
"""
sync_dep_ids = [op.id for op in self.sync_dep]
return (
f"KinetoOperator(id={self.id}, category={self.category}, name={self.name}, "
f"phase={self.phase}, inclusive_dur={self.inclusive_dur}, "
f"exclusive_dur={self.exclusive_dur}, timestamp={self.timestamp}, "
f"external_id={self.external_id}, ev_idx={self.ev_idx}, tid={self.tid}, "
f"parent_host_op_id={self.parent_host_op_id}, inter_thread_dep={self.inter_thread_dep}, "
f"stream={self.stream}, rf_id={self.rf_id}, correlation={self.correlation})"
f"sync_dep={sync_dep_ids}, stream={self.stream}, rf_id={self.rf_id}, correlation={self.correlation})"
)

def is_cpu_op(self) -> bool:
Expand Down
3 changes: 2 additions & 1 deletion src/trace_link/trace_link.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def main() -> None:
"Merging-PyTorch-and-Kineto-Traces"
)
)
parser.add_argument("--rank", type=int, required=True, help="Rank for the input traces")
parser.add_argument(
"--chakra-host-trace",
type=str,
Expand All @@ -43,7 +44,7 @@ def main() -> None:
logging.basicConfig(level=args.log_level.upper())

linker = TraceLinker()
linker.link(args.chakra_host_trace, args.chakra_device_trace, args.output_file)
linker.link(args.rank, args.chakra_host_trace, args.chakra_device_trace, args.output_file)


if __name__ == "__main__":
Expand Down
Loading
Loading