-
Notifications
You must be signed in to change notification settings - Fork 31
/
conftest.py
309 lines (231 loc) · 9.65 KB
/
conftest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
import os
from contextlib import contextmanager
from pathlib import Path
from textwrap import indent
from typing import Tuple
import pytest
import sqlalchemy
import structlog
from click.testing import CliRunner
from datacube.drivers.postgres import PostgresDb
from datacube.drivers.postgres._core import METADATA as ODC_SCHEMA_METADATA
from datacube.index import Index
from datacube.index.hl import Doc2Dataset
from datacube.model import Dataset
from datacube.scripts import ingest
from datacube.drivers import storage_writer_by_name
from datacube.utils import read_documents
from digitalearthau.testing import factories
from flask.testing import FlaskClient
from structlog import DropEvent
import cubedash
from cubedash import _model, _utils, generate, logs
from cubedash.summary import SummaryStore
from cubedash.summary._schema import METADATA as CUBEDASH_METADATA
from cubedash.warmup import find_examples_of_all_public_urls
# Use module-scoped databases, as it takes a while to populate with
# our data, and we're treating it as read-only in tests.
# -> Note: Since we're reusing the default config unchanged, we can't use the
# default index/dea_index fixtures, as they'll override data from
# the same db.
from integration_tests.asserts import format_doc_diffs
######################################################
# Prepare DB for integration test
#####################################################
module_vanilla_db = factories.db_fixture("local_config", scope="module")
@pytest.fixture(scope="module")
def module_db(module_vanilla_db: PostgresDb) -> PostgresDb:
# Set all the tables to unlogged for faster perf.
_make_all_tables_unlogged(module_vanilla_db._engine, ODC_SCHEMA_METADATA)
return module_vanilla_db
TEST_DATA_DIR = Path(__file__).parent / "data"
INTERGRATION_METADATA_FOLDER = Path(__file__).parent / "data/metadata"
INTERGRATION_PRODUCTS_FOLDER = Path(__file__).parent / "data/products"
INTEGRATION_INGESTION_FOLDER = Path(__file__).parent / "data/ingestions"
def dea_index_fixture(index_fixture_name, scope='function'):
"""
Create a pytest fixture for a Datacube instance populated
with DEA products/config.
"""
@pytest.fixture(scope=scope)
def dea_index_instance(request):
"""
An index initialised with DEA config (products)
"""
index: Index = request.getfixturevalue(index_fixture_name)
index.init_db(with_default_types=True)
index.metadata_types.check_field_indexes(
allow_table_lock=True,
rebuild_indexes=False,
rebuild_views=True,
)
# Add DEA metadata types, products.
for md_file in os.listdir(INTERGRATION_METADATA_FOLDER):
for _, doc in read_documents(os.path.join(INTERGRATION_METADATA_FOLDER, md_file)):
index.metadata_types.add(index.metadata_types.from_doc(doc))
for prod_file in os.listdir(INTERGRATION_PRODUCTS_FOLDER):
for _, product_def in read_documents(os.path.join(INTERGRATION_PRODUCTS_FOLDER, prod_file)):
index.products.add_document(product_def)
for path in INTEGRATION_INGESTION_FOLDER.glob('*.yaml'):
ingest_config = ingest.load_config_from_file(path)
driver_name = ingest_config['storage']['driver']
driver = storage_writer_by_name(driver_name)
if driver is None:
raise ValueError("No driver found for {}".format(driver_name))
ingest.ensure_output_type(
index, ingest_config, driver.format, allow_product_changes=True
)
return index
return dea_index_instance
module_index = factories.index_fixture("module_db", scope="module")
module_dea_index = dea_index_fixture("module_index", scope="module")
@pytest.fixture()
def summary_store(module_dea_index: Index) -> SummaryStore:
store = SummaryStore.create(module_dea_index)
store.drop_all()
module_dea_index.close()
with disable_logging():
# Some CRS/storage tests use test data that is 3577
store.init(grouping_epsg_code=3577)
_make_all_tables_unlogged(
_utils.alchemy_engine(module_dea_index), CUBEDASH_METADATA
)
return store
@pytest.fixture()
def summariser(summary_store: SummaryStore):
return summary_store._summariser
@pytest.fixture(autouse=True, scope="session")
def _init_logs(pytestconfig):
logs.init_logging(
verbosity=pytestconfig.getoption("verbose"), cache_logger_on_first_use=False
)
@pytest.fixture()
def tmppath(tmpdir):
return Path(str(tmpdir))
@pytest.fixture()
def clirunner(global_integration_cli_args):
def _run_cli(cli_method, opts, catch_exceptions=False, expect_success=True):
exe_opts = list(global_integration_cli_args)
exe_opts.extend(opts)
runner = CliRunner()
result = runner.invoke(cli_method, exe_opts, catch_exceptions=catch_exceptions)
if expect_success:
assert (
0 == result.exit_code
), f"Error for {opts}. Out:\n{indent(result.output, ' ' * 4)}"
return result
return _run_cli
@pytest.fixture()
def run_generate(clirunner, summary_store, multi_processed=False):
def do(*args, expect_success=True):
args = args or ("--all",)
if not multi_processed:
args = ("-j", "1") + tuple(args)
res = clirunner(generate.cli, args, expect_success=expect_success)
return res
return do
@pytest.fixture(scope="module")
def dataset_loader(module_dea_index: Index):
def _populate_from_dump(expected_type: str, dump_path: Path):
ls8_nbar_scene = module_dea_index.products.get_by_name(expected_type)
dataset_count = 0
create_dataset = Doc2Dataset(module_dea_index)
for _, doc in read_documents(dump_path):
label = doc["ga_label"] if ("ga_label" in doc) else doc["id"]
# type: Tuple[Dataset, str]
dataset, err = create_dataset(
doc, f"file://example.com/test_dataset/{label}"
)
assert dataset is not None, err
assert dataset.type.name == expected_type
created = module_dea_index.datasets.add(dataset)
assert created.uris
assert created.type.name == ls8_nbar_scene.name
dataset_count += 1
print(f"Populated {dataset_count} of {expected_type}")
return dataset_count
return _populate_from_dump
@pytest.fixture()
def all_urls(summary_store: SummaryStore):
"""A list of public URLs to try on the current Explorer instance"""
return list(find_examples_of_all_public_urls(summary_store.index))
@pytest.fixture()
def empty_client(summary_store: SummaryStore) -> FlaskClient:
_model.cache.clear()
_model.STORE = summary_store
cubedash.app.config["TESTING"] = True
return cubedash.app.test_client()
@pytest.fixture()
def unpopulated_client(
empty_client: FlaskClient, summary_store: SummaryStore
) -> FlaskClient:
with disable_logging():
_model.STORE.refresh_all_product_extents()
return empty_client
@contextmanager
def disable_logging():
"""
Turn off logging within the if-block
Used for repetitive environment setup that makes test errors too verbose.
"""
original_processors = structlog.get_config()["processors"]
def swallow_log(_logger, _log_method, _event_dict):
raise DropEvent
structlog.configure(processors=[swallow_log])
try:
yield
finally:
structlog.configure(processors=original_processors)
@pytest.fixture()
def client(unpopulated_client: FlaskClient) -> FlaskClient:
with disable_logging():
for product in _model.STORE.index.products.get_all():
_model.STORE.refresh(product.name)
return unpopulated_client
@pytest.fixture(scope="module")
def populated_index(dataset_loader, module_dea_index):
"""
Index populated with example datasets. Assumes our tests wont modify the data!
It's module-scoped as it's expensive to populate.
"""
loaded = dataset_loader("wofs_albers", TEST_DATA_DIR / "wofs-albers-sample.yaml.gz")
assert loaded == 11
loaded = dataset_loader(
"high_tide_comp_20p", TEST_DATA_DIR / "high_tide_comp_20p.yaml.gz"
)
assert loaded == 306
# These have very large footprints, as they were unioned from many almost-identical
# polygons and not simplified. They will trip up postgis if used naively.
# (postgis gist index has max record size of 8k per entry)
loaded = dataset_loader(
"pq_count_summary", TEST_DATA_DIR / "pq_count_summary.yaml.gz"
)
assert loaded == 20
return module_dea_index
def pytest_assertrepr_compare(op, left, right):
"""
Custom pytest error messages for large documents.
The default pytest dict==dict error messages are unreadable for
nested document-like dicts. (Such as our json and yaml docs!)
We just want to know which fields differ.
"""
def is_a_doc(o: object):
"""
Is it a dict that's not printable on one line?
"""
return isinstance(o, dict) and len(repr(o)) > 79
if (is_a_doc(left) or is_a_doc(right)) and op == "==":
return format_doc_diffs(left, right)
def _make_all_tables_unlogged(engine, metadata: sqlalchemy.MetaData):
"""
Set all tables in this alchemy metadata to unlogged.
Make them faster, but data is lost on crashes. Which is a good
trade-off for tests.
"""
for table in reversed(metadata.sorted_tables):
table: sqlalchemy.Table
if table.name.startswith("mv_"):
# Not supported for materialised views.
continue
else:
engine.execute(f"""alter table {table.selectable.fullname} set unlogged;""")