diff --git a/buildflow/core/app/flow.py b/buildflow/core/app/flow.py index 58adbebc..b4de421c 100644 --- a/buildflow/core/app/flow.py +++ b/buildflow/core/app/flow.py @@ -21,6 +21,12 @@ from buildflow.core.credentials.aws_credentials import AWSCredentials from buildflow.core.credentials.empty_credentials import EmptyCredentials from buildflow.core.credentials.gcp_credentials import GCPCredentials +from buildflow.core.dependencies.dependency import ( + Client, + Dependency, + KwargDependencies, + Sink, +) from buildflow.core.options.flow_options import FlowOptions from buildflow.core.options.runtime_options import AutoscalerOptions, ProcessorOptions from buildflow.core.processor.patterns.collector import CollectorProcessor @@ -315,6 +321,7 @@ def __init__( flow_id: FlowID = "buildflow-app", flow_options: Optional[FlowOptions] = None, ) -> None: + print("\n\nFLOW CREATED\n\n") # Load the BuildFlow Config to get the default options buildflow_config_dir = os.path.join( _get_directory_path_of_caller(), ".buildflow" @@ -378,6 +385,27 @@ def _background_tasks( return provider.background_tasks(credentials) return [] + def _dependencies( + self, full_arg_spec: type_inspect.FullArgSpec + ) -> KwargDependencies: + dependencies = {} + for i, arg_name in enumerate(full_arg_spec.args[1:]): + default_value = full_arg_spec.defaults[i] + if default_value is None: + raise ValueError( + f"Processor arg dependency {arg_name} has no default value." + ) + + if isinstance(default_value, Dependency): + credentials = self._get_credentials( + default_value.primitive.primitive_type + ) + default_value.attach_credentials(credentials) + default_value.attach_annotated_type(full_arg_spec.annotations[arg_name]) + dependencies[arg_name] = default_value + + return KwargDependencies(dependencies) + # NOTE: The Flow class is responsible for converting Primitives into a Provider def pipeline( self, @@ -743,6 +771,28 @@ def __init__( ) outputs["sink_urn"] = sink_resource.urn + # Builds the dependencies' pulumi.CompositeResources + # (if they exist) + for ( + arg_name, + dependency, + ) in kwarg_dependencies.arg_name_to_dependency.items(): + dependency_pulumi_provider = ( + dependency.primitive.pulumi_provider() + ) + if ( + dependency_pulumi_provider is not None + and dependency.primitive not in primitive_cache + ): + dependency_resource = _traverse_primitive_for_pulumi( + primitive=dependency.primitive, + type_=None, + credentials=dependency.credentials, + initial_opts=child_opts, + visited_primitives=primitive_cache, + ) + outputs[f"{arg_name}_urn"] = dependency_resource.urn + self.register_outputs(outputs) return PipelineComponentResource( @@ -756,6 +806,8 @@ def background_tasks(): source_primitive, source_credentials ) + self._background_tasks(sink_primitive, sink_credentials) + kwarg_dependencies = self._dependencies(full_arg_spec) + # Dynamically define a new class with the same structure as Processor class_name = f"PipelineProcessor{utils.uuid(max_len=8)}" source_provider = source_primitive.source_provider() @@ -771,6 +823,7 @@ def background_tasks(): "setup": setup, "teardown": teardown, "background_tasks": lambda self: background_tasks(), + "dependencies": kwarg_dependencies.materialize, "__meta__": { "source": source_primitive, "sink": sink_primitive, @@ -865,6 +918,28 @@ def __init__( ) outputs["sink_urn"] = sink_resource.urn + # Builds the dependencies' pulumi.CompositeResources + # (if they exist) + for ( + arg_name, + dependency, + ) in kwarg_dependencies.arg_name_to_dependency.items(): + dependency_pulumi_provider = ( + dependency.primitive.pulumi_provider() + ) + if ( + dependency_pulumi_provider is not None + and dependency.primitive not in primitive_cache + ): + dependency_resource = _traverse_primitive_for_pulumi( + primitive=dependency.primitive, + type_=None, + credentials=dependency.credentials, + initial_opts=child_opts, + visited_primitives=primitive_cache, + ) + outputs[f"{arg_name}_urn"] = dependency_resource.urn + self.register_outputs(outputs) return CollectorComponentResource( @@ -872,6 +947,8 @@ def __init__( sink_primitive=sink_primitive, ) + kwarg_dependencies = self._dependencies(full_arg_spec) + def background_tasks(): return self._background_tasks(sink_primitive, sink_credentials) @@ -889,6 +966,7 @@ def background_tasks(): "setup": setup, "teardown": teardown, "background_tasks": lambda self: background_tasks(), + "dependencies": kwarg_dependencies.materialize, "__meta__": { "sink": sink_primitive, }, @@ -939,6 +1017,7 @@ def decorator_function(original_process_fn_or_class): _, output_type = self._input_output_type(full_arg_spec) processor_id = original_process_fn_or_class.__name__ + primitive_cache = self._primitive_cache def pulumi_resources_for_endpoint(): class EndpointComponentResource(pulumi.ComponentResource): @@ -950,12 +1029,37 @@ def __init__(self, processor_id: str): None, ) + child_opts = pulumi.ResourceOptions(parent=self) outputs = {"processor_id": processor_id} + # Builds the dependencies' pulumi.CompositeResources + # (if they exist) + for ( + arg_name, + dependency, + ) in kwarg_dependencies.arg_name_to_dependency.items(): + dependency_pulumi_provider = ( + dependency.primitive.pulumi_provider() + ) + if ( + dependency_pulumi_provider is not None + and dependency.primitive not in primitive_cache + ): + dependency_resource = _traverse_primitive_for_pulumi( + primitive=dependency.primitive, + type_=None, + credentials=dependency.credentials, + initial_opts=child_opts, + visited_primitives=primitive_cache, + ) + outputs[f"{arg_name}_urn"] = dependency_resource.urn + self.register_outputs(outputs) return EndpointComponentResource(processor_id=processor_id) + kwarg_dependencies = self._dependencies(full_arg_spec) + # Dynamically define a new class with the same structure as Processor class_name = f"EndpointProcessor{utils.uuid(max_len=8)}" adhoc_methods = { @@ -968,6 +1072,7 @@ def __init__(self, processor_id: str): "setup": setup, "teardown": teardown, "background_tasks": lambda self: [], + "dependencies": kwarg_dependencies.materialize, "__meta__": {}, "__call__": original_process_fn_or_class, } diff --git a/buildflow/core/app/runtime/actors/collector_pattern/receive_process_push_ack.py b/buildflow/core/app/runtime/actors/collector_pattern/receive_process_push_ack.py index 83b95729..87bad648 100644 --- a/buildflow/core/app/runtime/actors/collector_pattern/receive_process_push_ack.py +++ b/buildflow/core/app/runtime/actors/collector_pattern/receive_process_push_ack.py @@ -69,14 +69,15 @@ async def run(self) -> bool: push_converter = self.sink.push_converter(output_type) app = fastapi.FastAPI() fastapi_method = None - if self.processor.endpoint().method == Method.GET: + + endpoint = self.processor.endpoint() + if endpoint.method == Method.GET: fastapi_method = app.get - elif self.processor.endpoint().method == Method.POST: + elif endpoint.method == Method.POST: fastapi_method = app.post else: raise NotImplementedError( - f"Method {self.processor.endpoint().method} is not supported " - "for collectors." + f"Method {endpoint.method} is not supported " "for collectors." ) @serve.deployment( @@ -112,7 +113,8 @@ async def root(self, request: input_type) -> None: sink = self.processor.sink() self.num_events_processed_counter.inc() start_time = time.monotonic() - output = await self.processor.process(request) + with self.processor.dependencies() as kwargs: + output = await self.processor.process(request, **kwargs) if output is None: # Exclude none results return diff --git a/buildflow/core/app/runtime/actors/endpoint_pattern/receive_process_respond.py b/buildflow/core/app/runtime/actors/endpoint_pattern/receive_process_respond.py index 00dcdfbe..ba429c92 100644 --- a/buildflow/core/app/runtime/actors/endpoint_pattern/receive_process_respond.py +++ b/buildflow/core/app/runtime/actors/endpoint_pattern/receive_process_respond.py @@ -67,14 +67,15 @@ async def run(self) -> bool: input_type, output_type = process_types(self.processor) app = fastapi.FastAPI() fastapi_method = None - if self.processor.endpoint().method == Method.GET: + + endpoint = self.processor.endpoint() + if endpoint.method == Method.GET: fastapi_method = app.get - elif self.processor.endpoint().method == Method.POST: + elif endpoint.method == Method.POST: fastapi_method = app.post else: raise NotImplementedError( - f"Method {self.processor.endpoint().method} is not supported " - "for endpoints." + f"Method {endpoint.method} is not supported " "for endpoints." ) @serve.deployment( @@ -108,7 +109,8 @@ def __init__(self, processor, run_id): async def root(self, request: input_type) -> output_type: self.num_events_processed_counter.inc() start_time = time.monotonic() - output = await self.processor.process(request) + with self.processor.dependencies() as kwargs: + output = await self.processor.process(request, **kwargs) self.process_time_counter.inc((time.monotonic() - start_time) * 1000) return output diff --git a/buildflow/core/app/runtime/actors/pipeline_pattern/pull_process_push.py b/buildflow/core/app/runtime/actors/pipeline_pattern/pull_process_push.py index 8b7650d7..9bb967a7 100644 --- a/buildflow/core/app/runtime/actors/pipeline_pattern/pull_process_push.py +++ b/buildflow/core/app/runtime/actors/pipeline_pattern/pull_process_push.py @@ -158,8 +158,8 @@ async def run(self): push_converter = sink.push_converter(output_type) process_fn = self.processor.process - async def process_element(element): - results = await process_fn(pull_converter(element)) + async def process_element(element, **process_kwargs): + results = await process_fn(pull_converter(element), **process_kwargs) if results is None: # Exclude none results return @@ -202,8 +202,10 @@ async def process_element(element): try: coros = [] for element in response.payload: - coros.append(process_element(element)) + with self.processor.dependencies() as process_kwargs: + coros.append(process_element(element, **process_kwargs)) flattened_results = await asyncio.gather(*coros) + batch_results = [] for results in flattened_results: if results is None: diff --git a/buildflow/core/dependencies/__init__.py b/buildflow/core/dependencies/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/buildflow/core/dependencies/dependency.py b/buildflow/core/dependencies/dependency.py new file mode 100644 index 00000000..c60f257d --- /dev/null +++ b/buildflow/core/dependencies/dependency.py @@ -0,0 +1,81 @@ +import inspect +from contextlib import ExitStack, contextmanager +from typing import Any, Dict, Generic, Optional, Type, TypeVar + +from buildflow.core.credentials import CredentialType +from buildflow.io.primitive import Primitive + + +class Dependency: + def __init__(self, primitive: Primitive): + self.primitive = primitive + self.credentials: Optional[CredentialType] = None + self.annotated_type: Optional[Type] = None + + def attach_credentials(self, credential_type: CredentialType) -> None: + self.credentials = credential_type + + def attach_annotated_type(self, annotated_type: Type) -> None: + self.annotated_type = annotated_type + + @contextmanager + def materialize(self): + raise NotImplementedError( + f"materialize not implemented for {self.__class__.__name__}" + ) + + +class KwargDependencies: + def __init__(self, arg_name_to_dependency: Dict[str, Dependency]): + self.arg_name_to_dependency = arg_name_to_dependency + + @contextmanager + def materialize(self): + with ExitStack() as stack: + kwargs = { + arg_name: stack.enter_context(dependency.materialize()) + for arg_name, dependency in self.arg_name_to_dependency.items() + } + yield kwargs + + +P = TypeVar("P", bound=Primitive) + + +class Client(Generic[P], Dependency): + def __init__(self, primitive: P): + super().__init__(primitive) + self.client: Optional[Any] = None + + @contextmanager + def materialize(self): + if self.client is None: + if self.credentials is None: + raise ValueError( + "Cannot materialize client without credentials attached" + ) + provider = self.primitive.client_provider() + self.client = provider.client(self.credentials, self.annotated_type) + try: + yield self.client + finally: + # Optionally, implement any teardown logic here + pass + + +class Sink(Dependency): + def __init__(self, primitive: Primitive): + super().__init__(primitive) + self.sink: Optional[Any] = None + + @contextmanager + def materialize(self): + if self.sink is None: + if self.credentials is None: + raise ValueError("Cannot materialize sink without credentials attached") + self.sink = self.primitive.sink_provider().sink(self.credentials) + try: + yield self.sink + finally: + # Optionally, implement any teardown logic here + pass diff --git a/buildflow/core/processor/processor.py b/buildflow/core/processor/processor.py index e1d35c4a..f1bba3ee 100644 --- a/buildflow/core/processor/processor.py +++ b/buildflow/core/processor/processor.py @@ -1,5 +1,6 @@ import enum -from typing import List +from contextlib import contextmanager +from typing import Any, Dict, List from buildflow.core.background_tasks.background_task import BackgroundTask @@ -21,6 +22,10 @@ class ProcessorAPI: def pulumi_program(self, preview: bool): raise NotImplementedError("pulumi_program not implemented for Processor") + @contextmanager + def dependencies(self): + raise NotImplementedError("dependencies not implemented for Processor") + def setup(self): raise NotImplementedError("setup not implemented") diff --git a/buildflow/io/aws/s3.py b/buildflow/io/aws/s3.py index 4244bb5c..a5f73865 100644 --- a/buildflow/io/aws/s3.py +++ b/buildflow/io/aws/s3.py @@ -21,6 +21,8 @@ class S3Bucket( S3BucketProvider, # Background task provider type None, + # Client provider type + None, ] ): bucket_name: S3BucketName diff --git a/buildflow/io/aws/s3_file_change_stream.py b/buildflow/io/aws/s3_file_change_stream.py index c41e144a..a9f4c7ea 100644 --- a/buildflow/io/aws/s3_file_change_stream.py +++ b/buildflow/io/aws/s3_file_change_stream.py @@ -23,6 +23,8 @@ class S3FileChangeStream( None, # Background task provider type None, + # Client provider type + None, ], CompositePrimitive, ): diff --git a/buildflow/io/aws/sqs.py b/buildflow/io/aws/sqs.py index 064079bd..cef6413a 100644 --- a/buildflow/io/aws/sqs.py +++ b/buildflow/io/aws/sqs.py @@ -19,6 +19,8 @@ class SQSQueue( SQSQueueProvider, # Background task provider type None, + # Client provider type + None, ] ): queue_name: SQSQueueName diff --git a/buildflow/io/duckdb/duckdb.py b/buildflow/io/duckdb/duckdb.py index 8566b3fb..714d56c8 100644 --- a/buildflow/io/duckdb/duckdb.py +++ b/buildflow/io/duckdb/duckdb.py @@ -18,6 +18,8 @@ class DuckDBTable( DuckDBProvider, # Background task provider type None, + # Client provider type + None, ] ): database: DuckDBDatabase diff --git a/buildflow/io/gcp/bigquery_table.py b/buildflow/io/gcp/bigquery_table.py index 3becb5eb..9834a5a0 100644 --- a/buildflow/io/gcp/bigquery_table.py +++ b/buildflow/io/gcp/bigquery_table.py @@ -24,6 +24,8 @@ class BigQueryTable( BigQueryTableProvider, # Background task provider type None, + # Client provider type + None, ] ): dataset: BigQueryDataset diff --git a/buildflow/io/gcp/gcs_file_change_stream.py b/buildflow/io/gcp/gcs_file_change_stream.py index 84a41993..ec5a9bb3 100644 --- a/buildflow/io/gcp/gcs_file_change_stream.py +++ b/buildflow/io/gcp/gcs_file_change_stream.py @@ -24,6 +24,8 @@ class GCSFileChangeStream( None, # Background task provider type None, + # Client provider type + None, ], CompositePrimitive, ): diff --git a/buildflow/io/gcp/providers/pubsub_topic.py b/buildflow/io/gcp/providers/pubsub_topic.py index 3ea77914..03ba3f87 100644 --- a/buildflow/io/gcp/providers/pubsub_topic.py +++ b/buildflow/io/gcp/providers/pubsub_topic.py @@ -2,11 +2,13 @@ import pulumi import pulumi_gcp +from google.cloud import pubsub from buildflow.core.credentials import GCPCredentials from buildflow.core.types.gcp_types import GCPProjectID, PubSubTopicID, PubSubTopicName from buildflow.io.gcp.strategies.pubsub_strategies import GCPPubSubTopicSink from buildflow.io.provider import PulumiProvider, SinkProvider +from buildflow.io.utils.clients import gcp_clients class _PubSubTopic(pulumi.ComponentResource): @@ -64,6 +66,17 @@ def sink(self, credentials: GCPCredentials): topic_name=self.topic_name, ) + def client(self, credentials: GCPCredentials, client_type: Optional[Type]): + if client_type is not None and client_type != pubsub.PublisherClient: + raise NotImplementedError( + f"Client type {client_type} is not supported by this provider" + ) + clients = gcp_clients.GCPClients( + credentials=credentials, + quota_project_id=self.project_id, + ) + return clients.get_publisher_client() + def pulumi_resource( self, type_: Optional[Type], diff --git a/buildflow/io/gcp/pubsub_subscription.py b/buildflow/io/gcp/pubsub_subscription.py index 19e2aa89..dc552980 100644 --- a/buildflow/io/gcp/pubsub_subscription.py +++ b/buildflow/io/gcp/pubsub_subscription.py @@ -28,6 +28,8 @@ class GCPPubSubSubscription( None, # Background task provider type None, + # Client provider type + None, ] ): project_id: GCPProjectID diff --git a/buildflow/io/gcp/pubsub_topic.py b/buildflow/io/gcp/pubsub_topic.py index 45a90aeb..b3719c7e 100644 --- a/buildflow/io/gcp/pubsub_topic.py +++ b/buildflow/io/gcp/pubsub_topic.py @@ -20,6 +20,8 @@ class GCPPubSubTopic( GCPPubSubTopicProvider, # Background task provider type None, + # Client provider type + GCPPubSubTopicProvider, ] ): project_id: GCPProjectID @@ -48,6 +50,12 @@ def sink_provider(self) -> GCPPubSubTopicProvider: topic_name=self.topic_name, ) + def client_provider(self) -> GCPPubSubTopicProvider: + return GCPPubSubTopicProvider( + project_id=self.project_id, + topic_name=self.topic_name, + ) + def _pulumi_provider(self) -> GCPPubSubTopicProvider: return GCPPubSubTopicProvider( project_id=self.project_id, diff --git a/buildflow/io/gcp/storage.py b/buildflow/io/gcp/storage.py index 48d6337f..2a0aefba 100644 --- a/buildflow/io/gcp/storage.py +++ b/buildflow/io/gcp/storage.py @@ -24,6 +24,8 @@ class GCSBucket( GCSBucketProvider, # Background task provider type None, + # Client provider type + None, ] ): project_id: GCPProjectID diff --git a/buildflow/io/local/empty.py b/buildflow/io/local/empty.py index 7db22eee..a76b995d 100644 --- a/buildflow/io/local/empty.py +++ b/buildflow/io/local/empty.py @@ -16,6 +16,8 @@ class Empty( EmptyProvider, # Background task provider type None, + # Client provider type + None, ] ): @classmethod diff --git a/buildflow/io/local/file.py b/buildflow/io/local/file.py index e930fa9f..e1143f9c 100644 --- a/buildflow/io/local/file.py +++ b/buildflow/io/local/file.py @@ -19,6 +19,8 @@ class File( FileProvider, # Background task provider type None, + # Client provider type + None, ] ): file_path: FilePath diff --git a/buildflow/io/local/file_change_stream.py b/buildflow/io/local/file_change_stream.py index 3e1822f5..14fa2baa 100644 --- a/buildflow/io/local/file_change_stream.py +++ b/buildflow/io/local/file_change_stream.py @@ -22,6 +22,8 @@ class LocalFileChangeStream( None, # Background task provider type None, + # Client provider type + None, ] ): file_path: FilePath diff --git a/buildflow/io/local/pulse.py b/buildflow/io/local/pulse.py index fa7aae54..e315bf63 100644 --- a/buildflow/io/local/pulse.py +++ b/buildflow/io/local/pulse.py @@ -17,6 +17,8 @@ class Pulse( None, # Background task provider type None, + # Client provider type + None, ] ): items: Iterable[Any] diff --git a/buildflow/io/postgres/__init__.py b/buildflow/io/postgres/__init__.py new file mode 100644 index 00000000..9a8cc8e5 --- /dev/null +++ b/buildflow/io/postgres/__init__.py @@ -0,0 +1,2 @@ +# ruff: noqa +from .postgres import Postgres diff --git a/buildflow/io/postgres/postgres.py b/buildflow/io/postgres/postgres.py new file mode 100644 index 00000000..36a27b9d --- /dev/null +++ b/buildflow/io/postgres/postgres.py @@ -0,0 +1,47 @@ +import dataclasses +from typing import Optional + +from buildflow.io.postgres.providers.postgres_provider import PostgresProvider +from buildflow.io.primitive import Primitive + +# TODO: Support Sessions, created like: +# dialect+driver://username:password@host:port/database_name + + +@dataclasses.dataclass +class Postgres( + Primitive[ + # Pulumi provider type + None, + # Source provider type + None, + # Sink provider type + None, + # Background task provider type + None, + # Client provider type + PostgresProvider, + ] +): + database_name: str + host: str + port: int + user: Optional[str] + password: Optional[str] + + def options( + self, + # Pulumi management options + managed: bool = False, + ) -> "Postgres": + to_ret = super().options(managed) + return to_ret + + def client_provider(self) -> PostgresProvider: + return PostgresProvider( + database_name=self.database_name, + host=self.host, + port=self.port, + user=self.user, + password=self.password, + ) diff --git a/buildflow/io/postgres/providers/__init__.py b/buildflow/io/postgres/providers/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/buildflow/io/postgres/providers/postgres_provider.py b/buildflow/io/postgres/providers/postgres_provider.py new file mode 100644 index 00000000..54cb5c4e --- /dev/null +++ b/buildflow/io/postgres/providers/postgres_provider.py @@ -0,0 +1,40 @@ +import dataclasses +from typing import Optional, Type + +import psycopg2 + +from buildflow.core.credentials.empty_credentials import EmptyCredentials +from buildflow.io.provider import ClientProvider +from sqlalchemy.orm import Session + + +class PostgresProvider(ClientProvider): + def __init__( + self, + *, + database_name: str, + host: str, + port: Optional[int] = None, + user: Optional[str] = None, + password: Optional[str] = None, + ): + self.database_name = database_name + self.host = host + self.port = port + self.user = user + self.password = password + + def client(self, credentials: EmptyCredentials, client_type: Optional[Type]): + if client_type == psycopg2.connection or client_type is None: + return psycopg2.connect( + database=self.database_name, + host=self.host, + port=self.port, + user=self.user, + password=self.password, + ) + elif client_type == Session: + engine = + + else: + raise ValueError(f"Unsupported client type {client_type}") diff --git a/buildflow/io/primitive.py b/buildflow/io/primitive.py index 5ff1aeda..b0b36954 100644 --- a/buildflow/io/primitive.py +++ b/buildflow/io/primitive.py @@ -9,7 +9,7 @@ GCPOptions, LocalOptions, ) -from buildflow.io.provider import BTT, PPT, SIT, SOT +from buildflow.io.provider import BTT, CLT, PPT, SIT, SOT from buildflow.io.strategies._strategy import StategyType @@ -23,8 +23,8 @@ class PrimitiveType(enum.Enum): AGNOSTIC = "agnostic" -class Primitive(Generic[PPT, SOT, SIT, BTT]): - primitive_type: PrimitiveType +class Primitive(Generic[PPT, SOT, SIT, BTT, CLT]): + primitive_type: PrimitiveType = PrimitiveType.AGNOSTIC _managed: bool = False def enable_managed(self): @@ -62,6 +62,12 @@ def sink_provider(self) -> SIT: f"Primitive.sink_provider() is not implemented for type: {type(self)}." ) + def client_provider(self) -> CLT: + """Return a client provider for this primitive.""" + raise NotImplementedError( + f"Primitive.client_provider() is not implemented for type: {type(self)}." + ) + def background_task_provider(self) -> Optional[BTT]: """Return a background task provider for this primitive.""" return None @@ -95,7 +101,7 @@ def options(self) -> "Primitive": return copy.deepcopy(self) -class GCPPrimtive(Primitive[PPT, SOT, SIT, BTT]): +class GCPPrimtive(Primitive[PPT, SOT, SIT, BTT, CLT]): # TODO: We need to check the infra State to warn the user if the infra has not been # created yet. primitive_type = PrimitiveType.GCP @@ -106,7 +112,7 @@ def from_gcp_options(cls, gcp_options: GCPOptions) -> "GCPPrimtive": raise NotImplementedError("GCPPrimtive.from_gcp_options() is not implemented.") -class AWSPrimtive(Primitive[PPT, SOT, SIT, BTT]): +class AWSPrimtive(Primitive[PPT, SOT, SIT, BTT, CLT]): primitive_type = PrimitiveType.AWS @classmethod @@ -115,7 +121,7 @@ def from_aws_options(cls, aws_options: AWSOptions) -> "AWSPrimtive": raise NotImplementedError("AWSPrimtive.from_aws_options() is not implemented.") -class AzurePrimtive(Primitive[PPT, SOT, SIT, BTT]): +class AzurePrimtive(Primitive[PPT, SOT, SIT, BTT, CLT]): primitive_type = PrimitiveType.AZURE @classmethod @@ -126,7 +132,7 @@ def from_azure_options(cls, azure_options: AzureOptions) -> "AzurePrimtive": ) -class LocalPrimtive(Primitive[PPT, SOT, SIT, BTT]): +class LocalPrimtive(Primitive[PPT, SOT, SIT, BTT, CLT]): primitive_type = PrimitiveType.LOCAL # LocalPrimitives are never managed. managed: bool = False diff --git a/buildflow/io/provider.py b/buildflow/io/provider.py index 1726e53e..ece89d65 100644 --- a/buildflow/io/provider.py +++ b/buildflow/io/provider.py @@ -1,4 +1,4 @@ -from typing import List, Optional, Type, TypeVar +from typing import Any, List, Optional, Type, TypeVar import pulumi @@ -44,6 +44,14 @@ def sink(self, credentials: CredentialType) -> SinkStrategy: SIT = TypeVar("SIT", bound=SinkProvider) +class ClientProvider(ProviderAPI): + def client(self, credentials: CredentialType, client_type: Optional[Type]) -> Any: + raise NotImplementedError("client not implemented for Provider") + + +CLT = TypeVar("CLT", bound=ClientProvider) + + class BackgroundTaskProvider(ProviderAPI): def background_tasks(self, credentials: CredentialType) -> List[BackgroundTask]: raise NotImplementedError("background_task not implemented for Provider") diff --git a/buildflow/io/snowflake/snowflake_table.py b/buildflow/io/snowflake/snowflake_table.py index e4f1ee70..d01d2ce9 100644 --- a/buildflow/io/snowflake/snowflake_table.py +++ b/buildflow/io/snowflake/snowflake_table.py @@ -27,6 +27,8 @@ class SnowflakeTable( SnowflakeTableProvider, # Background task provider type SnowflakeTableProvider, + # Client provider type + None, ] ): # TODO: make these types more concrete diff --git a/buildflow/io/strategies/source.py b/buildflow/io/strategies/source.py index 23cc7546..ce418a8e 100644 --- a/buildflow/io/strategies/source.py +++ b/buildflow/io/strategies/source.py @@ -1,5 +1,5 @@ import dataclasses -from typing import Any, Callable, Iterable, Type +from typing import Any, Callable, Iterable, Type, TypeAlias from buildflow.core.credentials import CredentialType from buildflow.io.strategies._strategy import StategyType, Strategy, StrategyID @@ -47,3 +47,6 @@ async def teardown(self): This should perform any cleanup that is needed by the source. """ pass + + +Source: TypeAlias = SourceStrategy diff --git a/buildflow/samples/service/main_two.py b/buildflow/samples/service/main_two.py new file mode 100644 index 00000000..de457fdb --- /dev/null +++ b/buildflow/samples/service/main_two.py @@ -0,0 +1,31 @@ +from dataclasses import dataclass + +from buildflow import Flow +from buildflow.core.dependencies.dependency import Client +from buildflow.io.gcp import GCPPubSubTopic +from buildflow.io.local import File, Pulse + + +@dataclass +class InputRequest: + val: int + + +@dataclass +class OuptutResponse: + val: int + + +pubsub_topic = GCPPubSubTopic( + project_id="daring-runway-374503", topic_name="tanke-test-topic" +) + + +app = Flow() + + +@app.pipeline(source=Pulse([InputRequest(1)], 1), sink=File("output.txt", "csv")) +def my_pipeline_processor( + input: InputRequest, pubsub_client=Client(pubsub_topic) +) -> OuptutResponse: + return OuptutResponse(val=input.val + 1) diff --git a/pyproject.toml b/pyproject.toml index f67483dd..a3f03b48 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,6 +32,9 @@ dependencies = [ "opentelemetry-exporter-otlp", "opentelemetry-exporter-jaeger", "pandas", + # TODO: use psycopg2 instead of psycopg2-binary + # more context: https://www.psycopg.org/docs/install.html#psycopg-vs-psycopg-binary + "psycopg2-binary", "pulumi==3.35.3", "pulumi_aws", "pulumi_gcp",