From 2fb365a5c9743a5f0e259a09bea8ad02117c2bb9 Mon Sep 17 00:00:00 2001 From: Raymond Cheng Date: Mon, 15 Jul 2024 07:14:09 -0700 Subject: [PATCH] feat: add a ClickhouseResource in Dagster (#1796) * Loads config from environment variables --- warehouse/oso_dagster/constants.py | 10 +-- warehouse/oso_dagster/definitions.py | 8 ++- warehouse/oso_dagster/resources/__init__.py | 3 +- warehouse/oso_dagster/resources/clickhouse.py | 72 +++++++++++++++++++ 4 files changed, 85 insertions(+), 8 deletions(-) create mode 100644 warehouse/oso_dagster/resources/clickhouse.py diff --git a/warehouse/oso_dagster/constants.py b/warehouse/oso_dagster/constants.py index c71f6b23f..381c8824f 100644 --- a/warehouse/oso_dagster/constants.py +++ b/warehouse/oso_dagster/constants.py @@ -1,8 +1,7 @@ import os from pathlib import Path - import requests - +from .utils.common import ensure from .utils.dbt import ( get_profiles_dir, load_dbt_manifests, @@ -34,7 +33,7 @@ def get_project_id(): except Exception: raise Exception("GOOGLE_PROJECT_ID must be set if you're not in GCP") -staging_bucket_url = os.getenv("DAGSTER_STAGING_BUCKET_URL") +staging_bucket = ensure(os.getenv("DAGSTER_STAGING_BUCKET_URL"), "Missing DAGSTER_STAGING_BUCKET_URL") profile_name = os.getenv("DAGSTER_DBT_PROFILE_NAME", "opensource_observer") gcp_secrets_prefix = os.getenv("DAGSTER_GCP_SECRETS_PREFIX", "") use_local_secrets = os.getenv("DAGSTER_USE_LOCAL_SECRETS", "true").lower() in [ @@ -50,6 +49,7 @@ def get_project_id(): dbt_profiles_dir = get_profiles_dir() dbt_target_base_dir = os.getenv("DAGSTER_DBT_TARGET_BASE_DIR") or "" +PRODUCTION_DBT_TARGET = "production" main_dbt_manifests = load_dbt_manifests( dbt_target_base_dir, main_dbt_project_dir, @@ -67,5 +67,7 @@ def get_project_id(): ), parse_projects=os.getenv("DAGSTER_DBT_PARSE_PROJECT_ON_LOAD", "0") == "1", ) - verbose_logs = os.getenv("DAGSTER_VERBOSE_LOGS", "false").lower() in ["true", "1"] +CLICKHOUSE_HOST = os.getenv("CLICKHOUSE_HOST") +CLICKHOUSE_USER = os.getenv("CLICKHOUSE_USER") +CLICKHOUSE_PASSWORD = os.getenv("CLICKHOUSE_PASSWORD") diff --git a/warehouse/oso_dagster/definitions.py b/warehouse/oso_dagster/definitions.py index b0f93b663..3351700d9 100644 --- a/warehouse/oso_dagster/definitions.py +++ b/warehouse/oso_dagster/definitions.py @@ -16,7 +16,7 @@ LogAlertManager, DiscordWebhookAlertManager, ) -from .resources import BigQueryDataTransferResource +from .resources import BigQueryDataTransferResource, ClickhouseResource from . import assets from .factories.alerts import setup_alert_sensor @@ -36,14 +36,15 @@ def load_definitions(): ) # A dlt destination for gcs staging to bigquery - assert constants.staging_bucket_url is not None - dlt_gcs_staging = filesystem(bucket_url=constants.staging_bucket_url) + assert constants.staging_bucket is not None + dlt_gcs_staging = filesystem(bucket_url=constants.staging_bucket) dlt = DagsterDltResource() bigquery = BigQueryResource(project=project_id) bigquery_datatransfer = BigQueryDataTransferResource( project=os.environ.get("GOOGLE_PROJECT_ID") ) + clickhouse = ClickhouseResource() gcs = GCSResource(project=project_id) cbt = CBTResource( bigquery=bigquery, @@ -77,6 +78,7 @@ def load_definitions(): "cbt": cbt, "bigquery": bigquery, "bigquery_datatransfer": bigquery_datatransfer, + "clickhouse": clickhouse, "io_manager": io_manager, "dlt": dlt, "secrets": secret_resolver, diff --git a/warehouse/oso_dagster/resources/__init__.py b/warehouse/oso_dagster/resources/__init__.py index 3c53d407f..77f0474e8 100644 --- a/warehouse/oso_dagster/resources/__init__.py +++ b/warehouse/oso_dagster/resources/__init__.py @@ -1,3 +1,4 @@ # ruff: noqa: F403 -from .bq_dts import * \ No newline at end of file +from .bq_dts import * +from .clickhouse import * \ No newline at end of file diff --git a/warehouse/oso_dagster/resources/clickhouse.py b/warehouse/oso_dagster/resources/clickhouse.py new file mode 100644 index 000000000..68bb580c6 --- /dev/null +++ b/warehouse/oso_dagster/resources/clickhouse.py @@ -0,0 +1,72 @@ +import clickhouse_connect +from contextlib import contextmanager +from typing import Optional +from dagster import ( + ConfigurableResource, + resource, +) +from pydantic import Field +from ..constants import CLICKHOUSE_HOST, CLICKHOUSE_USER, CLICKHOUSE_PASSWORD +from ..utils.common import ensure + +""" +Note: This code is predominantly copied from the BigQueryResource +It simply returns a Clickhouse Connect Client +""" + +class ClickhouseResource(ConfigurableResource): + """Resource for interacting with Clickhouse. + + Examples: + .. code-block:: python + + @asset + def tables(clickhouse: ClickhouseResource): + with clickhouse.get_client() as client: + client.query(...) + + defs = Definitions( + assets=[tables], + resources={ + "clickhouse": ClickhouseResource() + } + ) + """ + + host: Optional[str] = Field( + default=None, + description="Clickhouse host.", + ) + + user: Optional[str] = Field( + default=None, + description="Clickhouse username.", + ) + + password: Optional[str] = Field( + default=None, + description="Clickhouse password.", + ) + + @contextmanager + def get_client(self): + #Context manager to create a Clickhouse Client. + host = ensure(self.host or CLICKHOUSE_HOST, "Missing CLICKHOUSE_HOST") + username = ensure(self.user or CLICKHOUSE_USER, "Missing CLICKHOUSE_USER") + password = ensure(self.password or CLICKHOUSE_PASSWORD, "Missing CLICKHOUSE_PASSWORD") + client = clickhouse_connect.get_client( + host=host, + username=username, + password=password, + secure=True + ) + yield client + +@resource( + config_schema=ClickhouseResource.to_config_schema(), + description="Dagster resource for connecting to Clickhouse.", +) +def clickhouse_resource(context): + clickhouse_resource = ClickhouseResource.from_resource_context(context) + with clickhouse_resource.get_client() as client: + yield client