Skip to content

Commit

Permalink
feat: add a ClickhouseResource in Dagster (#1796)
Browse files Browse the repository at this point in the history
* Loads config from environment variables
  • Loading branch information
ryscheng committed Jul 15, 2024
1 parent a1d9dc7 commit 2fb365a
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 8 deletions.
10 changes: 6 additions & 4 deletions warehouse/oso_dagster/constants.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import os
from pathlib import Path

import requests

from .utils.common import ensure
from .utils.dbt import (
get_profiles_dir,
load_dbt_manifests,
Expand Down Expand Up @@ -34,7 +33,7 @@ def get_project_id():
except Exception:
raise Exception("GOOGLE_PROJECT_ID must be set if you're not in GCP")

staging_bucket_url = os.getenv("DAGSTER_STAGING_BUCKET_URL")
staging_bucket = ensure(os.getenv("DAGSTER_STAGING_BUCKET_URL"), "Missing DAGSTER_STAGING_BUCKET_URL")
profile_name = os.getenv("DAGSTER_DBT_PROFILE_NAME", "opensource_observer")
gcp_secrets_prefix = os.getenv("DAGSTER_GCP_SECRETS_PREFIX", "")
use_local_secrets = os.getenv("DAGSTER_USE_LOCAL_SECRETS", "true").lower() in [
Expand All @@ -50,6 +49,7 @@ def get_project_id():

dbt_profiles_dir = get_profiles_dir()
dbt_target_base_dir = os.getenv("DAGSTER_DBT_TARGET_BASE_DIR") or ""
PRODUCTION_DBT_TARGET = "production"
main_dbt_manifests = load_dbt_manifests(
dbt_target_base_dir,
main_dbt_project_dir,
Expand All @@ -67,5 +67,7 @@ def get_project_id():
),
parse_projects=os.getenv("DAGSTER_DBT_PARSE_PROJECT_ON_LOAD", "0") == "1",
)

verbose_logs = os.getenv("DAGSTER_VERBOSE_LOGS", "false").lower() in ["true", "1"]
CLICKHOUSE_HOST = os.getenv("CLICKHOUSE_HOST")
CLICKHOUSE_USER = os.getenv("CLICKHOUSE_USER")
CLICKHOUSE_PASSWORD = os.getenv("CLICKHOUSE_PASSWORD")
8 changes: 5 additions & 3 deletions warehouse/oso_dagster/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
LogAlertManager,
DiscordWebhookAlertManager,
)
from .resources import BigQueryDataTransferResource
from .resources import BigQueryDataTransferResource, ClickhouseResource
from . import assets
from .factories.alerts import setup_alert_sensor

Expand All @@ -36,14 +36,15 @@ def load_definitions():
)

# A dlt destination for gcs staging to bigquery
assert constants.staging_bucket_url is not None
dlt_gcs_staging = filesystem(bucket_url=constants.staging_bucket_url)
assert constants.staging_bucket is not None
dlt_gcs_staging = filesystem(bucket_url=constants.staging_bucket)

dlt = DagsterDltResource()
bigquery = BigQueryResource(project=project_id)
bigquery_datatransfer = BigQueryDataTransferResource(
project=os.environ.get("GOOGLE_PROJECT_ID")
)
clickhouse = ClickhouseResource()
gcs = GCSResource(project=project_id)
cbt = CBTResource(
bigquery=bigquery,
Expand Down Expand Up @@ -77,6 +78,7 @@ def load_definitions():
"cbt": cbt,
"bigquery": bigquery,
"bigquery_datatransfer": bigquery_datatransfer,
"clickhouse": clickhouse,
"io_manager": io_manager,
"dlt": dlt,
"secrets": secret_resolver,
Expand Down
3 changes: 2 additions & 1 deletion warehouse/oso_dagster/resources/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# ruff: noqa: F403

from .bq_dts import *
from .bq_dts import *
from .clickhouse import *
72 changes: 72 additions & 0 deletions warehouse/oso_dagster/resources/clickhouse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import clickhouse_connect
from contextlib import contextmanager
from typing import Optional
from dagster import (
ConfigurableResource,
resource,
)
from pydantic import Field
from ..constants import CLICKHOUSE_HOST, CLICKHOUSE_USER, CLICKHOUSE_PASSWORD
from ..utils.common import ensure

"""
Note: This code is predominantly copied from the BigQueryResource
It simply returns a Clickhouse Connect Client
"""

class ClickhouseResource(ConfigurableResource):
"""Resource for interacting with Clickhouse.
Examples:
.. code-block:: python
@asset
def tables(clickhouse: ClickhouseResource):
with clickhouse.get_client() as client:
client.query(...)
defs = Definitions(
assets=[tables],
resources={
"clickhouse": ClickhouseResource()
}
)
"""

host: Optional[str] = Field(
default=None,
description="Clickhouse host.",
)

user: Optional[str] = Field(
default=None,
description="Clickhouse username.",
)

password: Optional[str] = Field(
default=None,
description="Clickhouse password.",
)

@contextmanager
def get_client(self):
#Context manager to create a Clickhouse Client.
host = ensure(self.host or CLICKHOUSE_HOST, "Missing CLICKHOUSE_HOST")
username = ensure(self.user or CLICKHOUSE_USER, "Missing CLICKHOUSE_USER")
password = ensure(self.password or CLICKHOUSE_PASSWORD, "Missing CLICKHOUSE_PASSWORD")
client = clickhouse_connect.get_client(
host=host,
username=username,
password=password,
secure=True
)
yield client

@resource(
config_schema=ClickhouseResource.to_config_schema(),
description="Dagster resource for connecting to Clickhouse.",
)
def clickhouse_resource(context):
clickhouse_resource = ClickhouseResource.from_resource_context(context)
with clickhouse_resource.get_client() as client:
yield client

0 comments on commit 2fb365a

Please sign in to comment.