Skip to content

Commit

Permalink
Sets up discord alerting for run failures in dagster (#1776)
Browse files Browse the repository at this point in the history
* Sets up discord alerting for run failures in dagster

* Update deployment

* clean up

* Improved comments
  • Loading branch information
ravenac95 committed Jul 11, 2024
1 parent 3469347 commit 6019d9d
Show file tree
Hide file tree
Showing 15 changed files with 195 additions and 44 deletions.
13 changes: 9 additions & 4 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,15 @@ DAGSTER_HOME=/tmp/dagster-home
# This is used to put generated dbt profiles for dagster in a specific place
DAGSTER_DBT_TARGET_BASE_DIR=/tmp/dagster-home/generated-dbt
DAGSTER_DBT_PARSE_PROJECT_ON_LOAD=1
# Used when loading dlt assets into a staging area
DAGSTER_STAGING_BUCKET_URL=

# Used when loading dlt assets into a staging area. It should be set to a GCS
# bucket that will be used to write to for dlt data transfers into bigquery.
DAGSTER_STAGING_BUCKET_URL=some-bucket

# Uncomment the next two vars to use gcp secrets (you'll need to have gcp
# secrets configured)
# secrets configured). Unfortunately at this time, if you don't have access to
# the official oso gcp account uncommenting these will likely not work. The GCP
# secrets prefix should likely match the dagster deployment's search prefix in
# flux
#DAGSTER_USE_LOCAL_SECRETS=False
#DAGSTER_GCP_SECRETS_PREFIX=dagster
#DAGSTER_GCP_SECRETS_PREFIX=dagster
2 changes: 1 addition & 1 deletion ops/helm-charts/oso-dagster/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: oso-dagster
description: Extension of the dagster template

type: application
version: 0.2.1
version: 0.2.2
appVersion: "1.0.0"
dependencies:
- name: dagster
Expand Down
4 changes: 3 additions & 1 deletion ops/helm-charts/oso-dagster/templates/config-map.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,6 @@ data:
DAGSTER_PG_HOST: "{{ .Values.pg.host }}"
DAGSTER_USE_LOCAL_SECRETS: "False"
DAGSTER_GCP_SECRETS_PREFIX: "{{ .Values.global.serviceAccountName }}"
DAGSTER_STAGING_BUCKET_URL: "{{ .Values.configMap.secretPrefix }}-{{ .Values.secretmanagerKeys.stagingBucketUrl }}"
DAGSTER_STAGING_BUCKET_URL: "{{ .Values.configMap.secretPrefix }}-{{ .Values.secretmanagerKeys.stagingBucketUrl }}"
DAGSTER_DISCORD_WEBHOOK_URL: "{{ .Values.configMap.secretPrefix }}-{{ .Values.secretmanagerKeys.discordWebhookUrl }}"
DAGSTER_ALERTS_BASE_URL: "{{ .Values.alerts.baseUrl }}"
15 changes: 9 additions & 6 deletions ops/helm-charts/oso-dagster/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@ configMap:
name: "dagster-extra-env-config-map"
secretPrefix: "gcp:secretmanager:dagster"
secretmanagerKeys:
dbUser: "db-user/versions/1"
dbHost: "db-host/versions/1"
dbPort: "db-port/versions/1"
dbName: "db-name/versions/1"
impersonateServiceAccount: "dbt-impersonate-service-account/versions/1"
dbUser: "db-user/versions/latest"
dbHost: "db-host/versions/latest"
dbPort: "db-port/versions/latest"
dbName: "db-name/versions/latest"
impersonateServiceAccount: "dbt-impersonate-service-account/versions/latest"
stagingBucketUrl: "staging-bucket-url/versions/latest"
discordWebhookUrl: "discord-webhook-url/versions/latest"
pg:
port: "5432"
host: "127.0.0.1"
host: "127.0.0.1"
alerts:
baseUrl: ""
2 changes: 2 additions & 0 deletions ops/k8s-apps/production/dagster/custom-helm-values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ spec:
serviceAccountName: production-dagster
configMap:
secretPrefix: "gcp:secretmanager:production-dagster"
alerts:
baseUrl: "https://dagster.opensource.observer"
dagster:
global:
serviceAccountName: production-dagster
Expand Down
23 changes: 20 additions & 3 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ dagster-embedded-elt = "^0.23.11"
google-cloud-secret-manager = "^2.20.0"
dlt = { extras = ["filesystem"], version = "^0.4.12" }
gcsfs = "^2024.6.1"
discord-webhook = "^1.3.1"


[tool.poetry.scripts]
Expand Down
16 changes: 16 additions & 0 deletions warehouse/oso_dagster/assets/fake.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from dagster import asset, op, job
from .. import constants

if constants.enable_tests:

@asset(compute_kind="fake")
def fake_failing_asset() -> None:
raise Exception("This fake asset only ever fails")

@op(tags={"kind": "fake"})
def fake_failing_op() -> None:
raise Exception("This fake op only ever fails")

@job()
def fake_failing_job():
fake_failing_op()
8 changes: 7 additions & 1 deletion warehouse/oso_dagster/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,13 @@ def get_project_id():
staging_bucket_url = os.getenv("DAGSTER_STAGING_BUCKET_URL")
profile_name = os.getenv("DAGSTER_DBT_PROFILE_NAME", "opensource_observer")
gcp_secrets_prefix = os.getenv("DAGSTER_GCP_SECRETS_PREFIX", "")
use_local_secrets = os.getenv("DAGSTER_USE_LOCAL_SECRETS", "True") == "True"
use_local_secrets = os.getenv("DAGSTER_USE_LOCAL_SECRETS", "true").lower() in [
"true",
"1",
]
discord_webhook_url = os.getenv("DAGSTER_DISCORD_WEBHOOK_URL")
enable_tests = os.getenv("DAGSTER_ENABLE_TESTS", "false").lower() in ["true", "1"]
dagster_alerts_base_url = os.getenv("DAGSTER_ALERTS_BASE_URL", "")

dbt_profiles_dir = get_profiles_dir()
dbt_target_base_dir = os.getenv("DAGSTER_DBT_TARGET_BASE_DIR") or ""
Expand Down
20 changes: 19 additions & 1 deletion warehouse/oso_dagster/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,15 @@
from .schedules import schedules
from .cbt import CBTResource
from .factories import load_all_assets_from_package
from .utils.secrets import LocalSecretResolver, GCPSecretResolver
from .utils import (
LocalSecretResolver,
GCPSecretResolver,
LogAlertManager,
DiscordWebhookAlertManager,
)
from .resources import BigQueryDataTransferResource
from . import assets
from .factories.alerts import setup_alert_sensor

from dagster_embedded_elt.dlt import DagsterDltResource

Expand Down Expand Up @@ -52,6 +58,18 @@ def load_definitions():

io_manager = PolarsBigQueryIOManager(project=project_id)

# Setup an alert sensor
alert_manager = LogAlertManager()
if constants.discord_webhook_url:
alert_manager = DiscordWebhookAlertManager(constants.discord_webhook_url)
alerts = setup_alert_sensor(
"alerts",
constants.dagster_alerts_base_url,
alert_manager,
)

asset_factories = asset_factories + alerts

# Each of the dbt environments needs to be setup as a resource to be used in
# the dbt assets
resources = {
Expand Down
68 changes: 68 additions & 0 deletions warehouse/oso_dagster/factories/alerts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from dagster import (
run_failure_sensor,
RunFailureSensorContext,
DefaultSensorStatus,
op,
job,
RunRequest,
RunConfig,
Config,
OpExecutionContext,
DagsterEventType,
)
from ..utils import AlertManager
from .common import AssetFactoryResponse


class AlertOpConfig(Config):
run_id: str


def setup_alert_sensor(name: str, base_url: str, alert_manager: AlertManager):
@op(name=f"{name}_alert_op")
def failure_op(context: OpExecutionContext, config: AlertOpConfig) -> None:
context.log.info(config.run_id)
instance = context.instance
stats = instance.get_run_stats(config.run_id)
context.log.info(stats)
records = instance.get_records_for_run(config.run_id).records
context.log.info(records)
events = [
record.event_log_entry for record in records if record.event_log_entry
]
dagster_events = [
event.dagster_event for event in events if event.dagster_event
]
failures = [event for event in dagster_events if event.is_failure]
step_failures = [
failure
for failure in failures
if failure.event_type in [DagsterEventType.STEP_FAILURE]
]

alert_manager.alert(
f"{len(step_failures)} failed steps in run ({base_url}/runs/{config.run_id})"
)

@job(name=f"{name}_alert_job")
def failure_job():
failure_op()

@run_failure_sensor(
name=name, default_status=DefaultSensorStatus.RUNNING, request_job=failure_job
)
def failure_sensor(context: RunFailureSensorContext):
yield RunRequest(
run_key=context.dagster_run.run_id,
run_config=RunConfig(
ops={
f"{name}_alert_op": {
"config": {
"run_id": context.dagster_run.run_id,
}
}
}
),
)

return AssetFactoryResponse([], sensors=[failure_sensor], jobs=[failure_job])
8 changes: 8 additions & 0 deletions warehouse/oso_dagster/factories/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ class AssetFactoryResponse:
jobs: List[JobDefinition] = field(default_factory=lambda: [])
checks: List[AssetChecksDefinition] = field(default_factory=lambda: [])

def __add__(self, other: "AssetFactoryResponse") -> "AssetFactoryResponse":
return AssetFactoryResponse(
assets=list(self.assets) + list(other.assets),
sensors=list(self.sensors) + list(other.sensors),
checks=list(self.checks) + list(other.checks),
jobs=list(self.jobs) + list(other.jobs),
)


type EarlyResourcesAssetDecoratedFunction[**P] = Callable[
P, AssetFactoryResponse | AssetsDefinition
Expand Down
33 changes: 6 additions & 27 deletions warehouse/oso_dagster/factories/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,10 @@
import pkgutil

from dagster import (
SensorDefinition,
JobDefinition,
AssetChecksDefinition,
load_assets_from_modules,
)

from .common import AssetFactoryResponse, AssetList, EarlyResourcesAssetFactory
from .common import AssetFactoryResponse, EarlyResourcesAssetFactory


def load_all_assets_from_package(
Expand All @@ -27,37 +24,19 @@ def load_all_assets_from_package(
modules.append(module)
factories = load_assets_factories_from_modules(modules, early_resources)
asset_defs = load_assets_from_modules(modules)
factory_assets = list(factories.assets)
factory_assets.extend(asset_defs)
return AssetFactoryResponse(
assets=factory_assets,
sensors=factories.sensors,
jobs=factories.jobs,
checks=factories.checks,
)
return factories + AssetFactoryResponse(asset_defs)


def load_assets_factories_from_modules(
modules: List[ModuleType],
early_resources: Dict[str, Any],
) -> AssetFactoryResponse:
assets: AssetList = []
sensors: List[SensorDefinition] = []
jobs: List[JobDefinition] = []
checks: List[AssetChecksDefinition] = []
all = AssetFactoryResponse([])
for module in modules:
for _, obj in module.__dict__.items():
if isinstance(obj, EarlyResourcesAssetFactory):
resp = obj(**early_resources)
assets.extend(resp.assets)
sensors.extend(resp.sensors)
jobs.extend(resp.jobs)
checks.extend(resp.checks)
all = all + resp
elif isinstance(obj, AssetFactoryResponse):
assets.extend(obj.assets)
sensors.extend(obj.sensors)
jobs.extend(obj.jobs)
checks.extend(obj.checks)
return AssetFactoryResponse(
assets=assets, sensors=sensors, jobs=jobs, checks=checks
)
all = all + obj
return all
1 change: 1 addition & 0 deletions warehouse/oso_dagster/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@
from .retry import *
from .common import *
from .types import *
from .alerts import *
25 changes: 25 additions & 0 deletions warehouse/oso_dagster/utils/alerts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import logging
from discord_webhook import DiscordWebhook

logger = logging.getLogger(__name__)


class AlertManager:
"""Base class for an alert manager"""

def alert(self, message: str):
raise NotImplementedError()


class LogAlertManager(AlertManager):
def alert(self, message: str):
logging.error(message)


class DiscordWebhookAlertManager(AlertManager):
def __init__(self, url: str):
self._url = url

def alert(self, message: str):
wh = DiscordWebhook(url=self._url, content=message)
wh.execute()

0 comments on commit 6019d9d

Please sign in to comment.