-
Notifications
You must be signed in to change notification settings - Fork 47
/
fixtures_catalog.py
113 lines (88 loc) · 4.16 KB
/
fixtures_catalog.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import copy
import json
from pathlib import Path
import pytest
from up42.catalog import Catalog
from .fixtures_globals import API_HOST, DATA_PRODUCT_ID
@pytest.fixture()
def catalog_mock(auth_mock, requests_mock):
url_collections = f"{API_HOST}/collections"
collections_response = {"data": [{"name": "phr", "hostName": "oneatlas", "type": "ARCHIVE"}]}
requests_mock.get(
url=url_collections,
json=collections_response,
)
url_data_products = f"{API_HOST}/data-products"
with open(Path(__file__).resolve().parents[1] / "mock_data/data_products.json") as json_file:
json_data_products = json.load(json_file)
requests_mock.get(url=url_data_products, json={"data": json_data_products})
url_search = f"{API_HOST}/catalog/hosts/oneatlas/stac/search"
with open(Path(__file__).resolve().parents[1] / "mock_data/search_response.json") as json_file:
json_search_response = json.load(json_file)
requests_mock.post(
url=url_search,
json=json_search_response,
)
url_data_product_schema = f"{API_HOST}/orders/schema/{DATA_PRODUCT_ID}"
with open(Path(__file__).resolve().parents[1] / "mock_data/data_product_spot_schema.json") as json_file:
json_data_product_schema = json.load(json_file)
requests_mock.get(url=url_data_product_schema, json=json_data_product_schema)
return Catalog(auth=auth_mock)
@pytest.fixture()
def catalog_live(auth_live):
return Catalog(auth=auth_live)
@pytest.fixture()
def catalog_pagination_mock(auth_mock, requests_mock):
url_collections = f"{API_HOST}/collections"
collections_response = {"data": [{"name": "phr", "hostName": "oneatlas", "type": "ARCHIVE"}]}
requests_mock.get(
url=url_collections,
json=collections_response,
)
url_data_products = f"{API_HOST}/data-products"
with open(Path(__file__).resolve().parents[1] / "mock_data/data_products.json") as json_file:
json_data_products = json.load(json_file)
requests_mock.get(url=url_data_products, json={"data": json_data_products})
with open(Path(__file__).resolve().parents[1] / "mock_data/search_response.json") as json_file:
search_response_json = json.load(json_file)
search_response_json["features"] = search_response_json["features"] * 500
pagination_response_json = copy.deepcopy(search_response_json)
pagination_response_json["features"] = pagination_response_json["features"][:50]
del pagination_response_json["links"][1] # indicator of pagination exhaustion (after first page)
url_search = f"{API_HOST}/catalog/hosts/oneatlas/stac/search"
requests_mock.post(
url_search,
[{"json": search_response_json}, {"json": pagination_response_json}],
)
return Catalog(auth=auth_mock)
@pytest.fixture()
def catalog_usagetype_mock(auth_mock, requests_mock):
url_collections = f"{API_HOST}/collections"
collections_response = {"data": [{"name": "phr", "hostName": "oneatlas", "type": "ARCHIVE"}]}
requests_mock.get(
url=url_collections,
json=collections_response,
)
url_data_products = f"{API_HOST}/data-products"
with open(Path(__file__).resolve().parents[1] / "mock_data/data_products.json") as json_file:
json_data_products = json.load(json_file)
requests_mock.get(url=url_data_products, json={"data": json_data_products})
with open(Path(__file__).resolve().parents[1] / "mock_data/search_response.json") as json_file:
search_response_json = json.load(json_file) # original response is usagetype data
response_analytics = copy.deepcopy(search_response_json)
response_analytics["features"][0]["properties"]["up42:usageType"] = ["ANALYTICS"]
response_data_and_analytics = copy.deepcopy(search_response_json)
response_data_and_analytics["features"][0]["properties"]["up42:usageType"] = [
"DATA",
"ANALYTICS",
]
url_search = f"{API_HOST}/catalog/hosts/oneatlas/stac/search"
requests_mock.post(
url_search,
[
{"json": search_response_json},
{"json": response_analytics},
{"json": response_data_and_analytics},
],
)
return Catalog(auth=auth_mock)