diff --git a/.env.example b/.env.example index 08c68a64..abf09c56 100644 --- a/.env.example +++ b/.env.example @@ -23,3 +23,10 @@ DAGSTER_HOME=/tmp/dagster-home # This is used to put generated dbt profiles for dagster in a specific place DAGSTER_DBT_TARGET_BASE_DIR=/tmp/dagster-home/generated-dbt DAGSTER_DBT_PARSE_PROJECT_ON_LOAD=1 +# Used when loading dlt assets into a staging area +DAGSTER_STAGING_BUCKET_URL= + +# Uncomment the next two vars to use gcp secrets (you'll need to have gcp +# secrets configured) +#DAGSTER_USE_LOCAL_SECRETS=False +#DAGSTER_GCP_SECRETS_PREFIX=dagster \ No newline at end of file diff --git a/warehouse/oso_dagster/assets/dbt.py b/warehouse/oso_dagster/assets/dbt.py index ffabbdee..03e6bdcb 100644 --- a/warehouse/oso_dagster/assets/dbt.py +++ b/warehouse/oso_dagster/assets/dbt.py @@ -2,7 +2,12 @@ import os from typing import Any, Mapping, Dict, List, Sequence, Optional -from dagster import AssetExecutionContext, AssetKey, AssetsDefinition, Config +from dagster import ( + AssetExecutionContext, + AssetKey, + AssetsDefinition, + Config, +) from dagster_dbt import DbtCliResource, dbt_assets, DagsterDbtTranslator from ..constants import main_dbt_manifests, main_dbt_project_dir, dbt_profiles_dir @@ -11,11 +16,13 @@ class CustomDagsterDbtTranslator(DagsterDbtTranslator): def __init__( self, + environment: str, prefix: Sequence[str], internal_schema_map: Dict[str, List[str]], ): self._prefix = prefix self._internal_schema_map = internal_schema_map + self._environment = environment def get_asset_key(self, dbt_resource_props: Mapping[str, Any]) -> AssetKey: asset_key = super().get_asset_key(dbt_resource_props) @@ -37,6 +44,14 @@ def get_asset_key(self, dbt_resource_props: Mapping[str, Any]) -> AssetKey: final_key = asset_key return final_key + def get_tags(self, dbt_resource_props: Mapping[str, Any]) -> Mapping[str, str]: + tags: Dict[str, str] = dict() + materialization = dbt_resource_props.get("config", {}).get("materialized") + if materialization: + tags["dbt/materialized"] = materialization + tags["opensource.observer/environment"] = self._environment + return tags + class DBTConfig(Config): full_refresh: bool = False @@ -50,7 +65,7 @@ def generate_dbt_asset( internal_map: Dict[str, List[str]], ): print(f"Target[{target}] using profiles dir {dbt_profiles_dir}") - translator = CustomDagsterDbtTranslator(["dbt", target], internal_map) + translator = CustomDagsterDbtTranslator(target, ["dbt", target], internal_map) @dbt_assets( name=f"{target}_dbt", diff --git a/warehouse/oso_dagster/assets/farcaster.py b/warehouse/oso_dagster/assets/farcaster.py index 040a846d..11174ffb 100644 --- a/warehouse/oso_dagster/assets/farcaster.py +++ b/warehouse/oso_dagster/assets/farcaster.py @@ -1,23 +1,20 @@ from ..factories import ( - create_bq_dts_asset, - BqDtsAssetConfig, + bq_dts_asset, ) from ..utils.bq_dts import BigQuerySourceConfig from ..utils.common import TimeInterval, SourceMode -farcaster_data = create_bq_dts_asset( - BqDtsAssetConfig( - key_prefix="farcaster", - asset_name="bq_dts_source", - display_name="farcaster", - destination_project_id="opensource-observer", - destination_dataset_name="farcaster", - source_config=BigQuerySourceConfig( - source_project_id="glossy-odyssey-366820", - source_dataset_name="farcaster", - service_account=None - ), - copy_interval=TimeInterval.Weekly, - copy_mode=SourceMode.Overwrite, +farcaster_data = bq_dts_asset( + key_prefix="farcaster", + asset_name="bq_dts_source", + display_name="farcaster", + destination_project_id="opensource-observer", + destination_dataset_name="farcaster", + source_config=BigQuerySourceConfig( + source_project_id="glossy-odyssey-366820", + source_dataset_name="farcaster", + service_account=None, ), + copy_interval=TimeInterval.Weekly, + copy_mode=SourceMode.Overwrite, ) diff --git a/warehouse/oso_dagster/assets/lens.py b/warehouse/oso_dagster/assets/lens.py index 3f031f0b..e747b437 100644 --- a/warehouse/oso_dagster/assets/lens.py +++ b/warehouse/oso_dagster/assets/lens.py @@ -1,23 +1,20 @@ from ..factories import ( - create_bq_dts_asset, - BqDtsAssetConfig, + bq_dts_asset, ) from ..utils.bq_dts import BigQuerySourceConfig from ..utils.common import TimeInterval, SourceMode -farcaster_data = create_bq_dts_asset( - BqDtsAssetConfig( - key_prefix="lens", - asset_name="bq_dts_source", - display_name="lens", - destination_project_id="opensource-observer", - destination_dataset_name="lens_v2_polygon", - source_config=BigQuerySourceConfig( - source_project_id="lens-public-data", - source_dataset_name="v2_polygon", - service_account=None - ), - copy_interval=TimeInterval.Weekly, - copy_mode=SourceMode.Overwrite, +farcaster_data = bq_dts_asset( + key_prefix="lens", + asset_name="bq_dts_source", + display_name="lens", + destination_project_id="opensource-observer", + destination_dataset_name="lens_v2_polygon", + source_config=BigQuerySourceConfig( + source_project_id="lens-public-data", + source_dataset_name="v2_polygon", + service_account=None, ), + copy_interval=TimeInterval.Weekly, + copy_mode=SourceMode.Overwrite, ) diff --git a/warehouse/oso_dagster/factories/bq_dts.py b/warehouse/oso_dagster/factories/bq_dts.py index 89b8cb28..9815712d 100644 --- a/warehouse/oso_dagster/factories/bq_dts.py +++ b/warehouse/oso_dagster/factories/bq_dts.py @@ -8,8 +8,14 @@ from dagster_gcp import BigQueryResource from .common import AssetFactoryResponse from ..resources import BigQueryDataTransferResource -from ..utils.bq import ensure_dataset, DatasetOptions -from ..utils.bq_dts import ensure_bq_dts_transfer, BqDtsTransferConfig +from ..utils import ( + ensure_dataset, + DatasetOptions, + ensure_bq_dts_transfer, + BqDtsTransferConfig, + unpack_config, +) + @dataclass(kw_only=True) class BqDtsAssetConfig(BqDtsTransferConfig): @@ -20,27 +26,51 @@ class BqDtsAssetConfig(BqDtsTransferConfig): # Dagster remaining args asset_kwargs: dict = field(default_factory=lambda: {}) -def create_bq_dts_asset(asset_config: BqDtsAssetConfig): - @asset(name=asset_config.asset_name, key_prefix=asset_config.key_prefix, **asset_config.asset_kwargs) - def bq_dts_asset( + environment: str = "production" + + +@unpack_config(BqDtsAssetConfig) +def bq_dts_asset(asset_config: BqDtsAssetConfig): + tags = { + "opensource.observer/factory": "bigquery_dts", + "opensource.observer/environment": asset_config.environment, + "opensource.observer/type": "source", + } + + @asset( + name=asset_config.asset_name, + key_prefix=asset_config.key_prefix, + compute_kind="bigquery_dts", + tags=tags, + **asset_config.asset_kwargs, + ) + def _bq_dts_asset( context: AssetExecutionContext, bigquery: BigQueryResource, - bigquery_datatransfer: BigQueryDataTransferResource + bigquery_datatransfer: BigQueryDataTransferResource, ) -> MaterializeResult: - context.log.info(f"Materializing a BigQuery Data Transfer asset called {asset_config.asset_name}") + context.log.info( + f"Materializing a BigQuery Data Transfer asset called {asset_config.asset_name}" + ) with bigquery.get_client() as bq_client: ensure_dataset( bq_client, DatasetOptions( - dataset_ref=bq_client.dataset(dataset_id=asset_config.destination_dataset_name), + dataset_ref=bq_client.dataset( + dataset_id=asset_config.destination_dataset_name + ), is_public=True, ), ) - context.log.info(f"Ensured dataset named {asset_config.destination_dataset_name}") + context.log.info( + f"Ensured dataset named {asset_config.destination_dataset_name}" + ) with bigquery_datatransfer.get_client() as bq_dts_client: ensure_bq_dts_transfer(bq_dts_client, asset_config, context.log) - context.log.info(f"Ensured transfer config named {asset_config.display_name}") + context.log.info( + f"Ensured transfer config named {asset_config.display_name}" + ) return MaterializeResult( metadata={ @@ -48,4 +78,4 @@ def bq_dts_asset( } ) - return AssetFactoryResponse([bq_dts_asset]) + return AssetFactoryResponse([_bq_dts_asset]) diff --git a/warehouse/oso_dagster/factories/gcs.py b/warehouse/oso_dagster/factories/gcs.py index 75befbfc..a11e3d32 100644 --- a/warehouse/oso_dagster/factories/gcs.py +++ b/warehouse/oso_dagster/factories/gcs.py @@ -1,5 +1,5 @@ import re -from typing import Optional, Sequence +from typing import Optional, Sequence, Dict from dataclasses import dataclass, field import arrow @@ -22,8 +22,14 @@ from dagster_gcp import BigQueryResource, GCSResource from .common import AssetFactoryResponse -from ..utils.bq import ensure_dataset, DatasetOptions -from ..utils.common import TimeInterval, SourceMode +from ..utils import ( + ensure_dataset, + DatasetOptions, + TimeInterval, + SourceMode, + add_tags, + add_key_prefix_as_tag, +) @dataclass(kw_only=True) @@ -50,6 +56,10 @@ class BaseGCSAsset: # Dagster remaining arguments asset_kwargs: dict = field(default_factory=lambda: {}) + tags: Dict[str, str] = field(default_factory=lambda: {}) + + environment: str = "production" + @dataclass(kw_only=True) class IntervalGCSAsset(BaseGCSAsset): @@ -69,7 +79,28 @@ def interval_gcs_import_asset(config: IntervalGCSAsset): # Find all of the "intervals" in the bucket and load them into the `raw_sources` dataset # Run these sources through a secondary dbt model into `clean_sources` - @asset(name=config.name, key_prefix=config.key_prefix, **config.asset_kwargs) + tags = { + "opensource.observer/factory": "gcs", + "opensource.observer/environment": config.environment, + } + + # Extend with additional tags + tags.update(config.tags) + + tags = add_key_prefix_as_tag(tags, config.key_prefix) + + @asset( + name=config.name, + key_prefix=config.key_prefix, + tags=add_tags( + tags, + { + "opensource.observer/type": "source", + }, + ), + compute_kind="gcs", + **config.asset_kwargs, + ) def gcs_asset( context: AssetExecutionContext, bigquery: BigQueryResource, gcs: GCSResource ) -> MaterializeResult: @@ -198,12 +229,12 @@ def gcs_asset( asset_config = config - @op(name=f"{config.name}_clean_up_op") + @op(name=f"{config.name}_clean_up_op", tags=tags) def gcs_clean_up_op(context: OpExecutionContext, config: dict): context.log.info(f"Running clean up for {asset_config.name}") print(config) - @job(name=f"{config.name}_clean_up_job") + @job(name=f"{config.name}_clean_up_job", tags=tags) def gcs_clean_up_job(): gcs_clean_up_op() @@ -216,7 +247,6 @@ def gcs_clean_up_job(): def gcs_clean_up_sensor( context: SensorEvaluationContext, gcs: GCSResource, asset_event: EventLogEntry ): - print("EVENT!!!!!!") yield RunRequest( run_key=context.cursor, run_config=RunConfig( diff --git a/warehouse/oso_dagster/factories/goldsky/assets.py b/warehouse/oso_dagster/factories/goldsky/assets.py index b86fa1e5..35f29e19 100644 --- a/warehouse/oso_dagster/factories/goldsky/assets.py +++ b/warehouse/oso_dagster/factories/goldsky/assets.py @@ -49,7 +49,7 @@ from .. import AssetFactoryResponse from .config import GoldskyConfig, GoldskyConfigInterface, SchemaDict from ..common import AssetDeps, AssetList -from ...utils.gcs import batch_delete_blobs +from ...utils import batch_delete_blobs, add_tags GenericExecutionContext = AssetExecutionContext | OpExecutionContext @@ -407,78 +407,6 @@ def delete_all_gcs_files_in_prefix( return -def blocking_update_pointer_table( - context: GenericExecutionContext, - config: GoldskyConfig, - bigquery: BigQueryResource, - job_id: str, - worker: str, - pointer_table: str, - new_checkpoint: GoldskyCheckpoint, - latest_checkpoint: GoldskyCheckpoint | None, - wildcard_path: str, -): - with bigquery.get_client() as client: - dest_table_ref = client.get_dataset( - config.working_destination_dataset_name - ).table(f"{config.destination_table_name}_{worker}") - new = False - try: - client.get_table(dest_table_ref) - except NotFound: - # If the table doesn't exist just create it. An existing table will - # be there if a previous run happened to fail midway. - new = True - - if not new: - context.log.info("Merging into worker table") - client.query_and_wait( - f""" - LOAD DATA OVERWRITE `{config.project_id}.{config.working_destination_dataset_name}.{config.destination_table_name}_{worker}_{job_id}` - FROM FILES ( - format = "PARQUET", - uris = ["{wildcard_path}"] - ); - """ - ) - tx_query = f""" - BEGIN TRANSACTION; - INSERT INTO `{config.project_id}.{config.working_destination_dataset_name}.{config.destination_table_name}_{worker}` - SELECT * FROM `{config.project_id}.{config.working_destination_dataset_name}.{config.destination_table_name}_{worker}_{job_id}`; - - DELETE FROM `{pointer_table}` WHERE worker = '{worker}'; - - INSERT INTO `{pointer_table}` (worker, job_id, timestamp, checkpoint) - VALUES ('{worker}', '{new_checkpoint.job_id}', {new_checkpoint.timestamp}, {new_checkpoint.worker_checkpoint}); - COMMIT TRANSACTION; - """ - context.log.debug(f"query: {tx_query}") - client.query_and_wait(tx_query) - client.query_and_wait( - f""" - DROP TABLE `{config.project_id}.{config.working_destination_dataset_name}.{config.destination_table_name}_{worker}_{job_id}`; - """ - ) - else: - context.log.info("Creating new worker table") - query1 = f""" - LOAD DATA OVERWRITE `{config.project_id}.{config.working_destination_dataset_name}.{config.destination_table_name}_{worker}` - FROM FILES ( - format = "PARQUET", - uris = ["{wildcard_path}"] - ); - """ - context.log.debug(f"query: {query1}") - client.query_and_wait(query1) - rows = client.query_and_wait( - f""" - INSERT INTO `{pointer_table}` (worker, job_id, timestamp, checkpoint) - VALUES ('{worker}', '{new_checkpoint.job_id}', {new_checkpoint.timestamp}, {new_checkpoint.worker_checkpoint}); - """ - ) - context.log.info(rows) - - def decimal_convert(name: str, field: PolarsDataType): field = cast(polars.Decimal, field) if field.precision == 100 and field.scale == 0: @@ -1013,7 +941,21 @@ def materialize_asset( deps = deps or [] deps = cast(AssetDeps, deps) - @asset(name=asset_config.name, key_prefix=asset_config.key_prefix, deps=deps) + + key_prefix = asset_config.key_prefix + + tags: Dict[str, str] = { + "opensource.observer/factory": "goldsky", + "opensource.observer/environment": asset_config.environment, + } + + if key_prefix: + group_name = key_prefix if isinstance(key_prefix, str) else "__".join(list(key_prefix)) + tags["opensource.observer/group"] = group_name + + @asset(name=asset_config.name, key_prefix=asset_config.key_prefix, deps=deps, compute_kind="goldsky", tags=add_tags(tags, { + "opensource.observer/type": "source", + })) def generated_asset( context: AssetExecutionContext, bigquery: BigQueryResource, @@ -1025,7 +967,9 @@ def generated_asset( related_ops_prefix = "_".join(generated_asset.key.path) - @op(name=f"{related_ops_prefix}_clean_up_op") + @op(name=f"{related_ops_prefix}_clean_up_op", tags=add_tags(tags, { + "opensource.observer/op-type": "clean-up" + })) def goldsky_clean_up_op( context: OpExecutionContext, bigquery: BigQueryResource, @@ -1037,7 +981,9 @@ def goldsky_clean_up_op( gs_asset = GoldskyAsset(gcs, bigquery, cbt, asset_config) gs_asset.clean_up(context.log) - @op(name=f"{related_ops_prefix}_backfill_op") + @op(name=f"{related_ops_prefix}_backfill_op", tags=add_tags(tags, { + "opensource.observer/op-type": "manual-backfill" + })) def goldsky_backfill_op( context: OpExecutionContext, bigquery: BigQueryResource, @@ -1068,7 +1014,9 @@ def goldsky_backfill_op( pointer_table_suffix=op_input.backfill_label, ) - @op(name=f"{related_ops_prefix}_files_stats_op") + @op(name=f"{related_ops_prefix}_files_stats_op", tags=add_tags(tags,{ + "opensource.observer/op-type": "debug" + })) def goldsky_files_stats_op( context: OpExecutionContext, bigquery: BigQueryResource, @@ -1128,7 +1076,9 @@ def goldsky_files_stats_op( # Log the metadata context.add_output_metadata({"bucket_stats": table_metadata}) - @op(name=f"{related_ops_prefix}_load_schema_op") + @op(name=f"{related_ops_prefix}_load_schema_op", tags=add_tags(tags, { + "opensource.observer/op-type": "debug" + })) def goldsky_load_schema_op( context: OpExecutionContext, bigquery: BigQueryResource, @@ -1157,19 +1107,19 @@ def goldsky_load_schema_op( context.add_output_metadata({"schema": table_metadata}) - @job(name=f"{related_ops_prefix}_clean_up_job") + @job(name=f"{related_ops_prefix}_clean_up_job", tags=tags) def goldsky_clean_up_job(): goldsky_clean_up_op() - @job(name=f"{related_ops_prefix}_files_stats_job") + @job(name=f"{related_ops_prefix}_files_stats_job", tags=tags) def goldsky_files_stats_job(): goldsky_files_stats_op() - @job(name=f"{related_ops_prefix}_load_schema_job") + @job(name=f"{related_ops_prefix}_load_schema_job", tags=tags) def goldsky_load_schema_job(): goldsky_load_schema_op() - @job(name=f"{related_ops_prefix}_backfill_job") + @job(name=f"{related_ops_prefix}_backfill_job", tags=tags) def goldsky_backfill_job(): goldsky_backfill_op() diff --git a/warehouse/oso_dagster/factories/goldsky/config.py b/warehouse/oso_dagster/factories/goldsky/config.py index ebd2faaf..ad8cfb1d 100644 --- a/warehouse/oso_dagster/factories/goldsky/config.py +++ b/warehouse/oso_dagster/factories/goldsky/config.py @@ -32,6 +32,7 @@ class GoldskyConfigInterface(TypedDict): project_id: str source_name: str destination_table_name: str + environment: NotRequired[str] pointer_size: NotRequired[int] max_objects_to_load: NotRequired[int] destination_dataset_name: NotRequired[str] @@ -62,6 +63,7 @@ class GoldskyConfig: project_id: str source_name: str destination_table_name: str + environment: str = "production" # Maximum number of objects we can load into a load job is 10000 so the # largest this can be is 10000. diff --git a/warehouse/oso_dagster/factories/sql.py b/warehouse/oso_dagster/factories/sql.py index daaf83b4..3bc13a89 100644 --- a/warehouse/oso_dagster/factories/sql.py +++ b/warehouse/oso_dagster/factories/sql.py @@ -83,6 +83,8 @@ def sql_assets( source_credential_reference: SecretReference, sql_tables: List[TopLevelSQLTableOptions], group_name: str = "", + environment: str = "production", + asset_type: str = "source" ): """A convenience sql asset factory that should handle any basic incremental table or or full refresh sql source and configure a destination to the @@ -95,7 +97,13 @@ def factory( project_id: str, dlt_gcs_staging: dlt.destinations.filesystem, ): - translator = PrefixedDltTranslator(source_name) + + tags = { + "opensource.observer/environment": environment, + "opensource.observer/factory": "sql_dlt", + "opensource.observer/type": asset_type, + } + translator = PrefixedDltTranslator(source_name, tags) connection_string = secrets.resolve_as_str(source_credential_reference) credentials = ConnectionStringCredentials(connection_string) @@ -119,10 +127,11 @@ def factory( staging=dlt_gcs_staging, progress="log", ) + asset_def = _generate_asset_for_table( + source_name, credentials, pipeline, table, translator + ) assets.append( - _generate_asset_for_table( - source_name, credentials, pipeline, table, translator - ) + asset_def ) return AssetFactoryResponse(assets=assets) @@ -136,12 +145,14 @@ class PrefixedDltTranslator(DagsterDltTranslator): def __init__( self, source_name: str, + tags: Dict[str, str], prefix: Optional[Sequence[str]] = None, include_deps: bool = False, ): self._prefix = prefix or cast(Sequence[str], []) self._source_name = source_name self._include_deps = include_deps + self._tags = tags.copy() def get_asset_key(self, resource: DltResource) -> AssetKey: key: List[str] = [] @@ -160,3 +171,7 @@ def get_deps_asset_keys(self, resource: DltResource) -> Iterable[AssetKey]: key.append("sources") key.append(resource.name) return [AssetKey(key)] + + def get_tags(self, resource: DltResource): + # As of 2024-07-10 This doesn't work. We will make a PR upstream + return self._tags diff --git a/warehouse/oso_dagster/utils/__init__.py b/warehouse/oso_dagster/utils/__init__.py index 3e2e79c4..1a5be43c 100644 --- a/warehouse/oso_dagster/utils/__init__.py +++ b/warehouse/oso_dagster/utils/__init__.py @@ -5,3 +5,8 @@ from .dbt import * from .gcs import * from .retry import * +from .tags import * +from .bq_dts import * +from .retry import * +from .common import * +from .types import * diff --git a/warehouse/oso_dagster/utils/tags.py b/warehouse/oso_dagster/utils/tags.py new file mode 100644 index 00000000..1bcedf22 --- /dev/null +++ b/warehouse/oso_dagster/utils/tags.py @@ -0,0 +1,23 @@ +from typing import Mapping, Sequence + + +def add_tags( + tags: Mapping[str, str], additional_tags: Mapping[str, str] +) -> Mapping[str, str]: + new_tags = dict(tags) + new_tags.update(additional_tags) + return new_tags + + +def add_key_prefix_as_tag( + tags: Mapping[str, str], key_prefix: Sequence[str] | str | None +): + if key_prefix: + return add_tags( + tags, {"opensource.observer/group": key_prefix_to_group_name(key_prefix)} + ) + return add_tags(tags, {}) + + +def key_prefix_to_group_name(key_prefix: Sequence[str] | str): + return key_prefix if isinstance(key_prefix, str) else "__".join(list(key_prefix)) diff --git a/warehouse/oso_dagster/utils/types.py b/warehouse/oso_dagster/utils/types.py new file mode 100644 index 00000000..afe1c059 --- /dev/null +++ b/warehouse/oso_dagster/utils/types.py @@ -0,0 +1,54 @@ +from typing import Callable + +type ConfigCallable[T, **P] = Callable[P, T] + + +def unpack_config[ + R, T, **P +](conf: ConfigCallable[T, P]) -> Callable[[Callable[[T], R]], Callable[P, R]]: + """This decorator allows a short hand method to create a simple interface to + a function such that all arguments and keyword arguments are derived from + some kind of configuration callable. Generally this would be something like + a dataclass that can be used to create factories, but any form of callable + can be used. This is a convenience function mostly for better type + annotations. + + Args: + conf: The config callable + + Example: + + from dataclasses import dataclass + + @dataclass(kw_only=True) + class FactoryConfig: + foo: str + bar: str = "bar" + + class Example: + def __init__(self, foo: str, bar: str): + self.foo = foo + self.bar = bar + + def show(self): + print(self.foo) + print(self.bar) + + @config_function(FactoryConfig) + def my_factory(config: FactoryConfig): + # do something with the config + return + + # Later on this function can be called like this: + ex = my_factory(foo="foo") + ex.show() # should output foo and bar + """ + + def _wrapper(f: Callable[[T], R]): + def _inner(*args: P.args, **kwargs: P.kwargs): + config = conf(*args, **kwargs) + return f(config) + + return _inner + + return _wrapper