-
Notifications
You must be signed in to change notification settings - Fork 47
/
fixtures_job.py
116 lines (91 loc) · 3.35 KB
/
fixtures_job.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import os
import pytest
from up42.job import Job
from .fixtures_globals import (
API_HOST,
DOWNLOAD_URL,
JOB_ID,
JOB_ID_2,
JOB_NAME,
JOBTASK_ID,
MOCK_CREDITS,
PROJECT_ID,
WORKFLOW_NAME,
)
@pytest.fixture()
def job_mock(auth_mock, requests_mock):
url_job_info = f"{API_HOST}/projects/{PROJECT_ID}/jobs/{JOB_ID}"
json_job_info = {
"data": {
"xyz": 789,
"mode": "DEFAULT",
"description": "some_description",
"startedAt": "some_date",
"workflowName": WORKFLOW_NAME,
"name": JOB_NAME,
"finishedAt": "some_date",
"status": "SUCCESSFULL",
"inputs": "some_inputs",
},
"error": {},
}
requests_mock.get(url=url_job_info, json=json_job_info)
job = Job(auth=auth_mock, project_id=PROJECT_ID, job_id=JOB_ID)
# get_jobtasks
url_job_tasks = f"{API_HOST}/projects/{job.project_id}/jobs/{job.job_id}/tasks/"
requests_mock.get(url=url_job_tasks, json={"data": [{"id": JOBTASK_ID}]})
# get_logs
url_logs = f"{API_HOST}/projects/{job.project_id}/jobs/" f"{job.job_id}/tasks/{JOBTASK_ID}/logs"
requests_mock.get(url_logs, json="")
# quicklooks
url_quicklook = f"{API_HOST}/projects/{job.project_id}/jobs/{job.job_id}" f"/tasks/{JOBTASK_ID}/outputs/quicklooks/"
requests_mock.get(url_quicklook, json={"data": ["a_quicklook.png"]})
# get_results_json
url = f"{API_HOST}/projects/{job.project_id}/jobs/{job.job_id}" f"/outputs/data-json/"
requests_mock.get(url, json={"type": "FeatureCollection", "features": []})
# get_jobtasks_results_json
url = f"{API_HOST}/projects/{job.project_id}/jobs/{job.job_id}" f"/tasks/{JOBTASK_ID}/outputs/data-json"
requests_mock.get(url, json={"type": "FeatureCollection", "features": []})
# get_download_url
url_download_result = f"{API_HOST}/projects/" f"{job.project_id}/jobs/{job.job_id}/downloads/results/"
requests_mock.get(url_download_result, json={"data": {"url": DOWNLOAD_URL}, "error": {}})
# get_job_credits
url_download_result = f"{API_HOST}/projects/" f"{job.project_id}/jobs/{job.job_id}/credits"
requests_mock.get(
url_download_result,
json=MOCK_CREDITS,
)
return job
@pytest.fixture()
def job_live(auth_live, project_id_live):
job = Job(
auth=auth_live,
project_id=project_id_live,
job_id=os.getenv("TEST_UP42_JOB_ID"),
)
return job
@pytest.fixture()
def jobs_mock(auth_mock, requests_mock):
url_job_info = f"{API_HOST}/projects/{PROJECT_ID}/jobs/{JOB_ID}"
requests_mock.get(
url=url_job_info,
json={"data": {"xyz": 789, "mode": "DEFAULT"}, "error": {}},
)
job1 = Job(auth=auth_mock, project_id=PROJECT_ID, job_id=JOB_ID)
url_job_info = f"{API_HOST}/projects/{PROJECT_ID}/jobs/{JOB_ID_2}"
requests_mock.get(url=url_job_info, json={"data": {"xyz": 789}, "error": {}})
job2 = Job(auth=auth_mock, project_id=PROJECT_ID, job_id=JOB_ID_2)
return [job1, job2]
@pytest.fixture
def jobs_live(auth_live, project_id_live):
job_1 = Job(
auth=auth_live,
project_id=project_id_live,
job_id=os.getenv("TEST_UP42_JOB_ID"),
)
job_2 = Job(
auth=auth_live,
project_id=project_id_live,
job_id=os.getenv("TEST_UP42_JOB_ID_2"),
)
return [job_1, job_2]