From ff5d65145a505258fbffb4e478365c7946a51a8b Mon Sep 17 00:00:00 2001 From: Caleb Van Dyke Date: Mon, 17 Jul 2023 12:56:31 -0500 Subject: [PATCH 1/4] fix: introduce a c++ implementation of pubsub source --- .bazelrc | 25 ++++ .coveragerc | 21 +++ .gitignore | 10 ++ BUILD | 33 +++++ WORKSPACE | 51 ++++++++ .../pipeline_pattern/pull_process_push.py | 3 + buildflow/core/app/runtime/actors/runtime.py | 2 + .../core/io/gcp/providers/pubsub_providers.py | 3 + buildflow/core/io/gcp/pubsub.py | 3 + .../io/gcp/strategies/_cython/__init__.py | 0 .../gcp/strategies/_cython/pubsub_source.pyx | 50 ++++++++ .../gcp/strategies/_cython/pubsub_stream.cpp | 49 +++++++ .../io/gcp/strategies/_cython/pubsub_stream.h | 43 +++++++ .../io/gcp/strategies/pubsub_strategies.py | 41 +++++- defs.bzl | 31 +++++ infra-as-code.py | 64 --------- infra-from-code.py | 57 --------- pyproject.toml | 90 ------------- sandbox.py | 78 ----------- sandbox2.py | 64 --------- sandbox3.py | 48 ------- sandbox4.py | 15 --- setup.py | 121 ++++++++++++++++++ 23 files changed, 483 insertions(+), 419 deletions(-) create mode 100644 .bazelrc create mode 100644 .coveragerc create mode 100644 BUILD create mode 100644 WORKSPACE create mode 100644 buildflow/core/io/gcp/strategies/_cython/__init__.py create mode 100644 buildflow/core/io/gcp/strategies/_cython/pubsub_source.pyx create mode 100644 buildflow/core/io/gcp/strategies/_cython/pubsub_stream.cpp create mode 100644 buildflow/core/io/gcp/strategies/_cython/pubsub_stream.h create mode 100644 defs.bzl delete mode 100644 infra-as-code.py delete mode 100644 infra-from-code.py delete mode 100644 pyproject.toml delete mode 100644 sandbox.py delete mode 100644 sandbox2.py delete mode 100644 sandbox3.py delete mode 100644 sandbox4.py create mode 100644 setup.py diff --git a/.bazelrc b/.bazelrc new file mode 100644 index 00000000..bfeef20c --- /dev/null +++ b/.bazelrc @@ -0,0 +1,25 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Use host-OS-specific config lines from bazelrc files. +build --enable_platform_specific_config=true + +# The project requires C++ >= 14. By default Bazel adds `-std=c++0x` which +# disables C++14 features, even if the compilers defaults to C++ >= 14 +build:linux --cxxopt=-std=c++14 +build:macos --cxxopt=-std=c++14 +# Protobuf and gRPC require (or soon will require) C++14 to compile the "host" +# targets, such as protoc and the grpc plugin. +build:linux --host_cxxopt=-std=c++14 +build:macos --host_cxxopt=-std=c++14 diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 00000000..e75830d0 --- /dev/null +++ b/.coveragerc @@ -0,0 +1,21 @@ +[report] +# Regexes for lines to exclude from consideration +exclude_also = [ + # Don't complain about missing debug-only code: + "def __repr__", + # Don't complain if tests don't hit defensive assertion code: + "raise AssertionError", + "raise NotImplementedError", + "if __name__ == .__main__.:", + # Don't complain about abstract methods, they aren't run: + "@(abc\\.)?abstractmethod", +] +ignore_errors = True +# Files to ignore +omit = [ + # Exclude samples + "buildflow/samples/*", + "buildflow/cli/*", + # Exclude skipped / prototype code + "buildflow/runtime/ray_io/duckdb_io*", +] diff --git a/.gitignore b/.gitignore index 96cd00cc..ceab0464 100644 --- a/.gitignore +++ b/.gitignore @@ -133,3 +133,13 @@ dmypy.json # local vscode config .vscode/ + +### Automatically added by Hedron's Bazel Compile Commands Extractor: https://github.com/hedronvision/bazel-compile-commands-extractor +# Ignore the `external` link (that is added by `bazel-compile-commands-extractor`). The link differs between macOS/Linux and Windows, so it shouldn't be checked in. The pattern must not end with a trailing `/` because it's a symlink on macOS/Linux. +/external +# Ignore links to Bazel's output. The pattern needs the `*` because people can change the name of the directory into which your repository is cloned (changing the `bazel-` symlink), and must not end with a trailing `/` because it's a symlink on macOS/Linux. +/bazel-* +# Ignore generated output. Although valuable (after all, the primary purpose of `bazel-compile-commands-extractor` is to produce `compile_commands.json`!), it should not be checked in. +/compile_commands.json +# Ignore the directory in which `clangd` stores its local index. +/.cache/ diff --git a/BUILD b/BUILD new file mode 100644 index 00000000..66c44dc3 --- /dev/null +++ b/BUILD @@ -0,0 +1,33 @@ +load("@com_github_grpc_grpc//bazel:cython_library.bzl", "pyx_library") +load("//:defs.bzl", "copy_to_workspace") + +package(default_visibility = ["//visibility:private"]) + +copy_to_workspace( + name = "cp_pubsub_so", + file = "buildflow/core/io/gcp/strategies/_cython/pubsub_source.so", + output_dir = "buildflow/core/io/gcp/strategies/_cython", +) + +pyx_library( + name = "cython_pubsub_source", + srcs = [ + "buildflow/core/io/gcp/strategies/_cython/pubsub_source.pyx", + ], + deps = [ + ":c_pubsub", + ], +) + +cc_library( + name = "c_pubsub", + srcs = [ + "buildflow/core/io/gcp/strategies/_cython/pubsub_stream.cpp", + ], + hdrs = [ + "buildflow/core/io/gcp/strategies/_cython/pubsub_stream.h", + ], + deps = [ + "@google_cloud_cpp//:pubsub", + ], +) diff --git a/WORKSPACE b/WORKSPACE new file mode 100644 index 00000000..4890a561 --- /dev/null +++ b/WORKSPACE @@ -0,0 +1,51 @@ +workspace(name = "com_github_buildflow") + +# Add the necessary Starlark functions to fetch google-cloud-cpp. +load("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive") + +# Fetch the Google Cloud C++ libraries. +# NOTE: Update this version and SHA256 as needed. +http_archive( + name = "google_cloud_cpp", + sha256 = "7204805106be2164b2048a965b3cc747dd8bd9193c52d9b572c07606ea72ab7e", + strip_prefix = "google-cloud-cpp-2.13.0", + url = "https://github.com/googleapis/google-cloud-cpp/archive/v2.13.0.tar.gz", +) + +# Load indirect dependencies due to +# https://github.com/bazelbuild/bazel/issues/1943 +load("@google_cloud_cpp//bazel:google_cloud_cpp_deps.bzl", "google_cloud_cpp_deps") + +google_cloud_cpp_deps() + +load("@com_google_googleapis//:repository_rules.bzl", "switched_rules_by_language") + +switched_rules_by_language( + name = "com_google_googleapis_imports", + cc = True, + grpc = True, +) + +load("@com_github_grpc_grpc//bazel:grpc_deps.bzl", "grpc_deps") + +grpc_deps() + +load("@com_github_grpc_grpc//bazel:grpc_extra_deps.bzl", "grpc_extra_deps") + +grpc_extra_deps() + +# Hedron's Compile Commands Extractor for Bazel +# https://github.com/hedronvision/bazel-compile-commands-extractor +http_archive( + name = "hedron_compile_commands", + strip_prefix = "bazel-compile-commands-extractor-ed994039a951b736091776d677f324b3903ef939", + + # Replace the commit hash in both places (below) with the latest, rather than using the stale one here. + # Even better, set up Renovate and let it do the work for you (see "Suggestion: Updates" in the README). + url = "https://github.com/hedronvision/bazel-compile-commands-extractor/archive/ed994039a951b736091776d677f324b3903ef939.tar.gz", + # When you first run this tool, it'll recommend a sha256 hash to put here with a message like: "DEBUG: Rule 'hedron_compile_commands' indicated that a canonical reproducible form can be obtained by modifying arguments sha256 = ..." +) + +load("@hedron_compile_commands//:workspace_setup.bzl", "hedron_compile_commands_setup") + +hedron_compile_commands_setup() diff --git a/buildflow/core/app/runtime/actors/pipeline_pattern/pull_process_push.py b/buildflow/core/app/runtime/actors/pipeline_pattern/pull_process_push.py index a0b7afe7..db0d34d3 100644 --- a/buildflow/core/app/runtime/actors/pipeline_pattern/pull_process_push.py +++ b/buildflow/core/app/runtime/actors/pipeline_pattern/pull_process_push.py @@ -246,6 +246,9 @@ async def process_element(element): # PUSH await sink.push(batch_results) + # NOTE: we need this to ensure that we can call the snapshot method for + # drains and autoscale checks + # await asyncio.sleep(0.1) except Exception: logging.exception( "failed to process batch, messages will not be acknowledged" diff --git a/buildflow/core/app/runtime/actors/runtime.py b/buildflow/core/app/runtime/actors/runtime.py index 028e59d2..d9207e25 100644 --- a/buildflow/core/app/runtime/actors/runtime.py +++ b/buildflow/core/app/runtime/actors/runtime.py @@ -120,6 +120,8 @@ async def drain(self) -> bool: ray.kill(processor_pool.actor_handle) for processor_pool in self._processor_pool_refs ] + # Kill the runtime actor to force the job to exit. + ray.actor.exit_actor() else: logging.warning("Draining Runtime...") logging.warning("-- Attempting to drain again will force stop the runtime.") diff --git a/buildflow/core/io/gcp/providers/pubsub_providers.py b/buildflow/core/io/gcp/providers/pubsub_providers.py index 71e47770..957a5729 100644 --- a/buildflow/core/io/gcp/providers/pubsub_providers.py +++ b/buildflow/core/io/gcp/providers/pubsub_providers.py @@ -58,6 +58,7 @@ def __init__( # source-only options batch_size: int = 1000, include_attributes: bool = False, + use_cpp_source: bool = False, # pulumi-only options ack_deadline_seconds: int = 10 * 60, message_retention_duration: str = "1200s", @@ -68,6 +69,7 @@ def __init__( # source-only options self.batch_size = batch_size self.include_attributes = include_attributes + self.use_cpp_source = use_cpp_source # pulumi-only options self.ack_deadline_seconds = ack_deadline_seconds self.message_retention_duration = message_retention_duration @@ -78,6 +80,7 @@ def source(self): project_id=self.project_id, batch_size=self.batch_size, include_attributes=self.include_attributes, + use_cpp_source=self.use_cpp_source, ) def pulumi_resources(self, type_: Optional[Type]): diff --git a/buildflow/core/io/gcp/pubsub.py b/buildflow/core/io/gcp/pubsub.py index c27be14b..47961cfa 100644 --- a/buildflow/core/io/gcp/pubsub.py +++ b/buildflow/core/io/gcp/pubsub.py @@ -60,6 +60,8 @@ class GCPPubSubSubscription(GCPPrimtive): subscription_name: PubSubSubscriptionName # required fields topic_id: PubSubTopicID + # Optional fields + use_cpp_source: bool = False @classmethod def from_gcp_options( @@ -87,6 +89,7 @@ def source_provider(self) -> GCPPubSubSubscriptionProvider: project_id=self.project_id, subscription_name=self.subscription_name, topic_id=self.topic_id, + use_cpp_source=self.use_cpp_source, ) # NOTE: Subscriptions do not support sinks, but we "implement" it here to diff --git a/buildflow/core/io/gcp/strategies/_cython/__init__.py b/buildflow/core/io/gcp/strategies/_cython/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/buildflow/core/io/gcp/strategies/_cython/pubsub_source.pyx b/buildflow/core/io/gcp/strategies/_cython/pubsub_source.pyx new file mode 100644 index 00000000..74604796 --- /dev/null +++ b/buildflow/core/io/gcp/strategies/_cython/pubsub_source.pyx @@ -0,0 +1,50 @@ +# distutils: language=c++ +# distutils: sources = pubsub_stream.cpp + +from libcpp.string cimport string +from libcpp.vector cimport vector + + +cdef extern from "buildflow/core/io/gcp/strategies/_cython/pubsub_stream.h" namespace "buildflow": + cdef cppclass CPubSubData: + string data + string ack_id + + CPubSubData(string data, string ack_id) except + + + cdef cppclass CPubSubStream: + string subscription_id + + CPubSubStream(string) except + + vector[CPubSubData] pull() + void ack(vector[string]) + + +cdef class PyPubSubData: + cdef CPubSubData *thisptr + def __cinit__(self, string data, string ack_id): + self.thisptr = new CPubSubData(data, ack_id) + + def __dealloc__(self): + del self.thisptr + + def ack_id(self): + return self.thisptr.ack_id + + def data(self): + return self.thisptr.data + +cdef class PyPubSubStream: + cdef CPubSubStream *thisptr + def __cinit__(self, string subscription_id): + self.thisptr = new CPubSubStream(subscription_id) + def __dealloc__(self): + del self.thisptr + def pull(self): + cdata = self.thisptr.pull() + out_list = [] + for i in range(cdata.size()): + out_list.append(PyPubSubData(cdata[i].data, cdata[i].ack_id)) + return out_list + def ack(self, vector[string] ack_ids): + return self.thisptr.ack(ack_ids) diff --git a/buildflow/core/io/gcp/strategies/_cython/pubsub_stream.cpp b/buildflow/core/io/gcp/strategies/_cython/pubsub_stream.cpp new file mode 100644 index 00000000..aa09afe6 --- /dev/null +++ b/buildflow/core/io/gcp/strategies/_cython/pubsub_stream.cpp @@ -0,0 +1,49 @@ +#include +#include +#include + +#include "google/pubsub/v1/pubsub.grpc.pb.h" +#include "google/pubsub/v1/pubsub.pb.h" +#include "grpc++/grpc++.h" + +#include "buildflow/core/io/gcp/strategies/_cython/pubsub_stream.h" + +buildflow::CPubSubStream::CPubSubStream(std::string sub_id) { + subscription_id = sub_id; + creds = grpc::GoogleDefaultCredentials(); + channel = grpc::CreateChannel("pubsub.googleapis.com", creds); + stub = std::make_unique(channel); + context = std::make_unique(); + pubsub_stream = stub->StreamingPull(context.get()); +} + +std::vector buildflow::CPubSubStream::pull() { + if (!is_stream_initialized) { + google::pubsub::v1::StreamingPullRequest request; + request.set_subscription(subscription_id); + request.set_stream_ack_deadline_seconds(10); + pubsub_stream->Write(request); + is_stream_initialized = true; + } + google::pubsub::v1::StreamingPullResponse response; + pubsub_stream->Read(&response); + + std::vector v; + for (const auto &message : response.received_messages()) { + v.push_back(CPubSubData(message.message().data(), message.ack_id())); + } + return v; +} + +void buildflow::CPubSubStream::ack(std::vector ack_ids) { + google::pubsub::v1::StreamingPullRequest request; + for (const auto &ack_id : ack_ids) { + request.add_ack_ids(ack_id); + } + pubsub_stream->Write(request); +} + +buildflow::CPubSubData::CPubSubData(std::string d, std::string a) { + data = d; + ack_id = a; +} diff --git a/buildflow/core/io/gcp/strategies/_cython/pubsub_stream.h b/buildflow/core/io/gcp/strategies/_cython/pubsub_stream.h new file mode 100644 index 00000000..20b9e606 --- /dev/null +++ b/buildflow/core/io/gcp/strategies/_cython/pubsub_stream.h @@ -0,0 +1,43 @@ +#ifndef PUBSUB_STREAM_H +#define PUBSUB_STREAM_H + +#include +#include +#include + +#include "google/pubsub/v1/pubsub.grpc.pb.h" +#include "google/pubsub/v1/pubsub.pb.h" +#include "grpc++/grpc++.h" + +namespace buildflow { + +class CPubSubData { +public: + CPubSubData(std::string data, std::string ack_id); + std::string data; + std::string ack_id; +}; + +class CPubSubStream { +private: + std::string subscription_id; + std::shared_ptr creds; + std::shared_ptr channel; + std::unique_ptr stub; + std::unique_ptr< + grpc::ClientReaderWriter> + pubsub_stream; + std::unique_ptr context; + // Will be initialized upon first call to pull() + bool is_stream_initialized = false; + +public: + CPubSubStream(std::string subscription_id); + void run(); + std::vector pull(); + void ack(std::vector ack_ids); +}; +}; // namespace buildflow + +#endif diff --git a/buildflow/core/io/gcp/strategies/pubsub_strategies.py b/buildflow/core/io/gcp/strategies/pubsub_strategies.py index b4d5b3e9..9969468e 100644 --- a/buildflow/core/io/gcp/strategies/pubsub_strategies.py +++ b/buildflow/core/io/gcp/strategies/pubsub_strategies.py @@ -9,6 +9,7 @@ from buildflow.core import utils +from buildflow.core.io.gcp.strategies._cython import pubsub_source from buildflow.core.io.utils.clients import gcp_clients from buildflow.core.io.utils.schemas import converters from buildflow.core.strategies.sink import Batch, SinkStrategy @@ -48,6 +49,7 @@ def __init__( project_id: GCPProjectID, batch_size: int = 1000, include_attributes: bool = False, + use_cpp_source: bool = False, ): super().__init__(strategy_id="gcp-pubsub-subscription-source") # configuration @@ -55,8 +57,14 @@ def __init__( self.project_id = project_id self.batch_size = batch_size self.include_attributes = include_attributes + self.use_cpp_source = use_cpp_source # setup - self.subscriber_client = gcp_clients.get_async_subscriber_client(project_id) + if not self.use_cpp_source: + self.subscriber_client = gcp_clients.get_async_subscriber_client(project_id) + else: + self.cpp_subscriber = pubsub_source.PyPubSubStream( + self.subscription_id.encode("utf-8") + ) self.publisher_client = gcp_clients.get_async_publisher_client(project_id) # initial state @@ -64,7 +72,7 @@ def __init__( def subscription_id(self) -> PubSubSubscriptionID: return f"projects/{self.project_id}/subscriptions/{self.subscription_name}" # noqa: E501 - async def pull(self) -> PullResponse: + async def _pyton_pull(self) -> PullResponse: try: response = await self.subscriber_client.pull( subscription=self.subscription_id, @@ -92,7 +100,22 @@ async def pull(self) -> PullResponse: return PullResponse(payloads, _PubsubAckInfo(ack_ids)) - async def ack(self, ack_info: _PubsubAckInfo, success: bool): + async def _cpp_pull(self) -> PullResponse: + messages = self.cpp_subscriber.pull() + payloads = [] + ack_ids = [] + for message in messages: + payloads.append(message.data()) + ack_ids.append(message.ack_id()) + return PullResponse(payloads, _PubsubAckInfo(ack_ids)) + + async def pull(self) -> PullResponse: + if self.use_cpp_source: + return await self._cpp_pull() + else: + return await self._pyton_pull() + + async def _python_ack(self, ack_info: _PubsubAckInfo, success: bool): if ack_info.ack_ids: if success: await self.subscriber_client.acknowledge( @@ -108,6 +131,18 @@ async def ack(self, ack_info: _PubsubAckInfo, success: bool): ack_deadline_seconds=ack_deadline_seconds, ) + async def _cpp_ack(self, ack_info: _PubsubAckInfo, success: bool): + if ack_info.ack_ids: + # TODO: need to implement nack for cpp source + if success: + self.cpp_subscriber.ack(ack_info.ack_ids) + + async def ack(self, ack_info: _PubsubAckInfo, success: bool): + if self.use_cpp_source: + return await self._cpp_ack(ack_info, success) + else: + return await self._python_ack(ack_info, success) + async def backlog(self) -> int: split_sub = self.subscription_id.split("/") project = split_sub[1] diff --git a/defs.bzl b/defs.bzl new file mode 100644 index 00000000..6119e836 --- /dev/null +++ b/defs.bzl @@ -0,0 +1,31 @@ +COPY_TEMPLATE = """\ +#!/bin/bash +FROM_PATH="$PWD/{copy_from}" +TO_PATH="$BUILD_WORKSPACE_DIRECTORY/{to}/" +echo "Copying from $FROM_PATH to $TO_PATH" +cp -f $FROM_PATH $TO_PATH +""" + +def _copy_to_workspace_impl(ctx): + copy_file = ctx.file.file + copy_runfile_path = copy_file.short_path + copy_to = ctx.attr.file.label.package + ctx.attr.output_dir + script = ctx.actions.declare_file(ctx.label.name) + script_content = COPY_TEMPLATE.format( + copy_from = copy_runfile_path, + to = copy_to, + ) + + ctx.actions.write(script, script_content, is_executable = True) + runfiles = ctx.runfiles(files = [copy_file]) + + return [DefaultInfo(executable = script, runfiles = runfiles)] + +copy_to_workspace = rule( + implementation = _copy_to_workspace_impl, + attrs = { + "file": attr.label(allow_single_file = True), + "output_dir": attr.string(mandatory = False), + }, + executable = True, +) diff --git a/infra-as-code.py b/infra-as-code.py deleted file mode 100644 index 57c804e0..00000000 --- a/infra-as-code.py +++ /dev/null @@ -1,64 +0,0 @@ -from dataclasses import dataclass -from datetime import datetime -from sqlalchemy.orm import Session - -from buildflow import Node, Depends - -# buildflow.resources.clients exposes all of the ClientResource types (for Depends API) -from buildflow.resources.clients import PostgresClientPool - -# buildflow.resources.io exposes all of the IOResource types (for Provider API) -from buildflow.resources.io import BigQueryTable, GCPPubSubSubscription - - -@dataclass -class TaxiOutput: - ride_id: str - point_idx: int - latitude: float - longitude: float - timestamp: datetime - meter_reading: float - meter_increment: float - ride_status: str - passenger_count: int - - -# Create a new Node with all the defaults set -app = Node() - -# Define the source and sink -pubsub_source = GCPPubSubSubscription( - topic_id="projects/pubsub-public-data/topics/taxirides-realtime", - project_id="daring-runway-374503", - subscription_name="taxiride-sub", -) -bigquery_sink = BigQueryTable( - table_id="daring-runway-374503.taxi_ride_benchmark.buildflow_temp", - include_dataset=False, - destroy_protection=False, -) -# Define the postgres client pool -db_pool = PostgresClientPool( - host="localhost", - port=5432, - user="postgres", - password="postgres", - database="postgres", - max_size=10, -) - - -# using imperative api DOES NOT REQUIRE all schema types to be provided. -@app.processor(source=pubsub_source, sink=bigquery_sink) -def my_processor(pubsub_message, db: Session = Depends(db_pool)): - return pubsub_message - - -if __name__ == "__main__": - app.run( - disable_usage_stats=True, - apply_infrastructure=False, - destroy_infrastructure=False, - start_node_server=True, - ) diff --git a/infra-from-code.py b/infra-from-code.py deleted file mode 100644 index 04413d1b..00000000 --- a/infra-from-code.py +++ /dev/null @@ -1,57 +0,0 @@ -from dataclasses import dataclass -from datetime import datetime -from sqlalchemy.orm import Session - -from buildflow import Node, Setup, InfraOptions - -# buildflow.resources exposes all of the declarative types (for Provider & Depends API). -# they use the io & client submodules to provide the actual implementations. -from buildflow.resources import RelationalDB, DataWarehouse, Queue - - -# Setup(Resource) will return the client/resource instance, not io/resource. -# Setup(fn) will return the result of calling the function. - -Resource.setup() -Resource.client() - -# using this option REQUIRES that all schemas are provided as type annotations. - - -@dataclass -class TaxiOutput: - ride_id: str - point_idx: int - latitude: float - longitude: float - timestamp: datetime - meter_reading: float - meter_increment: float - ride_status: str - passenger_count: int - - -# declarative types could have "local" implementations for testing. -# this is akin to our in-memory metrics vs prometheus. -app = Node(infra_config=InfraOptions(resource_provider="local")) - - -# using declarative api REQUIRES all schema types to be provided. -# Users can optionally pass an id for other resources to reference. -# We could also pass along any args, kwargs to the underlying resource(s). -@app.processor(source=Queue(id=...), sink=DataWarehouse(id=...)) -def my_processor( - # Depends(Resource) will also return the client/resource instance, not io/resource. - pubsub_message: TaxiOutput, - db: Session = Setup(RelationalDB(id=...)), -) -> TaxiOutput: - return pubsub_message - - -if __name__ == "__main__": - app.run( - disable_usage_stats=True, - apply_infrastructure=False, - destroy_infrastructure=False, - start_node_server=True, - ) diff --git a/pyproject.toml b/pyproject.toml deleted file mode 100644 index 239c2cb6..00000000 --- a/pyproject.toml +++ /dev/null @@ -1,90 +0,0 @@ -[build-system] -requires = ["setuptools>=64.0.0", "wheel"] -build-backend = "setuptools.build_meta" - -[project] -name = "buildflow" -version = "0.1.2" -authors = [ - { name = "Caleb Van Dyke", email = "caleb@launchflow.com" }, - { name = "Josh Tanke", email = "josh@launchflow.com" }, -] -description = "buildflow is a unified **batch** and **streaming** framework that turns any python function into a scalable data pipeline that can read from our supported IO resources." -readme = "README.md" -requires-python = ">=3.7" -dependencies = [ - # TODO: split up AWS and GCP dependencies. - "boto3", - "dacite", - "duckdb==0.6.0", - "google-auth", - "google-cloud-bigquery", - "google-cloud-bigquery-storage", - "google-cloud-monitoring", - "google-cloud-pubsub", - "google-cloud-storage", - # TODO: https://github.com/grpc/grpc/issues/31885 - "grpcio<1.51.1", - "fastparquet", - "opentelemetry-api", - "opentelemetry-sdk", - "opentelemetry-exporter-otlp", - "opentelemetry-exporter-jaeger", - "pandas", - "pulumi==3.35.3", - "pulumi_gcp", - "pyarrow", - "pydantic < 2.0.2", - "pyyaml", - "ray[default] > 2.0.0", - "ray[serve] > 2.0.0", - "typer", - "redis", -] -classifiers = [ - "Development Status :: 2 - Pre-Alpha", - "Intended Audience :: Developers", - "Topic :: Software Development", - "License :: OSI Approved :: Apache Software License", -] - -[project.optional-dependencies] -dev = [ - "moto", - "pytest", - "pytest-cov", - "pytest-timeout", - "ruff", - "black", - "pre-commit", - "setuptools", - "wheel" -] - -[tool.setuptools.packages] -find = {} # Scan the project directory with the default parameters - -[project.scripts] -buildflow = "buildflow.cli.main:main" - -[tool.coverage.report] -# Regexes for lines to exclude from consideration -exclude_also = [ - # Don't complain about missing debug-only code: - "def __repr__", - # Don't complain if tests don't hit defensive assertion code: - "raise AssertionError", - "raise NotImplementedError", - "if __name__ == .__main__.:", - # Don't complain about abstract methods, they aren't run: - "@(abc\\.)?abstractmethod", -] -ignore_errors = true -# Files to ignore -omit = [ - # Exclude samples - "buildflow/samples/*", - "buildflow/cli/*", - # Exclude skipped / prototype code - "buildflow/runtime/ray_io/duckdb_io*" -] diff --git a/sandbox.py b/sandbox.py deleted file mode 100644 index edcbe5f0..00000000 --- a/sandbox.py +++ /dev/null @@ -1,78 +0,0 @@ -from dataclasses import dataclass -from datetime import datetime - -from buildflow import Node, RuntimeConfig -from buildflow.resources.io import GCPPubSubSubscription, BigQueryTable - - -@dataclass -class TaxiOutput: - ride_id: str - point_idx: int - latitude: float - longitude: float - timestamp: datetime - meter_reading: float - meter_increment: float - ride_status: str - passenger_count: int - - -# Create a new Node with all the defaults set -runtime_config = RuntimeConfig.IO_BOUND(autoscale=True, max_replicas=4) -# infra_config = InfraConfig( -# schema_validation=SchemaValidation.LOG_WARNING, -# require_confirmation=False, -# log_level="WARNING", -# ) -# app = Node(runtime_config=runtime_config, infra_config=infra_config) - -# Create a new Node -app = Node(runtime_config=runtime_config) - -# Define the source and sink -pubsub_source = GCPPubSubSubscription( - topic_id="projects/pubsub-public-data/topics/taxirides-realtime", - project_id="daring-runway-374503", - subscription_name="taxiride-sub", -) -bigquery_sink = BigQueryTable( - table_id="daring-runway-374503.taxi_ride_benchmark.buildflow_temp", - include_dataset=False, - destroy_protection=False, -) - - -# Attach a processor to the Node -@app.processor( - source=pubsub_source, - sink=bigquery_sink, - num_cpus=0.5, - num_concurrency=8, -) -def my_processor(pubsub_message: TaxiOutput) -> TaxiOutput: - # print('Process: ', pubsub_message) - # should_fail = random.randint(0, 1) - # if should_fail: - # raise ValueError("Randomly failing") - return pubsub_message - - -if __name__ == "__main__": - app.run( - disable_usage_stats=True, - # runtime-only options - block_runtime=True, - debug_run=False, - # infra-only options. - apply_infrastructure=False, - # Ad hoc infra is really nice for quick demos / tests - destroy_infrastructure=False, - # server options - start_node_server=True, - ) - - # these should also work: - # app.plan() - # app.apply() - # app.destroy() diff --git a/sandbox2.py b/sandbox2.py deleted file mode 100644 index 519768ba..00000000 --- a/sandbox2.py +++ /dev/null @@ -1,64 +0,0 @@ -import dataclasses -from datetime import datetime - -from buildflow import Node, ResourcesConfig -from buildflow.resources import DataWarehouseTable, Topic - - -@dataclasses.dataclass -class TaxiOutput: - ride_id: str - point_idx: int - latitude: float - longitude: float - timestamp: datetime - meter_reading: float - meter_increment: float - ride_status: str - passenger_count: int - - -resource_config = ResourcesConfig.default() -# resource_config.gcp.default_project_id = "daring-runway-374503" -# Create a new Node with custom resource config -app = Node(resource_config=resource_config) - - -# Define the source and sink -source = Topic( - resource_id="tanke_topic_id", - resource_provider="gcp", -) -sink = DataWarehouseTable( - resource_id="tanke_table_id", - resource_provider="gcp", -) - - -# Attach a processor to the Node -@app.processor(source=source, sink=sink) -def my_processor(pubsub_message: TaxiOutput) -> TaxiOutput: - return pubsub_message - - -# print(dataclasses.asdict(my_processor.source().resource_type())) -# print(dataclasses.asdict(my_processor.sink().resource_type())) - -# if __name__ == "__main__": -# app.run( -# disable_usage_stats=True, -# # runtime-only options -# block_runtime=True, -# debug_run=False, -# # infra-only options. -# apply_infrastructure=False, -# # Ad hoc infra is really nice for quick demos / tests -# destroy_infrastructure=False, -# # server options -# start_node_server=True, -# ) - -# these should also work: -# app.plan() -# app.apply() -# app.destroy() diff --git a/sandbox3.py b/sandbox3.py deleted file mode 100644 index 19d98455..00000000 --- a/sandbox3.py +++ /dev/null @@ -1,48 +0,0 @@ -from buildflow.core.io import AnalysisTable -from buildflow.core.app.flow import Flow -from buildflow.core.options import ( - FlowOptions, - AutoscalerOptions, - RuntimeOptions, - InfraOptions, -) -from buildflow.core.io.gcp import GCPPubSubSubscription, BigQueryTable -from sandbox4 import TaxiOutput - -# Create a new Flow -app = Flow( - flow_options=FlowOptions( - infra_options=InfraOptions.default(), - runtime_options=RuntimeOptions( - processor_options={}, - autoscaler_options=AutoscalerOptions( - enable_autoscaler=False, - min_replicas=1, - max_replicas=10, - log_level="INFO", - ), - num_replicas=10, - log_level="INFO", - ), - ) -) - - -# Define the source and sink -pubsub_source = GCPPubSubSubscription( - topic_id="projects/pubsub-public-data/topics/taxirides-realtime", - project_id="daring-runway-374503", - subscription_name="taxiride-sub", -) -bigquery_sink = AnalysisTable(table_name="tanke_table") - - -# Attach a processor to the Flow -@app.pipeline( - source=pubsub_source, - sink=bigquery_sink, - num_cpus=1.0, - num_concurrency=8, -) -def my_processor(pubsub_message: TaxiOutput) -> TaxiOutput: - return pubsub_message diff --git a/sandbox4.py b/sandbox4.py deleted file mode 100644 index 6979fac7..00000000 --- a/sandbox4.py +++ /dev/null @@ -1,15 +0,0 @@ -import dataclasses -from datetime import datetime - - -@dataclasses.dataclass -class TaxiOutput: - ride_id: str - point_idx: int - latitude: float - longitude: float - timestamp: datetime - meter_reading: float - meter_increment: float - ride_status: str - passenger_count: int diff --git a/setup.py b/setup.py new file mode 100644 index 00000000..754145f6 --- /dev/null +++ b/setup.py @@ -0,0 +1,121 @@ +import os +import subprocess +import sys +from typing import List + + +ROOT_DIR = os.path.dirname(__file__) + + +# Calls Bazel in PATH, falling back to the standard user installation path +# (~/bin/bazel) if it isn't found. +def run_bazel(cmd_args: List[str], *args, **kwargs): + home = os.path.expanduser("~") + first_candidate = os.getenv("BAZEL_PATH", "bazel") + candidates = [first_candidate] + candidates.append(os.path.join(home, "bin", "bazel")) + result = None + for i, cmd in enumerate(candidates): + try: + result = subprocess.check_call([cmd] + cmd_args, *args, **kwargs) + break + except IOError: + if i >= len(candidates) - 1: + raise + return result + + +def build(): + bazel_env = dict(os.environ, PYTHON3_BIN_PATH=sys.executable) + + run_bazel( + ["build", "//:cython_pubsub_source"], + env=bazel_env, + ) + run_bazel( + ["run", "//:cp_pubsub_so"], + env=bazel_env, + ) + + +if __name__ == "__main__": + import setuptools + import setuptools.command.build_ext + + class build_ext(setuptools.command.build_ext.build_ext): + def run(self): + build() + + class BinaryDistribution(setuptools.Distribution): + def has_ext_modules(self): + return True + + +setuptools.setup( + name="buildflow", + version="0.1.2", + python_requires=">=3.7", + author="LaunchFlow", + author_email="founders@launchflow.com", + description="Build your entire system in minutes using Python.", # noqa: E501 + long_description=open( + os.path.join(ROOT_DIR, "README.md"), "r", encoding="utf-8" + ).read(), + url="http://www.buildflow.dev", + classifiers=[ + "Intended Audience :: Developers", + "Topic :: Software Development", + "License :: OSI Approved :: Apache Software License", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + ], + packages=["buildflow"], + cmdclass={"build_ext": build_ext}, + distclass=BinaryDistribution, + entry_points={"console_scripts": ["buildflow = buildflow.cli.main:main"]}, + setup_requires=["cython >= 0.29.32", "wheel"], + install_requires=[ + # TODO: split up AWS and GCP dependencies. + "boto3", + "dacite", + "duckdb==0.6.0", + "google-auth", + "google-cloud-bigquery", + "google-cloud-bigquery-storage", + "google-cloud-monitoring", + "google-cloud-pubsub", + "google-cloud-storage", + # TODO: https://github.com/grpc/grpc/issues/31885 + "grpcio<1.51.1", + "fastparquet", + "opentelemetry-api", + "opentelemetry-sdk", + "opentelemetry-exporter-otlp", + "opentelemetry-exporter-jaeger", + "pandas", + "pulumi==3.35.3", + "pulumi_gcp", + "pyarrow", + "pydantic < 2.0.2", + "pyyaml", + "ray[default] > 2.0.0", + "ray[serve] > 2.0.0", + "typer", + "redis", + ], + extras_require={ + "def": [ + "moto", + "pytest", + "pytest-cov", + "pytest-timeout", + "ruff", + "black", + "pre-commit", + "setuptools", + "wheel", + ] + }, +) From 1cfdfc0e178b4595ae1dbb37d8af8de11f3d214c Mon Sep 17 00:00:00 2001 From: Caleb Van Dyke Date: Mon, 17 Jul 2023 15:29:10 -0500 Subject: [PATCH 2/4] temp --- .../pipeline_pattern/pull_process_push.py | 252 +++++++++--------- .../core/app/runtime/actors/process_pool.py | 6 +- buildflow/core/app/runtime/actors/runtime.py | 6 +- buildflow/core/io/local/empty.py | 18 ++ .../core/io/local/providers/empty_provider.py | 7 + .../io/local/strategies/empty_strategies.py | 16 ++ buildflow/core/options/runtime_options.py | 2 +- 7 files changed, 182 insertions(+), 125 deletions(-) create mode 100644 buildflow/core/io/local/empty.py create mode 100644 buildflow/core/io/local/providers/empty_provider.py create mode 100644 buildflow/core/io/local/strategies/empty_strategies.py diff --git a/buildflow/core/app/runtime/actors/pipeline_pattern/pull_process_push.py b/buildflow/core/app/runtime/actors/pipeline_pattern/pull_process_push.py index db0d34d3..e77bfd4f 100644 --- a/buildflow/core/app/runtime/actors/pipeline_pattern/pull_process_push.py +++ b/buildflow/core/app/runtime/actors/pipeline_pattern/pull_process_push.py @@ -81,7 +81,7 @@ def __init__( # initial runtime state self._status = RuntimeStatus.IDLE - self._num_running_threads = 0 + self._running_tasks = [] self._replica_id = replica_id self._last_snapshot_time = time.monotonic() # metrics @@ -147,139 +147,149 @@ def __init__( }, ) - async def run(self): - pid = os.getpid() - proc = psutil.Process(pid) - if self._status == RuntimeStatus.IDLE: - logging.info("Starting PullProcessPushActor...") - self._status = RuntimeStatus.RUNNING - elif self._status == RuntimeStatus.DRAINING: - raise RuntimeError("Cannot run a PullProcessPushActor that is draining.") - - logging.debug("Starting Thread...") - self._num_running_threads += 1 - - raw_process_fn = self.processor.process - full_arg_spec = inspect.getfullargspec(raw_process_fn) - output_type = None - input_type = None - if "return" in full_arg_spec.annotations: - output_type = full_arg_spec.annotations["return"] + async def _run_until_complete(self): + print("IN HERE") + try: + pid = os.getpid() + proc = psutil.Process(pid) + raw_process_fn = self.processor.process + full_arg_spec = inspect.getfullargspec(raw_process_fn) + output_type = None + input_type = None + if "return" in full_arg_spec.annotations: + output_type = full_arg_spec.annotations["return"] + if ( + hasattr(output_type, "__origin__") + and ( + output_type.__origin__ is list + or output_type.__origin__ is tuple + ) + and hasattr(output_type, "__args__") + ): + # We will flatten the return type if the outter most type is a tuple + # or list. + output_type = output_type.__args__[0] if ( - hasattr(output_type, "__origin__") - and (output_type.__origin__ is list or output_type.__origin__ is tuple) - and hasattr(output_type, "__args__") + len(full_arg_spec.args) > 1 + and full_arg_spec.args[1] in full_arg_spec.annotations ): - # We will flatten the return type if the outter most type is a tuple or - # list. - output_type = output_type.__args__[0] - if ( - len(full_arg_spec.args) > 1 - and full_arg_spec.args[1] in full_arg_spec.annotations - ): - input_type = full_arg_spec.annotations[full_arg_spec.args[1]] - source = self.processor.source() - sink = self.processor.sink() - pull_converter = source.pull_converter(input_type) - push_converter = sink.push_converter(output_type) - process_fn = raw_process_fn - if not inspect.iscoroutinefunction(raw_process_fn): - # Wrap the raw process function in an async function to make our calls below - # easier - async def wrapped_process_fn(x): - return raw_process_fn(x) + input_type = full_arg_spec.annotations[full_arg_spec.args[1]] + source = self.processor.source() + sink = self.processor.sink() + pull_converter = source.pull_converter(input_type) + push_converter = sink.push_converter(output_type) + process_fn = raw_process_fn + if not inspect.iscoroutinefunction(raw_process_fn): + # Wrap the raw process function in an async function to make our calls + # below easier + async def wrapped_process_fn(x): + return raw_process_fn(x) - process_fn = wrapped_process_fn + process_fn = wrapped_process_fn + + async def process_element(element): + results = await process_fn(pull_converter(element)) + if isinstance(results, (list, tuple)): + return [push_converter(result) for result in results] + else: + return push_converter(results) + + while self._status == RuntimeStatus.RUNNING: + print(f"DO NOT SUBMIT: running... {self._replica_id}") + proc.cpu_percent() + # PULL + total_start_time = time.monotonic() + try: + response = await source.pull() + except Exception: + logging.exception("pull failed") + continue + if not response.payload: + self._pull_percentage_counter.empty_inc() + cpu_percent = proc.cpu_percent() + if cpu_percent > 0.0: + self.cpu_percentage.inc(cpu_percent) + else: + self.cpu_percentage.empty_inc() + continue + # PROCESS + process_success = True + process_start_time = time.monotonic() + self._pull_percentage_counter.inc( + len(response.payload) / self.max_batch_size + ) + try: + coros = [] + for element in response.payload: + coros.append(process_element(element)) + flattened_results = await asyncio.gather(*coros) + batch_results = [] + for results in flattened_results: + if isinstance(results, list): + batch_results.extend(results) + else: + batch_results.append(results) - async def process_element(element): - results = await process_fn(pull_converter(element)) - if isinstance(results, (list, tuple)): - return [push_converter(result) for result in results] - else: - return push_converter(results) + batch_process_time_millis = ( + time.monotonic() - process_start_time + ) * 1000 + self.batch_time_counter.inc( + (time.monotonic() - process_start_time) * 1000 + ) + element_process_time_millis = batch_process_time_millis / len( + batch_results + ) + self.process_time_counter.inc(element_process_time_millis) - while self._status == RuntimeStatus.RUNNING: - proc.cpu_percent() - # PULL - total_start_time = time.monotonic() - try: - response = await source.pull() - except Exception: - logging.exception("pull failed") - continue - if not response.payload: - self._pull_percentage_counter.empty_inc() + # PUSH + await sink.push(batch_results) + # NOTE: we need this to ensure that we can call the snapshot method + # for drains and autoscale checks + # await asyncio.sleep(0.1) + except Exception: + logging.exception( + "failed to process batch, messages will not be acknowledged" + ) + process_success = False + finally: + # ACK + try: + await source.ack(response.ack_info, process_success) + except Exception: + # This can happen if there is network failures for w/e reason + # we want to try and catch here so our runtime loop + # doesn't die. + logging.exception("failed to ack batch, will continue") + continue + self.num_events_processed.inc(len(response.payload)) + # DONE -> LOOP + self.total_time_counter.inc( + (time.monotonic() - total_start_time) * 1000 + ) cpu_percent = proc.cpu_percent() if cpu_percent > 0.0: + # Ray doesn't like it when we try to set a metric to 0 self.cpu_percentage.inc(cpu_percent) else: self.cpu_percentage.empty_inc() - continue - # PROCESS - process_success = True - process_start_time = time.monotonic() - self._pull_percentage_counter.inc( - len(response.payload) / self.max_batch_size - ) - try: - coros = [] - for element in response.payload: - coros.append(process_element(element)) - flattened_results = await asyncio.gather(*coros) - batch_results = [] - for results in flattened_results: - if isinstance(results, list): - batch_results.extend(results) - else: - batch_results.append(results) - batch_process_time_millis = ( - time.monotonic() - process_start_time - ) * 1000 - self.batch_time_counter.inc( - (time.monotonic() - process_start_time) * 1000 - ) - element_process_time_millis = batch_process_time_millis / len( - batch_results - ) - self.process_time_counter.inc(element_process_time_millis) - - # PUSH - await sink.push(batch_results) - # NOTE: we need this to ensure that we can call the snapshot method for - # drains and autoscale checks - # await asyncio.sleep(0.1) - except Exception: - logging.exception( - "failed to process batch, messages will not be acknowledged" - ) - process_success = False - finally: - # ACK - try: - await source.ack(response.ack_info, process_success) - except Exception: - # This can happen if there is network failures for w/e reason - # we want to try and catch here so our runtime loop - # doesn't die. - logging.exception("failed to ack batch, will continue") - continue - self.num_events_processed.inc(len(response.payload)) - # DONE -> LOOP - self.total_time_counter.inc((time.monotonic() - total_start_time) * 1000) - cpu_percent = proc.cpu_percent() - if cpu_percent > 0.0: - # Ray doesn't like it when we try to set a metric to 0 - self.cpu_percentage.inc(cpu_percent) - else: - self.cpu_percentage.empty_inc() + if not self._running_tasks: + self._status = RuntimeStatus.IDLE + logging.info("PullProcessPushActor Complete.") - self._num_running_threads -= 1 - if self._num_running_threads == 0: - self._status = RuntimeStatus.IDLE - logging.info("PullProcessPushActor Complete.") + logging.debug("Thread Complete.") + except Exception as e: + print("DO NOT SUBMIT: ", e) + logging.exception("Thread failed.") - logging.debug("Thread Complete.") + async def run(self): + if self._status == RuntimeStatus.IDLE: + logging.info("Starting PullProcessPushActor...") + self._status = RuntimeStatus.RUNNING + elif self._status == RuntimeStatus.DRAINING: + raise RuntimeError("Cannot run a PullProcessPushActor that is draining.") + logging.debug("Starting Thread...") + self._running_tasks.append(self._run_until_complete()) async def status(self): # TODO: Have this method count the number of active threads diff --git a/buildflow/core/app/runtime/actors/process_pool.py b/buildflow/core/app/runtime/actors/process_pool.py index d3620a2f..db54614a 100644 --- a/buildflow/core/app/runtime/actors/process_pool.py +++ b/buildflow/core/app/runtime/actors/process_pool.py @@ -96,18 +96,22 @@ async def create_replica(self): raise NotImplementedError("create_replica must be implemented by subclasses.") async def add_replicas(self, num_replicas: int): + print("DO NOT SUBMIT: adding replicas: ", num_replicas) if self._status != RuntimeStatus.RUNNING: raise RuntimeError( "Can only add replicas to a processor pool that is running." ) + replica_run_coros = [] for _ in range(num_replicas): replica = await self.create_replica() if self._status == RuntimeStatus.RUNNING: for _ in range(self.options.num_concurrency): - replica.ray_actor_handle.run.remote() + replica_run_coros.append(replica.ray_actor_handle.run.remote()) self.replicas.append(replica) + await asyncio.wait(replica_run_coros) + self.num_replicas_gauge.set(len(self.replicas)) async def remove_replicas(self, num_replicas: int): diff --git a/buildflow/core/app/runtime/actors/runtime.py b/buildflow/core/app/runtime/actors/runtime.py index d9207e25..cf8ce501 100644 --- a/buildflow/core/app/runtime/actors/runtime.py +++ b/buildflow/core/app/runtime/actors/runtime.py @@ -216,9 +216,11 @@ async def _scale_processor( num_replicas_delta = target_num_replicas - current_num_replicas if num_replicas_delta > 0: - processor_pool.actor_handle.add_replicas.remote(num_replicas_delta) + await processor_pool.actor_handle.add_replicas.remote(num_replicas_delta) elif num_replicas_delta < 0: - processor_pool.actor_handle.remove_replicas.remote(abs(num_replicas_delta)) + await processor_pool.actor_handle.remove_replicas.remote( + abs(num_replicas_delta) + ) return processor_snapshot async def _runtime_autoscale_loop(self): diff --git a/buildflow/core/io/local/empty.py b/buildflow/core/io/local/empty.py new file mode 100644 index 00000000..2a4be96f --- /dev/null +++ b/buildflow/core/io/local/empty.py @@ -0,0 +1,18 @@ +import dataclasses + +from buildflow.config.cloud_provider_config import LocalOptions +from buildflow.core.io.local.providers.empty_provider import EmptryProvider +from buildflow.core.io.primitive import LocalPrimtive + + +@dataclasses.dataclass +class Empty(LocalPrimtive): + @classmethod + def from_local_options( + cls, + local_options: LocalOptions, + ) -> "Empty": + return cls() + + def sink_provider(self) -> EmptryProvider: + return EmptryProvider() diff --git a/buildflow/core/io/local/providers/empty_provider.py b/buildflow/core/io/local/providers/empty_provider.py new file mode 100644 index 00000000..34f3254b --- /dev/null +++ b/buildflow/core/io/local/providers/empty_provider.py @@ -0,0 +1,7 @@ +from buildflow.core.io.local.strategies.empty_strategies import EmptySink +from buildflow.core.providers.provider import SinkProvider + + +class EmptryProvider(SinkProvider): + def sink(self): + return EmptySink() diff --git a/buildflow/core/io/local/strategies/empty_strategies.py b/buildflow/core/io/local/strategies/empty_strategies.py new file mode 100644 index 00000000..d607c629 --- /dev/null +++ b/buildflow/core/io/local/strategies/empty_strategies.py @@ -0,0 +1,16 @@ +from typing import Any, Callable, Type + +from buildflow.core.io.utils.schemas import converters +from buildflow.core.strategies.sink import SinkStrategy + + +class EmptySink(SinkStrategy): + def __init__(self): + super().__init__(strategy_id="local-empty-sink") + + def push_converter(self, user_defined_type: Type) -> Callable[[Any], Any]: + return converters.identity() + + async def push(self, elements: Any): + # Just drop the elements on the floor + pass diff --git a/buildflow/core/options/runtime_options.py b/buildflow/core/options/runtime_options.py index e4f875cf..f0854150 100644 --- a/buildflow/core/options/runtime_options.py +++ b/buildflow/core/options/runtime_options.py @@ -27,7 +27,7 @@ class AutoscalerOptions(Options): min_replicas: int max_replicas: int log_level: str - autoscale_frequency_secs: int = 60 + autoscale_frequency_secs: int = 30 pipeline_backlog_burn_threshold: int = 60 pipeline_cpu_percent_target: int = 25 From c1aa3acf0e3b27c78fef105adeea7f665ec03653 Mon Sep 17 00:00:00 2001 From: Caleb Van Dyke Date: Mon, 17 Jul 2023 16:13:13 -0500 Subject: [PATCH 3/4] remove loop --- .../pipeline_pattern/pull_process_push.py | 252 +++++++++--------- .../core/app/runtime/actors/process_pool.py | 9 +- buildflow/core/options/runtime_options.py | 2 +- 3 files changed, 124 insertions(+), 139 deletions(-) diff --git a/buildflow/core/app/runtime/actors/pipeline_pattern/pull_process_push.py b/buildflow/core/app/runtime/actors/pipeline_pattern/pull_process_push.py index e77bfd4f..db0d34d3 100644 --- a/buildflow/core/app/runtime/actors/pipeline_pattern/pull_process_push.py +++ b/buildflow/core/app/runtime/actors/pipeline_pattern/pull_process_push.py @@ -81,7 +81,7 @@ def __init__( # initial runtime state self._status = RuntimeStatus.IDLE - self._running_tasks = [] + self._num_running_threads = 0 self._replica_id = replica_id self._last_snapshot_time = time.monotonic() # metrics @@ -147,149 +147,139 @@ def __init__( }, ) - async def _run_until_complete(self): - print("IN HERE") - try: - pid = os.getpid() - proc = psutil.Process(pid) - raw_process_fn = self.processor.process - full_arg_spec = inspect.getfullargspec(raw_process_fn) - output_type = None - input_type = None - if "return" in full_arg_spec.annotations: - output_type = full_arg_spec.annotations["return"] - if ( - hasattr(output_type, "__origin__") - and ( - output_type.__origin__ is list - or output_type.__origin__ is tuple - ) - and hasattr(output_type, "__args__") - ): - # We will flatten the return type if the outter most type is a tuple - # or list. - output_type = output_type.__args__[0] - if ( - len(full_arg_spec.args) > 1 - and full_arg_spec.args[1] in full_arg_spec.annotations - ): - input_type = full_arg_spec.annotations[full_arg_spec.args[1]] - source = self.processor.source() - sink = self.processor.sink() - pull_converter = source.pull_converter(input_type) - push_converter = sink.push_converter(output_type) - process_fn = raw_process_fn - if not inspect.iscoroutinefunction(raw_process_fn): - # Wrap the raw process function in an async function to make our calls - # below easier - async def wrapped_process_fn(x): - return raw_process_fn(x) + async def run(self): + pid = os.getpid() + proc = psutil.Process(pid) + if self._status == RuntimeStatus.IDLE: + logging.info("Starting PullProcessPushActor...") + self._status = RuntimeStatus.RUNNING + elif self._status == RuntimeStatus.DRAINING: + raise RuntimeError("Cannot run a PullProcessPushActor that is draining.") - process_fn = wrapped_process_fn + logging.debug("Starting Thread...") + self._num_running_threads += 1 - async def process_element(element): - results = await process_fn(pull_converter(element)) - if isinstance(results, (list, tuple)): - return [push_converter(result) for result in results] - else: - return push_converter(results) + raw_process_fn = self.processor.process + full_arg_spec = inspect.getfullargspec(raw_process_fn) + output_type = None + input_type = None + if "return" in full_arg_spec.annotations: + output_type = full_arg_spec.annotations["return"] + if ( + hasattr(output_type, "__origin__") + and (output_type.__origin__ is list or output_type.__origin__ is tuple) + and hasattr(output_type, "__args__") + ): + # We will flatten the return type if the outter most type is a tuple or + # list. + output_type = output_type.__args__[0] + if ( + len(full_arg_spec.args) > 1 + and full_arg_spec.args[1] in full_arg_spec.annotations + ): + input_type = full_arg_spec.annotations[full_arg_spec.args[1]] + source = self.processor.source() + sink = self.processor.sink() + pull_converter = source.pull_converter(input_type) + push_converter = sink.push_converter(output_type) + process_fn = raw_process_fn + if not inspect.iscoroutinefunction(raw_process_fn): + # Wrap the raw process function in an async function to make our calls below + # easier + async def wrapped_process_fn(x): + return raw_process_fn(x) - while self._status == RuntimeStatus.RUNNING: - print(f"DO NOT SUBMIT: running... {self._replica_id}") - proc.cpu_percent() - # PULL - total_start_time = time.monotonic() - try: - response = await source.pull() - except Exception: - logging.exception("pull failed") - continue - if not response.payload: - self._pull_percentage_counter.empty_inc() - cpu_percent = proc.cpu_percent() - if cpu_percent > 0.0: - self.cpu_percentage.inc(cpu_percent) - else: - self.cpu_percentage.empty_inc() - continue - # PROCESS - process_success = True - process_start_time = time.monotonic() - self._pull_percentage_counter.inc( - len(response.payload) / self.max_batch_size - ) - try: - coros = [] - for element in response.payload: - coros.append(process_element(element)) - flattened_results = await asyncio.gather(*coros) - batch_results = [] - for results in flattened_results: - if isinstance(results, list): - batch_results.extend(results) - else: - batch_results.append(results) + process_fn = wrapped_process_fn - batch_process_time_millis = ( - time.monotonic() - process_start_time - ) * 1000 - self.batch_time_counter.inc( - (time.monotonic() - process_start_time) * 1000 - ) - element_process_time_millis = batch_process_time_millis / len( - batch_results - ) - self.process_time_counter.inc(element_process_time_millis) + async def process_element(element): + results = await process_fn(pull_converter(element)) + if isinstance(results, (list, tuple)): + return [push_converter(result) for result in results] + else: + return push_converter(results) - # PUSH - await sink.push(batch_results) - # NOTE: we need this to ensure that we can call the snapshot method - # for drains and autoscale checks - # await asyncio.sleep(0.1) - except Exception: - logging.exception( - "failed to process batch, messages will not be acknowledged" - ) - process_success = False - finally: - # ACK - try: - await source.ack(response.ack_info, process_success) - except Exception: - # This can happen if there is network failures for w/e reason - # we want to try and catch here so our runtime loop - # doesn't die. - logging.exception("failed to ack batch, will continue") - continue - self.num_events_processed.inc(len(response.payload)) - # DONE -> LOOP - self.total_time_counter.inc( - (time.monotonic() - total_start_time) * 1000 - ) + while self._status == RuntimeStatus.RUNNING: + proc.cpu_percent() + # PULL + total_start_time = time.monotonic() + try: + response = await source.pull() + except Exception: + logging.exception("pull failed") + continue + if not response.payload: + self._pull_percentage_counter.empty_inc() cpu_percent = proc.cpu_percent() if cpu_percent > 0.0: - # Ray doesn't like it when we try to set a metric to 0 self.cpu_percentage.inc(cpu_percent) else: self.cpu_percentage.empty_inc() + continue + # PROCESS + process_success = True + process_start_time = time.monotonic() + self._pull_percentage_counter.inc( + len(response.payload) / self.max_batch_size + ) + try: + coros = [] + for element in response.payload: + coros.append(process_element(element)) + flattened_results = await asyncio.gather(*coros) + batch_results = [] + for results in flattened_results: + if isinstance(results, list): + batch_results.extend(results) + else: + batch_results.append(results) - if not self._running_tasks: - self._status = RuntimeStatus.IDLE - logging.info("PullProcessPushActor Complete.") + batch_process_time_millis = ( + time.monotonic() - process_start_time + ) * 1000 + self.batch_time_counter.inc( + (time.monotonic() - process_start_time) * 1000 + ) + element_process_time_millis = batch_process_time_millis / len( + batch_results + ) + self.process_time_counter.inc(element_process_time_millis) + + # PUSH + await sink.push(batch_results) + # NOTE: we need this to ensure that we can call the snapshot method for + # drains and autoscale checks + # await asyncio.sleep(0.1) + except Exception: + logging.exception( + "failed to process batch, messages will not be acknowledged" + ) + process_success = False + finally: + # ACK + try: + await source.ack(response.ack_info, process_success) + except Exception: + # This can happen if there is network failures for w/e reason + # we want to try and catch here so our runtime loop + # doesn't die. + logging.exception("failed to ack batch, will continue") + continue + self.num_events_processed.inc(len(response.payload)) + # DONE -> LOOP + self.total_time_counter.inc((time.monotonic() - total_start_time) * 1000) + cpu_percent = proc.cpu_percent() + if cpu_percent > 0.0: + # Ray doesn't like it when we try to set a metric to 0 + self.cpu_percentage.inc(cpu_percent) + else: + self.cpu_percentage.empty_inc() - logging.debug("Thread Complete.") - except Exception as e: - print("DO NOT SUBMIT: ", e) - logging.exception("Thread failed.") + self._num_running_threads -= 1 + if self._num_running_threads == 0: + self._status = RuntimeStatus.IDLE + logging.info("PullProcessPushActor Complete.") - async def run(self): - if self._status == RuntimeStatus.IDLE: - logging.info("Starting PullProcessPushActor...") - self._status = RuntimeStatus.RUNNING - elif self._status == RuntimeStatus.DRAINING: - raise RuntimeError("Cannot run a PullProcessPushActor that is draining.") - logging.debug("Starting Thread...") - self._running_tasks.append(self._run_until_complete()) + logging.debug("Thread Complete.") async def status(self): # TODO: Have this method count the number of active threads diff --git a/buildflow/core/app/runtime/actors/process_pool.py b/buildflow/core/app/runtime/actors/process_pool.py index db54614a..e8d02c7f 100644 --- a/buildflow/core/app/runtime/actors/process_pool.py +++ b/buildflow/core/app/runtime/actors/process_pool.py @@ -96,21 +96,18 @@ async def create_replica(self): raise NotImplementedError("create_replica must be implemented by subclasses.") async def add_replicas(self, num_replicas: int): - print("DO NOT SUBMIT: adding replicas: ", num_replicas) if self._status != RuntimeStatus.RUNNING: raise RuntimeError( "Can only add replicas to a processor pool that is running." ) - replica_run_coros = [] for _ in range(num_replicas): replica = await self.create_replica() if self._status == RuntimeStatus.RUNNING: for _ in range(self.options.num_concurrency): - replica_run_coros.append(replica.ray_actor_handle.run.remote()) - self.replicas.append(replica) + replica.ray_actor_handle.run.remote() - await asyncio.wait(replica_run_coros) + self.replicas.append(replica) self.num_replicas_gauge.set(len(self.replicas)) @@ -142,8 +139,6 @@ def run(self): if self._status != RuntimeStatus.IDLE: raise RuntimeError("Can only start an Idle Runtime.") self._status = RuntimeStatus.RUNNING - for replica in self.replicas: - replica.ray_actor_handle.run.remote() async def drain(self): logging.info(f"Draining ProcessorPool({self.processor.processor_id})...") diff --git a/buildflow/core/options/runtime_options.py b/buildflow/core/options/runtime_options.py index f0854150..b748ee85 100644 --- a/buildflow/core/options/runtime_options.py +++ b/buildflow/core/options/runtime_options.py @@ -27,7 +27,7 @@ class AutoscalerOptions(Options): min_replicas: int max_replicas: int log_level: str - autoscale_frequency_secs: int = 30 + autoscale_frequency_secs: int = 10 pipeline_backlog_burn_threshold: int = 60 pipeline_cpu_percent_target: int = 25 From c0c14eb378683d0be534533818a06e57fec56e28 Mon Sep 17 00:00:00 2001 From: Caleb Van Dyke Date: Mon, 17 Jul 2023 16:13:21 -0500 Subject: [PATCH 4/4] change default --- buildflow/core/options/runtime_options.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/buildflow/core/options/runtime_options.py b/buildflow/core/options/runtime_options.py index b748ee85..e4f875cf 100644 --- a/buildflow/core/options/runtime_options.py +++ b/buildflow/core/options/runtime_options.py @@ -27,7 +27,7 @@ class AutoscalerOptions(Options): min_replicas: int max_replicas: int log_level: str - autoscale_frequency_secs: int = 10 + autoscale_frequency_secs: int = 60 pipeline_backlog_burn_threshold: int = 60 pipeline_cpu_percent_target: int = 25