diff --git a/.bazelrc b/.bazelrc new file mode 100644 index 00000000..bfeef20c --- /dev/null +++ b/.bazelrc @@ -0,0 +1,25 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Use host-OS-specific config lines from bazelrc files. +build --enable_platform_specific_config=true + +# The project requires C++ >= 14. By default Bazel adds `-std=c++0x` which +# disables C++14 features, even if the compilers defaults to C++ >= 14 +build:linux --cxxopt=-std=c++14 +build:macos --cxxopt=-std=c++14 +# Protobuf and gRPC require (or soon will require) C++14 to compile the "host" +# targets, such as protoc and the grpc plugin. +build:linux --host_cxxopt=-std=c++14 +build:macos --host_cxxopt=-std=c++14 diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 00000000..e75830d0 --- /dev/null +++ b/.coveragerc @@ -0,0 +1,21 @@ +[report] +# Regexes for lines to exclude from consideration +exclude_also = [ + # Don't complain about missing debug-only code: + "def __repr__", + # Don't complain if tests don't hit defensive assertion code: + "raise AssertionError", + "raise NotImplementedError", + "if __name__ == .__main__.:", + # Don't complain about abstract methods, they aren't run: + "@(abc\\.)?abstractmethod", +] +ignore_errors = True +# Files to ignore +omit = [ + # Exclude samples + "buildflow/samples/*", + "buildflow/cli/*", + # Exclude skipped / prototype code + "buildflow/runtime/ray_io/duckdb_io*", +] diff --git a/.gitignore b/.gitignore index 96cd00cc..ceab0464 100644 --- a/.gitignore +++ b/.gitignore @@ -133,3 +133,13 @@ dmypy.json # local vscode config .vscode/ + +### Automatically added by Hedron's Bazel Compile Commands Extractor: https://github.com/hedronvision/bazel-compile-commands-extractor +# Ignore the `external` link (that is added by `bazel-compile-commands-extractor`). The link differs between macOS/Linux and Windows, so it shouldn't be checked in. The pattern must not end with a trailing `/` because it's a symlink on macOS/Linux. +/external +# Ignore links to Bazel's output. The pattern needs the `*` because people can change the name of the directory into which your repository is cloned (changing the `bazel-` symlink), and must not end with a trailing `/` because it's a symlink on macOS/Linux. +/bazel-* +# Ignore generated output. Although valuable (after all, the primary purpose of `bazel-compile-commands-extractor` is to produce `compile_commands.json`!), it should not be checked in. +/compile_commands.json +# Ignore the directory in which `clangd` stores its local index. +/.cache/ diff --git a/BUILD b/BUILD new file mode 100644 index 00000000..66c44dc3 --- /dev/null +++ b/BUILD @@ -0,0 +1,33 @@ +load("@com_github_grpc_grpc//bazel:cython_library.bzl", "pyx_library") +load("//:defs.bzl", "copy_to_workspace") + +package(default_visibility = ["//visibility:private"]) + +copy_to_workspace( + name = "cp_pubsub_so", + file = "buildflow/core/io/gcp/strategies/_cython/pubsub_source.so", + output_dir = "buildflow/core/io/gcp/strategies/_cython", +) + +pyx_library( + name = "cython_pubsub_source", + srcs = [ + "buildflow/core/io/gcp/strategies/_cython/pubsub_source.pyx", + ], + deps = [ + ":c_pubsub", + ], +) + +cc_library( + name = "c_pubsub", + srcs = [ + "buildflow/core/io/gcp/strategies/_cython/pubsub_stream.cpp", + ], + hdrs = [ + "buildflow/core/io/gcp/strategies/_cython/pubsub_stream.h", + ], + deps = [ + "@google_cloud_cpp//:pubsub", + ], +) diff --git a/WORKSPACE b/WORKSPACE new file mode 100644 index 00000000..4890a561 --- /dev/null +++ b/WORKSPACE @@ -0,0 +1,51 @@ +workspace(name = "com_github_buildflow") + +# Add the necessary Starlark functions to fetch google-cloud-cpp. +load("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive") + +# Fetch the Google Cloud C++ libraries. +# NOTE: Update this version and SHA256 as needed. +http_archive( + name = "google_cloud_cpp", + sha256 = "7204805106be2164b2048a965b3cc747dd8bd9193c52d9b572c07606ea72ab7e", + strip_prefix = "google-cloud-cpp-2.13.0", + url = "https://github.com/googleapis/google-cloud-cpp/archive/v2.13.0.tar.gz", +) + +# Load indirect dependencies due to +# https://github.com/bazelbuild/bazel/issues/1943 +load("@google_cloud_cpp//bazel:google_cloud_cpp_deps.bzl", "google_cloud_cpp_deps") + +google_cloud_cpp_deps() + +load("@com_google_googleapis//:repository_rules.bzl", "switched_rules_by_language") + +switched_rules_by_language( + name = "com_google_googleapis_imports", + cc = True, + grpc = True, +) + +load("@com_github_grpc_grpc//bazel:grpc_deps.bzl", "grpc_deps") + +grpc_deps() + +load("@com_github_grpc_grpc//bazel:grpc_extra_deps.bzl", "grpc_extra_deps") + +grpc_extra_deps() + +# Hedron's Compile Commands Extractor for Bazel +# https://github.com/hedronvision/bazel-compile-commands-extractor +http_archive( + name = "hedron_compile_commands", + strip_prefix = "bazel-compile-commands-extractor-ed994039a951b736091776d677f324b3903ef939", + + # Replace the commit hash in both places (below) with the latest, rather than using the stale one here. + # Even better, set up Renovate and let it do the work for you (see "Suggestion: Updates" in the README). + url = "https://github.com/hedronvision/bazel-compile-commands-extractor/archive/ed994039a951b736091776d677f324b3903ef939.tar.gz", + # When you first run this tool, it'll recommend a sha256 hash to put here with a message like: "DEBUG: Rule 'hedron_compile_commands' indicated that a canonical reproducible form can be obtained by modifying arguments sha256 = ..." +) + +load("@hedron_compile_commands//:workspace_setup.bzl", "hedron_compile_commands_setup") + +hedron_compile_commands_setup() diff --git a/buildflow/core/app/runtime/actors/pipeline_pattern/pull_process_push.py b/buildflow/core/app/runtime/actors/pipeline_pattern/pull_process_push.py index a0b7afe7..db0d34d3 100644 --- a/buildflow/core/app/runtime/actors/pipeline_pattern/pull_process_push.py +++ b/buildflow/core/app/runtime/actors/pipeline_pattern/pull_process_push.py @@ -246,6 +246,9 @@ async def process_element(element): # PUSH await sink.push(batch_results) + # NOTE: we need this to ensure that we can call the snapshot method for + # drains and autoscale checks + # await asyncio.sleep(0.1) except Exception: logging.exception( "failed to process batch, messages will not be acknowledged" diff --git a/buildflow/core/app/runtime/actors/process_pool.py b/buildflow/core/app/runtime/actors/process_pool.py index d3620a2f..e8d02c7f 100644 --- a/buildflow/core/app/runtime/actors/process_pool.py +++ b/buildflow/core/app/runtime/actors/process_pool.py @@ -106,6 +106,7 @@ async def add_replicas(self, num_replicas: int): if self._status == RuntimeStatus.RUNNING: for _ in range(self.options.num_concurrency): replica.ray_actor_handle.run.remote() + self.replicas.append(replica) self.num_replicas_gauge.set(len(self.replicas)) @@ -138,8 +139,6 @@ def run(self): if self._status != RuntimeStatus.IDLE: raise RuntimeError("Can only start an Idle Runtime.") self._status = RuntimeStatus.RUNNING - for replica in self.replicas: - replica.ray_actor_handle.run.remote() async def drain(self): logging.info(f"Draining ProcessorPool({self.processor.processor_id})...") diff --git a/buildflow/core/app/runtime/actors/runtime.py b/buildflow/core/app/runtime/actors/runtime.py index 028e59d2..cf8ce501 100644 --- a/buildflow/core/app/runtime/actors/runtime.py +++ b/buildflow/core/app/runtime/actors/runtime.py @@ -120,6 +120,8 @@ async def drain(self) -> bool: ray.kill(processor_pool.actor_handle) for processor_pool in self._processor_pool_refs ] + # Kill the runtime actor to force the job to exit. + ray.actor.exit_actor() else: logging.warning("Draining Runtime...") logging.warning("-- Attempting to drain again will force stop the runtime.") @@ -214,9 +216,11 @@ async def _scale_processor( num_replicas_delta = target_num_replicas - current_num_replicas if num_replicas_delta > 0: - processor_pool.actor_handle.add_replicas.remote(num_replicas_delta) + await processor_pool.actor_handle.add_replicas.remote(num_replicas_delta) elif num_replicas_delta < 0: - processor_pool.actor_handle.remove_replicas.remote(abs(num_replicas_delta)) + await processor_pool.actor_handle.remove_replicas.remote( + abs(num_replicas_delta) + ) return processor_snapshot async def _runtime_autoscale_loop(self): diff --git a/buildflow/core/io/gcp/providers/pubsub_providers.py b/buildflow/core/io/gcp/providers/pubsub_providers.py index 71e47770..957a5729 100644 --- a/buildflow/core/io/gcp/providers/pubsub_providers.py +++ b/buildflow/core/io/gcp/providers/pubsub_providers.py @@ -58,6 +58,7 @@ def __init__( # source-only options batch_size: int = 1000, include_attributes: bool = False, + use_cpp_source: bool = False, # pulumi-only options ack_deadline_seconds: int = 10 * 60, message_retention_duration: str = "1200s", @@ -68,6 +69,7 @@ def __init__( # source-only options self.batch_size = batch_size self.include_attributes = include_attributes + self.use_cpp_source = use_cpp_source # pulumi-only options self.ack_deadline_seconds = ack_deadline_seconds self.message_retention_duration = message_retention_duration @@ -78,6 +80,7 @@ def source(self): project_id=self.project_id, batch_size=self.batch_size, include_attributes=self.include_attributes, + use_cpp_source=self.use_cpp_source, ) def pulumi_resources(self, type_: Optional[Type]): diff --git a/buildflow/core/io/gcp/pubsub.py b/buildflow/core/io/gcp/pubsub.py index c27be14b..47961cfa 100644 --- a/buildflow/core/io/gcp/pubsub.py +++ b/buildflow/core/io/gcp/pubsub.py @@ -60,6 +60,8 @@ class GCPPubSubSubscription(GCPPrimtive): subscription_name: PubSubSubscriptionName # required fields topic_id: PubSubTopicID + # Optional fields + use_cpp_source: bool = False @classmethod def from_gcp_options( @@ -87,6 +89,7 @@ def source_provider(self) -> GCPPubSubSubscriptionProvider: project_id=self.project_id, subscription_name=self.subscription_name, topic_id=self.topic_id, + use_cpp_source=self.use_cpp_source, ) # NOTE: Subscriptions do not support sinks, but we "implement" it here to diff --git a/buildflow/core/io/gcp/strategies/_cython/__init__.py b/buildflow/core/io/gcp/strategies/_cython/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/buildflow/core/io/gcp/strategies/_cython/pubsub_source.pyx b/buildflow/core/io/gcp/strategies/_cython/pubsub_source.pyx new file mode 100644 index 00000000..74604796 --- /dev/null +++ b/buildflow/core/io/gcp/strategies/_cython/pubsub_source.pyx @@ -0,0 +1,50 @@ +# distutils: language=c++ +# distutils: sources = pubsub_stream.cpp + +from libcpp.string cimport string +from libcpp.vector cimport vector + + +cdef extern from "buildflow/core/io/gcp/strategies/_cython/pubsub_stream.h" namespace "buildflow": + cdef cppclass CPubSubData: + string data + string ack_id + + CPubSubData(string data, string ack_id) except + + + cdef cppclass CPubSubStream: + string subscription_id + + CPubSubStream(string) except + + vector[CPubSubData] pull() + void ack(vector[string]) + + +cdef class PyPubSubData: + cdef CPubSubData *thisptr + def __cinit__(self, string data, string ack_id): + self.thisptr = new CPubSubData(data, ack_id) + + def __dealloc__(self): + del self.thisptr + + def ack_id(self): + return self.thisptr.ack_id + + def data(self): + return self.thisptr.data + +cdef class PyPubSubStream: + cdef CPubSubStream *thisptr + def __cinit__(self, string subscription_id): + self.thisptr = new CPubSubStream(subscription_id) + def __dealloc__(self): + del self.thisptr + def pull(self): + cdata = self.thisptr.pull() + out_list = [] + for i in range(cdata.size()): + out_list.append(PyPubSubData(cdata[i].data, cdata[i].ack_id)) + return out_list + def ack(self, vector[string] ack_ids): + return self.thisptr.ack(ack_ids) diff --git a/buildflow/core/io/gcp/strategies/_cython/pubsub_stream.cpp b/buildflow/core/io/gcp/strategies/_cython/pubsub_stream.cpp new file mode 100644 index 00000000..aa09afe6 --- /dev/null +++ b/buildflow/core/io/gcp/strategies/_cython/pubsub_stream.cpp @@ -0,0 +1,49 @@ +#include +#include +#include + +#include "google/pubsub/v1/pubsub.grpc.pb.h" +#include "google/pubsub/v1/pubsub.pb.h" +#include "grpc++/grpc++.h" + +#include "buildflow/core/io/gcp/strategies/_cython/pubsub_stream.h" + +buildflow::CPubSubStream::CPubSubStream(std::string sub_id) { + subscription_id = sub_id; + creds = grpc::GoogleDefaultCredentials(); + channel = grpc::CreateChannel("pubsub.googleapis.com", creds); + stub = std::make_unique(channel); + context = std::make_unique(); + pubsub_stream = stub->StreamingPull(context.get()); +} + +std::vector buildflow::CPubSubStream::pull() { + if (!is_stream_initialized) { + google::pubsub::v1::StreamingPullRequest request; + request.set_subscription(subscription_id); + request.set_stream_ack_deadline_seconds(10); + pubsub_stream->Write(request); + is_stream_initialized = true; + } + google::pubsub::v1::StreamingPullResponse response; + pubsub_stream->Read(&response); + + std::vector v; + for (const auto &message : response.received_messages()) { + v.push_back(CPubSubData(message.message().data(), message.ack_id())); + } + return v; +} + +void buildflow::CPubSubStream::ack(std::vector ack_ids) { + google::pubsub::v1::StreamingPullRequest request; + for (const auto &ack_id : ack_ids) { + request.add_ack_ids(ack_id); + } + pubsub_stream->Write(request); +} + +buildflow::CPubSubData::CPubSubData(std::string d, std::string a) { + data = d; + ack_id = a; +} diff --git a/buildflow/core/io/gcp/strategies/_cython/pubsub_stream.h b/buildflow/core/io/gcp/strategies/_cython/pubsub_stream.h new file mode 100644 index 00000000..20b9e606 --- /dev/null +++ b/buildflow/core/io/gcp/strategies/_cython/pubsub_stream.h @@ -0,0 +1,43 @@ +#ifndef PUBSUB_STREAM_H +#define PUBSUB_STREAM_H + +#include +#include +#include + +#include "google/pubsub/v1/pubsub.grpc.pb.h" +#include "google/pubsub/v1/pubsub.pb.h" +#include "grpc++/grpc++.h" + +namespace buildflow { + +class CPubSubData { +public: + CPubSubData(std::string data, std::string ack_id); + std::string data; + std::string ack_id; +}; + +class CPubSubStream { +private: + std::string subscription_id; + std::shared_ptr creds; + std::shared_ptr channel; + std::unique_ptr stub; + std::unique_ptr< + grpc::ClientReaderWriter> + pubsub_stream; + std::unique_ptr context; + // Will be initialized upon first call to pull() + bool is_stream_initialized = false; + +public: + CPubSubStream(std::string subscription_id); + void run(); + std::vector pull(); + void ack(std::vector ack_ids); +}; +}; // namespace buildflow + +#endif diff --git a/buildflow/core/io/gcp/strategies/pubsub_strategies.py b/buildflow/core/io/gcp/strategies/pubsub_strategies.py index b4d5b3e9..9969468e 100644 --- a/buildflow/core/io/gcp/strategies/pubsub_strategies.py +++ b/buildflow/core/io/gcp/strategies/pubsub_strategies.py @@ -9,6 +9,7 @@ from buildflow.core import utils +from buildflow.core.io.gcp.strategies._cython import pubsub_source from buildflow.core.io.utils.clients import gcp_clients from buildflow.core.io.utils.schemas import converters from buildflow.core.strategies.sink import Batch, SinkStrategy @@ -48,6 +49,7 @@ def __init__( project_id: GCPProjectID, batch_size: int = 1000, include_attributes: bool = False, + use_cpp_source: bool = False, ): super().__init__(strategy_id="gcp-pubsub-subscription-source") # configuration @@ -55,8 +57,14 @@ def __init__( self.project_id = project_id self.batch_size = batch_size self.include_attributes = include_attributes + self.use_cpp_source = use_cpp_source # setup - self.subscriber_client = gcp_clients.get_async_subscriber_client(project_id) + if not self.use_cpp_source: + self.subscriber_client = gcp_clients.get_async_subscriber_client(project_id) + else: + self.cpp_subscriber = pubsub_source.PyPubSubStream( + self.subscription_id.encode("utf-8") + ) self.publisher_client = gcp_clients.get_async_publisher_client(project_id) # initial state @@ -64,7 +72,7 @@ def __init__( def subscription_id(self) -> PubSubSubscriptionID: return f"projects/{self.project_id}/subscriptions/{self.subscription_name}" # noqa: E501 - async def pull(self) -> PullResponse: + async def _pyton_pull(self) -> PullResponse: try: response = await self.subscriber_client.pull( subscription=self.subscription_id, @@ -92,7 +100,22 @@ async def pull(self) -> PullResponse: return PullResponse(payloads, _PubsubAckInfo(ack_ids)) - async def ack(self, ack_info: _PubsubAckInfo, success: bool): + async def _cpp_pull(self) -> PullResponse: + messages = self.cpp_subscriber.pull() + payloads = [] + ack_ids = [] + for message in messages: + payloads.append(message.data()) + ack_ids.append(message.ack_id()) + return PullResponse(payloads, _PubsubAckInfo(ack_ids)) + + async def pull(self) -> PullResponse: + if self.use_cpp_source: + return await self._cpp_pull() + else: + return await self._pyton_pull() + + async def _python_ack(self, ack_info: _PubsubAckInfo, success: bool): if ack_info.ack_ids: if success: await self.subscriber_client.acknowledge( @@ -108,6 +131,18 @@ async def ack(self, ack_info: _PubsubAckInfo, success: bool): ack_deadline_seconds=ack_deadline_seconds, ) + async def _cpp_ack(self, ack_info: _PubsubAckInfo, success: bool): + if ack_info.ack_ids: + # TODO: need to implement nack for cpp source + if success: + self.cpp_subscriber.ack(ack_info.ack_ids) + + async def ack(self, ack_info: _PubsubAckInfo, success: bool): + if self.use_cpp_source: + return await self._cpp_ack(ack_info, success) + else: + return await self._python_ack(ack_info, success) + async def backlog(self) -> int: split_sub = self.subscription_id.split("/") project = split_sub[1] diff --git a/buildflow/core/io/local/empty.py b/buildflow/core/io/local/empty.py new file mode 100644 index 00000000..2a4be96f --- /dev/null +++ b/buildflow/core/io/local/empty.py @@ -0,0 +1,18 @@ +import dataclasses + +from buildflow.config.cloud_provider_config import LocalOptions +from buildflow.core.io.local.providers.empty_provider import EmptryProvider +from buildflow.core.io.primitive import LocalPrimtive + + +@dataclasses.dataclass +class Empty(LocalPrimtive): + @classmethod + def from_local_options( + cls, + local_options: LocalOptions, + ) -> "Empty": + return cls() + + def sink_provider(self) -> EmptryProvider: + return EmptryProvider() diff --git a/buildflow/core/io/local/providers/empty_provider.py b/buildflow/core/io/local/providers/empty_provider.py new file mode 100644 index 00000000..34f3254b --- /dev/null +++ b/buildflow/core/io/local/providers/empty_provider.py @@ -0,0 +1,7 @@ +from buildflow.core.io.local.strategies.empty_strategies import EmptySink +from buildflow.core.providers.provider import SinkProvider + + +class EmptryProvider(SinkProvider): + def sink(self): + return EmptySink() diff --git a/buildflow/core/io/local/strategies/empty_strategies.py b/buildflow/core/io/local/strategies/empty_strategies.py new file mode 100644 index 00000000..d607c629 --- /dev/null +++ b/buildflow/core/io/local/strategies/empty_strategies.py @@ -0,0 +1,16 @@ +from typing import Any, Callable, Type + +from buildflow.core.io.utils.schemas import converters +from buildflow.core.strategies.sink import SinkStrategy + + +class EmptySink(SinkStrategy): + def __init__(self): + super().__init__(strategy_id="local-empty-sink") + + def push_converter(self, user_defined_type: Type) -> Callable[[Any], Any]: + return converters.identity() + + async def push(self, elements: Any): + # Just drop the elements on the floor + pass diff --git a/defs.bzl b/defs.bzl new file mode 100644 index 00000000..6119e836 --- /dev/null +++ b/defs.bzl @@ -0,0 +1,31 @@ +COPY_TEMPLATE = """\ +#!/bin/bash +FROM_PATH="$PWD/{copy_from}" +TO_PATH="$BUILD_WORKSPACE_DIRECTORY/{to}/" +echo "Copying from $FROM_PATH to $TO_PATH" +cp -f $FROM_PATH $TO_PATH +""" + +def _copy_to_workspace_impl(ctx): + copy_file = ctx.file.file + copy_runfile_path = copy_file.short_path + copy_to = ctx.attr.file.label.package + ctx.attr.output_dir + script = ctx.actions.declare_file(ctx.label.name) + script_content = COPY_TEMPLATE.format( + copy_from = copy_runfile_path, + to = copy_to, + ) + + ctx.actions.write(script, script_content, is_executable = True) + runfiles = ctx.runfiles(files = [copy_file]) + + return [DefaultInfo(executable = script, runfiles = runfiles)] + +copy_to_workspace = rule( + implementation = _copy_to_workspace_impl, + attrs = { + "file": attr.label(allow_single_file = True), + "output_dir": attr.string(mandatory = False), + }, + executable = True, +) diff --git a/infra-as-code.py b/infra-as-code.py deleted file mode 100644 index 57c804e0..00000000 --- a/infra-as-code.py +++ /dev/null @@ -1,64 +0,0 @@ -from dataclasses import dataclass -from datetime import datetime -from sqlalchemy.orm import Session - -from buildflow import Node, Depends - -# buildflow.resources.clients exposes all of the ClientResource types (for Depends API) -from buildflow.resources.clients import PostgresClientPool - -# buildflow.resources.io exposes all of the IOResource types (for Provider API) -from buildflow.resources.io import BigQueryTable, GCPPubSubSubscription - - -@dataclass -class TaxiOutput: - ride_id: str - point_idx: int - latitude: float - longitude: float - timestamp: datetime - meter_reading: float - meter_increment: float - ride_status: str - passenger_count: int - - -# Create a new Node with all the defaults set -app = Node() - -# Define the source and sink -pubsub_source = GCPPubSubSubscription( - topic_id="projects/pubsub-public-data/topics/taxirides-realtime", - project_id="daring-runway-374503", - subscription_name="taxiride-sub", -) -bigquery_sink = BigQueryTable( - table_id="daring-runway-374503.taxi_ride_benchmark.buildflow_temp", - include_dataset=False, - destroy_protection=False, -) -# Define the postgres client pool -db_pool = PostgresClientPool( - host="localhost", - port=5432, - user="postgres", - password="postgres", - database="postgres", - max_size=10, -) - - -# using imperative api DOES NOT REQUIRE all schema types to be provided. -@app.processor(source=pubsub_source, sink=bigquery_sink) -def my_processor(pubsub_message, db: Session = Depends(db_pool)): - return pubsub_message - - -if __name__ == "__main__": - app.run( - disable_usage_stats=True, - apply_infrastructure=False, - destroy_infrastructure=False, - start_node_server=True, - ) diff --git a/infra-from-code.py b/infra-from-code.py deleted file mode 100644 index 04413d1b..00000000 --- a/infra-from-code.py +++ /dev/null @@ -1,57 +0,0 @@ -from dataclasses import dataclass -from datetime import datetime -from sqlalchemy.orm import Session - -from buildflow import Node, Setup, InfraOptions - -# buildflow.resources exposes all of the declarative types (for Provider & Depends API). -# they use the io & client submodules to provide the actual implementations. -from buildflow.resources import RelationalDB, DataWarehouse, Queue - - -# Setup(Resource) will return the client/resource instance, not io/resource. -# Setup(fn) will return the result of calling the function. - -Resource.setup() -Resource.client() - -# using this option REQUIRES that all schemas are provided as type annotations. - - -@dataclass -class TaxiOutput: - ride_id: str - point_idx: int - latitude: float - longitude: float - timestamp: datetime - meter_reading: float - meter_increment: float - ride_status: str - passenger_count: int - - -# declarative types could have "local" implementations for testing. -# this is akin to our in-memory metrics vs prometheus. -app = Node(infra_config=InfraOptions(resource_provider="local")) - - -# using declarative api REQUIRES all schema types to be provided. -# Users can optionally pass an id for other resources to reference. -# We could also pass along any args, kwargs to the underlying resource(s). -@app.processor(source=Queue(id=...), sink=DataWarehouse(id=...)) -def my_processor( - # Depends(Resource) will also return the client/resource instance, not io/resource. - pubsub_message: TaxiOutput, - db: Session = Setup(RelationalDB(id=...)), -) -> TaxiOutput: - return pubsub_message - - -if __name__ == "__main__": - app.run( - disable_usage_stats=True, - apply_infrastructure=False, - destroy_infrastructure=False, - start_node_server=True, - ) diff --git a/pyproject.toml b/pyproject.toml deleted file mode 100644 index 239c2cb6..00000000 --- a/pyproject.toml +++ /dev/null @@ -1,90 +0,0 @@ -[build-system] -requires = ["setuptools>=64.0.0", "wheel"] -build-backend = "setuptools.build_meta" - -[project] -name = "buildflow" -version = "0.1.2" -authors = [ - { name = "Caleb Van Dyke", email = "caleb@launchflow.com" }, - { name = "Josh Tanke", email = "josh@launchflow.com" }, -] -description = "buildflow is a unified **batch** and **streaming** framework that turns any python function into a scalable data pipeline that can read from our supported IO resources." -readme = "README.md" -requires-python = ">=3.7" -dependencies = [ - # TODO: split up AWS and GCP dependencies. - "boto3", - "dacite", - "duckdb==0.6.0", - "google-auth", - "google-cloud-bigquery", - "google-cloud-bigquery-storage", - "google-cloud-monitoring", - "google-cloud-pubsub", - "google-cloud-storage", - # TODO: https://github.com/grpc/grpc/issues/31885 - "grpcio<1.51.1", - "fastparquet", - "opentelemetry-api", - "opentelemetry-sdk", - "opentelemetry-exporter-otlp", - "opentelemetry-exporter-jaeger", - "pandas", - "pulumi==3.35.3", - "pulumi_gcp", - "pyarrow", - "pydantic < 2.0.2", - "pyyaml", - "ray[default] > 2.0.0", - "ray[serve] > 2.0.0", - "typer", - "redis", -] -classifiers = [ - "Development Status :: 2 - Pre-Alpha", - "Intended Audience :: Developers", - "Topic :: Software Development", - "License :: OSI Approved :: Apache Software License", -] - -[project.optional-dependencies] -dev = [ - "moto", - "pytest", - "pytest-cov", - "pytest-timeout", - "ruff", - "black", - "pre-commit", - "setuptools", - "wheel" -] - -[tool.setuptools.packages] -find = {} # Scan the project directory with the default parameters - -[project.scripts] -buildflow = "buildflow.cli.main:main" - -[tool.coverage.report] -# Regexes for lines to exclude from consideration -exclude_also = [ - # Don't complain about missing debug-only code: - "def __repr__", - # Don't complain if tests don't hit defensive assertion code: - "raise AssertionError", - "raise NotImplementedError", - "if __name__ == .__main__.:", - # Don't complain about abstract methods, they aren't run: - "@(abc\\.)?abstractmethod", -] -ignore_errors = true -# Files to ignore -omit = [ - # Exclude samples - "buildflow/samples/*", - "buildflow/cli/*", - # Exclude skipped / prototype code - "buildflow/runtime/ray_io/duckdb_io*" -] diff --git a/sandbox.py b/sandbox.py deleted file mode 100644 index edcbe5f0..00000000 --- a/sandbox.py +++ /dev/null @@ -1,78 +0,0 @@ -from dataclasses import dataclass -from datetime import datetime - -from buildflow import Node, RuntimeConfig -from buildflow.resources.io import GCPPubSubSubscription, BigQueryTable - - -@dataclass -class TaxiOutput: - ride_id: str - point_idx: int - latitude: float - longitude: float - timestamp: datetime - meter_reading: float - meter_increment: float - ride_status: str - passenger_count: int - - -# Create a new Node with all the defaults set -runtime_config = RuntimeConfig.IO_BOUND(autoscale=True, max_replicas=4) -# infra_config = InfraConfig( -# schema_validation=SchemaValidation.LOG_WARNING, -# require_confirmation=False, -# log_level="WARNING", -# ) -# app = Node(runtime_config=runtime_config, infra_config=infra_config) - -# Create a new Node -app = Node(runtime_config=runtime_config) - -# Define the source and sink -pubsub_source = GCPPubSubSubscription( - topic_id="projects/pubsub-public-data/topics/taxirides-realtime", - project_id="daring-runway-374503", - subscription_name="taxiride-sub", -) -bigquery_sink = BigQueryTable( - table_id="daring-runway-374503.taxi_ride_benchmark.buildflow_temp", - include_dataset=False, - destroy_protection=False, -) - - -# Attach a processor to the Node -@app.processor( - source=pubsub_source, - sink=bigquery_sink, - num_cpus=0.5, - num_concurrency=8, -) -def my_processor(pubsub_message: TaxiOutput) -> TaxiOutput: - # print('Process: ', pubsub_message) - # should_fail = random.randint(0, 1) - # if should_fail: - # raise ValueError("Randomly failing") - return pubsub_message - - -if __name__ == "__main__": - app.run( - disable_usage_stats=True, - # runtime-only options - block_runtime=True, - debug_run=False, - # infra-only options. - apply_infrastructure=False, - # Ad hoc infra is really nice for quick demos / tests - destroy_infrastructure=False, - # server options - start_node_server=True, - ) - - # these should also work: - # app.plan() - # app.apply() - # app.destroy() diff --git a/sandbox2.py b/sandbox2.py deleted file mode 100644 index 519768ba..00000000 --- a/sandbox2.py +++ /dev/null @@ -1,64 +0,0 @@ -import dataclasses -from datetime import datetime - -from buildflow import Node, ResourcesConfig -from buildflow.resources import DataWarehouseTable, Topic - - -@dataclasses.dataclass -class TaxiOutput: - ride_id: str - point_idx: int - latitude: float - longitude: float - timestamp: datetime - meter_reading: float - meter_increment: float - ride_status: str - passenger_count: int - - -resource_config = ResourcesConfig.default() -# resource_config.gcp.default_project_id = "daring-runway-374503" -# Create a new Node with custom resource config -app = Node(resource_config=resource_config) - - -# Define the source and sink -source = Topic( - resource_id="tanke_topic_id", - resource_provider="gcp", -) -sink = DataWarehouseTable( - resource_id="tanke_table_id", - resource_provider="gcp", -) - - -# Attach a processor to the Node -@app.processor(source=source, sink=sink) -def my_processor(pubsub_message: TaxiOutput) -> TaxiOutput: - return pubsub_message - - -# print(dataclasses.asdict(my_processor.source().resource_type())) -# print(dataclasses.asdict(my_processor.sink().resource_type())) - -# if __name__ == "__main__": -# app.run( -# disable_usage_stats=True, -# # runtime-only options -# block_runtime=True, -# debug_run=False, -# # infra-only options. -# apply_infrastructure=False, -# # Ad hoc infra is really nice for quick demos / tests -# destroy_infrastructure=False, -# # server options -# start_node_server=True, -# ) - -# these should also work: -# app.plan() -# app.apply() -# app.destroy() diff --git a/sandbox3.py b/sandbox3.py deleted file mode 100644 index 19d98455..00000000 --- a/sandbox3.py +++ /dev/null @@ -1,48 +0,0 @@ -from buildflow.core.io import AnalysisTable -from buildflow.core.app.flow import Flow -from buildflow.core.options import ( - FlowOptions, - AutoscalerOptions, - RuntimeOptions, - InfraOptions, -) -from buildflow.core.io.gcp import GCPPubSubSubscription, BigQueryTable -from sandbox4 import TaxiOutput - -# Create a new Flow -app = Flow( - flow_options=FlowOptions( - infra_options=InfraOptions.default(), - runtime_options=RuntimeOptions( - processor_options={}, - autoscaler_options=AutoscalerOptions( - enable_autoscaler=False, - min_replicas=1, - max_replicas=10, - log_level="INFO", - ), - num_replicas=10, - log_level="INFO", - ), - ) -) - - -# Define the source and sink -pubsub_source = GCPPubSubSubscription( - topic_id="projects/pubsub-public-data/topics/taxirides-realtime", - project_id="daring-runway-374503", - subscription_name="taxiride-sub", -) -bigquery_sink = AnalysisTable(table_name="tanke_table") - - -# Attach a processor to the Flow -@app.pipeline( - source=pubsub_source, - sink=bigquery_sink, - num_cpus=1.0, - num_concurrency=8, -) -def my_processor(pubsub_message: TaxiOutput) -> TaxiOutput: - return pubsub_message diff --git a/sandbox4.py b/sandbox4.py deleted file mode 100644 index 6979fac7..00000000 --- a/sandbox4.py +++ /dev/null @@ -1,15 +0,0 @@ -import dataclasses -from datetime import datetime - - -@dataclasses.dataclass -class TaxiOutput: - ride_id: str - point_idx: int - latitude: float - longitude: float - timestamp: datetime - meter_reading: float - meter_increment: float - ride_status: str - passenger_count: int diff --git a/setup.py b/setup.py new file mode 100644 index 00000000..754145f6 --- /dev/null +++ b/setup.py @@ -0,0 +1,121 @@ +import os +import subprocess +import sys +from typing import List + + +ROOT_DIR = os.path.dirname(__file__) + + +# Calls Bazel in PATH, falling back to the standard user installation path +# (~/bin/bazel) if it isn't found. +def run_bazel(cmd_args: List[str], *args, **kwargs): + home = os.path.expanduser("~") + first_candidate = os.getenv("BAZEL_PATH", "bazel") + candidates = [first_candidate] + candidates.append(os.path.join(home, "bin", "bazel")) + result = None + for i, cmd in enumerate(candidates): + try: + result = subprocess.check_call([cmd] + cmd_args, *args, **kwargs) + break + except IOError: + if i >= len(candidates) - 1: + raise + return result + + +def build(): + bazel_env = dict(os.environ, PYTHON3_BIN_PATH=sys.executable) + + run_bazel( + ["build", "//:cython_pubsub_source"], + env=bazel_env, + ) + run_bazel( + ["run", "//:cp_pubsub_so"], + env=bazel_env, + ) + + +if __name__ == "__main__": + import setuptools + import setuptools.command.build_ext + + class build_ext(setuptools.command.build_ext.build_ext): + def run(self): + build() + + class BinaryDistribution(setuptools.Distribution): + def has_ext_modules(self): + return True + + +setuptools.setup( + name="buildflow", + version="0.1.2", + python_requires=">=3.7", + author="LaunchFlow", + author_email="founders@launchflow.com", + description="Build your entire system in minutes using Python.", # noqa: E501 + long_description=open( + os.path.join(ROOT_DIR, "README.md"), "r", encoding="utf-8" + ).read(), + url="http://www.buildflow.dev", + classifiers=[ + "Intended Audience :: Developers", + "Topic :: Software Development", + "License :: OSI Approved :: Apache Software License", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + ], + packages=["buildflow"], + cmdclass={"build_ext": build_ext}, + distclass=BinaryDistribution, + entry_points={"console_scripts": ["buildflow = buildflow.cli.main:main"]}, + setup_requires=["cython >= 0.29.32", "wheel"], + install_requires=[ + # TODO: split up AWS and GCP dependencies. + "boto3", + "dacite", + "duckdb==0.6.0", + "google-auth", + "google-cloud-bigquery", + "google-cloud-bigquery-storage", + "google-cloud-monitoring", + "google-cloud-pubsub", + "google-cloud-storage", + # TODO: https://github.com/grpc/grpc/issues/31885 + "grpcio<1.51.1", + "fastparquet", + "opentelemetry-api", + "opentelemetry-sdk", + "opentelemetry-exporter-otlp", + "opentelemetry-exporter-jaeger", + "pandas", + "pulumi==3.35.3", + "pulumi_gcp", + "pyarrow", + "pydantic < 2.0.2", + "pyyaml", + "ray[default] > 2.0.0", + "ray[serve] > 2.0.0", + "typer", + "redis", + ], + extras_require={ + "def": [ + "moto", + "pytest", + "pytest-cov", + "pytest-timeout", + "ruff", + "black", + "pre-commit", + "setuptools", + "wheel", + ] + }, +)