/
project.py
225 lines (195 loc) · 8.38 KB
/
project.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
import logging
from typing import Dict, List, Union, Optional
from up42.auth import Auth
from up42.job import Job
from up42.jobcollection import JobCollection
from up42.utils import get_logger, filter_jobs_on_mode
from up42.workflow import Workflow
logger = get_logger(__name__)
class Project:
"""
The Project is the top-level class of the UP42 hierarchy. With it you can create
new workflows, query already existing workflows & jobs in the project and manage
the project settings.
Create a new project on the
[**UP42 Console website**](https://sdk.up42.com/authentication/#get-your-project-credentials).
Use an existing project:
```python
up42.authenticate(project_id="uz92-8uo0-4dc9-ab1d-06d7ec1a5321",
project_api_key="9i7uec8a-45be-41ad-a50f-98bewb528b10")
project = up42.initialize_project()
```
"""
def __init__(self, auth: Auth, project_id: str):
self.auth = auth
self.project_id = project_id
self._info = self.info
def __repr__(self):
env = ", env: dev" if self.auth.env == "dev" else ""
return (
f"Project(name: {self._info['name']}, project_id: {self.project_id}, "
f"description: {self._info['description']}, createdAt: {self._info['createdAt']}"
f"{env})"
)
@property
def info(self) -> dict:
"""
Gets and updates the project metadata information.
"""
url = f"{self.auth._endpoint()}/projects/{self.project_id}"
response_json = self.auth._request(request_type="GET", url=url)
self._info = response_json["data"]
return self._info
def create_workflow(
self, name: str, description: str = "", use_existing: bool = False
) -> "Workflow":
"""
Creates a new workflow and returns a workflow object.
Args:
name: Name of the new workflow.
description: Description of the new workflow.
use_existing: If True, instead of creating a new workflow, uses the
most recent workflow with the same name & description.
Returns:
The workflow object.
"""
if use_existing:
logger.info("Getting existing workflows in project ...")
logging.getLogger("up42.workflow").setLevel(logging.CRITICAL)
existing_workflows: list = self.get_workflows(return_json=True)
logging.getLogger("up42.workflow").setLevel(logging.INFO)
matching_workflows: list = [
workflow
for workflow in existing_workflows
if workflow["name"] == name and workflow["description"] == description
]
if matching_workflows:
existing_workflow = Workflow(
self.auth,
project_id=self.project_id,
workflow_id=matching_workflows[0]["id"],
)
logger.info(
f"Using existing workflow: {name} - {existing_workflow.workflow_id}"
)
return existing_workflow
url = f"{self.auth._endpoint()}/projects/{self.project_id}/workflows/"
payload = {"name": name, "description": description}
response_json = self.auth._request(request_type="POST", url=url, data=payload)
workflow_id = response_json["data"]["id"]
logger.info(f"Created new workflow: {workflow_id}")
workflow = Workflow(
self.auth, project_id=self.project_id, workflow_id=workflow_id
)
return workflow
def get_workflows(
self, return_json: bool = False
) -> Union[List["Workflow"], List[dict]]:
"""
Gets all workflows in a project as workflow objects or json.
Args:
return_json: True returns infos of workflows as json instead of workflow objects.
Returns:
List of Workflow objects in the project or alternatively json info of the workflows.
"""
url = f"{self.auth._endpoint()}/projects/{self.project_id}/workflows"
response_json = self.auth._request(request_type="GET", url=url)
workflows_json = response_json["data"]
logger.info(
f"Got {len(workflows_json)} workflows for project {self.project_id}."
)
if return_json:
return workflows_json
else:
workflows = [
Workflow(
self.auth,
project_id=self.project_id,
workflow_id=workflow_json["id"],
workflow_info=workflow_json,
)
for workflow_json in workflows_json
]
return workflows
def get_jobs(
self, return_json: bool = False, test_jobs: bool = True, real_jobs: bool = True
) -> Union[JobCollection, List[dict]]:
"""
Get all jobs in the project as a JobCollection or json.
Use Workflow().get_job() to get a JobCollection with jobs associated with a
specific workflow.
Args:
return_json: If true, returns the job info jsons instead of JobCollection.
test_jobs: Return test jobs or test queries.
real_jobs: Return real jobs.
Returns:
All job objects in a JobCollection, or alternatively the jobs info as json.
"""
url = f"{self.auth._endpoint()}/projects/{self.project_id}/jobs"
response_json = self.auth._request(request_type="GET", url=url)
jobs_json = filter_jobs_on_mode(response_json["data"], test_jobs, real_jobs)
logger.info(f"Got {len(jobs_json)} jobs in project {self.project_id}.")
if return_json:
return jobs_json
else:
jobs = [
Job(
self.auth,
job_id=job_json["id"],
project_id=self.project_id,
job_info=job_json,
)
for job_json in jobs_json
]
jobcollection = JobCollection(
auth=self.auth, project_id=self.project_id, jobs=jobs
)
return jobcollection
def get_project_settings(self) -> List[Dict[str, str]]:
"""
Gets the project settings.
Returns:
The project settings.
"""
url = f"{self.auth._endpoint()}/projects/{self.project_id}/settings"
response_json = self.auth._request(request_type="GET", url=url)
project_settings = response_json["data"]
return project_settings
@property
def max_concurrent_jobs(self) -> int:
"""
Gets the maximum number of concurrent jobs allowed by the project settings.
"""
project_settings = self.get_project_settings()
project_settings_dict = {d["name"]: int(d["value"]) for d in project_settings}
return project_settings_dict["MAX_CONCURRENT_JOBS"]
def update_project_settings(
self,
max_aoi_size: Optional[int] = None,
max_concurrent_jobs: Optional[int] = None,
number_of_images: Optional[int] = None,
) -> None:
"""
Updates a project's settings.
Args:
max_aoi_size: The maximum area of interest geometry size, from 1-1000 sqkm, default 10 sqkm.
max_concurrent_jobs: The maximum number of concurrent jobs, from 1-10, default 1.
number_of_images: The maximum number of images returned with each job, from 1-20, default 10.
"""
# The ranges and default values of the project settings can potentially get
# increased, so need to be dynamically queried from the server.
current_settings = {d["name"]: d for d in self.get_project_settings()}
url = f"{self.auth._endpoint()}/projects/{self.project_id}/settings"
payload: Dict = {"settings": {}}
desired_settings = {
"JOB_QUERY_MAX_AOI_SIZE": max_aoi_size,
"MAX_CONCURRENT_JOBS": max_concurrent_jobs,
"JOB_QUERY_LIMIT_PARAMETER_MAX_VALUE": number_of_images,
}
for name, desired_setting in desired_settings.items():
if desired_setting is None:
payload["settings"][name] = str(current_settings[name]["value"])
else:
payload["settings"][name] = str(desired_setting)
self.auth._request(request_type="POST", url=url, data=payload)
logger.info(f"Updated project settings: {payload}")