Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

temp pr #276

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 105 additions & 0 deletions buildflow/core/app/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@
from buildflow.core.credentials.aws_credentials import AWSCredentials
from buildflow.core.credentials.empty_credentials import EmptyCredentials
from buildflow.core.credentials.gcp_credentials import GCPCredentials
from buildflow.core.dependencies.dependency import (
Client,
Dependency,
KwargDependencies,
Sink,
)
from buildflow.core.options.flow_options import FlowOptions
from buildflow.core.options.runtime_options import AutoscalerOptions, ProcessorOptions
from buildflow.core.processor.patterns.collector import CollectorProcessor
Expand Down Expand Up @@ -315,6 +321,7 @@ def __init__(
flow_id: FlowID = "buildflow-app",
flow_options: Optional[FlowOptions] = None,
) -> None:
print("\n\nFLOW CREATED\n\n")
# Load the BuildFlow Config to get the default options
buildflow_config_dir = os.path.join(
_get_directory_path_of_caller(), ".buildflow"
Expand Down Expand Up @@ -378,6 +385,27 @@ def _background_tasks(
return provider.background_tasks(credentials)
return []

def _dependencies(
self, full_arg_spec: type_inspect.FullArgSpec
) -> KwargDependencies:
dependencies = {}
for i, arg_name in enumerate(full_arg_spec.args[1:]):
default_value = full_arg_spec.defaults[i]
if default_value is None:
raise ValueError(
f"Processor arg dependency {arg_name} has no default value."
)

if isinstance(default_value, Dependency):
credentials = self._get_credentials(
default_value.primitive.primitive_type
)
default_value.attach_credentials(credentials)
default_value.attach_annotated_type(full_arg_spec.annotations[arg_name])
dependencies[arg_name] = default_value

return KwargDependencies(dependencies)

# NOTE: The Flow class is responsible for converting Primitives into a Provider
def pipeline(
self,
Expand Down Expand Up @@ -743,6 +771,28 @@ def __init__(
)
outputs["sink_urn"] = sink_resource.urn

# Builds the dependencies' pulumi.CompositeResources
# (if they exist)
for (
arg_name,
dependency,
) in kwarg_dependencies.arg_name_to_dependency.items():
dependency_pulumi_provider = (
dependency.primitive.pulumi_provider()
)
if (
dependency_pulumi_provider is not None
and dependency.primitive not in primitive_cache
):
dependency_resource = _traverse_primitive_for_pulumi(
primitive=dependency.primitive,
type_=None,
credentials=dependency.credentials,
initial_opts=child_opts,
visited_primitives=primitive_cache,
)
outputs[f"{arg_name}_urn"] = dependency_resource.urn

self.register_outputs(outputs)

return PipelineComponentResource(
Expand All @@ -756,6 +806,8 @@ def background_tasks():
source_primitive, source_credentials
) + self._background_tasks(sink_primitive, sink_credentials)

kwarg_dependencies = self._dependencies(full_arg_spec)

# Dynamically define a new class with the same structure as Processor
class_name = f"PipelineProcessor{utils.uuid(max_len=8)}"
source_provider = source_primitive.source_provider()
Expand All @@ -771,6 +823,7 @@ def background_tasks():
"setup": setup,
"teardown": teardown,
"background_tasks": lambda self: background_tasks(),
"dependencies": kwarg_dependencies.materialize,
"__meta__": {
"source": source_primitive,
"sink": sink_primitive,
Expand Down Expand Up @@ -865,13 +918,37 @@ def __init__(
)
outputs["sink_urn"] = sink_resource.urn

# Builds the dependencies' pulumi.CompositeResources
# (if they exist)
for (
arg_name,
dependency,
) in kwarg_dependencies.arg_name_to_dependency.items():
dependency_pulumi_provider = (
dependency.primitive.pulumi_provider()
)
if (
dependency_pulumi_provider is not None
and dependency.primitive not in primitive_cache
):
dependency_resource = _traverse_primitive_for_pulumi(
primitive=dependency.primitive,
type_=None,
credentials=dependency.credentials,
initial_opts=child_opts,
visited_primitives=primitive_cache,
)
outputs[f"{arg_name}_urn"] = dependency_resource.urn

self.register_outputs(outputs)

return CollectorComponentResource(
processor_id=processor_id,
sink_primitive=sink_primitive,
)

kwarg_dependencies = self._dependencies(full_arg_spec)

def background_tasks():
return self._background_tasks(sink_primitive, sink_credentials)

Expand All @@ -889,6 +966,7 @@ def background_tasks():
"setup": setup,
"teardown": teardown,
"background_tasks": lambda self: background_tasks(),
"dependencies": kwarg_dependencies.materialize,
"__meta__": {
"sink": sink_primitive,
},
Expand Down Expand Up @@ -939,6 +1017,7 @@ def decorator_function(original_process_fn_or_class):
_, output_type = self._input_output_type(full_arg_spec)

processor_id = original_process_fn_or_class.__name__
primitive_cache = self._primitive_cache

def pulumi_resources_for_endpoint():
class EndpointComponentResource(pulumi.ComponentResource):
Expand All @@ -950,12 +1029,37 @@ def __init__(self, processor_id: str):
None,
)

child_opts = pulumi.ResourceOptions(parent=self)
outputs = {"processor_id": processor_id}

# Builds the dependencies' pulumi.CompositeResources
# (if they exist)
for (
arg_name,
dependency,
) in kwarg_dependencies.arg_name_to_dependency.items():
dependency_pulumi_provider = (
dependency.primitive.pulumi_provider()
)
if (
dependency_pulumi_provider is not None
and dependency.primitive not in primitive_cache
):
dependency_resource = _traverse_primitive_for_pulumi(
primitive=dependency.primitive,
type_=None,
credentials=dependency.credentials,
initial_opts=child_opts,
visited_primitives=primitive_cache,
)
outputs[f"{arg_name}_urn"] = dependency_resource.urn

self.register_outputs(outputs)

return EndpointComponentResource(processor_id=processor_id)

kwarg_dependencies = self._dependencies(full_arg_spec)

# Dynamically define a new class with the same structure as Processor
class_name = f"EndpointProcessor{utils.uuid(max_len=8)}"
adhoc_methods = {
Expand All @@ -968,6 +1072,7 @@ def __init__(self, processor_id: str):
"setup": setup,
"teardown": teardown,
"background_tasks": lambda self: [],
"dependencies": kwarg_dependencies.materialize,
"__meta__": {},
"__call__": original_process_fn_or_class,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,15 @@ async def run(self) -> bool:
push_converter = self.sink.push_converter(output_type)
app = fastapi.FastAPI()
fastapi_method = None
if self.processor.endpoint().method == Method.GET:

endpoint = self.processor.endpoint()
if endpoint.method == Method.GET:
fastapi_method = app.get
elif self.processor.endpoint().method == Method.POST:
elif endpoint.method == Method.POST:
fastapi_method = app.post
else:
raise NotImplementedError(
f"Method {self.processor.endpoint().method} is not supported "
"for collectors."
f"Method {endpoint.method} is not supported " "for collectors."
)

@serve.deployment(
Expand Down Expand Up @@ -112,7 +113,8 @@ async def root(self, request: input_type) -> None:
sink = self.processor.sink()
self.num_events_processed_counter.inc()
start_time = time.monotonic()
output = await self.processor.process(request)
with self.processor.dependencies() as kwargs:
output = await self.processor.process(request, **kwargs)
if output is None:
# Exclude none results
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,15 @@ async def run(self) -> bool:
input_type, output_type = process_types(self.processor)
app = fastapi.FastAPI()
fastapi_method = None
if self.processor.endpoint().method == Method.GET:

endpoint = self.processor.endpoint()
if endpoint.method == Method.GET:
fastapi_method = app.get
elif self.processor.endpoint().method == Method.POST:
elif endpoint.method == Method.POST:
fastapi_method = app.post
else:
raise NotImplementedError(
f"Method {self.processor.endpoint().method} is not supported "
"for endpoints."
f"Method {endpoint.method} is not supported " "for endpoints."
)

@serve.deployment(
Expand Down Expand Up @@ -108,7 +109,8 @@ def __init__(self, processor, run_id):
async def root(self, request: input_type) -> output_type:
self.num_events_processed_counter.inc()
start_time = time.monotonic()
output = await self.processor.process(request)
with self.processor.dependencies() as kwargs:
output = await self.processor.process(request, **kwargs)
self.process_time_counter.inc((time.monotonic() - start_time) * 1000)
return output

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ async def run(self):
push_converter = sink.push_converter(output_type)
process_fn = self.processor.process

async def process_element(element):
results = await process_fn(pull_converter(element))
async def process_element(element, **process_kwargs):
results = await process_fn(pull_converter(element), **process_kwargs)
if results is None:
# Exclude none results
return
Expand Down Expand Up @@ -202,8 +202,10 @@ async def process_element(element):
try:
coros = []
for element in response.payload:
coros.append(process_element(element))
with self.processor.dependencies() as process_kwargs:
coros.append(process_element(element, **process_kwargs))
flattened_results = await asyncio.gather(*coros)

batch_results = []
for results in flattened_results:
if results is None:
Expand Down
Empty file.
81 changes: 81 additions & 0 deletions buildflow/core/dependencies/dependency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import inspect
from contextlib import ExitStack, contextmanager
from typing import Any, Dict, Generic, Optional, Type, TypeVar

from buildflow.core.credentials import CredentialType
from buildflow.io.primitive import Primitive


class Dependency:
def __init__(self, primitive: Primitive):
self.primitive = primitive
self.credentials: Optional[CredentialType] = None
self.annotated_type: Optional[Type] = None

def attach_credentials(self, credential_type: CredentialType) -> None:
self.credentials = credential_type

def attach_annotated_type(self, annotated_type: Type) -> None:
self.annotated_type = annotated_type

@contextmanager
def materialize(self):
raise NotImplementedError(
f"materialize not implemented for {self.__class__.__name__}"
)


class KwargDependencies:
def __init__(self, arg_name_to_dependency: Dict[str, Dependency]):
self.arg_name_to_dependency = arg_name_to_dependency

@contextmanager
def materialize(self):
with ExitStack() as stack:
kwargs = {
arg_name: stack.enter_context(dependency.materialize())
for arg_name, dependency in self.arg_name_to_dependency.items()
}
yield kwargs


P = TypeVar("P", bound=Primitive)


class Client(Generic[P], Dependency):
def __init__(self, primitive: P):
super().__init__(primitive)
self.client: Optional[Any] = None

@contextmanager
def materialize(self):
if self.client is None:
if self.credentials is None:
raise ValueError(
"Cannot materialize client without credentials attached"
)
provider = self.primitive.client_provider()
self.client = provider.client(self.credentials, self.annotated_type)
try:
yield self.client
finally:
# Optionally, implement any teardown logic here
pass


class Sink(Dependency):
def __init__(self, primitive: Primitive):
super().__init__(primitive)
self.sink: Optional[Any] = None

@contextmanager
def materialize(self):
if self.sink is None:
if self.credentials is None:
raise ValueError("Cannot materialize sink without credentials attached")
self.sink = self.primitive.sink_provider().sink(self.credentials)
try:
yield self.sink
finally:
# Optionally, implement any teardown logic here
pass
7 changes: 6 additions & 1 deletion buildflow/core/processor/processor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import enum
from typing import List
from contextlib import contextmanager
from typing import Any, Dict, List

from buildflow.core.background_tasks.background_task import BackgroundTask

Expand All @@ -21,6 +22,10 @@ class ProcessorAPI:
def pulumi_program(self, preview: bool):
raise NotImplementedError("pulumi_program not implemented for Processor")

@contextmanager
def dependencies(self):
raise NotImplementedError("dependencies not implemented for Processor")

def setup(self):
raise NotImplementedError("setup not implemented")

Expand Down
2 changes: 2 additions & 0 deletions buildflow/io/aws/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ class S3Bucket(
S3BucketProvider,
# Background task provider type
None,
# Client provider type
None,
]
):
bucket_name: S3BucketName
Expand Down
2 changes: 2 additions & 0 deletions buildflow/io/aws/s3_file_change_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ class S3FileChangeStream(
None,
# Background task provider type
None,
# Client provider type
None,
],
CompositePrimitive,
):
Expand Down
Loading