-
Notifications
You must be signed in to change notification settings - Fork 91
/
jetstream.py
90 lines (78 loc) · 2.88 KB
/
jetstream.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
from airflow import DAG
from airflow.operators.sensors import ExternalTaskSensor
from datetime import timedelta, datetime
from operators.gcp_container_operator import GKEPodOperator
default_args = {
"owner": "ascholtz@mozilla.com",
"email": ["ascholtz@mozilla.com", "ssuh@mozilla.com", "tdsmith@mozilla.com",],
"depends_on_past": False,
"start_date": datetime(2020, 3, 12),
"email_on_failure": True,
"email_on_retry": True,
"retries": 2,
"retry_delay": timedelta(minutes=30),
}
with DAG("jetstream", default_args=default_args, schedule_interval="0 4 * * *") as dag:
# Built from repo https://github.com/mozilla/jetstream
jetstream_image = "gcr.io/moz-fx-data-experiments/jetstream:latest"
jetstream = GKEPodOperator(
task_id="jetstream",
name="jetstream",
image=jetstream_image,
email=["ascholtz@mozilla.com", "ssuh@mozilla.com", "tdsmith@mozilla.com",],
arguments=["--date={{ds}}"],
dag=dag,
)
jetstream_export_json = GKEPodOperator(
task_id="jetstream_export_json",
name="jetstream_export_json",
image=jetstream_image,
email=["ascholtz@mozilla.com", "ssuh@mozilla.com", "tdsmith@mozilla.com",],
arguments=["export_statistics_to_json"],
dag=dag,
)
wait_for_clients_daily_export = ExternalTaskSensor(
task_id="wait_for_clients_daily",
external_dag_id="bqetl_main_summary",
external_task_id="telemetry_derived__clients_daily__v6",
execution_delta=timedelta(hours=2),
dag=dag,
)
wait_for_main_summary_export = ExternalTaskSensor(
task_id="wait_for_main_summary",
external_dag_id="bqetl_main_summary",
external_task_id="telemetry_derived__main_summary__v4",
execution_delta=timedelta(hours=2),
dag=dag,
)
wait_for_search_clients_daily = ExternalTaskSensor(
task_id="wait_for_search_clients_daily",
external_dag_id="bqetl_search",
external_task_id="search_derived__search_clients_daily__v8",
execution_delta=timedelta(hours=1),
dag=dag,
)
wait_for_bq_events = ExternalTaskSensor(
task_id="wait_for_bq_events",
external_dag_id="copy_deduplicate",
external_task_id="bq_main_events",
execution_delta=timedelta(hours=3),
dag=dag,
)
wait_for_copy_deduplicate_events = ExternalTaskSensor(
task_id="wait_for_copy_deduplicate_events",
external_dag_id="copy_deduplicate",
external_task_id="event_events",
execution_delta=timedelta(hours=3),
dag=dag,
)
jetstream.set_upstream(
[
wait_for_clients_daily_export,
wait_for_main_summary_export,
wait_for_search_clients_daily,
wait_for_bq_events,
wait_for_copy_deduplicate_events,
]
)
jetstream_export_json.set_upstream(jetstream)