Skip to content

Commit

Permalink
Add asset tags to many assets (#1766)
Browse files Browse the repository at this point in the history
* Adds tags to goldsky assets

* Add tags to gcs

* Adds tags to dbt resources

* More asset tags
  • Loading branch information
ravenac95 committed Jul 11, 2024
1 parent 4f4cb97 commit 3469347
Show file tree
Hide file tree
Showing 12 changed files with 263 additions and 138 deletions.
7 changes: 7 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,10 @@ DAGSTER_HOME=/tmp/dagster-home
# This is used to put generated dbt profiles for dagster in a specific place
DAGSTER_DBT_TARGET_BASE_DIR=/tmp/dagster-home/generated-dbt
DAGSTER_DBT_PARSE_PROJECT_ON_LOAD=1
# Used when loading dlt assets into a staging area
DAGSTER_STAGING_BUCKET_URL=

# Uncomment the next two vars to use gcp secrets (you'll need to have gcp
# secrets configured)
#DAGSTER_USE_LOCAL_SECRETS=False
#DAGSTER_GCP_SECRETS_PREFIX=dagster
19 changes: 17 additions & 2 deletions warehouse/oso_dagster/assets/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@
import os
from typing import Any, Mapping, Dict, List, Sequence, Optional

from dagster import AssetExecutionContext, AssetKey, AssetsDefinition, Config
from dagster import (
AssetExecutionContext,
AssetKey,
AssetsDefinition,
Config,
)
from dagster_dbt import DbtCliResource, dbt_assets, DagsterDbtTranslator

from ..constants import main_dbt_manifests, main_dbt_project_dir, dbt_profiles_dir
Expand All @@ -11,11 +16,13 @@
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
def __init__(
self,
environment: str,
prefix: Sequence[str],
internal_schema_map: Dict[str, List[str]],
):
self._prefix = prefix
self._internal_schema_map = internal_schema_map
self._environment = environment

def get_asset_key(self, dbt_resource_props: Mapping[str, Any]) -> AssetKey:
asset_key = super().get_asset_key(dbt_resource_props)
Expand All @@ -37,6 +44,14 @@ def get_asset_key(self, dbt_resource_props: Mapping[str, Any]) -> AssetKey:
final_key = asset_key
return final_key

def get_tags(self, dbt_resource_props: Mapping[str, Any]) -> Mapping[str, str]:
tags: Dict[str, str] = dict()
materialization = dbt_resource_props.get("config", {}).get("materialized")
if materialization:
tags["dbt/materialized"] = materialization
tags["opensource.observer/environment"] = self._environment
return tags


class DBTConfig(Config):
full_refresh: bool = False
Expand All @@ -50,7 +65,7 @@ def generate_dbt_asset(
internal_map: Dict[str, List[str]],
):
print(f"Target[{target}] using profiles dir {dbt_profiles_dir}")
translator = CustomDagsterDbtTranslator(["dbt", target], internal_map)
translator = CustomDagsterDbtTranslator(target, ["dbt", target], internal_map)

@dbt_assets(
name=f"{target}_dbt",
Expand Down
29 changes: 13 additions & 16 deletions warehouse/oso_dagster/assets/farcaster.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,20 @@
from ..factories import (
create_bq_dts_asset,
BqDtsAssetConfig,
bq_dts_asset,
)
from ..utils.bq_dts import BigQuerySourceConfig
from ..utils.common import TimeInterval, SourceMode

farcaster_data = create_bq_dts_asset(
BqDtsAssetConfig(
key_prefix="farcaster",
asset_name="bq_dts_source",
display_name="farcaster",
destination_project_id="opensource-observer",
destination_dataset_name="farcaster",
source_config=BigQuerySourceConfig(
source_project_id="glossy-odyssey-366820",
source_dataset_name="farcaster",
service_account=None
),
copy_interval=TimeInterval.Weekly,
copy_mode=SourceMode.Overwrite,
farcaster_data = bq_dts_asset(
key_prefix="farcaster",
asset_name="bq_dts_source",
display_name="farcaster",
destination_project_id="opensource-observer",
destination_dataset_name="farcaster",
source_config=BigQuerySourceConfig(
source_project_id="glossy-odyssey-366820",
source_dataset_name="farcaster",
service_account=None,
),
copy_interval=TimeInterval.Weekly,
copy_mode=SourceMode.Overwrite,
)
29 changes: 13 additions & 16 deletions warehouse/oso_dagster/assets/lens.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,20 @@
from ..factories import (
create_bq_dts_asset,
BqDtsAssetConfig,
bq_dts_asset,
)
from ..utils.bq_dts import BigQuerySourceConfig
from ..utils.common import TimeInterval, SourceMode

farcaster_data = create_bq_dts_asset(
BqDtsAssetConfig(
key_prefix="lens",
asset_name="bq_dts_source",
display_name="lens",
destination_project_id="opensource-observer",
destination_dataset_name="lens_v2_polygon",
source_config=BigQuerySourceConfig(
source_project_id="lens-public-data",
source_dataset_name="v2_polygon",
service_account=None
),
copy_interval=TimeInterval.Weekly,
copy_mode=SourceMode.Overwrite,
farcaster_data = bq_dts_asset(
key_prefix="lens",
asset_name="bq_dts_source",
display_name="lens",
destination_project_id="opensource-observer",
destination_dataset_name="lens_v2_polygon",
source_config=BigQuerySourceConfig(
source_project_id="lens-public-data",
source_dataset_name="v2_polygon",
service_account=None,
),
copy_interval=TimeInterval.Weekly,
copy_mode=SourceMode.Overwrite,
)
52 changes: 41 additions & 11 deletions warehouse/oso_dagster/factories/bq_dts.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,14 @@
from dagster_gcp import BigQueryResource
from .common import AssetFactoryResponse
from ..resources import BigQueryDataTransferResource
from ..utils.bq import ensure_dataset, DatasetOptions
from ..utils.bq_dts import ensure_bq_dts_transfer, BqDtsTransferConfig
from ..utils import (
ensure_dataset,
DatasetOptions,
ensure_bq_dts_transfer,
BqDtsTransferConfig,
unpack_config,
)


@dataclass(kw_only=True)
class BqDtsAssetConfig(BqDtsTransferConfig):
Expand All @@ -20,32 +26,56 @@ class BqDtsAssetConfig(BqDtsTransferConfig):
# Dagster remaining args
asset_kwargs: dict = field(default_factory=lambda: {})

def create_bq_dts_asset(asset_config: BqDtsAssetConfig):
@asset(name=asset_config.asset_name, key_prefix=asset_config.key_prefix, **asset_config.asset_kwargs)
def bq_dts_asset(
environment: str = "production"


@unpack_config(BqDtsAssetConfig)
def bq_dts_asset(asset_config: BqDtsAssetConfig):
tags = {
"opensource.observer/factory": "bigquery_dts",
"opensource.observer/environment": asset_config.environment,
"opensource.observer/type": "source",
}

@asset(
name=asset_config.asset_name,
key_prefix=asset_config.key_prefix,
compute_kind="bigquery_dts",
tags=tags,
**asset_config.asset_kwargs,
)
def _bq_dts_asset(
context: AssetExecutionContext,
bigquery: BigQueryResource,
bigquery_datatransfer: BigQueryDataTransferResource
bigquery_datatransfer: BigQueryDataTransferResource,
) -> MaterializeResult:
context.log.info(f"Materializing a BigQuery Data Transfer asset called {asset_config.asset_name}")
context.log.info(
f"Materializing a BigQuery Data Transfer asset called {asset_config.asset_name}"
)
with bigquery.get_client() as bq_client:
ensure_dataset(
bq_client,
DatasetOptions(
dataset_ref=bq_client.dataset(dataset_id=asset_config.destination_dataset_name),
dataset_ref=bq_client.dataset(
dataset_id=asset_config.destination_dataset_name
),
is_public=True,
),
)
context.log.info(f"Ensured dataset named {asset_config.destination_dataset_name}")
context.log.info(
f"Ensured dataset named {asset_config.destination_dataset_name}"
)

with bigquery_datatransfer.get_client() as bq_dts_client:
ensure_bq_dts_transfer(bq_dts_client, asset_config, context.log)
context.log.info(f"Ensured transfer config named {asset_config.display_name}")
context.log.info(
f"Ensured transfer config named {asset_config.display_name}"
)

return MaterializeResult(
metadata={
"success": True,
}
)

return AssetFactoryResponse([bq_dts_asset])
return AssetFactoryResponse([_bq_dts_asset])
44 changes: 37 additions & 7 deletions warehouse/oso_dagster/factories/gcs.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import re
from typing import Optional, Sequence
from typing import Optional, Sequence, Dict
from dataclasses import dataclass, field

import arrow
Expand All @@ -22,8 +22,14 @@
from dagster_gcp import BigQueryResource, GCSResource

from .common import AssetFactoryResponse
from ..utils.bq import ensure_dataset, DatasetOptions
from ..utils.common import TimeInterval, SourceMode
from ..utils import (
ensure_dataset,
DatasetOptions,
TimeInterval,
SourceMode,
add_tags,
add_key_prefix_as_tag,
)


@dataclass(kw_only=True)
Expand All @@ -50,6 +56,10 @@ class BaseGCSAsset:
# Dagster remaining arguments
asset_kwargs: dict = field(default_factory=lambda: {})

tags: Dict[str, str] = field(default_factory=lambda: {})

environment: str = "production"


@dataclass(kw_only=True)
class IntervalGCSAsset(BaseGCSAsset):
Expand All @@ -69,7 +79,28 @@ def interval_gcs_import_asset(config: IntervalGCSAsset):
# Find all of the "intervals" in the bucket and load them into the `raw_sources` dataset
# Run these sources through a secondary dbt model into `clean_sources`

@asset(name=config.name, key_prefix=config.key_prefix, **config.asset_kwargs)
tags = {
"opensource.observer/factory": "gcs",
"opensource.observer/environment": config.environment,
}

# Extend with additional tags
tags.update(config.tags)

tags = add_key_prefix_as_tag(tags, config.key_prefix)

@asset(
name=config.name,
key_prefix=config.key_prefix,
tags=add_tags(
tags,
{
"opensource.observer/type": "source",
},
),
compute_kind="gcs",
**config.asset_kwargs,
)
def gcs_asset(
context: AssetExecutionContext, bigquery: BigQueryResource, gcs: GCSResource
) -> MaterializeResult:
Expand Down Expand Up @@ -198,12 +229,12 @@ def gcs_asset(

asset_config = config

@op(name=f"{config.name}_clean_up_op")
@op(name=f"{config.name}_clean_up_op", tags=tags)
def gcs_clean_up_op(context: OpExecutionContext, config: dict):
context.log.info(f"Running clean up for {asset_config.name}")
print(config)

@job(name=f"{config.name}_clean_up_job")
@job(name=f"{config.name}_clean_up_job", tags=tags)
def gcs_clean_up_job():
gcs_clean_up_op()

Expand All @@ -216,7 +247,6 @@ def gcs_clean_up_job():
def gcs_clean_up_sensor(
context: SensorEvaluationContext, gcs: GCSResource, asset_event: EventLogEntry
):
print("EVENT!!!!!!")
yield RunRequest(
run_key=context.cursor,
run_config=RunConfig(
Expand Down
Loading

0 comments on commit 3469347

Please sign in to comment.