From cdec71dad85d4792dd6278ad06991485e4baaffb Mon Sep 17 00:00:00 2001 From: CalebTVanDyke Date: Thu, 21 Dec 2023 11:21:26 -0600 Subject: [PATCH 1/7] bump version for next release --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index d9135ea0..944b19ce 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "setuptools.build_meta" [project] name = "buildflow" -version = "0.3.1" +version = "0.3.2" authors = [ { name = "Caleb Van Dyke", email = "caleb@launchflow.com" }, { name = "Josh Tanke", email = "josh@launchflow.com" }, From cf0be2d6f2433f40a0390cf83d84cf303514492d Mon Sep 17 00:00:00 2001 From: CalebTVanDyke Date: Thu, 21 Dec 2023 16:21:05 -0600 Subject: [PATCH 2/7] fix how we find num replicas for endpoints and collectors --- .../actors/collector_pattern/receive_process_push_ack.py | 7 ++++++- .../actors/endpoint_pattern/receive_process_respond.py | 7 ++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/buildflow/core/app/runtime/actors/collector_pattern/receive_process_push_ack.py b/buildflow/core/app/runtime/actors/collector_pattern/receive_process_push_ack.py index 142b6a36..1eec2a77 100644 --- a/buildflow/core/app/runtime/actors/collector_pattern/receive_process_push_ack.py +++ b/buildflow/core/app/runtime/actors/collector_pattern/receive_process_push_ack.py @@ -158,7 +158,12 @@ async def snapshot(self) -> Snapshot: avg_process_time_millis=0, ) if self.collector_deployment is not None: - num_replicas = self.collector_deployment.num_replicas + num_replicas = ( + serve.status() + .applications.get(self.processor_group.group_id, {}) + .deployments.get(self.endpoint_deployment.name, {}) + .replica_states.get("RUNNING", 0) + ) else: num_replicas = 0 diff --git a/buildflow/core/app/runtime/actors/endpoint_pattern/receive_process_respond.py b/buildflow/core/app/runtime/actors/endpoint_pattern/receive_process_respond.py index 966a400a..6499037f 100644 --- a/buildflow/core/app/runtime/actors/endpoint_pattern/receive_process_respond.py +++ b/buildflow/core/app/runtime/actors/endpoint_pattern/receive_process_respond.py @@ -146,7 +146,12 @@ async def snapshot(self) -> Snapshot: avg_process_time_millis=0, ) if self.endpoint_deployment is not None: - num_replicas = self.endpoint_deployment.num_replicas + num_replicas = ( + serve.status() + .applications.get(self.processor_group.group_id, {}) + .deployments.get(self.endpoint_deployment.name, {}) + .replica_states.get("RUNNING", 0) + ) else: num_replicas = 0 From 6c9b9f7a2722a91d1a9b9fa1b1d5658c53cb0995 Mon Sep 17 00:00:00 2001 From: CalebTVanDyke Date: Fri, 22 Dec 2023 08:03:54 -0600 Subject: [PATCH 3/7] update replica gauge for endpoint and collectors --- .../core/app/runtime/actors/collector_pattern/collector_pool.py | 1 + .../core/app/runtime/actors/endpoint_pattern/endpoint_pool.py | 1 + 2 files changed, 2 insertions(+) diff --git a/buildflow/core/app/runtime/actors/collector_pattern/collector_pool.py b/buildflow/core/app/runtime/actors/collector_pattern/collector_pool.py index 69351fa4..02cad6f4 100644 --- a/buildflow/core/app/runtime/actors/collector_pattern/collector_pool.py +++ b/buildflow/core/app/runtime/actors/collector_pattern/collector_pool.py @@ -102,6 +102,7 @@ async def snapshot(self) -> ProcessorGroupSnapshot: if len(self.replicas) > 0: replica_snapshot = await self.replicas[0].ray_actor_handle.snapshot.remote() num_replicas = replica_snapshot.num_replicas + self.num_replicas_gauge.set(num_replicas) for pid, metrics in replica_snapshot.processor_snapshots.items(): total_events_processed_per_sec = metrics.events_processed_per_sec avg_process_time_millis_per_element = metrics.avg_process_time_millis diff --git a/buildflow/core/app/runtime/actors/endpoint_pattern/endpoint_pool.py b/buildflow/core/app/runtime/actors/endpoint_pattern/endpoint_pool.py index 55a7b049..057a2ed1 100644 --- a/buildflow/core/app/runtime/actors/endpoint_pattern/endpoint_pool.py +++ b/buildflow/core/app/runtime/actors/endpoint_pattern/endpoint_pool.py @@ -103,6 +103,7 @@ async def snapshot(self) -> ProcessorGroupSnapshot: if len(self.replicas) > 0: replica_snapshot = await self.replicas[0].ray_actor_handle.snapshot.remote() num_replicas = replica_snapshot.num_replicas + self.num_replicas_gauge.set(num_replicas) for pid, metrics in replica_snapshot.processor_snapshots.items(): total_events_processed_per_sec = metrics.events_processed_per_sec avg_process_time_millis_per_element = metrics.avg_process_time_millis From ee42cd607f209d0fb8e392dfbf1a1ab4590ffaa9 Mon Sep 17 00:00:00 2001 From: CalebTVanDyke Date: Fri, 22 Dec 2023 08:11:43 -0600 Subject: [PATCH 4/7] update num replicas when we fetch the status --- .../actors/collector_pattern/collector_pool.py | 12 +++++++++++- .../runtime/actors/endpoint_pattern/endpoint_pool.py | 12 +++++++++++- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/buildflow/core/app/runtime/actors/collector_pattern/collector_pool.py b/buildflow/core/app/runtime/actors/collector_pattern/collector_pool.py index 02cad6f4..a189b7c0 100644 --- a/buildflow/core/app/runtime/actors/collector_pattern/collector_pool.py +++ b/buildflow/core/app/runtime/actors/collector_pattern/collector_pool.py @@ -2,6 +2,7 @@ from typing import Any, Dict, Type import ray +from ray import serve from buildflow.core import utils from buildflow.core.app.runtime._runtime import RunID @@ -95,6 +96,16 @@ async def create_replica(self): ray_actor_handle=replica_actor_handle, ) + async def status(self): + num_replicas = ( + serve.status() + .applications.get(self.processor_group.group_id, {}) + .deployments.get("FastAPIWrapper", {}) + .replica_states.get("RUNNING", 0) + ) + self.num_replicas_gauge.set(num_replicas) + return self._status + async def snapshot(self) -> ProcessorGroupSnapshot: parent_snapshot: ProcessorGroupSnapshot = await super().snapshot() num_replicas = 0 @@ -102,7 +113,6 @@ async def snapshot(self) -> ProcessorGroupSnapshot: if len(self.replicas) > 0: replica_snapshot = await self.replicas[0].ray_actor_handle.snapshot.remote() num_replicas = replica_snapshot.num_replicas - self.num_replicas_gauge.set(num_replicas) for pid, metrics in replica_snapshot.processor_snapshots.items(): total_events_processed_per_sec = metrics.events_processed_per_sec avg_process_time_millis_per_element = metrics.avg_process_time_millis diff --git a/buildflow/core/app/runtime/actors/endpoint_pattern/endpoint_pool.py b/buildflow/core/app/runtime/actors/endpoint_pattern/endpoint_pool.py index 057a2ed1..3ec1a19d 100644 --- a/buildflow/core/app/runtime/actors/endpoint_pattern/endpoint_pool.py +++ b/buildflow/core/app/runtime/actors/endpoint_pattern/endpoint_pool.py @@ -2,6 +2,7 @@ from typing import Any, Dict, Type import ray +from ray import serve from buildflow.core import utils from buildflow.core.app.runtime._runtime import RunID @@ -96,6 +97,16 @@ async def create_replica(self): ray_actor_handle=replica_actor_handle, ) + async def status(self): + num_replicas = ( + serve.status() + .applications.get(self.processor_group.group_id, {}) + .deployments.get("FastAPIWrapper", {}) + .replica_states.get("RUNNING", 0) + ) + self.num_replicas_gauge.set(num_replicas) + return self._status + async def snapshot(self) -> ProcessorGroupSnapshot: parent_snapshot: ProcessorGroupSnapshot = await super().snapshot() num_replicas = 0 @@ -103,7 +114,6 @@ async def snapshot(self) -> ProcessorGroupSnapshot: if len(self.replicas) > 0: replica_snapshot = await self.replicas[0].ray_actor_handle.snapshot.remote() num_replicas = replica_snapshot.num_replicas - self.num_replicas_gauge.set(num_replicas) for pid, metrics in replica_snapshot.processor_snapshots.items(): total_events_processed_per_sec = metrics.events_processed_per_sec avg_process_time_millis_per_element = metrics.avg_process_time_millis From 89c1ee2f45143e1e893278e7640caae88b029332 Mon Sep 17 00:00:00 2001 From: CalebTVanDyke Date: Fri, 22 Dec 2023 08:38:10 -0600 Subject: [PATCH 5/7] redo --- .../collector_pattern/collector_pool.py | 10 +++----- .../receive_process_push_ack.py | 25 +++++++++++-------- .../actors/endpoint_pattern/endpoint_pool.py | 10 +++----- .../receive_process_respond.py | 24 ++++++++++-------- 4 files changed, 34 insertions(+), 35 deletions(-) diff --git a/buildflow/core/app/runtime/actors/collector_pattern/collector_pool.py b/buildflow/core/app/runtime/actors/collector_pattern/collector_pool.py index a189b7c0..09eb2d95 100644 --- a/buildflow/core/app/runtime/actors/collector_pattern/collector_pool.py +++ b/buildflow/core/app/runtime/actors/collector_pattern/collector_pool.py @@ -97,13 +97,9 @@ async def create_replica(self): ) async def status(self): - num_replicas = ( - serve.status() - .applications.get(self.processor_group.group_id, {}) - .deployments.get("FastAPIWrapper", {}) - .replica_states.get("RUNNING", 0) - ) - self.num_replicas_gauge.set(num_replicas) + if len(self.replicas) > 0: + num_replicas = await self.replicas[0].ray_actor_handle.num_replicas.remote() + self.num_replicas_gauge.set(num_replicas) return self._status async def snapshot(self) -> ProcessorGroupSnapshot: diff --git a/buildflow/core/app/runtime/actors/collector_pattern/receive_process_push_ack.py b/buildflow/core/app/runtime/actors/collector_pattern/receive_process_push_ack.py index 1eec2a77..7806f88b 100644 --- a/buildflow/core/app/runtime/actors/collector_pattern/receive_process_push_ack.py +++ b/buildflow/core/app/runtime/actors/collector_pattern/receive_process_push_ack.py @@ -150,6 +150,19 @@ async def drain(self) -> bool: async def status(self) -> RuntimeStatus: return self._status + async def num_replicas(self) -> int: + if self.endpoint_deployment is not None: + application = serve.status().applications.get(self.processor_group.group_id) + if application is None: + return 0 + + num_replicas = application.deployments.get( + self.endpoint_deployment.name, {} + ).replica_states.get("RUNNING", 0) + else: + num_replicas = 0 + return num_replicas + async def snapshot(self) -> Snapshot: processor_snapshots = {} for processor in self.processor_group.processors: @@ -157,19 +170,9 @@ async def snapshot(self) -> Snapshot: events_processed_per_sec=0, avg_process_time_millis=0, ) - if self.collector_deployment is not None: - num_replicas = ( - serve.status() - .applications.get(self.processor_group.group_id, {}) - .deployments.get(self.endpoint_deployment.name, {}) - .replica_states.get("RUNNING", 0) - ) - else: - num_replicas = 0 - return ReceiveProcessPushSnapshot( status=self._status, timestamp_millis=utils.timestamp_millis(), processor_snapshots=processor_snapshots, - num_replicas=num_replicas, + num_replicas=await self.num_replicas(), ) diff --git a/buildflow/core/app/runtime/actors/endpoint_pattern/endpoint_pool.py b/buildflow/core/app/runtime/actors/endpoint_pattern/endpoint_pool.py index 3ec1a19d..3aca4e3a 100644 --- a/buildflow/core/app/runtime/actors/endpoint_pattern/endpoint_pool.py +++ b/buildflow/core/app/runtime/actors/endpoint_pattern/endpoint_pool.py @@ -98,13 +98,9 @@ async def create_replica(self): ) async def status(self): - num_replicas = ( - serve.status() - .applications.get(self.processor_group.group_id, {}) - .deployments.get("FastAPIWrapper", {}) - .replica_states.get("RUNNING", 0) - ) - self.num_replicas_gauge.set(num_replicas) + if len(self.replicas) > 0: + num_replicas = await self.replicas[0].ray_actor_handle.num_replicas.remote() + self.num_replicas_gauge.set(num_replicas) return self._status async def snapshot(self) -> ProcessorGroupSnapshot: diff --git a/buildflow/core/app/runtime/actors/endpoint_pattern/receive_process_respond.py b/buildflow/core/app/runtime/actors/endpoint_pattern/receive_process_respond.py index 6499037f..6578b2c6 100644 --- a/buildflow/core/app/runtime/actors/endpoint_pattern/receive_process_respond.py +++ b/buildflow/core/app/runtime/actors/endpoint_pattern/receive_process_respond.py @@ -137,6 +137,19 @@ async def drain(self) -> bool: async def status(self) -> RuntimeStatus: return self._status + async def num_replicas(self) -> int: + if self.endpoint_deployment is not None: + application = serve.status().applications.get(self.processor_group.group_id) + if application is None: + return 0 + + num_replicas = application.deployments.get( + self.endpoint_deployment.name, {} + ).replica_states.get("RUNNING", 0) + else: + num_replicas = 0 + return num_replicas + async def snapshot(self) -> Snapshot: processor_snapshots = {} # TODO: need to figure out local metrics @@ -145,19 +158,10 @@ async def snapshot(self) -> Snapshot: events_processed_per_sec=0, avg_process_time_millis=0, ) - if self.endpoint_deployment is not None: - num_replicas = ( - serve.status() - .applications.get(self.processor_group.group_id, {}) - .deployments.get(self.endpoint_deployment.name, {}) - .replica_states.get("RUNNING", 0) - ) - else: - num_replicas = 0 return ReceiveProcessRespondSnapshot( status=self._status, timestamp_millis=utils.timestamp_millis(), processor_snapshots=processor_snapshots, - num_replicas=num_replicas, + num_replicas=await self.num_replicas(), ) From 7bc7232edbd57155ce31e662cf3e470c9cbb6046 Mon Sep 17 00:00:00 2001 From: CalebTVanDyke Date: Fri, 22 Dec 2023 08:48:36 -0600 Subject: [PATCH 6/7] fix ruff --- .../core/app/runtime/actors/collector_pattern/collector_pool.py | 1 - .../core/app/runtime/actors/endpoint_pattern/endpoint_pool.py | 1 - 2 files changed, 2 deletions(-) diff --git a/buildflow/core/app/runtime/actors/collector_pattern/collector_pool.py b/buildflow/core/app/runtime/actors/collector_pattern/collector_pool.py index 09eb2d95..e2d01672 100644 --- a/buildflow/core/app/runtime/actors/collector_pattern/collector_pool.py +++ b/buildflow/core/app/runtime/actors/collector_pattern/collector_pool.py @@ -2,7 +2,6 @@ from typing import Any, Dict, Type import ray -from ray import serve from buildflow.core import utils from buildflow.core.app.runtime._runtime import RunID diff --git a/buildflow/core/app/runtime/actors/endpoint_pattern/endpoint_pool.py b/buildflow/core/app/runtime/actors/endpoint_pattern/endpoint_pool.py index 3aca4e3a..ec6dd609 100644 --- a/buildflow/core/app/runtime/actors/endpoint_pattern/endpoint_pool.py +++ b/buildflow/core/app/runtime/actors/endpoint_pattern/endpoint_pool.py @@ -2,7 +2,6 @@ from typing import Any, Dict, Type import ray -from ray import serve from buildflow.core import utils from buildflow.core.app.runtime._runtime import RunID From c7c8e56743e457e5511ab24496e5dde6c6955e15 Mon Sep 17 00:00:00 2001 From: CalebTVanDyke Date: Fri, 22 Dec 2023 08:56:04 -0600 Subject: [PATCH 7/7] fix collector --- .../actors/collector_pattern/receive_process_push_ack.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/buildflow/core/app/runtime/actors/collector_pattern/receive_process_push_ack.py b/buildflow/core/app/runtime/actors/collector_pattern/receive_process_push_ack.py index 7806f88b..11731b21 100644 --- a/buildflow/core/app/runtime/actors/collector_pattern/receive_process_push_ack.py +++ b/buildflow/core/app/runtime/actors/collector_pattern/receive_process_push_ack.py @@ -151,13 +151,13 @@ async def status(self) -> RuntimeStatus: return self._status async def num_replicas(self) -> int: - if self.endpoint_deployment is not None: + if self.collector_deployment is not None: application = serve.status().applications.get(self.processor_group.group_id) if application is None: return 0 num_replicas = application.deployments.get( - self.endpoint_deployment.name, {} + self.collector_deployment.name, {} ).replica_states.get("RUNNING", 0) else: num_replicas = 0