From 6019d9d1d1c62c3d8e26d9edfea2343f4c60644e Mon Sep 17 00:00:00 2001 From: Reuven Gonzales Date: Thu, 11 Jul 2024 11:42:26 -0700 Subject: [PATCH] Sets up discord alerting for run failures in dagster (#1776) * Sets up discord alerting for run failures in dagster * Update deployment * clean up * Improved comments --- .env.example | 13 ++-- ops/helm-charts/oso-dagster/Chart.yaml | 2 +- .../oso-dagster/templates/config-map.yaml | 4 +- ops/helm-charts/oso-dagster/values.yaml | 15 ++-- .../dagster/custom-helm-values.yaml | 2 + poetry.lock | 23 ++++++- pyproject.toml | 1 + warehouse/oso_dagster/assets/fake.py | 16 +++++ warehouse/oso_dagster/constants.py | 8 ++- warehouse/oso_dagster/definitions.py | 20 +++++- warehouse/oso_dagster/factories/alerts.py | 68 +++++++++++++++++++ warehouse/oso_dagster/factories/common.py | 8 +++ warehouse/oso_dagster/factories/loader.py | 33 ++------- warehouse/oso_dagster/utils/__init__.py | 1 + warehouse/oso_dagster/utils/alerts.py | 25 +++++++ 15 files changed, 195 insertions(+), 44 deletions(-) create mode 100644 warehouse/oso_dagster/assets/fake.py create mode 100644 warehouse/oso_dagster/factories/alerts.py create mode 100644 warehouse/oso_dagster/utils/alerts.py diff --git a/.env.example b/.env.example index abf09c568..b1c26c364 100644 --- a/.env.example +++ b/.env.example @@ -23,10 +23,15 @@ DAGSTER_HOME=/tmp/dagster-home # This is used to put generated dbt profiles for dagster in a specific place DAGSTER_DBT_TARGET_BASE_DIR=/tmp/dagster-home/generated-dbt DAGSTER_DBT_PARSE_PROJECT_ON_LOAD=1 -# Used when loading dlt assets into a staging area -DAGSTER_STAGING_BUCKET_URL= + +# Used when loading dlt assets into a staging area. It should be set to a GCS +# bucket that will be used to write to for dlt data transfers into bigquery. +DAGSTER_STAGING_BUCKET_URL=some-bucket # Uncomment the next two vars to use gcp secrets (you'll need to have gcp -# secrets configured) +# secrets configured). Unfortunately at this time, if you don't have access to +# the official oso gcp account uncommenting these will likely not work. The GCP +# secrets prefix should likely match the dagster deployment's search prefix in +# flux #DAGSTER_USE_LOCAL_SECRETS=False -#DAGSTER_GCP_SECRETS_PREFIX=dagster \ No newline at end of file +#DAGSTER_GCP_SECRETS_PREFIX=dagster \ No newline at end of file diff --git a/ops/helm-charts/oso-dagster/Chart.yaml b/ops/helm-charts/oso-dagster/Chart.yaml index 602470d12..d203027e2 100644 --- a/ops/helm-charts/oso-dagster/Chart.yaml +++ b/ops/helm-charts/oso-dagster/Chart.yaml @@ -3,7 +3,7 @@ name: oso-dagster description: Extension of the dagster template type: application -version: 0.2.1 +version: 0.2.2 appVersion: "1.0.0" dependencies: - name: dagster diff --git a/ops/helm-charts/oso-dagster/templates/config-map.yaml b/ops/helm-charts/oso-dagster/templates/config-map.yaml index 479e067d7..3d0ad3225 100644 --- a/ops/helm-charts/oso-dagster/templates/config-map.yaml +++ b/ops/helm-charts/oso-dagster/templates/config-map.yaml @@ -10,4 +10,6 @@ data: DAGSTER_PG_HOST: "{{ .Values.pg.host }}" DAGSTER_USE_LOCAL_SECRETS: "False" DAGSTER_GCP_SECRETS_PREFIX: "{{ .Values.global.serviceAccountName }}" - DAGSTER_STAGING_BUCKET_URL: "{{ .Values.configMap.secretPrefix }}-{{ .Values.secretmanagerKeys.stagingBucketUrl }}" \ No newline at end of file + DAGSTER_STAGING_BUCKET_URL: "{{ .Values.configMap.secretPrefix }}-{{ .Values.secretmanagerKeys.stagingBucketUrl }}" + DAGSTER_DISCORD_WEBHOOK_URL: "{{ .Values.configMap.secretPrefix }}-{{ .Values.secretmanagerKeys.discordWebhookUrl }}" + DAGSTER_ALERTS_BASE_URL: "{{ .Values.alerts.baseUrl }}" \ No newline at end of file diff --git a/ops/helm-charts/oso-dagster/values.yaml b/ops/helm-charts/oso-dagster/values.yaml index 4dbaf62ce..aee4de359 100644 --- a/ops/helm-charts/oso-dagster/values.yaml +++ b/ops/helm-charts/oso-dagster/values.yaml @@ -18,12 +18,15 @@ configMap: name: "dagster-extra-env-config-map" secretPrefix: "gcp:secretmanager:dagster" secretmanagerKeys: - dbUser: "db-user/versions/1" - dbHost: "db-host/versions/1" - dbPort: "db-port/versions/1" - dbName: "db-name/versions/1" - impersonateServiceAccount: "dbt-impersonate-service-account/versions/1" + dbUser: "db-user/versions/latest" + dbHost: "db-host/versions/latest" + dbPort: "db-port/versions/latest" + dbName: "db-name/versions/latest" + impersonateServiceAccount: "dbt-impersonate-service-account/versions/latest" stagingBucketUrl: "staging-bucket-url/versions/latest" + discordWebhookUrl: "discord-webhook-url/versions/latest" pg: port: "5432" - host: "127.0.0.1" \ No newline at end of file + host: "127.0.0.1" +alerts: + baseUrl: "" \ No newline at end of file diff --git a/ops/k8s-apps/production/dagster/custom-helm-values.yaml b/ops/k8s-apps/production/dagster/custom-helm-values.yaml index 05d86a217..a9e5b0f6d 100644 --- a/ops/k8s-apps/production/dagster/custom-helm-values.yaml +++ b/ops/k8s-apps/production/dagster/custom-helm-values.yaml @@ -11,6 +11,8 @@ spec: serviceAccountName: production-dagster configMap: secretPrefix: "gcp:secretmanager:production-dagster" + alerts: + baseUrl: "https://dagster.opensource.observer" dagster: global: serviceAccountName: production-dagster diff --git a/poetry.lock b/poetry.lock index 33e8fb8c9..330f9c863 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand. [[package]] name = "agate" @@ -1276,6 +1276,23 @@ Pygments = ">=2.9.0,<3.0.0" [package.extras] toml = ["tomli (>=1.2.1)"] +[[package]] +name = "discord-webhook" +version = "1.3.1" +description = "Easily send Discord webhooks with Python" +optional = false +python-versions = ">=3.10,<4.0" +files = [ + {file = "discord_webhook-1.3.1-py3-none-any.whl", hash = "sha256:ede07028316de76d24eb811836e2b818b2017510da786777adcb0d5970e7af79"}, + {file = "discord_webhook-1.3.1.tar.gz", hash = "sha256:ee3e0f3ea4f3dc8dc42be91f75b894a01624c6c13fea28e23ebcf9a6c9a304f7"}, +] + +[package.dependencies] +requests = ">=2.28.1,<3.0.0" + +[package.extras] +async = ["httpx (>=0.23.0,<0.24.0)"] + [[package]] name = "distributed" version = "2024.6.2" @@ -3190,7 +3207,7 @@ files = [ {file = "msgpack-1.0.8-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:5fbb160554e319f7b22ecf530a80a3ff496d38e8e07ae763b9e82fadfe96f273"}, {file = "msgpack-1.0.8-cp39-cp39-win32.whl", hash = "sha256:f9af38a89b6a5c04b7d18c492c8ccf2aee7048aff1ce8437c4683bb5a1df893d"}, {file = "msgpack-1.0.8-cp39-cp39-win_amd64.whl", hash = "sha256:ed59dd52075f8fc91da6053b12e8c89e37aa043f8986efd89e61fae69dc1b011"}, - {file = "msgpack-1.0.8-py3-none-any.whl", hash = "sha256:24f727df1e20b9876fa6e95f840a2a2651e34c0ad147676356f4bf5fbb0206ca"}, + {file = "msgpack-1.0.8.tar.gz", hash = "sha256:95c02b0e27e706e48d0e5426d1710ca78e0f0628d6e89d5b5a5b91a5f12274f3"}, ] [[package]] @@ -6342,4 +6359,4 @@ test = ["big-O", "importlib-resources", "jaraco.functools", "jaraco.itertools", [metadata] lock-version = "2.0" python-versions = "^3.12,<3.13" -content-hash = "720141c96fbf9b1faea9c1a9c281c2d0bf0e8c2053edcdaf9614ce908658bc23" +content-hash = "cd10cc4669ec998f4b89fe38446798bcae6862436badd38048905081e5445b5a" diff --git a/pyproject.toml b/pyproject.toml index 923f950be..ec6b47d44 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -50,6 +50,7 @@ dagster-embedded-elt = "^0.23.11" google-cloud-secret-manager = "^2.20.0" dlt = { extras = ["filesystem"], version = "^0.4.12" } gcsfs = "^2024.6.1" +discord-webhook = "^1.3.1" [tool.poetry.scripts] diff --git a/warehouse/oso_dagster/assets/fake.py b/warehouse/oso_dagster/assets/fake.py new file mode 100644 index 000000000..e1bbf6c98 --- /dev/null +++ b/warehouse/oso_dagster/assets/fake.py @@ -0,0 +1,16 @@ +from dagster import asset, op, job +from .. import constants + +if constants.enable_tests: + + @asset(compute_kind="fake") + def fake_failing_asset() -> None: + raise Exception("This fake asset only ever fails") + + @op(tags={"kind": "fake"}) + def fake_failing_op() -> None: + raise Exception("This fake op only ever fails") + + @job() + def fake_failing_job(): + fake_failing_op() diff --git a/warehouse/oso_dagster/constants.py b/warehouse/oso_dagster/constants.py index c8109ee8f..ec0649263 100644 --- a/warehouse/oso_dagster/constants.py +++ b/warehouse/oso_dagster/constants.py @@ -37,7 +37,13 @@ def get_project_id(): staging_bucket_url = os.getenv("DAGSTER_STAGING_BUCKET_URL") profile_name = os.getenv("DAGSTER_DBT_PROFILE_NAME", "opensource_observer") gcp_secrets_prefix = os.getenv("DAGSTER_GCP_SECRETS_PREFIX", "") -use_local_secrets = os.getenv("DAGSTER_USE_LOCAL_SECRETS", "True") == "True" +use_local_secrets = os.getenv("DAGSTER_USE_LOCAL_SECRETS", "true").lower() in [ + "true", + "1", +] +discord_webhook_url = os.getenv("DAGSTER_DISCORD_WEBHOOK_URL") +enable_tests = os.getenv("DAGSTER_ENABLE_TESTS", "false").lower() in ["true", "1"] +dagster_alerts_base_url = os.getenv("DAGSTER_ALERTS_BASE_URL", "") dbt_profiles_dir = get_profiles_dir() dbt_target_base_dir = os.getenv("DAGSTER_DBT_TARGET_BASE_DIR") or "" diff --git a/warehouse/oso_dagster/definitions.py b/warehouse/oso_dagster/definitions.py index 869471ec0..b0f93b663 100644 --- a/warehouse/oso_dagster/definitions.py +++ b/warehouse/oso_dagster/definitions.py @@ -10,9 +10,15 @@ from .schedules import schedules from .cbt import CBTResource from .factories import load_all_assets_from_package -from .utils.secrets import LocalSecretResolver, GCPSecretResolver +from .utils import ( + LocalSecretResolver, + GCPSecretResolver, + LogAlertManager, + DiscordWebhookAlertManager, +) from .resources import BigQueryDataTransferResource from . import assets +from .factories.alerts import setup_alert_sensor from dagster_embedded_elt.dlt import DagsterDltResource @@ -52,6 +58,18 @@ def load_definitions(): io_manager = PolarsBigQueryIOManager(project=project_id) + # Setup an alert sensor + alert_manager = LogAlertManager() + if constants.discord_webhook_url: + alert_manager = DiscordWebhookAlertManager(constants.discord_webhook_url) + alerts = setup_alert_sensor( + "alerts", + constants.dagster_alerts_base_url, + alert_manager, + ) + + asset_factories = asset_factories + alerts + # Each of the dbt environments needs to be setup as a resource to be used in # the dbt assets resources = { diff --git a/warehouse/oso_dagster/factories/alerts.py b/warehouse/oso_dagster/factories/alerts.py new file mode 100644 index 000000000..6c7c94b53 --- /dev/null +++ b/warehouse/oso_dagster/factories/alerts.py @@ -0,0 +1,68 @@ +from dagster import ( + run_failure_sensor, + RunFailureSensorContext, + DefaultSensorStatus, + op, + job, + RunRequest, + RunConfig, + Config, + OpExecutionContext, + DagsterEventType, +) +from ..utils import AlertManager +from .common import AssetFactoryResponse + + +class AlertOpConfig(Config): + run_id: str + + +def setup_alert_sensor(name: str, base_url: str, alert_manager: AlertManager): + @op(name=f"{name}_alert_op") + def failure_op(context: OpExecutionContext, config: AlertOpConfig) -> None: + context.log.info(config.run_id) + instance = context.instance + stats = instance.get_run_stats(config.run_id) + context.log.info(stats) + records = instance.get_records_for_run(config.run_id).records + context.log.info(records) + events = [ + record.event_log_entry for record in records if record.event_log_entry + ] + dagster_events = [ + event.dagster_event for event in events if event.dagster_event + ] + failures = [event for event in dagster_events if event.is_failure] + step_failures = [ + failure + for failure in failures + if failure.event_type in [DagsterEventType.STEP_FAILURE] + ] + + alert_manager.alert( + f"{len(step_failures)} failed steps in run ({base_url}/runs/{config.run_id})" + ) + + @job(name=f"{name}_alert_job") + def failure_job(): + failure_op() + + @run_failure_sensor( + name=name, default_status=DefaultSensorStatus.RUNNING, request_job=failure_job + ) + def failure_sensor(context: RunFailureSensorContext): + yield RunRequest( + run_key=context.dagster_run.run_id, + run_config=RunConfig( + ops={ + f"{name}_alert_op": { + "config": { + "run_id": context.dagster_run.run_id, + } + } + } + ), + ) + + return AssetFactoryResponse([], sensors=[failure_sensor], jobs=[failure_job]) diff --git a/warehouse/oso_dagster/factories/common.py b/warehouse/oso_dagster/factories/common.py index f6b4413a4..125e19e1a 100644 --- a/warehouse/oso_dagster/factories/common.py +++ b/warehouse/oso_dagster/factories/common.py @@ -37,6 +37,14 @@ class AssetFactoryResponse: jobs: List[JobDefinition] = field(default_factory=lambda: []) checks: List[AssetChecksDefinition] = field(default_factory=lambda: []) + def __add__(self, other: "AssetFactoryResponse") -> "AssetFactoryResponse": + return AssetFactoryResponse( + assets=list(self.assets) + list(other.assets), + sensors=list(self.sensors) + list(other.sensors), + checks=list(self.checks) + list(other.checks), + jobs=list(self.jobs) + list(other.jobs), + ) + type EarlyResourcesAssetDecoratedFunction[**P] = Callable[ P, AssetFactoryResponse | AssetsDefinition diff --git a/warehouse/oso_dagster/factories/loader.py b/warehouse/oso_dagster/factories/loader.py index 313e6e006..347b98563 100644 --- a/warehouse/oso_dagster/factories/loader.py +++ b/warehouse/oso_dagster/factories/loader.py @@ -4,13 +4,10 @@ import pkgutil from dagster import ( - SensorDefinition, - JobDefinition, - AssetChecksDefinition, load_assets_from_modules, ) -from .common import AssetFactoryResponse, AssetList, EarlyResourcesAssetFactory +from .common import AssetFactoryResponse, EarlyResourcesAssetFactory def load_all_assets_from_package( @@ -27,37 +24,19 @@ def load_all_assets_from_package( modules.append(module) factories = load_assets_factories_from_modules(modules, early_resources) asset_defs = load_assets_from_modules(modules) - factory_assets = list(factories.assets) - factory_assets.extend(asset_defs) - return AssetFactoryResponse( - assets=factory_assets, - sensors=factories.sensors, - jobs=factories.jobs, - checks=factories.checks, - ) + return factories + AssetFactoryResponse(asset_defs) def load_assets_factories_from_modules( modules: List[ModuleType], early_resources: Dict[str, Any], ) -> AssetFactoryResponse: - assets: AssetList = [] - sensors: List[SensorDefinition] = [] - jobs: List[JobDefinition] = [] - checks: List[AssetChecksDefinition] = [] + all = AssetFactoryResponse([]) for module in modules: for _, obj in module.__dict__.items(): if isinstance(obj, EarlyResourcesAssetFactory): resp = obj(**early_resources) - assets.extend(resp.assets) - sensors.extend(resp.sensors) - jobs.extend(resp.jobs) - checks.extend(resp.checks) + all = all + resp elif isinstance(obj, AssetFactoryResponse): - assets.extend(obj.assets) - sensors.extend(obj.sensors) - jobs.extend(obj.jobs) - checks.extend(obj.checks) - return AssetFactoryResponse( - assets=assets, sensors=sensors, jobs=jobs, checks=checks - ) + all = all + obj + return all diff --git a/warehouse/oso_dagster/utils/__init__.py b/warehouse/oso_dagster/utils/__init__.py index 1a5be43c2..d16d6624c 100644 --- a/warehouse/oso_dagster/utils/__init__.py +++ b/warehouse/oso_dagster/utils/__init__.py @@ -10,3 +10,4 @@ from .retry import * from .common import * from .types import * +from .alerts import * diff --git a/warehouse/oso_dagster/utils/alerts.py b/warehouse/oso_dagster/utils/alerts.py new file mode 100644 index 000000000..c71ebea29 --- /dev/null +++ b/warehouse/oso_dagster/utils/alerts.py @@ -0,0 +1,25 @@ +import logging +from discord_webhook import DiscordWebhook + +logger = logging.getLogger(__name__) + + +class AlertManager: + """Base class for an alert manager""" + + def alert(self, message: str): + raise NotImplementedError() + + +class LogAlertManager(AlertManager): + def alert(self, message: str): + logging.error(message) + + +class DiscordWebhookAlertManager(AlertManager): + def __init__(self, url: str): + self._url = url + + def alert(self, message: str): + wh = DiscordWebhook(url=self._url, content=message) + wh.execute()