From 86cfd183e8e1db1952297c51b9b0a8cc4a147a9a Mon Sep 17 00:00:00 2001 From: tanke Date: Tue, 5 Sep 2023 11:27:50 -0700 Subject: [PATCH 1/2] temp --- buildflow/core/app/flow.py | 104 ++++++++ .../receive_process_push_ack.py | 12 +- .../receive_process_respond.py | 12 +- .../pipeline_pattern/pull_process_push.py | 14 +- buildflow/core/dependencies/__init__.py | 0 buildflow/core/dependencies/dependency.py | 95 ++++++++ buildflow/core/processor/processor.py | 7 +- buildflow/io/gcp/providers/pubsub_topic.py | 13 + buildflow/io/gcp/pubsub_topic.py | 6 + buildflow/io/postgres/__init__.py | 3 + buildflow/io/postgres/postgres.py | 158 ++++++++++++ buildflow/io/postgres/providers/__init__.py | 0 .../postgres/providers/postgres_provider.py | 225 ++++++++++++++++++ buildflow/io/primitive.py | 10 +- buildflow/io/provider.py | 10 +- buildflow/io/strategies/source.py | 5 +- buildflow/samples/service/main_two.py | 31 +++ 17 files changed, 685 insertions(+), 20 deletions(-) create mode 100644 buildflow/core/dependencies/__init__.py create mode 100644 buildflow/core/dependencies/dependency.py create mode 100644 buildflow/io/postgres/__init__.py create mode 100644 buildflow/io/postgres/postgres.py create mode 100644 buildflow/io/postgres/providers/__init__.py create mode 100644 buildflow/io/postgres/providers/postgres_provider.py create mode 100644 buildflow/samples/service/main_two.py diff --git a/buildflow/core/app/flow.py b/buildflow/core/app/flow.py index 58adbebc..85b05d81 100644 --- a/buildflow/core/app/flow.py +++ b/buildflow/core/app/flow.py @@ -21,6 +21,12 @@ from buildflow.core.credentials.aws_credentials import AWSCredentials from buildflow.core.credentials.empty_credentials import EmptyCredentials from buildflow.core.credentials.gcp_credentials import GCPCredentials +from buildflow.core.dependencies.dependency import ( + Client, + Dependency, + KwargDependencies, + Sink, +) from buildflow.core.options.flow_options import FlowOptions from buildflow.core.options.runtime_options import AutoscalerOptions, ProcessorOptions from buildflow.core.processor.patterns.collector import CollectorProcessor @@ -315,6 +321,7 @@ def __init__( flow_id: FlowID = "buildflow-app", flow_options: Optional[FlowOptions] = None, ) -> None: + print("\n\nFLOW CREATED\n\n") # Load the BuildFlow Config to get the default options buildflow_config_dir = os.path.join( _get_directory_path_of_caller(), ".buildflow" @@ -378,6 +385,26 @@ def _background_tasks( return provider.background_tasks(credentials) return [] + def _dependencies( + self, full_arg_spec: type_inspect.FullArgSpec + ) -> KwargDependencies: + dependencies = {} + for i, arg_name in enumerate(full_arg_spec.args[1:]): + default_value = full_arg_spec.defaults[i] + if default_value is None: + raise ValueError( + f"Processor arg dependency {arg_name} has no default value." + ) + + if isinstance(default_value, Dependency): + credentials = self._get_credentials( + default_value.primitive.primitive_type + ) + default_value.attach_credentials(credentials) + dependencies[arg_name] = default_value + + return KwargDependencies(dependencies) + # NOTE: The Flow class is responsible for converting Primitives into a Provider def pipeline( self, @@ -743,6 +770,28 @@ def __init__( ) outputs["sink_urn"] = sink_resource.urn + # Builds the dependencies' pulumi.CompositeResources + # (if they exist) + for ( + arg_name, + dependency, + ) in kwarg_dependencies.arg_name_to_dependency.items(): + dependency_pulumi_provider = ( + dependency.primitive.pulumi_provider() + ) + if ( + dependency_pulumi_provider is not None + and dependency.primitive not in primitive_cache + ): + dependency_resource = _traverse_primitive_for_pulumi( + primitive=dependency.primitive, + type_=None, + credentials=dependency.credentials, + initial_opts=child_opts, + visited_primitives=primitive_cache, + ) + outputs[f"{arg_name}_urn"] = dependency_resource.urn + self.register_outputs(outputs) return PipelineComponentResource( @@ -756,6 +805,8 @@ def background_tasks(): source_primitive, source_credentials ) + self._background_tasks(sink_primitive, sink_credentials) + kwarg_dependencies = self._dependencies(full_arg_spec) + # Dynamically define a new class with the same structure as Processor class_name = f"PipelineProcessor{utils.uuid(max_len=8)}" source_provider = source_primitive.source_provider() @@ -771,6 +822,7 @@ def background_tasks(): "setup": setup, "teardown": teardown, "background_tasks": lambda self: background_tasks(), + "dependencies": kwarg_dependencies.materialize, "__meta__": { "source": source_primitive, "sink": sink_primitive, @@ -865,6 +917,28 @@ def __init__( ) outputs["sink_urn"] = sink_resource.urn + # Builds the dependencies' pulumi.CompositeResources + # (if they exist) + for ( + arg_name, + dependency, + ) in kwarg_dependencies.arg_name_to_dependency.items(): + dependency_pulumi_provider = ( + dependency.primitive.pulumi_provider() + ) + if ( + dependency_pulumi_provider is not None + and dependency.primitive not in primitive_cache + ): + dependency_resource = _traverse_primitive_for_pulumi( + primitive=dependency.primitive, + type_=None, + credentials=dependency.credentials, + initial_opts=child_opts, + visited_primitives=primitive_cache, + ) + outputs[f"{arg_name}_urn"] = dependency_resource.urn + self.register_outputs(outputs) return CollectorComponentResource( @@ -872,6 +946,8 @@ def __init__( sink_primitive=sink_primitive, ) + kwarg_dependencies = self._dependencies(full_arg_spec) + def background_tasks(): return self._background_tasks(sink_primitive, sink_credentials) @@ -889,6 +965,7 @@ def background_tasks(): "setup": setup, "teardown": teardown, "background_tasks": lambda self: background_tasks(), + "dependencies": kwarg_dependencies.materialize, "__meta__": { "sink": sink_primitive, }, @@ -939,6 +1016,7 @@ def decorator_function(original_process_fn_or_class): _, output_type = self._input_output_type(full_arg_spec) processor_id = original_process_fn_or_class.__name__ + primitive_cache = self._primitive_cache def pulumi_resources_for_endpoint(): class EndpointComponentResource(pulumi.ComponentResource): @@ -950,12 +1028,37 @@ def __init__(self, processor_id: str): None, ) + child_opts = pulumi.ResourceOptions(parent=self) outputs = {"processor_id": processor_id} + # Builds the dependencies' pulumi.CompositeResources + # (if they exist) + for ( + arg_name, + dependency, + ) in kwarg_dependencies.arg_name_to_dependency.items(): + dependency_pulumi_provider = ( + dependency.primitive.pulumi_provider() + ) + if ( + dependency_pulumi_provider is not None + and dependency.primitive not in primitive_cache + ): + dependency_resource = _traverse_primitive_for_pulumi( + primitive=dependency.primitive, + type_=None, + credentials=dependency.credentials, + initial_opts=child_opts, + visited_primitives=primitive_cache, + ) + outputs[f"{arg_name}_urn"] = dependency_resource.urn + self.register_outputs(outputs) return EndpointComponentResource(processor_id=processor_id) + kwarg_dependencies = self._dependencies(full_arg_spec) + # Dynamically define a new class with the same structure as Processor class_name = f"EndpointProcessor{utils.uuid(max_len=8)}" adhoc_methods = { @@ -968,6 +1071,7 @@ def __init__(self, processor_id: str): "setup": setup, "teardown": teardown, "background_tasks": lambda self: [], + "dependencies": kwarg_dependencies.materialize, "__meta__": {}, "__call__": original_process_fn_or_class, } diff --git a/buildflow/core/app/runtime/actors/collector_pattern/receive_process_push_ack.py b/buildflow/core/app/runtime/actors/collector_pattern/receive_process_push_ack.py index 83b95729..87bad648 100644 --- a/buildflow/core/app/runtime/actors/collector_pattern/receive_process_push_ack.py +++ b/buildflow/core/app/runtime/actors/collector_pattern/receive_process_push_ack.py @@ -69,14 +69,15 @@ async def run(self) -> bool: push_converter = self.sink.push_converter(output_type) app = fastapi.FastAPI() fastapi_method = None - if self.processor.endpoint().method == Method.GET: + + endpoint = self.processor.endpoint() + if endpoint.method == Method.GET: fastapi_method = app.get - elif self.processor.endpoint().method == Method.POST: + elif endpoint.method == Method.POST: fastapi_method = app.post else: raise NotImplementedError( - f"Method {self.processor.endpoint().method} is not supported " - "for collectors." + f"Method {endpoint.method} is not supported " "for collectors." ) @serve.deployment( @@ -112,7 +113,8 @@ async def root(self, request: input_type) -> None: sink = self.processor.sink() self.num_events_processed_counter.inc() start_time = time.monotonic() - output = await self.processor.process(request) + with self.processor.dependencies() as kwargs: + output = await self.processor.process(request, **kwargs) if output is None: # Exclude none results return diff --git a/buildflow/core/app/runtime/actors/endpoint_pattern/receive_process_respond.py b/buildflow/core/app/runtime/actors/endpoint_pattern/receive_process_respond.py index 00dcdfbe..ba429c92 100644 --- a/buildflow/core/app/runtime/actors/endpoint_pattern/receive_process_respond.py +++ b/buildflow/core/app/runtime/actors/endpoint_pattern/receive_process_respond.py @@ -67,14 +67,15 @@ async def run(self) -> bool: input_type, output_type = process_types(self.processor) app = fastapi.FastAPI() fastapi_method = None - if self.processor.endpoint().method == Method.GET: + + endpoint = self.processor.endpoint() + if endpoint.method == Method.GET: fastapi_method = app.get - elif self.processor.endpoint().method == Method.POST: + elif endpoint.method == Method.POST: fastapi_method = app.post else: raise NotImplementedError( - f"Method {self.processor.endpoint().method} is not supported " - "for endpoints." + f"Method {endpoint.method} is not supported " "for endpoints." ) @serve.deployment( @@ -108,7 +109,8 @@ def __init__(self, processor, run_id): async def root(self, request: input_type) -> output_type: self.num_events_processed_counter.inc() start_time = time.monotonic() - output = await self.processor.process(request) + with self.processor.dependencies() as kwargs: + output = await self.processor.process(request, **kwargs) self.process_time_counter.inc((time.monotonic() - start_time) * 1000) return output diff --git a/buildflow/core/app/runtime/actors/pipeline_pattern/pull_process_push.py b/buildflow/core/app/runtime/actors/pipeline_pattern/pull_process_push.py index 8b7650d7..826015c7 100644 --- a/buildflow/core/app/runtime/actors/pipeline_pattern/pull_process_push.py +++ b/buildflow/core/app/runtime/actors/pipeline_pattern/pull_process_push.py @@ -158,8 +158,8 @@ async def run(self): push_converter = sink.push_converter(output_type) process_fn = self.processor.process - async def process_element(element): - results = await process_fn(pull_converter(element)) + async def process_element(element, **process_kwargs): + results = await process_fn(pull_converter(element), **process_kwargs) if results is None: # Exclude none results return @@ -201,9 +201,13 @@ async def process_element(element): ) try: coros = [] - for element in response.payload: - coros.append(process_element(element)) - flattened_results = await asyncio.gather(*coros) + # NOTE: process dependencies are acquired here (in context) + with self.processor.dependencies() as process_kwargs: + for element in response.payload: + coros.append(process_element(element, **process_kwargs)) + flattened_results = await asyncio.gather(*coros) + # NOTE: process dependencies are released here (out of context) + batch_results = [] for results in flattened_results: if results is None: diff --git a/buildflow/core/dependencies/__init__.py b/buildflow/core/dependencies/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/buildflow/core/dependencies/dependency.py b/buildflow/core/dependencies/dependency.py new file mode 100644 index 00000000..19a39829 --- /dev/null +++ b/buildflow/core/dependencies/dependency.py @@ -0,0 +1,95 @@ +import inspect +from contextlib import ExitStack, contextmanager +from typing import Any, Dict, List, Optional, Type + +from buildflow.core.credentials import CredentialType +from buildflow.io.primitive import Primitive + + +class Dependency: + def __init__(self, primitive: Primitive): + self.primitive = primitive + self.credentials: Optional[CredentialType] = None + + def attach_credentials(self, credential_type: CredentialType) -> None: + self.credentials = credential_type + + @contextmanager + def materialize(self): + raise NotImplementedError( + f"materialize not implemented for {self.__class__.__name__}" + ) + + +class KwargDependencies: + def __init__(self, arg_name_to_dependency: Dict[str, Dependency]): + self.arg_name_to_dependency = arg_name_to_dependency + + @contextmanager + def materialize(self): + with ExitStack() as stack: + kwargs = { + arg_name: stack.enter_context(dependency.materialize()) + for arg_name, dependency in self.arg_name_to_dependency.items() + } + yield kwargs + + +# Python Magic - maybe not the best way to do this but should work as long as no +# threads change their annotations dynamically +class _AnnotationCapturer(type): + captured_annotation: Optional[Type] = None + + def __call__(cls, *args, **kwargs): + print("CALLED") + frame = inspect.currentframe().f_back # Capture the frame immediately + instance = super().__call__(*args, **kwargs) + print("INSTANCE", instance) + for name, value in frame.f_locals.items(): + if value is instance: + print("FOUND: ", name, value) + annotations = frame.f_locals.get("__annotations__", {}) + instance.captured_annotation = annotations.get(name, None) + break + return instance + + +class Client(Dependency, metaclass=_AnnotationCapturer): + def __init__(self, primitive: Primitive): + super().__init__(primitive) + self.captured_annotation: Optional[Type] = None + self.client: Optional[Any] = None + + @contextmanager + def materialize(self): + if self.client is None: + if self.credentials is None: + raise ValueError( + "Cannot materialize client without credentials attached" + ) + self.client = self.primitive.client_provider().client( + self.credentials, self.captured_annotation + ) + try: + yield self.client + finally: + # Optionally, implement any teardown logic here + pass + + +class Sink(Dependency): + def __init__(self, primitive: Primitive): + super().__init__(primitive) + self.sink: Optional[Any] = None + + @contextmanager + def materialize(self): + if self.sink is None: + if self.credentials is None: + raise ValueError("Cannot materialize sink without credentials attached") + self.sink = self.primitive.sink_provider().sink(self.credentials) + try: + yield self.sink + finally: + # Optionally, implement any teardown logic here + pass diff --git a/buildflow/core/processor/processor.py b/buildflow/core/processor/processor.py index e1d35c4a..f1bba3ee 100644 --- a/buildflow/core/processor/processor.py +++ b/buildflow/core/processor/processor.py @@ -1,5 +1,6 @@ import enum -from typing import List +from contextlib import contextmanager +from typing import Any, Dict, List from buildflow.core.background_tasks.background_task import BackgroundTask @@ -21,6 +22,10 @@ class ProcessorAPI: def pulumi_program(self, preview: bool): raise NotImplementedError("pulumi_program not implemented for Processor") + @contextmanager + def dependencies(self): + raise NotImplementedError("dependencies not implemented for Processor") + def setup(self): raise NotImplementedError("setup not implemented") diff --git a/buildflow/io/gcp/providers/pubsub_topic.py b/buildflow/io/gcp/providers/pubsub_topic.py index 3ea77914..03ba3f87 100644 --- a/buildflow/io/gcp/providers/pubsub_topic.py +++ b/buildflow/io/gcp/providers/pubsub_topic.py @@ -2,11 +2,13 @@ import pulumi import pulumi_gcp +from google.cloud import pubsub from buildflow.core.credentials import GCPCredentials from buildflow.core.types.gcp_types import GCPProjectID, PubSubTopicID, PubSubTopicName from buildflow.io.gcp.strategies.pubsub_strategies import GCPPubSubTopicSink from buildflow.io.provider import PulumiProvider, SinkProvider +from buildflow.io.utils.clients import gcp_clients class _PubSubTopic(pulumi.ComponentResource): @@ -64,6 +66,17 @@ def sink(self, credentials: GCPCredentials): topic_name=self.topic_name, ) + def client(self, credentials: GCPCredentials, client_type: Optional[Type]): + if client_type is not None and client_type != pubsub.PublisherClient: + raise NotImplementedError( + f"Client type {client_type} is not supported by this provider" + ) + clients = gcp_clients.GCPClients( + credentials=credentials, + quota_project_id=self.project_id, + ) + return clients.get_publisher_client() + def pulumi_resource( self, type_: Optional[Type], diff --git a/buildflow/io/gcp/pubsub_topic.py b/buildflow/io/gcp/pubsub_topic.py index 45a90aeb..c89af146 100644 --- a/buildflow/io/gcp/pubsub_topic.py +++ b/buildflow/io/gcp/pubsub_topic.py @@ -48,6 +48,12 @@ def sink_provider(self) -> GCPPubSubTopicProvider: topic_name=self.topic_name, ) + def client_provider(self) -> GCPPubSubTopicProvider: + return GCPPubSubTopicProvider( + project_id=self.project_id, + topic_name=self.topic_name, + ) + def _pulumi_provider(self) -> GCPPubSubTopicProvider: return GCPPubSubTopicProvider( project_id=self.project_id, diff --git a/buildflow/io/postgres/__init__.py b/buildflow/io/postgres/__init__.py new file mode 100644 index 00000000..a99c3cf9 --- /dev/null +++ b/buildflow/io/postgres/__init__.py @@ -0,0 +1,3 @@ +# ruff: noqa +from .postgres import Postgres +from .utils import read_private_key_file, read_private_key_file_bytes diff --git a/buildflow/io/postgres/postgres.py b/buildflow/io/postgres/postgres.py new file mode 100644 index 00000000..e4f1ee70 --- /dev/null +++ b/buildflow/io/postgres/postgres.py @@ -0,0 +1,158 @@ +import dataclasses +from typing import Optional, Union + +from buildflow.core.utils import uuid +from buildflow.io.aws.s3 import S3Bucket +from buildflow.io.gcp.storage import GCSBucket +from buildflow.io.primitive import Primitive, PrimitiveType +from buildflow.io.provider import BackgroundTaskProvider, PulumiProvider, SinkProvider +from buildflow.io.snowflake.providers.table_provider import SnowflakeTableProvider +from buildflow.types.portable import FileFormat + +_DEFAULT_DATABASE_MANAGED = True +_DEFAULT_SCHEMA_MANAGED = True +_DEFAULT_SNOW_PIPE_MANAGED = True +_DEFAULT_STAGE_MANAGED = True +_DEFAULT_FLUSH_TIME_LIMIT_SECS = 60 + + +@dataclasses.dataclass +class SnowflakeTable( + Primitive[ + # Pulumi provider type + SnowflakeTableProvider, + # Source provider type + None, + # Sink provider type + SnowflakeTableProvider, + # Background task provider type + SnowflakeTableProvider, + ] +): + # TODO: make these types more concrete + # Required arguments + table: str + database: str + schema: str + bucket: Union[S3Bucket, GCSBucket] + # Arguments for authentication + account: Optional[str] + user: Optional[str] + private_key: Optional[str] + # Optional arguments to configure sink + # If snow pipe is provided, the sink will use snowpipe to load data + # Otherwise you should run `buildflow apply` to have a snow pipe created + # for you. + snow_pipe: Optional[str] = None + # If snowflake_stage is provided, the sink will use the provided stage + # for copying data. + # Otherwise you should run `buildflow apply` to have a stage created + # for you. + snowflake_stage: Optional[str] = None + + # Optional arguments to configure sink + # The maximium number of seconds to wait before flushing to SnowPipe. + flush_time_limit_secs: int = dataclasses.field( + default=_DEFAULT_FLUSH_TIME_LIMIT_SECS, init=False + ) + + # Optional arguments to configure pulumi. These can be set with the: + # .options(...) method + database_managed: bool = dataclasses.field( + default=_DEFAULT_DATABASE_MANAGED, init=False + ) + schema_managed: bool = dataclasses.field( + default=_DEFAULT_SCHEMA_MANAGED, init=False + ) + snow_pipe_managed: bool = dataclasses.field( + default=_DEFAULT_SNOW_PIPE_MANAGED, init=False + ) + stage_managed: bool = dataclasses.field(default=_DEFAULT_STAGE_MANAGED, init=False) + + def __post_init__(self): + if isinstance(self.bucket, S3Bucket): + self.primitive_type = PrimitiveType.AWS + elif isinstance(self.bucket, GCSBucket): + self.primitive_type = PrimitiveType.GCP + else: + raise ValueError( + "Bucket must be of type S3Bucket or GCSBucket. Got: " + f"{type(self.bucket)}" + ) + self.bucket.file_format = FileFormat.PARQUET + self.bucket.file_path = f"{uuid()}.parquet" + self.snow_pipe_managed = self.snow_pipe is None + if self.snow_pipe is None: + self.snow_pipe = "buildflow_managed_snow_pipe" + self.stage_managed = self.snowflake_stage is None + if self.snowflake_stage is None: + self.snowflake_stage = "buildflow_managed_snowflake_stage" + + def options( + self, + # Pulumi management options + managed: bool = False, + database_managed: bool = _DEFAULT_DATABASE_MANAGED, + schema_managed: bool = _DEFAULT_SCHEMA_MANAGED, + # Sink options + flush_time_limit_secs: int = _DEFAULT_FLUSH_TIME_LIMIT_SECS, + ) -> "SnowflakeTable": + to_ret = super().options(managed) + to_ret.database_managed = database_managed + to_ret.schema_managed = schema_managed + to_ret.flush_time_limit_secs = flush_time_limit_secs + return to_ret + + def sink_provider(self) -> SinkProvider: + return SnowflakeTableProvider( + table=self.table, + database=self.database, + schema=self.schema, + bucket=self.bucket, + snow_pipe=self.snow_pipe, + snowflake_stage=self.snowflake_stage, + database_managed=self.database_managed, + schema_managed=self.schema_managed, + snow_pipe_managed=self.snow_pipe_managed, + stage_managed=self.stage_managed, + account=self.account, + user=self.user, + private_key=self.private_key, + flush_time_secs=self.flush_time_limit_secs, + ) + + def _pulumi_provider(self) -> PulumiProvider: + return SnowflakeTableProvider( + table=self.table, + database=self.database, + schema=self.schema, + bucket=self.bucket, + snow_pipe=self.snow_pipe, + snowflake_stage=self.snowflake_stage, + database_managed=self.database_managed, + schema_managed=self.schema_managed, + snow_pipe_managed=self.snow_pipe_managed, + stage_managed=self.stage_managed, + account=self.account, + user=self.user, + private_key=self.private_key, + flush_time_secs=self.flush_time_limit_secs, + ) + + def background_task_provider(self) -> BackgroundTaskProvider: + return SnowflakeTableProvider( + table=self.table, + database=self.database, + schema=self.schema, + bucket=self.bucket._pulumi_provider(), + snow_pipe=self.snow_pipe, + snowflake_stage=self.snowflake_stage, + database_managed=self.database_managed, + schema_managed=self.schema_managed, + snow_pipe_managed=self.snow_pipe_managed, + stage_managed=self.stage_managed, + account=self.account, + user=self.user, + private_key=self.private_key, + flush_time_secs=self.flush_time_limit_secs, + ) diff --git a/buildflow/io/postgres/providers/__init__.py b/buildflow/io/postgres/providers/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/buildflow/io/postgres/providers/postgres_provider.py b/buildflow/io/postgres/providers/postgres_provider.py new file mode 100644 index 00000000..e7f5110f --- /dev/null +++ b/buildflow/io/postgres/providers/postgres_provider.py @@ -0,0 +1,225 @@ +import dataclasses +from typing import List, Optional, Type, Union + +import pulumi +import pulumi_snowflake + +from buildflow.core.background_tasks.background_task import BackgroundTask +from buildflow.core.credentials.aws_credentials import AWSCredentials +from buildflow.core.credentials.gcp_credentials import GCPCredentials +from buildflow.io.aws.s3 import S3Bucket +from buildflow.io.gcp.storage import GCSBucket +from buildflow.io.provider import BackgroundTaskProvider, PulumiProvider, SinkProvider +from buildflow.io.snowflake.background_tasks.table_load_background_task import ( + SnowflakeUploadBackgroundTask, +) +from buildflow.io.snowflake.providers.schemas import type_to_snowflake_columns +from buildflow.io.snowflake.strategies.table_sink_startegy import SnowflakeTableSink +from buildflow.io.strategies.sink import SinkStrategy + + +class _SnowflakeTableSinkResource(pulumi.ComponentResource): + def __init__( + self, + table: str, + database: str, + schema: str, + bucket: Union[S3Bucket, GCSBucket], + snow_pipe: Optional[str], + snowflake_stage: Optional[str], + database_managed: bool, + schema_managed: bool, + snow_pipe_managed: bool, + stage_managed: bool, + account: str, + user: str, + private_key: str, + # pulumi_resource options (buildflow internal concept) + type_: Optional[Type], + credentials: AWSCredentials, + opts: pulumi.ResourceOptions, + ): + super().__init__( + "buildflow:snowflake:Table", + f"buildflow-{database}-{schema}-{table}", + None, + opts, + ) + + outputs = {} + + table_id = f"{database}.{schema}.{table}" + snowflake_provider = pulumi_snowflake.Provider( + resource_name=f"{table_id}.snowflake_provider", + account=account, + username=user, + private_key=private_key, + ) + if type_ is None: + raise ValueError( + "Please specify an output type so we can determine the expected schema " + "of the table." + ) + if hasattr(type_, "__args__"): + # Using a composite type hint like List or Optional + type_ = type_.__args__[0] + columns = type_to_snowflake_columns(type_) + pulumi_snowflake_cols = [] + for column in columns: + pulumi_snowflake_cols.append( + pulumi_snowflake.TableColumnArgs( + name=column.name, type=column.col_type, nullable=column.nullable + ) + ) + + self.database_resource = None + self.schema_resource = None + running_depends = [] + if database_managed: + self.database_resource = pulumi_snowflake.Database( + database, + opts=pulumi.ResourceOptions(parent=self, provider=snowflake_provider), + name=database, + ) + running_depends.append(self.database_resource) + outputs["snowflake.database"] = self.database_resource.name + if schema_managed: + schema_id = f"{database}.{schema}" + self.schema_resource = pulumi_snowflake.Schema( + schema_id, + opts=pulumi.ResourceOptions( + parent=self, + provider=snowflake_provider, + depends_on=running_depends, + ), + name=schema, + database=database, + ) + running_depends.append(self.schema_resource) + outputs["snowflake.schema"] = schema_id + + self.table_resource = pulumi_snowflake.Table( + table_id, + columns=pulumi_snowflake_cols, + database=database, + schema=schema, + name=table, + opts=pulumi.ResourceOptions( + parent=self, provider=snowflake_provider, depends_on=running_depends + ), + ) + outputs["snowflake.table"] = table_id + + self.stage_resource = None + if stage_managed: + snow_stage_id = f"{table_id}.{snowflake_stage}" + stage_credentials = None + if isinstance(credentials, AWSCredentials): + stage_credentials = ( + f"AWS_KEY_ID='{credentials.access_key_id}' " + f"AWS_SECRET_KEY='{credentials.secret_access_key}'" + ) + self.stage_resource = pulumi_snowflake.Stage( + snow_stage_id, + opts=pulumi.ResourceOptions( + parent=self, provider=snowflake_provider, depends_on=running_depends + ), + name=snowflake_stage, + database=database, + schema=schema, + copy_options="MATCH_BY_COLUMN_NAME = CASE_SENSITIVE", + file_format="TYPE = PARQUET", + url=bucket.bucket_url, + credentials=stage_credentials, + ) + outputs["snowflake.stage"] = snow_stage_id + running_depends.append(self.stage_resource) + self.snow_pipe_resource = None + if snow_pipe_managed: + copy_statement = ( + f'copy into "{database}"."{schema}"."{table}" ' + f'from @"{database}"."{schema}"."{snowflake_stage}";' + ) + snow_pipe_id = f"{table_id}.{snow_pipe}" + self.snow_pipe_resource = pulumi_snowflake.Pipe( + snow_pipe_id, + opts=pulumi.ResourceOptions( + parent=self, provider=snowflake_provider, depends_on=running_depends + ), + database=database, + schema=schema, + name=snow_pipe, + copy_statement=copy_statement, + ) + outputs["snowflake.pipe"] = snow_pipe_id + + +@dataclasses.dataclass +class SnowflakeTableProvider(SinkProvider, PulumiProvider, BackgroundTaskProvider): + # Information about the table + table: str + database: str + schema: str + # Options for configuring flushing + flush_time_secs: int + # Bucket for staging data for upload to snowflake + bucket: Union[S3Bucket, GCSBucket] + snow_pipe: Optional[str] + snowflake_stage: Optional[str] + # Options for configuring pulumi + database_managed: bool + schema_managed: bool + snow_pipe_managed: bool + stage_managed: bool + # Authentication information + account: str + user: str + private_key: str + + def sink(self, credentials: Union[AWSCredentials, GCPCredentials]) -> SinkStrategy: + return SnowflakeTableSink( + credentials=credentials, + bucket_sink=self.bucket.sink_provider().sink(credentials), + ) + + def background_tasks( + self, credentials: Union[AWSCredentials, GCPCredentials] + ) -> List[BackgroundTask]: + return [ + SnowflakeUploadBackgroundTask( + credentials=credentials, + bucket_name=self.bucket.bucket_name, + account=self.account, + user=self.user, + database=self.database, + schema=self.schema, + private_key=self.private_key, + pipe=self.snow_pipe, + flush_time_secs=self.flush_time_secs, + ) + ] + + def pulumi_resource( + self, + type_: Optional[Type], + credentials: Union[AWSCredentials, GCPCredentials], + opts: pulumi.ResourceOptions, + ) -> _SnowflakeTableSinkResource: + return _SnowflakeTableSinkResource( + table=self.table, + database=self.database, + schema=self.schema, + bucket=self.bucket, + snowflake_stage=self.snowflake_stage, + snow_pipe=self.snow_pipe, + database_managed=self.database_managed, + schema_managed=self.schema_managed, + snow_pipe_managed=self.snow_pipe_managed, + stage_managed=self.stage_managed, + account=self.account, + user=self.user, + private_key=self.private_key, + type_=type_, + credentials=credentials, + opts=opts, + ) diff --git a/buildflow/io/primitive.py b/buildflow/io/primitive.py index 5ff1aeda..a579455d 100644 --- a/buildflow/io/primitive.py +++ b/buildflow/io/primitive.py @@ -9,7 +9,7 @@ GCPOptions, LocalOptions, ) -from buildflow.io.provider import BTT, PPT, SIT, SOT +from buildflow.io.provider import BTT, CLT, PPT, SIT, SOT from buildflow.io.strategies._strategy import StategyType @@ -23,7 +23,7 @@ class PrimitiveType(enum.Enum): AGNOSTIC = "agnostic" -class Primitive(Generic[PPT, SOT, SIT, BTT]): +class Primitive(Generic[PPT, SOT, SIT, BTT, CLT]): primitive_type: PrimitiveType _managed: bool = False @@ -62,6 +62,12 @@ def sink_provider(self) -> SIT: f"Primitive.sink_provider() is not implemented for type: {type(self)}." ) + def client_provider(self) -> CLT: + """Return a client provider for this primitive.""" + raise NotImplementedError( + f"Primitive.client_provider() is not implemented for type: {type(self)}." + ) + def background_task_provider(self) -> Optional[BTT]: """Return a background task provider for this primitive.""" return None diff --git a/buildflow/io/provider.py b/buildflow/io/provider.py index 1726e53e..ece89d65 100644 --- a/buildflow/io/provider.py +++ b/buildflow/io/provider.py @@ -1,4 +1,4 @@ -from typing import List, Optional, Type, TypeVar +from typing import Any, List, Optional, Type, TypeVar import pulumi @@ -44,6 +44,14 @@ def sink(self, credentials: CredentialType) -> SinkStrategy: SIT = TypeVar("SIT", bound=SinkProvider) +class ClientProvider(ProviderAPI): + def client(self, credentials: CredentialType, client_type: Optional[Type]) -> Any: + raise NotImplementedError("client not implemented for Provider") + + +CLT = TypeVar("CLT", bound=ClientProvider) + + class BackgroundTaskProvider(ProviderAPI): def background_tasks(self, credentials: CredentialType) -> List[BackgroundTask]: raise NotImplementedError("background_task not implemented for Provider") diff --git a/buildflow/io/strategies/source.py b/buildflow/io/strategies/source.py index 23cc7546..ce418a8e 100644 --- a/buildflow/io/strategies/source.py +++ b/buildflow/io/strategies/source.py @@ -1,5 +1,5 @@ import dataclasses -from typing import Any, Callable, Iterable, Type +from typing import Any, Callable, Iterable, Type, TypeAlias from buildflow.core.credentials import CredentialType from buildflow.io.strategies._strategy import StategyType, Strategy, StrategyID @@ -47,3 +47,6 @@ async def teardown(self): This should perform any cleanup that is needed by the source. """ pass + + +Source: TypeAlias = SourceStrategy diff --git a/buildflow/samples/service/main_two.py b/buildflow/samples/service/main_two.py new file mode 100644 index 00000000..de457fdb --- /dev/null +++ b/buildflow/samples/service/main_two.py @@ -0,0 +1,31 @@ +from dataclasses import dataclass + +from buildflow import Flow +from buildflow.core.dependencies.dependency import Client +from buildflow.io.gcp import GCPPubSubTopic +from buildflow.io.local import File, Pulse + + +@dataclass +class InputRequest: + val: int + + +@dataclass +class OuptutResponse: + val: int + + +pubsub_topic = GCPPubSubTopic( + project_id="daring-runway-374503", topic_name="tanke-test-topic" +) + + +app = Flow() + + +@app.pipeline(source=Pulse([InputRequest(1)], 1), sink=File("output.txt", "csv")) +def my_pipeline_processor( + input: InputRequest, pubsub_client=Client(pubsub_topic) +) -> OuptutResponse: + return OuptutResponse(val=input.val + 1) From c6cb2b9ffb397c6279a41556db6ea21c8c76de9c Mon Sep 17 00:00:00 2001 From: tanke Date: Wed, 6 Sep 2023 10:48:58 -0700 Subject: [PATCH 2/2] temp --- buildflow/core/app/flow.py | 1 + .../pipeline_pattern/pull_process_push.py | 8 +- buildflow/core/dependencies/dependency.py | 34 +-- buildflow/io/aws/s3.py | 2 + buildflow/io/aws/s3_file_change_stream.py | 2 + buildflow/io/aws/sqs.py | 2 + buildflow/io/duckdb/duckdb.py | 2 + buildflow/io/gcp/bigquery_table.py | 2 + buildflow/io/gcp/gcs_file_change_stream.py | 2 + buildflow/io/gcp/pubsub_subscription.py | 2 + buildflow/io/gcp/pubsub_topic.py | 2 + buildflow/io/gcp/storage.py | 2 + buildflow/io/local/empty.py | 2 + buildflow/io/local/file.py | 2 + buildflow/io/local/file_change_stream.py | 2 + buildflow/io/local/pulse.py | 2 + buildflow/io/postgres/__init__.py | 1 - buildflow/io/postgres/postgres.py | 155 ++--------- .../postgres/providers/postgres_provider.py | 245 +++--------------- buildflow/io/primitive.py | 10 +- buildflow/io/snowflake/snowflake_table.py | 2 + pyproject.toml | 3 + 22 files changed, 102 insertions(+), 383 deletions(-) diff --git a/buildflow/core/app/flow.py b/buildflow/core/app/flow.py index 85b05d81..b4de421c 100644 --- a/buildflow/core/app/flow.py +++ b/buildflow/core/app/flow.py @@ -401,6 +401,7 @@ def _dependencies( default_value.primitive.primitive_type ) default_value.attach_credentials(credentials) + default_value.attach_annotated_type(full_arg_spec.annotations[arg_name]) dependencies[arg_name] = default_value return KwargDependencies(dependencies) diff --git a/buildflow/core/app/runtime/actors/pipeline_pattern/pull_process_push.py b/buildflow/core/app/runtime/actors/pipeline_pattern/pull_process_push.py index 826015c7..9bb967a7 100644 --- a/buildflow/core/app/runtime/actors/pipeline_pattern/pull_process_push.py +++ b/buildflow/core/app/runtime/actors/pipeline_pattern/pull_process_push.py @@ -201,12 +201,10 @@ async def process_element(element, **process_kwargs): ) try: coros = [] - # NOTE: process dependencies are acquired here (in context) - with self.processor.dependencies() as process_kwargs: - for element in response.payload: + for element in response.payload: + with self.processor.dependencies() as process_kwargs: coros.append(process_element(element, **process_kwargs)) - flattened_results = await asyncio.gather(*coros) - # NOTE: process dependencies are released here (out of context) + flattened_results = await asyncio.gather(*coros) batch_results = [] for results in flattened_results: diff --git a/buildflow/core/dependencies/dependency.py b/buildflow/core/dependencies/dependency.py index 19a39829..c60f257d 100644 --- a/buildflow/core/dependencies/dependency.py +++ b/buildflow/core/dependencies/dependency.py @@ -1,6 +1,6 @@ import inspect from contextlib import ExitStack, contextmanager -from typing import Any, Dict, List, Optional, Type +from typing import Any, Dict, Generic, Optional, Type, TypeVar from buildflow.core.credentials import CredentialType from buildflow.io.primitive import Primitive @@ -10,10 +10,14 @@ class Dependency: def __init__(self, primitive: Primitive): self.primitive = primitive self.credentials: Optional[CredentialType] = None + self.annotated_type: Optional[Type] = None def attach_credentials(self, credential_type: CredentialType) -> None: self.credentials = credential_type + def attach_annotated_type(self, annotated_type: Type) -> None: + self.annotated_type = annotated_type + @contextmanager def materialize(self): raise NotImplementedError( @@ -35,29 +39,12 @@ def materialize(self): yield kwargs -# Python Magic - maybe not the best way to do this but should work as long as no -# threads change their annotations dynamically -class _AnnotationCapturer(type): - captured_annotation: Optional[Type] = None - - def __call__(cls, *args, **kwargs): - print("CALLED") - frame = inspect.currentframe().f_back # Capture the frame immediately - instance = super().__call__(*args, **kwargs) - print("INSTANCE", instance) - for name, value in frame.f_locals.items(): - if value is instance: - print("FOUND: ", name, value) - annotations = frame.f_locals.get("__annotations__", {}) - instance.captured_annotation = annotations.get(name, None) - break - return instance +P = TypeVar("P", bound=Primitive) -class Client(Dependency, metaclass=_AnnotationCapturer): - def __init__(self, primitive: Primitive): +class Client(Generic[P], Dependency): + def __init__(self, primitive: P): super().__init__(primitive) - self.captured_annotation: Optional[Type] = None self.client: Optional[Any] = None @contextmanager @@ -67,9 +54,8 @@ def materialize(self): raise ValueError( "Cannot materialize client without credentials attached" ) - self.client = self.primitive.client_provider().client( - self.credentials, self.captured_annotation - ) + provider = self.primitive.client_provider() + self.client = provider.client(self.credentials, self.annotated_type) try: yield self.client finally: diff --git a/buildflow/io/aws/s3.py b/buildflow/io/aws/s3.py index 4244bb5c..a5f73865 100644 --- a/buildflow/io/aws/s3.py +++ b/buildflow/io/aws/s3.py @@ -21,6 +21,8 @@ class S3Bucket( S3BucketProvider, # Background task provider type None, + # Client provider type + None, ] ): bucket_name: S3BucketName diff --git a/buildflow/io/aws/s3_file_change_stream.py b/buildflow/io/aws/s3_file_change_stream.py index c41e144a..a9f4c7ea 100644 --- a/buildflow/io/aws/s3_file_change_stream.py +++ b/buildflow/io/aws/s3_file_change_stream.py @@ -23,6 +23,8 @@ class S3FileChangeStream( None, # Background task provider type None, + # Client provider type + None, ], CompositePrimitive, ): diff --git a/buildflow/io/aws/sqs.py b/buildflow/io/aws/sqs.py index 064079bd..cef6413a 100644 --- a/buildflow/io/aws/sqs.py +++ b/buildflow/io/aws/sqs.py @@ -19,6 +19,8 @@ class SQSQueue( SQSQueueProvider, # Background task provider type None, + # Client provider type + None, ] ): queue_name: SQSQueueName diff --git a/buildflow/io/duckdb/duckdb.py b/buildflow/io/duckdb/duckdb.py index 8566b3fb..714d56c8 100644 --- a/buildflow/io/duckdb/duckdb.py +++ b/buildflow/io/duckdb/duckdb.py @@ -18,6 +18,8 @@ class DuckDBTable( DuckDBProvider, # Background task provider type None, + # Client provider type + None, ] ): database: DuckDBDatabase diff --git a/buildflow/io/gcp/bigquery_table.py b/buildflow/io/gcp/bigquery_table.py index 3becb5eb..9834a5a0 100644 --- a/buildflow/io/gcp/bigquery_table.py +++ b/buildflow/io/gcp/bigquery_table.py @@ -24,6 +24,8 @@ class BigQueryTable( BigQueryTableProvider, # Background task provider type None, + # Client provider type + None, ] ): dataset: BigQueryDataset diff --git a/buildflow/io/gcp/gcs_file_change_stream.py b/buildflow/io/gcp/gcs_file_change_stream.py index 84a41993..ec5a9bb3 100644 --- a/buildflow/io/gcp/gcs_file_change_stream.py +++ b/buildflow/io/gcp/gcs_file_change_stream.py @@ -24,6 +24,8 @@ class GCSFileChangeStream( None, # Background task provider type None, + # Client provider type + None, ], CompositePrimitive, ): diff --git a/buildflow/io/gcp/pubsub_subscription.py b/buildflow/io/gcp/pubsub_subscription.py index 19e2aa89..dc552980 100644 --- a/buildflow/io/gcp/pubsub_subscription.py +++ b/buildflow/io/gcp/pubsub_subscription.py @@ -28,6 +28,8 @@ class GCPPubSubSubscription( None, # Background task provider type None, + # Client provider type + None, ] ): project_id: GCPProjectID diff --git a/buildflow/io/gcp/pubsub_topic.py b/buildflow/io/gcp/pubsub_topic.py index c89af146..b3719c7e 100644 --- a/buildflow/io/gcp/pubsub_topic.py +++ b/buildflow/io/gcp/pubsub_topic.py @@ -20,6 +20,8 @@ class GCPPubSubTopic( GCPPubSubTopicProvider, # Background task provider type None, + # Client provider type + GCPPubSubTopicProvider, ] ): project_id: GCPProjectID diff --git a/buildflow/io/gcp/storage.py b/buildflow/io/gcp/storage.py index 48d6337f..2a0aefba 100644 --- a/buildflow/io/gcp/storage.py +++ b/buildflow/io/gcp/storage.py @@ -24,6 +24,8 @@ class GCSBucket( GCSBucketProvider, # Background task provider type None, + # Client provider type + None, ] ): project_id: GCPProjectID diff --git a/buildflow/io/local/empty.py b/buildflow/io/local/empty.py index 7db22eee..a76b995d 100644 --- a/buildflow/io/local/empty.py +++ b/buildflow/io/local/empty.py @@ -16,6 +16,8 @@ class Empty( EmptyProvider, # Background task provider type None, + # Client provider type + None, ] ): @classmethod diff --git a/buildflow/io/local/file.py b/buildflow/io/local/file.py index e930fa9f..e1143f9c 100644 --- a/buildflow/io/local/file.py +++ b/buildflow/io/local/file.py @@ -19,6 +19,8 @@ class File( FileProvider, # Background task provider type None, + # Client provider type + None, ] ): file_path: FilePath diff --git a/buildflow/io/local/file_change_stream.py b/buildflow/io/local/file_change_stream.py index 3e1822f5..14fa2baa 100644 --- a/buildflow/io/local/file_change_stream.py +++ b/buildflow/io/local/file_change_stream.py @@ -22,6 +22,8 @@ class LocalFileChangeStream( None, # Background task provider type None, + # Client provider type + None, ] ): file_path: FilePath diff --git a/buildflow/io/local/pulse.py b/buildflow/io/local/pulse.py index fa7aae54..e315bf63 100644 --- a/buildflow/io/local/pulse.py +++ b/buildflow/io/local/pulse.py @@ -17,6 +17,8 @@ class Pulse( None, # Background task provider type None, + # Client provider type + None, ] ): items: Iterable[Any] diff --git a/buildflow/io/postgres/__init__.py b/buildflow/io/postgres/__init__.py index a99c3cf9..9a8cc8e5 100644 --- a/buildflow/io/postgres/__init__.py +++ b/buildflow/io/postgres/__init__.py @@ -1,3 +1,2 @@ # ruff: noqa from .postgres import Postgres -from .utils import read_private_key_file, read_private_key_file_bytes diff --git a/buildflow/io/postgres/postgres.py b/buildflow/io/postgres/postgres.py index e4f1ee70..36a27b9d 100644 --- a/buildflow/io/postgres/postgres.py +++ b/buildflow/io/postgres/postgres.py @@ -1,158 +1,47 @@ import dataclasses -from typing import Optional, Union +from typing import Optional -from buildflow.core.utils import uuid -from buildflow.io.aws.s3 import S3Bucket -from buildflow.io.gcp.storage import GCSBucket -from buildflow.io.primitive import Primitive, PrimitiveType -from buildflow.io.provider import BackgroundTaskProvider, PulumiProvider, SinkProvider -from buildflow.io.snowflake.providers.table_provider import SnowflakeTableProvider -from buildflow.types.portable import FileFormat +from buildflow.io.postgres.providers.postgres_provider import PostgresProvider +from buildflow.io.primitive import Primitive -_DEFAULT_DATABASE_MANAGED = True -_DEFAULT_SCHEMA_MANAGED = True -_DEFAULT_SNOW_PIPE_MANAGED = True -_DEFAULT_STAGE_MANAGED = True -_DEFAULT_FLUSH_TIME_LIMIT_SECS = 60 +# TODO: Support Sessions, created like: +# dialect+driver://username:password@host:port/database_name @dataclasses.dataclass -class SnowflakeTable( +class Postgres( Primitive[ # Pulumi provider type - SnowflakeTableProvider, + None, # Source provider type None, # Sink provider type - SnowflakeTableProvider, + None, # Background task provider type - SnowflakeTableProvider, + None, + # Client provider type + PostgresProvider, ] ): - # TODO: make these types more concrete - # Required arguments - table: str - database: str - schema: str - bucket: Union[S3Bucket, GCSBucket] - # Arguments for authentication - account: Optional[str] + database_name: str + host: str + port: int user: Optional[str] - private_key: Optional[str] - # Optional arguments to configure sink - # If snow pipe is provided, the sink will use snowpipe to load data - # Otherwise you should run `buildflow apply` to have a snow pipe created - # for you. - snow_pipe: Optional[str] = None - # If snowflake_stage is provided, the sink will use the provided stage - # for copying data. - # Otherwise you should run `buildflow apply` to have a stage created - # for you. - snowflake_stage: Optional[str] = None - - # Optional arguments to configure sink - # The maximium number of seconds to wait before flushing to SnowPipe. - flush_time_limit_secs: int = dataclasses.field( - default=_DEFAULT_FLUSH_TIME_LIMIT_SECS, init=False - ) - - # Optional arguments to configure pulumi. These can be set with the: - # .options(...) method - database_managed: bool = dataclasses.field( - default=_DEFAULT_DATABASE_MANAGED, init=False - ) - schema_managed: bool = dataclasses.field( - default=_DEFAULT_SCHEMA_MANAGED, init=False - ) - snow_pipe_managed: bool = dataclasses.field( - default=_DEFAULT_SNOW_PIPE_MANAGED, init=False - ) - stage_managed: bool = dataclasses.field(default=_DEFAULT_STAGE_MANAGED, init=False) - - def __post_init__(self): - if isinstance(self.bucket, S3Bucket): - self.primitive_type = PrimitiveType.AWS - elif isinstance(self.bucket, GCSBucket): - self.primitive_type = PrimitiveType.GCP - else: - raise ValueError( - "Bucket must be of type S3Bucket or GCSBucket. Got: " - f"{type(self.bucket)}" - ) - self.bucket.file_format = FileFormat.PARQUET - self.bucket.file_path = f"{uuid()}.parquet" - self.snow_pipe_managed = self.snow_pipe is None - if self.snow_pipe is None: - self.snow_pipe = "buildflow_managed_snow_pipe" - self.stage_managed = self.snowflake_stage is None - if self.snowflake_stage is None: - self.snowflake_stage = "buildflow_managed_snowflake_stage" + password: Optional[str] def options( self, # Pulumi management options managed: bool = False, - database_managed: bool = _DEFAULT_DATABASE_MANAGED, - schema_managed: bool = _DEFAULT_SCHEMA_MANAGED, - # Sink options - flush_time_limit_secs: int = _DEFAULT_FLUSH_TIME_LIMIT_SECS, - ) -> "SnowflakeTable": + ) -> "Postgres": to_ret = super().options(managed) - to_ret.database_managed = database_managed - to_ret.schema_managed = schema_managed - to_ret.flush_time_limit_secs = flush_time_limit_secs return to_ret - def sink_provider(self) -> SinkProvider: - return SnowflakeTableProvider( - table=self.table, - database=self.database, - schema=self.schema, - bucket=self.bucket, - snow_pipe=self.snow_pipe, - snowflake_stage=self.snowflake_stage, - database_managed=self.database_managed, - schema_managed=self.schema_managed, - snow_pipe_managed=self.snow_pipe_managed, - stage_managed=self.stage_managed, - account=self.account, - user=self.user, - private_key=self.private_key, - flush_time_secs=self.flush_time_limit_secs, - ) - - def _pulumi_provider(self) -> PulumiProvider: - return SnowflakeTableProvider( - table=self.table, - database=self.database, - schema=self.schema, - bucket=self.bucket, - snow_pipe=self.snow_pipe, - snowflake_stage=self.snowflake_stage, - database_managed=self.database_managed, - schema_managed=self.schema_managed, - snow_pipe_managed=self.snow_pipe_managed, - stage_managed=self.stage_managed, - account=self.account, - user=self.user, - private_key=self.private_key, - flush_time_secs=self.flush_time_limit_secs, - ) - - def background_task_provider(self) -> BackgroundTaskProvider: - return SnowflakeTableProvider( - table=self.table, - database=self.database, - schema=self.schema, - bucket=self.bucket._pulumi_provider(), - snow_pipe=self.snow_pipe, - snowflake_stage=self.snowflake_stage, - database_managed=self.database_managed, - schema_managed=self.schema_managed, - snow_pipe_managed=self.snow_pipe_managed, - stage_managed=self.stage_managed, - account=self.account, + def client_provider(self) -> PostgresProvider: + return PostgresProvider( + database_name=self.database_name, + host=self.host, + port=self.port, user=self.user, - private_key=self.private_key, - flush_time_secs=self.flush_time_limit_secs, + password=self.password, ) diff --git a/buildflow/io/postgres/providers/postgres_provider.py b/buildflow/io/postgres/providers/postgres_provider.py index e7f5110f..54cb5c4e 100644 --- a/buildflow/io/postgres/providers/postgres_provider.py +++ b/buildflow/io/postgres/providers/postgres_provider.py @@ -1,225 +1,40 @@ import dataclasses -from typing import List, Optional, Type, Union +from typing import Optional, Type -import pulumi -import pulumi_snowflake +import psycopg2 -from buildflow.core.background_tasks.background_task import BackgroundTask -from buildflow.core.credentials.aws_credentials import AWSCredentials -from buildflow.core.credentials.gcp_credentials import GCPCredentials -from buildflow.io.aws.s3 import S3Bucket -from buildflow.io.gcp.storage import GCSBucket -from buildflow.io.provider import BackgroundTaskProvider, PulumiProvider, SinkProvider -from buildflow.io.snowflake.background_tasks.table_load_background_task import ( - SnowflakeUploadBackgroundTask, -) -from buildflow.io.snowflake.providers.schemas import type_to_snowflake_columns -from buildflow.io.snowflake.strategies.table_sink_startegy import SnowflakeTableSink -from buildflow.io.strategies.sink import SinkStrategy +from buildflow.core.credentials.empty_credentials import EmptyCredentials +from buildflow.io.provider import ClientProvider +from sqlalchemy.orm import Session -class _SnowflakeTableSinkResource(pulumi.ComponentResource): +class PostgresProvider(ClientProvider): def __init__( self, - table: str, - database: str, - schema: str, - bucket: Union[S3Bucket, GCSBucket], - snow_pipe: Optional[str], - snowflake_stage: Optional[str], - database_managed: bool, - schema_managed: bool, - snow_pipe_managed: bool, - stage_managed: bool, - account: str, - user: str, - private_key: str, - # pulumi_resource options (buildflow internal concept) - type_: Optional[Type], - credentials: AWSCredentials, - opts: pulumi.ResourceOptions, + *, + database_name: str, + host: str, + port: Optional[int] = None, + user: Optional[str] = None, + password: Optional[str] = None, ): - super().__init__( - "buildflow:snowflake:Table", - f"buildflow-{database}-{schema}-{table}", - None, - opts, - ) - - outputs = {} - - table_id = f"{database}.{schema}.{table}" - snowflake_provider = pulumi_snowflake.Provider( - resource_name=f"{table_id}.snowflake_provider", - account=account, - username=user, - private_key=private_key, - ) - if type_ is None: - raise ValueError( - "Please specify an output type so we can determine the expected schema " - "of the table." - ) - if hasattr(type_, "__args__"): - # Using a composite type hint like List or Optional - type_ = type_.__args__[0] - columns = type_to_snowflake_columns(type_) - pulumi_snowflake_cols = [] - for column in columns: - pulumi_snowflake_cols.append( - pulumi_snowflake.TableColumnArgs( - name=column.name, type=column.col_type, nullable=column.nullable - ) - ) - - self.database_resource = None - self.schema_resource = None - running_depends = [] - if database_managed: - self.database_resource = pulumi_snowflake.Database( - database, - opts=pulumi.ResourceOptions(parent=self, provider=snowflake_provider), - name=database, - ) - running_depends.append(self.database_resource) - outputs["snowflake.database"] = self.database_resource.name - if schema_managed: - schema_id = f"{database}.{schema}" - self.schema_resource = pulumi_snowflake.Schema( - schema_id, - opts=pulumi.ResourceOptions( - parent=self, - provider=snowflake_provider, - depends_on=running_depends, - ), - name=schema, - database=database, - ) - running_depends.append(self.schema_resource) - outputs["snowflake.schema"] = schema_id - - self.table_resource = pulumi_snowflake.Table( - table_id, - columns=pulumi_snowflake_cols, - database=database, - schema=schema, - name=table, - opts=pulumi.ResourceOptions( - parent=self, provider=snowflake_provider, depends_on=running_depends - ), - ) - outputs["snowflake.table"] = table_id - - self.stage_resource = None - if stage_managed: - snow_stage_id = f"{table_id}.{snowflake_stage}" - stage_credentials = None - if isinstance(credentials, AWSCredentials): - stage_credentials = ( - f"AWS_KEY_ID='{credentials.access_key_id}' " - f"AWS_SECRET_KEY='{credentials.secret_access_key}'" - ) - self.stage_resource = pulumi_snowflake.Stage( - snow_stage_id, - opts=pulumi.ResourceOptions( - parent=self, provider=snowflake_provider, depends_on=running_depends - ), - name=snowflake_stage, - database=database, - schema=schema, - copy_options="MATCH_BY_COLUMN_NAME = CASE_SENSITIVE", - file_format="TYPE = PARQUET", - url=bucket.bucket_url, - credentials=stage_credentials, - ) - outputs["snowflake.stage"] = snow_stage_id - running_depends.append(self.stage_resource) - self.snow_pipe_resource = None - if snow_pipe_managed: - copy_statement = ( - f'copy into "{database}"."{schema}"."{table}" ' - f'from @"{database}"."{schema}"."{snowflake_stage}";' - ) - snow_pipe_id = f"{table_id}.{snow_pipe}" - self.snow_pipe_resource = pulumi_snowflake.Pipe( - snow_pipe_id, - opts=pulumi.ResourceOptions( - parent=self, provider=snowflake_provider, depends_on=running_depends - ), - database=database, - schema=schema, - name=snow_pipe, - copy_statement=copy_statement, - ) - outputs["snowflake.pipe"] = snow_pipe_id - - -@dataclasses.dataclass -class SnowflakeTableProvider(SinkProvider, PulumiProvider, BackgroundTaskProvider): - # Information about the table - table: str - database: str - schema: str - # Options for configuring flushing - flush_time_secs: int - # Bucket for staging data for upload to snowflake - bucket: Union[S3Bucket, GCSBucket] - snow_pipe: Optional[str] - snowflake_stage: Optional[str] - # Options for configuring pulumi - database_managed: bool - schema_managed: bool - snow_pipe_managed: bool - stage_managed: bool - # Authentication information - account: str - user: str - private_key: str - - def sink(self, credentials: Union[AWSCredentials, GCPCredentials]) -> SinkStrategy: - return SnowflakeTableSink( - credentials=credentials, - bucket_sink=self.bucket.sink_provider().sink(credentials), - ) - - def background_tasks( - self, credentials: Union[AWSCredentials, GCPCredentials] - ) -> List[BackgroundTask]: - return [ - SnowflakeUploadBackgroundTask( - credentials=credentials, - bucket_name=self.bucket.bucket_name, - account=self.account, + self.database_name = database_name + self.host = host + self.port = port + self.user = user + self.password = password + + def client(self, credentials: EmptyCredentials, client_type: Optional[Type]): + if client_type == psycopg2.connection or client_type is None: + return psycopg2.connect( + database=self.database_name, + host=self.host, + port=self.port, user=self.user, - database=self.database, - schema=self.schema, - private_key=self.private_key, - pipe=self.snow_pipe, - flush_time_secs=self.flush_time_secs, + password=self.password, ) - ] - - def pulumi_resource( - self, - type_: Optional[Type], - credentials: Union[AWSCredentials, GCPCredentials], - opts: pulumi.ResourceOptions, - ) -> _SnowflakeTableSinkResource: - return _SnowflakeTableSinkResource( - table=self.table, - database=self.database, - schema=self.schema, - bucket=self.bucket, - snowflake_stage=self.snowflake_stage, - snow_pipe=self.snow_pipe, - database_managed=self.database_managed, - schema_managed=self.schema_managed, - snow_pipe_managed=self.snow_pipe_managed, - stage_managed=self.stage_managed, - account=self.account, - user=self.user, - private_key=self.private_key, - type_=type_, - credentials=credentials, - opts=opts, - ) + elif client_type == Session: + engine = + + else: + raise ValueError(f"Unsupported client type {client_type}") diff --git a/buildflow/io/primitive.py b/buildflow/io/primitive.py index a579455d..b0b36954 100644 --- a/buildflow/io/primitive.py +++ b/buildflow/io/primitive.py @@ -24,7 +24,7 @@ class PrimitiveType(enum.Enum): class Primitive(Generic[PPT, SOT, SIT, BTT, CLT]): - primitive_type: PrimitiveType + primitive_type: PrimitiveType = PrimitiveType.AGNOSTIC _managed: bool = False def enable_managed(self): @@ -101,7 +101,7 @@ def options(self) -> "Primitive": return copy.deepcopy(self) -class GCPPrimtive(Primitive[PPT, SOT, SIT, BTT]): +class GCPPrimtive(Primitive[PPT, SOT, SIT, BTT, CLT]): # TODO: We need to check the infra State to warn the user if the infra has not been # created yet. primitive_type = PrimitiveType.GCP @@ -112,7 +112,7 @@ def from_gcp_options(cls, gcp_options: GCPOptions) -> "GCPPrimtive": raise NotImplementedError("GCPPrimtive.from_gcp_options() is not implemented.") -class AWSPrimtive(Primitive[PPT, SOT, SIT, BTT]): +class AWSPrimtive(Primitive[PPT, SOT, SIT, BTT, CLT]): primitive_type = PrimitiveType.AWS @classmethod @@ -121,7 +121,7 @@ def from_aws_options(cls, aws_options: AWSOptions) -> "AWSPrimtive": raise NotImplementedError("AWSPrimtive.from_aws_options() is not implemented.") -class AzurePrimtive(Primitive[PPT, SOT, SIT, BTT]): +class AzurePrimtive(Primitive[PPT, SOT, SIT, BTT, CLT]): primitive_type = PrimitiveType.AZURE @classmethod @@ -132,7 +132,7 @@ def from_azure_options(cls, azure_options: AzureOptions) -> "AzurePrimtive": ) -class LocalPrimtive(Primitive[PPT, SOT, SIT, BTT]): +class LocalPrimtive(Primitive[PPT, SOT, SIT, BTT, CLT]): primitive_type = PrimitiveType.LOCAL # LocalPrimitives are never managed. managed: bool = False diff --git a/buildflow/io/snowflake/snowflake_table.py b/buildflow/io/snowflake/snowflake_table.py index e4f1ee70..d01d2ce9 100644 --- a/buildflow/io/snowflake/snowflake_table.py +++ b/buildflow/io/snowflake/snowflake_table.py @@ -27,6 +27,8 @@ class SnowflakeTable( SnowflakeTableProvider, # Background task provider type SnowflakeTableProvider, + # Client provider type + None, ] ): # TODO: make these types more concrete diff --git a/pyproject.toml b/pyproject.toml index f67483dd..a3f03b48 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,6 +32,9 @@ dependencies = [ "opentelemetry-exporter-otlp", "opentelemetry-exporter-jaeger", "pandas", + # TODO: use psycopg2 instead of psycopg2-binary + # more context: https://www.psycopg.org/docs/install.html#psycopg-vs-psycopg-binary + "psycopg2-binary", "pulumi==3.35.3", "pulumi_aws", "pulumi_gcp",