/
test_workflow.py
123 lines (96 loc) · 4.33 KB
/
test_workflow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import pytest
from up42.job import Job
from up42.jobcollection import JobCollection
from up42.workflow import Workflow
from .fixtures.fixtures_globals import API_HOST, JOB_ID, JSON_WORKFLOW_TASKS
def test_workflow_info(workflow_mock):
del workflow_mock._info
assert isinstance(workflow_mock, Workflow)
assert workflow_mock.info["xyz"] == 789
assert workflow_mock._info["xyz"] == 789
def test_get_workflow_tasks(workflow_mock):
tasks = workflow_mock.get_workflow_tasks(basic=False)
assert len(tasks) == 2
assert tasks[0] == JSON_WORKFLOW_TASKS["data"][0]
def test_get_workflow_tasks_basic(workflow_mock):
tasks = workflow_mock.get_workflow_tasks(basic=True)
assert len(tasks) == 2
assert tasks["esa-s2-l2a-gtiff-visual:1"] == "1.0.1"
assert tasks["tiling:1"] == "2.2.3"
def test_construct_full_workflow_tasks_dict_unknown_block_raises(workflow_mock):
input_tasks = ["some_block"]
with pytest.raises(ValueError):
workflow_mock._construct_full_workflow_tasks_dict(input_tasks=input_tasks)
@pytest.mark.parametrize(
"input_tasks",
[
[
"c4cb8913-2ef3-4e82-a426-65ea8faacd9a",
"4ed70368-d4e1-4462-bef6-14e768049471",
],
["esa-s2-l2a-gtiff-visual", "tiling"],
[
"Sentinel-2 L2A Visual (GeoTIFF)",
"Raster Tiling",
],
[
"c4cb8913-2ef3-4e82-a426-65ea8faacd9a",
"tiling",
],
[
"Sentinel-2 L2A Visual (GeoTIFF)",
"4ed70368-d4e1-4462-bef6-14e768049471",
],
],
)
def test_construct_full_workflow_tasks_dict(workflow_mock, input_tasks):
full_workflow_tasks_dict = workflow_mock._construct_full_workflow_tasks_dict(input_tasks=input_tasks)
assert isinstance(full_workflow_tasks_dict, list)
assert full_workflow_tasks_dict[0]["name"] == "esa-s2-l2a-gtiff-visual:1"
assert full_workflow_tasks_dict[0]["parentName"] is None
assert full_workflow_tasks_dict[1]["name"] == "tiling:1"
assert full_workflow_tasks_dict[1]["parentName"] == "esa-s2-l2a-gtiff-visual:1"
assert full_workflow_tasks_dict[1]["blockId"] == "4ed70368-d4e1-4462-bef6-14e768049471"
def test_get_parameter_info(workflow_mock):
parameter_info = workflow_mock.get_parameters_info()
assert isinstance(parameter_info, dict)
assert {"tiling:1", "esa-s2-l2a-gtiff-visual:1"} <= set(parameter_info.keys())
assert {"nodata", "tile_width"} <= set(parameter_info["tiling:1"].keys())
def test_get_default_parameters(workflow_mock):
default_parameters = workflow_mock._get_default_parameters()
assert isinstance(default_parameters, dict)
assert {"tiling:1", "esa-s2-l2a-gtiff-visual:1"} <= set(default_parameters.keys())
assert default_parameters["tiling:1"] == {
"nodata": None,
"tile_width": 768,
"required_but_no_default": None,
}
assert "not_required_no_default" not in default_parameters["tiling:1"]
assert default_parameters["esa-s2-l2a-gtiff-visual:1"] == {
"ids": None,
"bbox": None,
"time": "2018-01-01T00:00:00+00:00/2020-12-31T23:59:59+00:00",
}
assert "intersects" not in default_parameters["esa-s2-l2a-gtiff-visual:1"]
def test_get_jobs(workflow_mock, requests_mock):
url_job_info = f"{API_HOST}/projects/{workflow_mock.project_id}/jobs/{JOB_ID}"
requests_mock.get(
url=url_job_info,
json={"data": {"xyz": 789, "mode": "DEFAULT"}, "error": {}},
)
jobcollection = workflow_mock.get_jobs()
assert isinstance(jobcollection, JobCollection)
assert isinstance(jobcollection.jobs[0], Job)
assert jobcollection.jobs[0].job_id == JOB_ID
assert len(jobcollection.jobs) == 1 # Filters out the job that is not associated with the workflow object
@pytest.mark.skip(reason="too many jobs in test project, triggers too many job info requests.")
@pytest.mark.live
def test_get_jobs_live(workflow_live):
jobcollection = workflow_live.get_jobs()
assert isinstance(jobcollection, list)
assert isinstance(jobcollection.jobs[0], Job)
assert all(j._info["workflowId"] == workflow_live.workflow_id for j in jobcollection.jobs)
def test_delete(workflow_mock, requests_mock):
delete_url = f"{API_HOST}/projects/{workflow_mock.project_id}/workflows/{workflow_mock.workflow_id}"
requests_mock.delete(url=delete_url)
workflow_mock.delete()