Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

temp pr #276

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions buildflow/core/app/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@
from buildflow.core.credentials.aws_credentials import AWSCredentials
from buildflow.core.credentials.empty_credentials import EmptyCredentials
from buildflow.core.credentials.gcp_credentials import GCPCredentials
from buildflow.core.dependencies.dependency import (
Client,
Dependency,
KwargDependencies,
Sink,
)
from buildflow.core.options.flow_options import FlowOptions
from buildflow.core.options.runtime_options import AutoscalerOptions, ProcessorOptions
from buildflow.core.processor.patterns.collector import CollectorProcessor
Expand Down Expand Up @@ -315,6 +321,7 @@ def __init__(
flow_id: FlowID = "buildflow-app",
flow_options: Optional[FlowOptions] = None,
) -> None:
print("\n\nFLOW CREATED\n\n")
# Load the BuildFlow Config to get the default options
buildflow_config_dir = os.path.join(
_get_directory_path_of_caller(), ".buildflow"
Expand Down Expand Up @@ -378,6 +385,26 @@ def _background_tasks(
return provider.background_tasks(credentials)
return []

def _dependencies(
self, full_arg_spec: type_inspect.FullArgSpec
) -> KwargDependencies:
dependencies = {}
for i, arg_name in enumerate(full_arg_spec.args[1:]):
default_value = full_arg_spec.defaults[i]
if default_value is None:
raise ValueError(
f"Processor arg dependency {arg_name} has no default value."
)

if isinstance(default_value, Dependency):
credentials = self._get_credentials(
default_value.primitive.primitive_type
)
default_value.attach_credentials(credentials)
dependencies[arg_name] = default_value

return KwargDependencies(dependencies)

# NOTE: The Flow class is responsible for converting Primitives into a Provider
def pipeline(
self,
Expand Down Expand Up @@ -743,6 +770,28 @@ def __init__(
)
outputs["sink_urn"] = sink_resource.urn

# Builds the dependencies' pulumi.CompositeResources
# (if they exist)
for (
arg_name,
dependency,
) in kwarg_dependencies.arg_name_to_dependency.items():
dependency_pulumi_provider = (
dependency.primitive.pulumi_provider()
)
if (
dependency_pulumi_provider is not None
and dependency.primitive not in primitive_cache
):
dependency_resource = _traverse_primitive_for_pulumi(
primitive=dependency.primitive,
type_=None,
credentials=dependency.credentials,
initial_opts=child_opts,
visited_primitives=primitive_cache,
)
outputs[f"{arg_name}_urn"] = dependency_resource.urn

self.register_outputs(outputs)

return PipelineComponentResource(
Expand All @@ -756,6 +805,8 @@ def background_tasks():
source_primitive, source_credentials
) + self._background_tasks(sink_primitive, sink_credentials)

kwarg_dependencies = self._dependencies(full_arg_spec)

# Dynamically define a new class with the same structure as Processor
class_name = f"PipelineProcessor{utils.uuid(max_len=8)}"
source_provider = source_primitive.source_provider()
Expand All @@ -771,6 +822,7 @@ def background_tasks():
"setup": setup,
"teardown": teardown,
"background_tasks": lambda self: background_tasks(),
"dependencies": kwarg_dependencies.materialize,
"__meta__": {
"source": source_primitive,
"sink": sink_primitive,
Expand Down Expand Up @@ -865,13 +917,37 @@ def __init__(
)
outputs["sink_urn"] = sink_resource.urn

# Builds the dependencies' pulumi.CompositeResources
# (if they exist)
for (
arg_name,
dependency,
) in kwarg_dependencies.arg_name_to_dependency.items():
dependency_pulumi_provider = (
dependency.primitive.pulumi_provider()
)
if (
dependency_pulumi_provider is not None
and dependency.primitive not in primitive_cache
):
dependency_resource = _traverse_primitive_for_pulumi(
primitive=dependency.primitive,
type_=None,
credentials=dependency.credentials,
initial_opts=child_opts,
visited_primitives=primitive_cache,
)
outputs[f"{arg_name}_urn"] = dependency_resource.urn

self.register_outputs(outputs)

return CollectorComponentResource(
processor_id=processor_id,
sink_primitive=sink_primitive,
)

kwarg_dependencies = self._dependencies(full_arg_spec)

def background_tasks():
return self._background_tasks(sink_primitive, sink_credentials)

Expand All @@ -889,6 +965,7 @@ def background_tasks():
"setup": setup,
"teardown": teardown,
"background_tasks": lambda self: background_tasks(),
"dependencies": kwarg_dependencies.materialize,
"__meta__": {
"sink": sink_primitive,
},
Expand Down Expand Up @@ -939,6 +1016,7 @@ def decorator_function(original_process_fn_or_class):
_, output_type = self._input_output_type(full_arg_spec)

processor_id = original_process_fn_or_class.__name__
primitive_cache = self._primitive_cache

def pulumi_resources_for_endpoint():
class EndpointComponentResource(pulumi.ComponentResource):
Expand All @@ -950,12 +1028,37 @@ def __init__(self, processor_id: str):
None,
)

child_opts = pulumi.ResourceOptions(parent=self)
outputs = {"processor_id": processor_id}

# Builds the dependencies' pulumi.CompositeResources
# (if they exist)
for (
arg_name,
dependency,
) in kwarg_dependencies.arg_name_to_dependency.items():
dependency_pulumi_provider = (
dependency.primitive.pulumi_provider()
)
if (
dependency_pulumi_provider is not None
and dependency.primitive not in primitive_cache
):
dependency_resource = _traverse_primitive_for_pulumi(
primitive=dependency.primitive,
type_=None,
credentials=dependency.credentials,
initial_opts=child_opts,
visited_primitives=primitive_cache,
)
outputs[f"{arg_name}_urn"] = dependency_resource.urn

self.register_outputs(outputs)

return EndpointComponentResource(processor_id=processor_id)

kwarg_dependencies = self._dependencies(full_arg_spec)

# Dynamically define a new class with the same structure as Processor
class_name = f"EndpointProcessor{utils.uuid(max_len=8)}"
adhoc_methods = {
Expand All @@ -968,6 +1071,7 @@ def __init__(self, processor_id: str):
"setup": setup,
"teardown": teardown,
"background_tasks": lambda self: [],
"dependencies": kwarg_dependencies.materialize,
"__meta__": {},
"__call__": original_process_fn_or_class,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,15 @@ async def run(self) -> bool:
push_converter = self.sink.push_converter(output_type)
app = fastapi.FastAPI()
fastapi_method = None
if self.processor.endpoint().method == Method.GET:

endpoint = self.processor.endpoint()
if endpoint.method == Method.GET:
fastapi_method = app.get
elif self.processor.endpoint().method == Method.POST:
elif endpoint.method == Method.POST:
fastapi_method = app.post
else:
raise NotImplementedError(
f"Method {self.processor.endpoint().method} is not supported "
"for collectors."
f"Method {endpoint.method} is not supported " "for collectors."
)

@serve.deployment(
Expand Down Expand Up @@ -112,7 +113,8 @@ async def root(self, request: input_type) -> None:
sink = self.processor.sink()
self.num_events_processed_counter.inc()
start_time = time.monotonic()
output = await self.processor.process(request)
with self.processor.dependencies() as kwargs:
output = await self.processor.process(request, **kwargs)
if output is None:
# Exclude none results
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,15 @@ async def run(self) -> bool:
input_type, output_type = process_types(self.processor)
app = fastapi.FastAPI()
fastapi_method = None
if self.processor.endpoint().method == Method.GET:

endpoint = self.processor.endpoint()
if endpoint.method == Method.GET:
fastapi_method = app.get
elif self.processor.endpoint().method == Method.POST:
elif endpoint.method == Method.POST:
fastapi_method = app.post
else:
raise NotImplementedError(
f"Method {self.processor.endpoint().method} is not supported "
"for endpoints."
f"Method {endpoint.method} is not supported " "for endpoints."
)

@serve.deployment(
Expand Down Expand Up @@ -108,7 +109,8 @@ def __init__(self, processor, run_id):
async def root(self, request: input_type) -> output_type:
self.num_events_processed_counter.inc()
start_time = time.monotonic()
output = await self.processor.process(request)
with self.processor.dependencies() as kwargs:
output = await self.processor.process(request, **kwargs)
self.process_time_counter.inc((time.monotonic() - start_time) * 1000)
return output

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ async def run(self):
push_converter = sink.push_converter(output_type)
process_fn = self.processor.process

async def process_element(element):
results = await process_fn(pull_converter(element))
async def process_element(element, **process_kwargs):
results = await process_fn(pull_converter(element), **process_kwargs)
if results is None:
# Exclude none results
return
Expand Down Expand Up @@ -201,9 +201,13 @@ async def process_element(element):
)
try:
coros = []
for element in response.payload:
coros.append(process_element(element))
flattened_results = await asyncio.gather(*coros)
# NOTE: process dependencies are acquired here (in context)
with self.processor.dependencies() as process_kwargs:
for element in response.payload:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 context block per element

coros.append(process_element(element, **process_kwargs))
flattened_results = await asyncio.gather(*coros)
# NOTE: process dependencies are released here (out of context)

batch_results = []
for results in flattened_results:
if results is None:
Expand Down
Empty file.
95 changes: 95 additions & 0 deletions buildflow/core/dependencies/dependency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import inspect
from contextlib import ExitStack, contextmanager
from typing import Any, Dict, List, Optional, Type

from buildflow.core.credentials import CredentialType
from buildflow.io.primitive import Primitive


class Dependency:
def __init__(self, primitive: Primitive):
self.primitive = primitive
self.credentials: Optional[CredentialType] = None

def attach_credentials(self, credential_type: CredentialType) -> None:
self.credentials = credential_type

@contextmanager
def materialize(self):
raise NotImplementedError(
f"materialize not implemented for {self.__class__.__name__}"
)


class KwargDependencies:
def __init__(self, arg_name_to_dependency: Dict[str, Dependency]):
self.arg_name_to_dependency = arg_name_to_dependency

@contextmanager
def materialize(self):
with ExitStack() as stack:
kwargs = {
arg_name: stack.enter_context(dependency.materialize())
for arg_name, dependency in self.arg_name_to_dependency.items()
}
yield kwargs


# Python Magic - maybe not the best way to do this but should work as long as no
# threads change their annotations dynamically
class _AnnotationCapturer(type):
captured_annotation: Optional[Type] = None

def __call__(cls, *args, **kwargs):
print("CALLED")
frame = inspect.currentframe().f_back # Capture the frame immediately
instance = super().__call__(*args, **kwargs)
print("INSTANCE", instance)
for name, value in frame.f_locals.items():
if value is instance:
print("FOUND: ", name, value)
annotations = frame.f_locals.get("__annotations__", {})
instance.captured_annotation = annotations.get(name, None)
break
return instance


class Client(Dependency, metaclass=_AnnotationCapturer):
def __init__(self, primitive: Primitive):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make primitive a generic to support client types

super().__init__(primitive)
self.captured_annotation: Optional[Type] = None
self.client: Optional[Any] = None

@contextmanager
def materialize(self):
if self.client is None:
if self.credentials is None:
raise ValueError(
"Cannot materialize client without credentials attached"
)
self.client = self.primitive.client_provider().client(
self.credentials, self.captured_annotation
)
try:
yield self.client
finally:
# Optionally, implement any teardown logic here
pass


class Sink(Dependency):
def __init__(self, primitive: Primitive):
super().__init__(primitive)
self.sink: Optional[Any] = None

@contextmanager
def materialize(self):
if self.sink is None:
if self.credentials is None:
raise ValueError("Cannot materialize sink without credentials attached")
self.sink = self.primitive.sink_provider().sink(self.credentials)
try:
yield self.sink
finally:
# Optionally, implement any teardown logic here
pass
7 changes: 6 additions & 1 deletion buildflow/core/processor/processor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import enum
from typing import List
from contextlib import contextmanager
from typing import Any, Dict, List

from buildflow.core.background_tasks.background_task import BackgroundTask

Expand All @@ -21,6 +22,10 @@ class ProcessorAPI:
def pulumi_program(self, preview: bool):
raise NotImplementedError("pulumi_program not implemented for Processor")

@contextmanager
def dependencies(self):
raise NotImplementedError("dependencies not implemented for Processor")

def setup(self):
raise NotImplementedError("setup not implemented")

Expand Down
Loading
Loading