/
test_dbt_deps.py
276 lines (231 loc) · 8.34 KB
/
test_dbt_deps.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
"""Unit test module for DbtDepsOperator."""
import datetime as dt
import glob
import os
from pathlib import Path
from unittest.mock import patch
import freezegun
import pytest
from airflow_dbt_python.hooks.dbt import DepsTaskConfig
from airflow_dbt_python.operators.dbt import DbtDepsOperator
condition = False
try:
from airflow_dbt_python.hooks.s3 import DbtS3Hook
except ImportError:
condition = True
no_s3_hook = pytest.mark.skipif(
condition, reason="S3Hook not available, consider installing amazon extras"
)
def test_dbt_deps_mocked_all_args():
"""Test mocked dbt deps call with all arguments."""
op = DbtDepsOperator(
task_id="dbt_task",
project_dir="/path/to/project/",
profiles_dir="/path/to/profiles/",
profile="dbt-profile",
target="dbt-target",
vars={"target": "override"},
log_cache_events=True,
)
assert op.command == "deps"
config = op.get_dbt_config()
assert isinstance(config, DepsTaskConfig) is True
assert config.project_dir == "/path/to/project/"
assert config.profiles_dir == "/path/to/profiles/"
assert config.profile == "dbt-profile"
assert config.target == "dbt-target"
assert config.vars == '{"target": "override"}'
assert config.log_cache_events is True
def test_dbt_deps_downloads_dbt_utils(
profiles_file, dbt_project_file, dbt_packages_dir, packages_file
):
"""Test that a DbtDepsOperator downloads the dbt_utils module."""
import shutil
# Ensure modules directory is empty before starting
dbt_utils_dir = dbt_packages_dir / "dbt_utils"
shutil.rmtree(dbt_utils_dir, ignore_errors=True)
assert dbt_utils_dir.exists() is False
# Record last modified times to ensure dbt deps only acts on
# dbt_packages_dir
files_and_times = [
(_file, os.stat(_file).st_mtime)
for _file in [dbt_project_file, profiles_file, packages_file]
]
op = DbtDepsOperator(
task_id="dbt_task",
project_dir=dbt_project_file.parent,
profiles_dir=profiles_file.parent,
)
modules = dbt_packages_dir.glob("dbt_utils")
assert len([m for m in modules]) == 0
op.execute({})
modules = dbt_packages_dir.glob("dbt_utils")
assert len([m for m in modules]) == 1
for _file, last_modified in files_and_times:
assert (
last_modified == os.stat(_file).st_mtime
), f"DbtDepsOperator changed an unexpected file: {_file}"
@no_s3_hook
def test_dbt_deps_push_to_s3(
s3_bucket,
profiles_file,
dbt_project_file,
packages_file,
):
"""Test execution of DbtDepsOperator with a push to S3 at the end."""
hook = DbtS3Hook()
bucket = hook.get_bucket(s3_bucket)
with open(dbt_project_file) as pf:
project_content = pf.read()
bucket.put_object(Key="project/dbt_project.yml", Body=project_content.encode())
with open(profiles_file) as pf:
profiles_content = pf.read()
bucket.put_object(Key="project/profiles.yml", Body=profiles_content.encode())
with open(packages_file) as pf:
packages_content = pf.read()
bucket.put_object(Key="project/packages.yml", Body=packages_content.encode())
# Ensure we are working with an empty dbt_packages dir in S3.
keys = hook.list_keys(
s3_bucket,
f"s3://{s3_bucket}/project/dbt_packages/",
)
if keys is not None and len(keys) > 0:
hook.delete_objects(
s3_bucket,
keys,
)
keys = hook.list_keys(
s3_bucket,
f"s3://{s3_bucket}/project/dbt_packages/",
)
assert keys is None or len(keys) == 0
# Record last modified times to ensure dbt deps only acts on
# dbt_packages_dir
files_and_times = [
(_file, os.stat(_file).st_mtime)
for _file in [dbt_project_file, profiles_file, packages_file]
]
op = DbtDepsOperator(
task_id="dbt_task",
project_dir=f"s3://{s3_bucket}/project/",
profiles_dir=f"s3://{s3_bucket}/project/",
push_dbt_project=True,
)
results = op.execute({})
assert results is None
keys = hook.list_keys(
s3_bucket,
f"s3://{s3_bucket}/project/dbt_packages/",
)
assert len(keys) >= 0
# dbt_utils files may be anything, let's just check that at least
# "dbt_utils" exists as part of the key.
assert len([k for k in keys if "dbt_utils" in k]) >= 0
for _file, last_modified in files_and_times:
assert (
last_modified == os.stat(_file).st_mtime
), f"DbtDepsOperator changed an unexpected file: {_file}"
def test_dbt_deps_doesnt_affect_non_package_files(
profiles_file,
dbt_project_file,
dbt_packages_dir,
packages_file,
model_files,
seed_files,
):
"""Test that a DbtDepsOperator doesn't alter model, seed, or other project files."""
import shutil
# Ensure modules directory is empty before starting
dbt_utils_dir = dbt_packages_dir / "dbt_utils"
shutil.rmtree(dbt_utils_dir, ignore_errors=True)
assert dbt_utils_dir.exists() is False
# Record files to ensure dbt deps only acts on dbt_packages_dir
files_and_times = [
(_file, os.stat(_file).st_mtime)
for _file in dbt_project_file.parent.glob("**/*")
# is_relative_to was added in 3.9 and we support both 3.7 and 3.8
if dbt_packages_dir.name not in str(_file)
]
dbt_packages_and_times = [
(_file, os.stat(_file).st_mtime) for _file in dbt_packages_dir.glob("**/*")
]
op = DbtDepsOperator(
task_id="dbt_task",
project_dir=dbt_project_file.parent,
profiles_dir=profiles_file.parent,
)
modules = dbt_packages_dir.glob("dbt_utils")
assert len([m for m in modules]) == 0
op.execute({})
modules = dbt_packages_dir.glob("dbt_utils")
assert len([m for m in modules]) == 1
for _file, last_modified in files_and_times:
assert (
last_modified == os.stat(_file).st_mtime
), f"DbtDepsOperator changed an unexpected file: {_file}"
for _file, last_modified in dbt_packages_and_times:
assert (
last_modified < os.stat(_file).st_mtime
), f"DbtDepsOperator did not change a package file: {_file}"
@no_s3_hook
def test_dbt_deps_push_to_s3_with_no_replace(
s3_bucket,
profiles_file,
dbt_project_file,
packages_file,
):
"""Test execution of DbtDepsOperator with a push to S3 at the end but with replace = False.
We would expect dbt_packages to be pushed (since they don't exist) but the rest of the project
files should not be replaced.
"""
hook = DbtS3Hook()
bucket = hook.get_bucket(s3_bucket)
project_files = (dbt_project_file, profiles_file, packages_file)
with freezegun.freeze_time("2022-01-01"):
for _file in project_files:
with open(_file) as pf:
content = pf.read()
bucket.put_object(Key=f"project/{_file.name}", Body=content.encode())
# Ensure we are working with an empty dbt_packages dir in S3.
keys = hook.list_keys(
s3_bucket,
f"s3://{s3_bucket}/project/dbt_packages/",
)
if keys is not None and len(keys) > 0:
hook.delete_objects(
s3_bucket,
keys,
)
keys = hook.list_keys(
s3_bucket,
f"s3://{s3_bucket}/project/dbt_packages/",
)
assert keys is None or len(keys) == 0
with freezegun.freeze_time("2022-02-02"):
op = DbtDepsOperator(
task_id="dbt_task",
project_dir=f"s3://{s3_bucket}/project/",
profiles_dir=f"s3://{s3_bucket}/project/",
push_dbt_project=True,
replace_on_push=False,
)
results = op.execute({})
assert results is None
keys = hook.list_keys(
s3_bucket,
f"s3://{s3_bucket}/project/dbt_packages/",
)
assert len(keys) >= 0
# dbt_utils files may be anything, let's just check that at least
# "dbt_utils" exists as part of the key.
assert len([k for k in keys if "dbt_utils" in k]) >= 0
file_names = {(f.name for f in project_files)}
for key in keys:
obj = hook.get_key(
key,
s3_bucket,
)
if Path(key).name in file_names:
assert obj.last_modified == dt.datetime(2022, 1, 1, tzinfo=dt.timezone.utc)
else:
assert obj.last_modified == dt.datetime(2022, 2, 2, tzinfo=dt.timezone.utc)