-
Notifications
You must be signed in to change notification settings - Fork 1
/
example_great_expectations_dag.py
executable file
·186 lines (167 loc) · 6.44 KB
/
example_great_expectations_dag.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
# A sample DAG to show some functionality of the GE operator. Steps to run:
#
# 1. You'll first need to install this operator:
# `pip install great_expectations_airflow`
#
# 2. Make sure airflow is installed and your dags_folder is configured to point to this directory.
#
# 3. When running a checkpoint task, change the path to the data directory in great_expectations/checkpoint/*.yml
#
# 4. You can then test-run a single task in this DAG using:
# Airflow v1.x: `airflow test example_great_expectations_dag ge_batch_kwargs_pass 2020-01-01`
# Airflow v2.x: `airflow tasks test example_great_expectations_dag ge_batch_kwargs_pass 2020-01-01`
#
# Note: The tasks that don't set an explicit data_context_root_dir need to be run from within
# this examples directory, otherwise GE won't know where to find the data context.
#
# Note: This DAG is ***not intended to run as a single DAG***, it's just a bunch of examples for
# different types of invoking the Airflow operator!
import os
import airflow
from airflow import DAG
from great_expectations_provider.operators.great_expectations import GreatExpectationsOperator
from great_expectations.data_context.types.base import DataContextConfig, DatasourceConfig, S3StoreBackendDefaults, \
FilesystemStoreBackendDefaults, DatasourceConfig
from great_expectations.data_context import BaseDataContext
default_args = {
"owner": "Airflow",
"start_date": airflow.utils.dates.days_ago(1)
}
dag = DAG(
dag_id='example_great_expectations_dag',
default_args=default_args
)
# We could/should set these with environment variables:
data_dir = '/usr/local/airflow/include/data/'
data_file = '/usr/local/airflow/include/data/yellow_tripdata_sample_2019-01.csv'
ge_root_dir = '/usr/local/airflow/include/great_expectations'
data_dir_local = '/usr/local/airflow/include/data'
data_file_local = '/usr/local/airflow/include/data/yellow_tripdata_sample_2019-01.csv'
ge_root_dir_local = '/usr/local/airflow/include/great_expectations'
ge_batch_kwargs_pass = GreatExpectationsOperator(
task_id='ge_batch_kwargs_pass',
expectation_suite_name='taxi.demo',
batch_kwargs={
'path': data_file,
'datasource': 'data__dir'
},
data_context_root_dir=ge_root_dir,
# dag=dag
)
# This runs an expectation suite against a data asset that passes the tests
ge_batch_kwargs_list_pass = GreatExpectationsOperator(
task_id='ge_batch_kwargs_list_pass',
assets_to_validate=[
{
'batch_kwargs': {
'path': data_file,
'datasource': 'data__dir'
},
'expectation_suite_name': 'taxi.demo'
}
],
data_context_root_dir=ge_root_dir,
# dag=dag
)
# This runs a checkpoint that will pass. Make sure the checkpoint yml file has the correct path to the data file.
ge_checkpoint_pass = GreatExpectationsOperator(
task_id='ge_checkpoint_pass',
run_name='ge_airflow_run',
checkpoint_name='taxi.pass.chk',
data_context_root_dir=ge_root_dir,
# dag=dag
)
# This runs a checkpoint that will fail, but we set a flag to exit the task successfully.
# Make sure the checkpoint yml file has the correct path to the data file.
ge_checkpoint_fail_but_continue = GreatExpectationsOperator(
task_id='ge_checkpoint_fail_but_continue',
run_name='ge_airflow_run',
checkpoint_name='taxi.fail.chk',
fail_task_on_validation_failure=False,
data_context_root_dir=ge_root_dir,
# dag=dag
)
# This runs a checkpoint that will fail. Make sure the checkpoint yml file has the correct path to the data file.
ge_checkpoint_fail = GreatExpectationsOperator(
task_id='ge_checkpoint_fail',
run_name='ge_airflow_run',
checkpoint_name='taxi.fail.chk',
data_context_root_dir=ge_root_dir,
fail_task_on_validation_failure=True,
# dag=dag
)
# This creates a data context with a local data source and local file system defaults
# The task then tries to run a checkpoint, which requires the context_root_dir to be set
data_context_config_local = DataContextConfig(
datasources={
"data__dir": DatasourceConfig(
class_name="PandasDatasource",
batch_kwargs_generators={
"subdir_reader": {
"class_name": "SubdirReaderBatchKwargsGenerator",
"base_directory": data_dir_local,
}
},
)
},
store_backend_defaults=FilesystemStoreBackendDefaults(root_directory=ge_root_dir_local)
)
data_context_local = BaseDataContext(project_config=data_context_config_local, context_root_dir=ge_root_dir_local)
ge_in_code_context_local = GreatExpectationsOperator(
task_id='ge_in_code_context_local',
checkpoint_name="taxi.pass.chk",
data_context=data_context_local,
#dag=dag
)
# This creates a data context with an S3 default backend, but a local datasource
data_context_config_s3 = DataContextConfig(
datasources={
"data__dir": DatasourceConfig(
class_name="PandasDatasource",
batch_kwargs_generators={
"subdir_reader": {
"class_name": "SubdirReaderBatchKwargsGenerator",
"base_directory": data_dir_local,
}
},
)
},
store_backend_defaults=S3StoreBackendDefaults(default_bucket_name="sam-webinar-demo")
)
data_context_s3 = BaseDataContext(project_config=data_context_config_s3)
ge_in_code_context_s3 = GreatExpectationsOperator(
task_id='ge_in_code_context_s3',
expectation_suite_name='taxi.demo',
batch_kwargs={
'path': data_file_local,
'datasource': 'my_datasource'
},
data_context=data_context_s3,
#dag=dag
)
# This creates a data context with a local datasource and local file system store defaults
data_context_config = DataContextConfig(
datasources={
"data__dir": DatasourceConfig(
class_name="PandasDatasource",
batch_kwargs_generators={
"subdir_reader": {
"class_name": "SubdirReaderBatchKwargsGenerator",
"base_directory": data_dir,
}
},
)
},
store_backend_defaults=FilesystemStoreBackendDefaults(root_directory=ge_root_dir)
)
data_context = BaseDataContext(project_config=data_context_config)
ge_in_code_context = GreatExpectationsOperator(
task_id='ge_in_code_context',
expectation_suite_name='taxi.demo',
batch_kwargs={
'path': data_file,
'datasource': 'my_datasource'
},
data_context=data_context,
#dag=dag
)