Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: introduce a c++ implementation of pubsub source #216

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions .bazelrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Use host-OS-specific config lines from bazelrc files.
build --enable_platform_specific_config=true

# The project requires C++ >= 14. By default Bazel adds `-std=c++0x` which
# disables C++14 features, even if the compilers defaults to C++ >= 14
build:linux --cxxopt=-std=c++14
build:macos --cxxopt=-std=c++14
# Protobuf and gRPC require (or soon will require) C++14 to compile the "host"
# targets, such as protoc and the grpc plugin.
build:linux --host_cxxopt=-std=c++14
build:macos --host_cxxopt=-std=c++14
21 changes: 21 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
[report]
# Regexes for lines to exclude from consideration
exclude_also = [
# Don't complain about missing debug-only code:
"def __repr__",
# Don't complain if tests don't hit defensive assertion code:
"raise AssertionError",
"raise NotImplementedError",
"if __name__ == .__main__.:",
# Don't complain about abstract methods, they aren't run:
"@(abc\\.)?abstractmethod",
]
ignore_errors = True
# Files to ignore
omit = [
# Exclude samples
"buildflow/samples/*",
"buildflow/cli/*",
# Exclude skipped / prototype code
"buildflow/runtime/ray_io/duckdb_io*",
]
10 changes: 10 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,13 @@ dmypy.json

# local vscode config
.vscode/

### Automatically added by Hedron's Bazel Compile Commands Extractor: https://github.com/hedronvision/bazel-compile-commands-extractor
# Ignore the `external` link (that is added by `bazel-compile-commands-extractor`). The link differs between macOS/Linux and Windows, so it shouldn't be checked in. The pattern must not end with a trailing `/` because it's a symlink on macOS/Linux.
/external
# Ignore links to Bazel's output. The pattern needs the `*` because people can change the name of the directory into which your repository is cloned (changing the `bazel-<workspace_name>` symlink), and must not end with a trailing `/` because it's a symlink on macOS/Linux.
/bazel-*
# Ignore generated output. Although valuable (after all, the primary purpose of `bazel-compile-commands-extractor` is to produce `compile_commands.json`!), it should not be checked in.
/compile_commands.json
# Ignore the directory in which `clangd` stores its local index.
/.cache/
33 changes: 33 additions & 0 deletions BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
load("@com_github_grpc_grpc//bazel:cython_library.bzl", "pyx_library")
load("//:defs.bzl", "copy_to_workspace")

package(default_visibility = ["//visibility:private"])

copy_to_workspace(
name = "cp_pubsub_so",
file = "buildflow/core/io/gcp/strategies/_cython/pubsub_source.so",
output_dir = "buildflow/core/io/gcp/strategies/_cython",
)

pyx_library(
name = "cython_pubsub_source",
srcs = [
"buildflow/core/io/gcp/strategies/_cython/pubsub_source.pyx",
],
deps = [
":c_pubsub",
],
)

cc_library(
name = "c_pubsub",
srcs = [
"buildflow/core/io/gcp/strategies/_cython/pubsub_stream.cpp",
],
hdrs = [
"buildflow/core/io/gcp/strategies/_cython/pubsub_stream.h",
],
deps = [
"@google_cloud_cpp//:pubsub",
],
)
51 changes: 51 additions & 0 deletions WORKSPACE
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
workspace(name = "com_github_buildflow")

# Add the necessary Starlark functions to fetch google-cloud-cpp.
load("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive")

# Fetch the Google Cloud C++ libraries.
# NOTE: Update this version and SHA256 as needed.
http_archive(
name = "google_cloud_cpp",
sha256 = "7204805106be2164b2048a965b3cc747dd8bd9193c52d9b572c07606ea72ab7e",
strip_prefix = "google-cloud-cpp-2.13.0",
url = "https://github.com/googleapis/google-cloud-cpp/archive/v2.13.0.tar.gz",
)

# Load indirect dependencies due to
# https://github.com/bazelbuild/bazel/issues/1943
load("@google_cloud_cpp//bazel:google_cloud_cpp_deps.bzl", "google_cloud_cpp_deps")

google_cloud_cpp_deps()

load("@com_google_googleapis//:repository_rules.bzl", "switched_rules_by_language")

switched_rules_by_language(
name = "com_google_googleapis_imports",
cc = True,
grpc = True,
)

load("@com_github_grpc_grpc//bazel:grpc_deps.bzl", "grpc_deps")

grpc_deps()

load("@com_github_grpc_grpc//bazel:grpc_extra_deps.bzl", "grpc_extra_deps")

grpc_extra_deps()

# Hedron's Compile Commands Extractor for Bazel
# https://github.com/hedronvision/bazel-compile-commands-extractor
http_archive(
name = "hedron_compile_commands",
strip_prefix = "bazel-compile-commands-extractor-ed994039a951b736091776d677f324b3903ef939",

# Replace the commit hash in both places (below) with the latest, rather than using the stale one here.
# Even better, set up Renovate and let it do the work for you (see "Suggestion: Updates" in the README).
url = "https://github.com/hedronvision/bazel-compile-commands-extractor/archive/ed994039a951b736091776d677f324b3903ef939.tar.gz",
# When you first run this tool, it'll recommend a sha256 hash to put here with a message like: "DEBUG: Rule 'hedron_compile_commands' indicated that a canonical reproducible form can be obtained by modifying arguments sha256 = ..."
)

load("@hedron_compile_commands//:workspace_setup.bzl", "hedron_compile_commands_setup")

hedron_compile_commands_setup()
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,9 @@ async def process_element(element):

# PUSH
await sink.push(batch_results)
# NOTE: we need this to ensure that we can call the snapshot method for
# drains and autoscale checks
# await asyncio.sleep(0.1)
except Exception:
logging.exception(
"failed to process batch, messages will not be acknowledged"
Expand Down
3 changes: 1 addition & 2 deletions buildflow/core/app/runtime/actors/process_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ async def add_replicas(self, num_replicas: int):
if self._status == RuntimeStatus.RUNNING:
for _ in range(self.options.num_concurrency):
replica.ray_actor_handle.run.remote()

self.replicas.append(replica)

self.num_replicas_gauge.set(len(self.replicas))
Expand Down Expand Up @@ -138,8 +139,6 @@ def run(self):
if self._status != RuntimeStatus.IDLE:
raise RuntimeError("Can only start an Idle Runtime.")
self._status = RuntimeStatus.RUNNING
for replica in self.replicas:
replica.ray_actor_handle.run.remote()

async def drain(self):
logging.info(f"Draining ProcessorPool({self.processor.processor_id})...")
Expand Down
8 changes: 6 additions & 2 deletions buildflow/core/app/runtime/actors/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ async def drain(self) -> bool:
ray.kill(processor_pool.actor_handle)
for processor_pool in self._processor_pool_refs
]
# Kill the runtime actor to force the job to exit.
ray.actor.exit_actor()
else:
logging.warning("Draining Runtime...")
logging.warning("-- Attempting to drain again will force stop the runtime.")
Expand Down Expand Up @@ -214,9 +216,11 @@ async def _scale_processor(

num_replicas_delta = target_num_replicas - current_num_replicas
if num_replicas_delta > 0:
processor_pool.actor_handle.add_replicas.remote(num_replicas_delta)
await processor_pool.actor_handle.add_replicas.remote(num_replicas_delta)
elif num_replicas_delta < 0:
processor_pool.actor_handle.remove_replicas.remote(abs(num_replicas_delta))
await processor_pool.actor_handle.remove_replicas.remote(
abs(num_replicas_delta)
)
return processor_snapshot

async def _runtime_autoscale_loop(self):
Expand Down
3 changes: 3 additions & 0 deletions buildflow/core/io/gcp/providers/pubsub_providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def __init__(
# source-only options
batch_size: int = 1000,
include_attributes: bool = False,
use_cpp_source: bool = False,
# pulumi-only options
ack_deadline_seconds: int = 10 * 60,
message_retention_duration: str = "1200s",
Expand All @@ -68,6 +69,7 @@ def __init__(
# source-only options
self.batch_size = batch_size
self.include_attributes = include_attributes
self.use_cpp_source = use_cpp_source
# pulumi-only options
self.ack_deadline_seconds = ack_deadline_seconds
self.message_retention_duration = message_retention_duration
Expand All @@ -78,6 +80,7 @@ def source(self):
project_id=self.project_id,
batch_size=self.batch_size,
include_attributes=self.include_attributes,
use_cpp_source=self.use_cpp_source,
)

def pulumi_resources(self, type_: Optional[Type]):
Expand Down
3 changes: 3 additions & 0 deletions buildflow/core/io/gcp/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ class GCPPubSubSubscription(GCPPrimtive):
subscription_name: PubSubSubscriptionName
# required fields
topic_id: PubSubTopicID
# Optional fields
use_cpp_source: bool = False

@classmethod
def from_gcp_options(
Expand Down Expand Up @@ -87,6 +89,7 @@ def source_provider(self) -> GCPPubSubSubscriptionProvider:
project_id=self.project_id,
subscription_name=self.subscription_name,
topic_id=self.topic_id,
use_cpp_source=self.use_cpp_source,
)

# NOTE: Subscriptions do not support sinks, but we "implement" it here to
Expand Down
Empty file.
50 changes: 50 additions & 0 deletions buildflow/core/io/gcp/strategies/_cython/pubsub_source.pyx
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# distutils: language=c++
# distutils: sources = pubsub_stream.cpp

from libcpp.string cimport string
from libcpp.vector cimport vector


cdef extern from "buildflow/core/io/gcp/strategies/_cython/pubsub_stream.h" namespace "buildflow":
cdef cppclass CPubSubData:
string data
string ack_id

CPubSubData(string data, string ack_id) except +

cdef cppclass CPubSubStream:
string subscription_id

CPubSubStream(string) except +
vector[CPubSubData] pull()
void ack(vector[string])


cdef class PyPubSubData:
cdef CPubSubData *thisptr
def __cinit__(self, string data, string ack_id):
self.thisptr = new CPubSubData(data, ack_id)

def __dealloc__(self):
del self.thisptr

def ack_id(self):
return self.thisptr.ack_id

def data(self):
return self.thisptr.data

cdef class PyPubSubStream:
cdef CPubSubStream *thisptr
def __cinit__(self, string subscription_id):
self.thisptr = new CPubSubStream(subscription_id)
def __dealloc__(self):
del self.thisptr
def pull(self):
cdata = self.thisptr.pull()
out_list = []
for i in range(cdata.size()):
out_list.append(PyPubSubData(cdata[i].data, cdata[i].ack_id))
return out_list
def ack(self, vector[string] ack_ids):
return self.thisptr.ack(ack_ids)
49 changes: 49 additions & 0 deletions buildflow/core/io/gcp/strategies/_cython/pubsub_stream.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#include <iostream>
#include <string>
#include <vector>

#include "google/pubsub/v1/pubsub.grpc.pb.h"
#include "google/pubsub/v1/pubsub.pb.h"
#include "grpc++/grpc++.h"

#include "buildflow/core/io/gcp/strategies/_cython/pubsub_stream.h"

buildflow::CPubSubStream::CPubSubStream(std::string sub_id) {
subscription_id = sub_id;
creds = grpc::GoogleDefaultCredentials();
channel = grpc::CreateChannel("pubsub.googleapis.com", creds);
stub = std::make_unique<google::pubsub::v1::Subscriber::Stub>(channel);
context = std::make_unique<grpc::ClientContext>();
pubsub_stream = stub->StreamingPull(context.get());
}

std::vector<buildflow::CPubSubData> buildflow::CPubSubStream::pull() {
if (!is_stream_initialized) {
google::pubsub::v1::StreamingPullRequest request;
request.set_subscription(subscription_id);
request.set_stream_ack_deadline_seconds(10);
pubsub_stream->Write(request);
is_stream_initialized = true;
}
google::pubsub::v1::StreamingPullResponse response;
pubsub_stream->Read(&response);

std::vector<CPubSubData> v;
for (const auto &message : response.received_messages()) {
v.push_back(CPubSubData(message.message().data(), message.ack_id()));
}
return v;
}

void buildflow::CPubSubStream::ack(std::vector<std::string> ack_ids) {
google::pubsub::v1::StreamingPullRequest request;
for (const auto &ack_id : ack_ids) {
request.add_ack_ids(ack_id);
}
pubsub_stream->Write(request);
}

buildflow::CPubSubData::CPubSubData(std::string d, std::string a) {
data = d;
ack_id = a;
}
43 changes: 43 additions & 0 deletions buildflow/core/io/gcp/strategies/_cython/pubsub_stream.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#ifndef PUBSUB_STREAM_H
#define PUBSUB_STREAM_H

#include <string>
#include <utility>
#include <vector>

#include "google/pubsub/v1/pubsub.grpc.pb.h"
#include "google/pubsub/v1/pubsub.pb.h"
#include "grpc++/grpc++.h"

namespace buildflow {

class CPubSubData {
public:
CPubSubData(std::string data, std::string ack_id);
std::string data;
std::string ack_id;
};

class CPubSubStream {
private:
std::string subscription_id;
std::shared_ptr<grpc::ChannelCredentials> creds;
std::shared_ptr<grpc::Channel> channel;
std::unique_ptr<google::pubsub::v1::Subscriber::Stub> stub;
std::unique_ptr<
grpc::ClientReaderWriter<google::pubsub::v1::StreamingPullRequest,
google::pubsub::v1::StreamingPullResponse>>
pubsub_stream;
std::unique_ptr<grpc::ClientContext> context;
// Will be initialized upon first call to pull()
bool is_stream_initialized = false;

public:
CPubSubStream(std::string subscription_id);
void run();
std::vector<CPubSubData> pull();
void ack(std::vector<std::string> ack_ids);
};
}; // namespace buildflow

#endif
Loading
Loading