-
Notifications
You must be signed in to change notification settings - Fork 344
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: Airflow DAG(에러 발생시 Slack 메세지 전송 (#179)
- Loading branch information
Showing
1 changed file
with
39 additions
and
45 deletions.
There are no files selected for viewing
84 changes: 39 additions & 45 deletions
84
01-batch-serving(airflow)/dags/05-python-operator-with-slack-noti.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,47 +1,41 @@ | ||
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator | ||
|
||
# Slack Webhook 제공 Operator 를 먼저 정의합니다 | ||
# 1. Connection ID (실습에서 Webserver 를 통해 생성한 값) | ||
SLACK_DAG_CONN_ID = "my_webhook" | ||
|
||
|
||
# 2. Webhook 함수 정의 | ||
def send_message(slack_msg): | ||
return SlackWebhookOperator( | ||
task_id="slack_webhook", | ||
slack_webhook_conn_id=SLACK_DAG_CONN_ID, | ||
message=slack_msg, | ||
username="Airflow-alert", | ||
) | ||
|
||
|
||
# 3. slack alert 함수 정의 | ||
# 메시지에 Slack ID 추가해 tag 가능 (ex. <@U022T50D4F5>) | ||
def task_fail_slack_alert(context): | ||
slack_msg = """ | ||
:red_circle: Task Failed. | ||
*Task*: {task} | ||
*Dag*: `{dag}` | ||
*Execution Time*: {exec_date} | ||
""".format( | ||
task=context.get("task_instance").task_id, | ||
dag=context.get("task_instance").dag_id, | ||
exec_date=context.get("execution_date"), | ||
# slack_notifier에 선언한 webhook 전송 함수를 활용하여 slack 알림을 제공하는 예제 | ||
|
||
from airflow import DAG | ||
from airflow.operators.python import PythonOperator | ||
from datetime import datetime, timedelta | ||
|
||
from airflow.exceptions import AirflowFailException | ||
from utils.slack_notifier import task_fail_slack_alert, task_succ_slack_alert | ||
|
||
default_args = { | ||
'owner': 'kyle', | ||
'depends_on_past': False, | ||
'start_date': datetime(2024, 1, 1), | ||
'start_date': datetime(2024, 1, 4), | ||
'retires': 1, | ||
'retry_delay': timedelta(minutes=5), | ||
} | ||
|
||
|
||
def _handle_job_error() -> None: | ||
raise AirflowFailException("Raise Exception.") | ||
|
||
|
||
with DAG( | ||
dag_id='python_dag_with_slack_webhook', | ||
default_args=default_args, | ||
schedule_interval='30 0 * * *', | ||
tags=['my_dags'], | ||
catchup=False, | ||
on_failure_callback=task_fail_slack_alert, | ||
# on_success_callback=task_succ_slack_alert # 성공 알림 필요 시 추가 | ||
) as dag: | ||
execution_date = "{{ ds }}" | ||
|
||
send_slack_noti = PythonOperator( | ||
task_id='raise_exception_and_send_slack_noti', | ||
python_callable=_handle_job_error, | ||
op_args=[execution_date] | ||
) | ||
|
||
alert = send_message(slack_msg) | ||
|
||
return alert.execute(context=context) | ||
|
||
|
||
def task_succ_slack_alert(context): | ||
slack_msg = f""" | ||
:large_green_circle: Task SUCC. | ||
*Task*: {context.get("task_instance").task_id} | ||
*Dag*: {context.get("task_instance").dag_id} | ||
*Execution Time*: {context.get("execution_date")} | ||
""" | ||
|
||
alert = send_message(slack_msg) | ||
|
||
return alert.execute(context=context) | ||
send_slack_noti |