Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ repos:
args: [ --fix, --exit-non-zero-on-fix ]
- id: ruff
args: [ --select, "I", --fix, --exit-non-zero-on-fix ]
files: '^python/ray/serve/|^python/ray/train|^python/ray/data|^python/ray/_private/|^python/ray/llm/|^python/ray/tune/'
files: '^python/ray/serve/|^python/ray/train|^python/ray/data|^python/ray/_private/|^python/ray/llm/|^python/ray/tune/|^python/ray/cloudpickle/|^python/ray/dag/|^python/ray/includes/|^python/ray/internal/|^python/ray/ray_operator/|^python/ray/scripts/|^python/ray/serve/generated/serve_pb2.py|^python/ray/streaming/|^python/ray/tests/|^python/ray/util/|^python/ray/workers/|^python/ray/workflow/'

- repo: https://github.com/jsh9/pydoclint
rev: "0.6.6"
Expand Down
15 changes: 1 addition & 14 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,25 +56,12 @@ afterray = ["psutil", "setproctitle"]
# python/ray/cloudpickle/*
# doc/*
# python/ray/__init__.py
# python/ray/setup-dev.py
# For the rest we will gradually remove them from the blacklist as we
# reformat the code to follow the style guide.
[tool.ruff.lint.per-file-ignores]
"doc/*" = ["I"]
"python/ray/__init__.py" = ["I"]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we keep this __init__.py ?

and also remove L59? which says setup-dev.py needs to be exluded?

"python/ray/setup-dev.py" = ["I"]
"python/ray/cloudpickle/*" = ["I"]
"python/ray/dag/*.py" = ["I"]
"python/ray/includes/*" = ["I"]
"python/ray/internal/*" = ["I"]
"python/ray/ray_operator/*" = ["I"]
"python/ray/scripts/*" = ["I"]
"python/ray/serve/generated/serve_pb2.py" = ["I"]
"python/ray/streaming/*" = ["I"]
"python/ray/tests/*" = ["I"]
"python/ray/util/*" = ["I"]
"python/ray/workers/*" = ["I"]
"python/ray/workflow/*" = ["I"]
"python/ray/dag/__init__.py" = ["I"]
"rllib/*" = ["I"]
"release/*" = ["I"]

Expand Down
13 changes: 6 additions & 7 deletions python/ray/dag/class_node.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
from typing import Any, Dict, List, Optional, Tuple, Union
from weakref import ReferenceType

import ray
from ray.dag.dag_node import DAGNode
from ray.dag.input_node import InputNode
from ray.dag.format_utils import get_dag_node_str
from ray.dag.constants import (
PARENT_CLASS_NODE_KEY,
PREV_CLASS_METHOD_CALL_KEY,
BIND_INDEX_KEY,
IS_CLASS_METHOD_OUTPUT_KEY,
PARENT_CLASS_NODE_KEY,
PREV_CLASS_METHOD_CALL_KEY,
)
from ray.dag.dag_node import DAGNode
from ray.dag.format_utils import get_dag_node_str
from ray.dag.input_node import InputNode
from ray.util.annotations import DeveloperAPI

from typing import Any, Dict, List, Union, Tuple, Optional


@DeveloperAPI
class ClassNode(DAGNode):
Expand Down
6 changes: 3 additions & 3 deletions python/ray/dag/collective_node.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
from typing import Any, Dict, List, Union, Tuple, Optional, TYPE_CHECKING
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union

if TYPE_CHECKING:
import torch

import ray
from ray.dag import (
DAGNode,
ClassMethodNode,
DAGNode,
)
from ray.dag.constants import COLLECTIVE_OPERATION_KEY, IS_CLASS_METHOD_OUTPUT_KEY
from ray.experimental.channel import ChannelContext
from ray.experimental.channel.torch_tensor_type import Communicator, TorchTensorType
from ray.experimental.util.types import (
_CollectiveOp,
AllGatherOp,
AllReduceOp,
ReduceScatterOp,
_CollectiveOp,
)
from ray.util.annotations import DeveloperAPI

Expand Down
106 changes: 51 additions & 55 deletions python/ray/dag/compiled_dag_node.py
Original file line number Diff line number Diff line change
@@ -1,89 +1,84 @@
import weakref
import asyncio
import logging
import threading
import time
import traceback
import uuid
import weakref
from collections import defaultdict
from contextlib import nullcontext
from dataclasses import dataclass, asdict
from dataclasses import asdict, dataclass
from typing import (
Any,
Dict,
List,
Tuple,
Union,
Optional,
Set,
Tuple,
Union,
)
import logging
import threading
import time
import uuid
import traceback

from ray.experimental.channel.auto_transport_type import (
AutoTransportType,
TypeHintResolver,
)
import ray
import ray.exceptions
from ray.dag.dag_operation_future import GPUFuture, DAGOperationFuture, ResolvedFuture
from ray.experimental.channel.cached_channel import CachedChannel
from ray.experimental.channel.communicator import Communicator
from ray.dag.constants import (
RAY_CGRAPH_ENABLE_NVTX_PROFILING,
RAY_CGRAPH_ENABLE_TORCH_PROFILING,
RAY_CGRAPH_VISUALIZE_SCHEDULE,
)
import ray
from ray.dag.dag_node_operation import (
_build_dag_node_operation_graph,
_DAGNodeOperation,
_DAGNodeOperationType,
_DAGOperationGraphNode,
_extract_execution_schedule,
_generate_actor_to_execution_schedule,
_generate_overlapped_execution_schedule,
_visualize_execution_schedule,
)
from ray.dag.dag_operation_future import DAGOperationFuture, GPUFuture, ResolvedFuture
from ray.exceptions import (
RayCgraphCapacityExceeded,
RayTaskError,
RayChannelError,
RayChannelTimeoutError,
)
from ray.experimental.compiled_dag_ref import (
CompiledDAGRef,
CompiledDAGFuture,
_process_return_vals,
RayTaskError,
)
from ray.experimental.channel import (
AwaitableBackgroundReader,
AwaitableBackgroundWriter,
ChannelContext,
ChannelInterface,
ChannelOutputType,
ReaderInterface,
SynchronousReader,
WriterInterface,
SynchronousWriter,
AwaitableBackgroundReader,
AwaitableBackgroundWriter,
CompiledDAGArgs,
CompositeChannel,
IntraProcessChannel,
ReaderInterface,
SynchronousReader,
SynchronousWriter,
WriterInterface,
)
from ray.util.annotations import DeveloperAPI

from ray.experimental.channel.accelerator_context import AcceleratorContext
from ray.experimental.channel.auto_transport_type import (
AutoTransportType,
TypeHintResolver,
)
from ray.experimental.channel.cached_channel import CachedChannel
from ray.experimental.channel.communicator import Communicator
from ray.experimental.channel.shared_memory_channel import (
SharedMemoryType,
)
from ray.experimental.channel.torch_tensor_type import TorchTensorType

from ray.experimental.channel.torch_tensor_accelerator_channel import (
_init_communicator,
_destroy_communicator,
_init_communicator,
)

from ray.dag.dag_node_operation import (
_DAGNodeOperation,
_DAGNodeOperationType,
_DAGOperationGraphNode,
_build_dag_node_operation_graph,
_extract_execution_schedule,
_generate_actor_to_execution_schedule,
_generate_overlapped_execution_schedule,
_visualize_execution_schedule,
from ray.experimental.channel.torch_tensor_type import TorchTensorType
from ray.experimental.compiled_dag_ref import (
CompiledDAGFuture,
CompiledDAGRef,
_process_return_vals,
)

from ray.util.annotations import DeveloperAPI
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy

from ray.experimental.channel.accelerator_context import AcceleratorContext

logger = logging.getLogger(__name__)

# Keep tracking of every compiled dag created during the lifetime of
Expand Down Expand Up @@ -370,6 +365,7 @@ def _device_context_manager():
return nullcontext()

import torch

from ray.experimental.channel.accelerator_context import AcceleratorContext

device = ChannelContext.get_current().torch_device
Expand Down Expand Up @@ -1091,9 +1087,9 @@ def _preprocess(self) -> None:
This function is idempotent.
"""
from ray.dag import (
DAGNode,
ClassMethodNode,
CollectiveOutputNode,
DAGNode,
FunctionNode,
InputAttributeNode,
InputNode,
Expand Down Expand Up @@ -1491,8 +1487,8 @@ def _check_leaf_nodes(self) -> None:
Check if there are leaf nodes in the DAG and raise an error if there are.
"""
from ray.dag import (
DAGNode,
ClassMethodNode,
DAGNode,
)

leaf_nodes: List[DAGNode] = []
Expand Down Expand Up @@ -1565,11 +1561,11 @@ def _get_or_compile(
outputs for the DAG.
"""
from ray.dag import (
ClassMethodNode,
DAGNode,
InputNode,
InputAttributeNode,
InputNode,
MultiOutputNode,
ClassMethodNode,
)

if self.input_task_idx is None:
Expand Down Expand Up @@ -2789,11 +2785,11 @@ def _visualize_ascii(self) -> str:
"""

from ray.dag import (
ClassMethodNode,
DAGNode,
InputAttributeNode,
InputNode,
MultiOutputNode,
ClassMethodNode,
DAGNode,
)

# Check that the DAG has been compiled
Expand Down Expand Up @@ -3097,11 +3093,11 @@ def visualize(
"You can install it by running `pip install graphviz`."
)
from ray.dag import (
ClassMethodNode,
DAGNode,
InputAttributeNode,
InputNode,
MultiOutputNode,
ClassMethodNode,
DAGNode,
)

# Check that the DAG has been compiled
Expand Down
1 change: 1 addition & 0 deletions python/ray/dag/conftest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os

import pytest

import ray
Expand Down
3 changes: 2 additions & 1 deletion python/ray/dag/context.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from dataclasses import dataclass
import os
import threading
from dataclasses import dataclass
from typing import Optional

from ray.util.annotations import DeveloperAPI

# The context singleton on this process.
Expand Down
30 changes: 14 additions & 16 deletions python/ray/dag/dag_node.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,29 @@
import asyncio
import copy
from ray.experimental.channel.auto_transport_type import AutoTransportType
from ray.experimental.channel.torch_tensor_type import TorchTensorType
import ray
from ray.dag.base import DAGNodeBase
from ray.dag.py_obj_scanner import _PyObjScanner
from ray.util.annotations import DeveloperAPI

import uuid
from itertools import chain

from typing import (
Optional,
Union,
List,
Tuple,
Dict,
Any,
TypeVar,
Callable,
Dict,
List,
Literal,
Optional,
Tuple,
TypeVar,
Union,
)
import uuid
import asyncio

import ray
from ray.dag.base import DAGNodeBase
from ray.dag.compiled_dag_node import build_compiled_dag_from_ray_dag
from ray.dag.py_obj_scanner import _PyObjScanner
from ray.experimental.channel import ChannelOutputType
from ray.experimental.channel.auto_transport_type import AutoTransportType
from ray.experimental.channel.communicator import Communicator
from ray.experimental.channel.torch_tensor_type import TorchTensorType
from ray.experimental.util.types import Device
from ray.util.annotations import DeveloperAPI

T = TypeVar("T")

Expand Down
10 changes: 5 additions & 5 deletions python/ray/dag/dag_node_operation.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
from functools import total_ordering
from enum import Enum
from typing import Set, Tuple, List, Dict, Optional
import copy
import logging
import ray
import heapq
import logging
from collections import defaultdict
from enum import Enum
from functools import total_ordering
from typing import Dict, List, Optional, Set, Tuple

import ray

logger = logging.getLogger(__name__)

Expand Down
6 changes: 3 additions & 3 deletions python/ray/dag/dag_operation_future.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from abc import ABC, abstractmethod
from typing import Any, Generic, TypeVar, Dict
from ray.util.annotations import DeveloperAPI
from ray.experimental.channel.accelerator_context import AcceleratorContext
from typing import Any, Dict, Generic, TypeVar

from ray.experimental.channel.accelerator_context import AcceleratorContext
from ray.util.annotations import DeveloperAPI

T = TypeVar("T")

Expand Down
1 change: 0 additions & 1 deletion python/ray/dag/function_node.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from typing import Any, Dict, List


import ray
from ray.dag.dag_node import DAGNode
from ray.dag.format_utils import get_dag_node_str
Expand Down
2 changes: 1 addition & 1 deletion python/ray/dag/input_node.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Dict, List, Union, Optional
from typing import Any, Dict, List, Optional, Union

from ray.dag import DAGNode
from ray.dag.format_utils import get_dag_node_str
Expand Down
Loading