diff --git a/buildflow/core/app/runtime/actors/collector_pattern/collector_pool.py b/buildflow/core/app/runtime/actors/collector_pattern/collector_pool.py index 69351fa4..e2d01672 100644 --- a/buildflow/core/app/runtime/actors/collector_pattern/collector_pool.py +++ b/buildflow/core/app/runtime/actors/collector_pattern/collector_pool.py @@ -95,6 +95,12 @@ async def create_replica(self): ray_actor_handle=replica_actor_handle, ) + async def status(self): + if len(self.replicas) > 0: + num_replicas = await self.replicas[0].ray_actor_handle.num_replicas.remote() + self.num_replicas_gauge.set(num_replicas) + return self._status + async def snapshot(self) -> ProcessorGroupSnapshot: parent_snapshot: ProcessorGroupSnapshot = await super().snapshot() num_replicas = 0 diff --git a/buildflow/core/app/runtime/actors/collector_pattern/receive_process_push_ack.py b/buildflow/core/app/runtime/actors/collector_pattern/receive_process_push_ack.py index 1eec2a77..11731b21 100644 --- a/buildflow/core/app/runtime/actors/collector_pattern/receive_process_push_ack.py +++ b/buildflow/core/app/runtime/actors/collector_pattern/receive_process_push_ack.py @@ -150,6 +150,19 @@ async def drain(self) -> bool: async def status(self) -> RuntimeStatus: return self._status + async def num_replicas(self) -> int: + if self.collector_deployment is not None: + application = serve.status().applications.get(self.processor_group.group_id) + if application is None: + return 0 + + num_replicas = application.deployments.get( + self.collector_deployment.name, {} + ).replica_states.get("RUNNING", 0) + else: + num_replicas = 0 + return num_replicas + async def snapshot(self) -> Snapshot: processor_snapshots = {} for processor in self.processor_group.processors: @@ -157,19 +170,9 @@ async def snapshot(self) -> Snapshot: events_processed_per_sec=0, avg_process_time_millis=0, ) - if self.collector_deployment is not None: - num_replicas = ( - serve.status() - .applications.get(self.processor_group.group_id, {}) - .deployments.get(self.endpoint_deployment.name, {}) - .replica_states.get("RUNNING", 0) - ) - else: - num_replicas = 0 - return ReceiveProcessPushSnapshot( status=self._status, timestamp_millis=utils.timestamp_millis(), processor_snapshots=processor_snapshots, - num_replicas=num_replicas, + num_replicas=await self.num_replicas(), ) diff --git a/buildflow/core/app/runtime/actors/endpoint_pattern/endpoint_pool.py b/buildflow/core/app/runtime/actors/endpoint_pattern/endpoint_pool.py index 55a7b049..ec6dd609 100644 --- a/buildflow/core/app/runtime/actors/endpoint_pattern/endpoint_pool.py +++ b/buildflow/core/app/runtime/actors/endpoint_pattern/endpoint_pool.py @@ -96,6 +96,12 @@ async def create_replica(self): ray_actor_handle=replica_actor_handle, ) + async def status(self): + if len(self.replicas) > 0: + num_replicas = await self.replicas[0].ray_actor_handle.num_replicas.remote() + self.num_replicas_gauge.set(num_replicas) + return self._status + async def snapshot(self) -> ProcessorGroupSnapshot: parent_snapshot: ProcessorGroupSnapshot = await super().snapshot() num_replicas = 0 diff --git a/buildflow/core/app/runtime/actors/endpoint_pattern/receive_process_respond.py b/buildflow/core/app/runtime/actors/endpoint_pattern/receive_process_respond.py index 6499037f..6578b2c6 100644 --- a/buildflow/core/app/runtime/actors/endpoint_pattern/receive_process_respond.py +++ b/buildflow/core/app/runtime/actors/endpoint_pattern/receive_process_respond.py @@ -137,6 +137,19 @@ async def drain(self) -> bool: async def status(self) -> RuntimeStatus: return self._status + async def num_replicas(self) -> int: + if self.endpoint_deployment is not None: + application = serve.status().applications.get(self.processor_group.group_id) + if application is None: + return 0 + + num_replicas = application.deployments.get( + self.endpoint_deployment.name, {} + ).replica_states.get("RUNNING", 0) + else: + num_replicas = 0 + return num_replicas + async def snapshot(self) -> Snapshot: processor_snapshots = {} # TODO: need to figure out local metrics @@ -145,19 +158,10 @@ async def snapshot(self) -> Snapshot: events_processed_per_sec=0, avg_process_time_millis=0, ) - if self.endpoint_deployment is not None: - num_replicas = ( - serve.status() - .applications.get(self.processor_group.group_id, {}) - .deployments.get(self.endpoint_deployment.name, {}) - .replica_states.get("RUNNING", 0) - ) - else: - num_replicas = 0 return ReceiveProcessRespondSnapshot( status=self._status, timestamp_millis=utils.timestamp_millis(), processor_snapshots=processor_snapshots, - num_replicas=num_replicas, + num_replicas=await self.num_replicas(), )