diff --git a/buildflow/core/app/flow.py b/buildflow/core/app/flow.py index ed99ce09..da154631 100644 --- a/buildflow/core/app/flow.py +++ b/buildflow/core/app/flow.py @@ -309,6 +309,8 @@ def pipeline( sink: Optional[Primitive] = None, *, num_cpus: float = 1.0, + num_gpus: Optional[float] = None, + memory: Optional[float] = None, num_concurrency: int = 1, autoscale_options: AutoscalerOptions = AutoscalerOptions.default(), log_level: str = "INFO", @@ -329,6 +331,8 @@ def pipeline( sink_primitive=sink, processor_options=ProcessorOptions( num_cpus=num_cpus, + num_gpu=num_gpus, + memory=memory, num_concurrency=num_concurrency, log_level=log_level, autoscaler_options=autoscale_options, @@ -344,6 +348,8 @@ def collector( sink: Optional[Primitive] = None, *, num_cpus: float = 1.0, + num_gpus: Optional[float] = None, + memory: Optional[float] = None, autoscale_options: AutoscalerOptions = AutoscalerOptions.default(), log_level: str = "INFO", ): @@ -362,6 +368,8 @@ def collector( sink_primitive=sink, processor_options=ProcessorOptions( num_cpus=num_cpus, + num_gpu=num_gpus, + memory=memory, # Collectors always have a concurrency of 1 num_concurrency=1, log_level=log_level, @@ -376,6 +384,8 @@ def endpoint( method: Method, *, num_cpus: float = 1.0, + num_gpus: Optional[float] = None, + memory: Optional[float] = None, autoscale_options: AutoscalerOptions = AutoscalerOptions.default(), log_level: str = "INFO", ): @@ -384,7 +394,9 @@ def endpoint( method=method, processor_options=ProcessorOptions( num_cpus=num_cpus, - # Collectors always have a concurrency of 1 + num_gpu=num_gpus, + memory=memory, + # Endpoints always have a concurrency of 1 num_concurrency=1, log_level=log_level, autoscaler_options=autoscale_options, diff --git a/buildflow/core/app/runtime/actors/collector_pattern/receive_process_push_ack.py b/buildflow/core/app/runtime/actors/collector_pattern/receive_process_push_ack.py index 83b95729..6ee55606 100644 --- a/buildflow/core/app/runtime/actors/collector_pattern/receive_process_push_ack.py +++ b/buildflow/core/app/runtime/actors/collector_pattern/receive_process_push_ack.py @@ -78,10 +78,19 @@ async def run(self) -> bool: f"Method {self.processor.endpoint().method} is not supported " "for collectors." ) + + # Some ProcessorOptions fields are optional, so we need to filter out Nones + ray_actor_options = { + "num_cpus": self.processor_options.num_cpus, + } + if self.processor_options.num_gpu is not None: + ray_actor_options["num_gpus"] = self.options.num_gpu + if self.processor_options.memory is not None: + ray_actor_options["memory"] = self.options.memory @serve.deployment( route_prefix=self.endpoint.route, - ray_actor_options={"num_cpus": self.processor_options.num_cpus}, + ray_actor_options=ray_actor_options, autoscaling_config={ "min_replicas": self.processor_options.autoscaler_options.min_replicas, "initial_replicas": self.processor_options.autoscaler_options.num_replicas, # noqa: E501 diff --git a/buildflow/core/app/runtime/actors/endpoint_pattern/receive_process_respond.py b/buildflow/core/app/runtime/actors/endpoint_pattern/receive_process_respond.py index 00dcdfbe..81030c16 100644 --- a/buildflow/core/app/runtime/actors/endpoint_pattern/receive_process_respond.py +++ b/buildflow/core/app/runtime/actors/endpoint_pattern/receive_process_respond.py @@ -76,10 +76,19 @@ async def run(self) -> bool: f"Method {self.processor.endpoint().method} is not supported " "for endpoints." ) + + # Some ProcessorOptions fields are optional, so we need to filter out Nones + ray_actor_options = { + "num_cpus": self.processor_options.num_cpus, + } + if self.processor_options.num_gpu is not None: + ray_actor_options["num_gpus"] = self.options.num_gpu + if self.processor_options.memory is not None: + ray_actor_options["memory"] = self.options.memory @serve.deployment( route_prefix=self.endpoint.route, - ray_actor_options={"num_cpus": self.processor_options.num_cpus}, + ray_actor_options=ray_actor_options, autoscaling_config={ "min_replicas": self.processor_options.autoscaler_options.min_replicas, "initial_replicas": self.processor_options.autoscaler_options.num_replicas, # noqa: E501 diff --git a/buildflow/core/app/runtime/actors/pipeline_pattern/pipeline_pool.py b/buildflow/core/app/runtime/actors/pipeline_pattern/pipeline_pool.py index 84b7fa00..3c554347 100644 --- a/buildflow/core/app/runtime/actors/pipeline_pattern/pipeline_pool.py +++ b/buildflow/core/app/runtime/actors/pipeline_pattern/pipeline_pool.py @@ -95,8 +95,17 @@ async def scale(self): # contain any runtime logic that applies to all Processor types. async def create_replica(self) -> ReplicaReference: replica_id = utils.uuid() + # Some options are optional, so we need to filter out None values + ray_actor_options = { + "num_cpus": self.options.num_cpus, + } + if self.options.num_gpu is not None: + ray_actor_options["num_gpus"] = self.options.num_gpu + if self.options.memory is not None: + ray_actor_options["memory"] = self.options.memory + replica_actor_handle = PullProcessPushActor.options( - num_cpus=self.options.num_cpus, + **ray_actor_options ).remote( self.run_id, self.processor, diff --git a/buildflow/core/options/runtime_options.py b/buildflow/core/options/runtime_options.py index 24c29db4..3f62f3cb 100644 --- a/buildflow/core/options/runtime_options.py +++ b/buildflow/core/options/runtime_options.py @@ -1,5 +1,5 @@ import dataclasses -from typing import Dict +from typing import Dict, Optional from buildflow.core.options._options import Options from buildflow.core.processor.processor import ProcessorID @@ -53,7 +53,11 @@ def __post_init__(self): # TODO: Add options for other pattern types, or merge into a single options object @dataclasses.dataclass class ProcessorOptions(Options): + # ray actor options num_cpus: float + num_gpu: Optional[float] + memory: Optional[float] + # Runtime API options num_concurrency: int log_level: str # the configuration of the autoscaler for this processor @@ -63,6 +67,8 @@ class ProcessorOptions(Options): def default(cls) -> "ProcessorOptions": return cls( num_cpus=1.0, + num_gpu=None, + memory=None, num_concurrency=1, log_level="INFO", autoscaler_options=AutoscalerOptions.default(),