Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 55 additions & 79 deletions src/superannotate/lib/core/usecases/annotations.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ async def _upload_small_annotations(self, chunk) -> Report:
except Exception: # noqa
failed_annotations.extend([i.name for i in chunk])
finally:
print(1111, len(chunk))
self.reporter.update_progress(len(chunk))
return Report(
failed_annotations, missing_classes, missing_attr_groups, missing_attrs
Expand All @@ -345,7 +346,7 @@ async def upload(_chunk):
self._report.missing_attr_groups.extend(report.missing_attr_groups)
self._report.missing_attrs.extend(report.missing_attrs)
except Exception as e:
self.reporter.log_debug(str(e))
self.reporter.log_error(str(e))
self._report.failed_annotations.extend([i.name for i in _chunk])

_size = 0
Expand Down Expand Up @@ -386,7 +387,6 @@ async def _upload_big_annotation(self, item) -> Tuple[str, bool]:
except Exception as e:
self.reporter.log_debug(str(e))
self._report.failed_annotations.append(item.name)
raise
finally:
self.reporter.update_progress()

Expand All @@ -397,7 +397,7 @@ async def upload_big_annotations(self):
if item:
await self._upload_big_annotation(item)
else:
await self._big_files_queue.put_nowait(None)
self._big_files_queue.put_nowait(None)
break

async def distribute_queues(self, items_to_upload: list):
Expand Down Expand Up @@ -431,26 +431,21 @@ async def distribute_queues(self, items_to_upload: list):
self.reporter.update_progress()
data[idx][1] = True
processed_count += 1
self.reporter.update_progress()
data[idx][1] = True
processed_count += 1
self._big_files_queue.put_nowait(None)
self._small_files_queue.put_nowait(None)

async def run_workers(self, items_to_upload):
try:
self._big_files_queue = asyncio.Queue()
self._small_files_queue = asyncio.Queue()
await asyncio.gather(
self.distribute_queues(items_to_upload),
self.upload_big_annotations(),
self.upload_big_annotations(),
self.upload_big_annotations(),
return_exceptions=True,
)
await asyncio.gather(self.upload_small_annotations())
except Exception as e:
self.reporter.log_error(f"Error {str(e)}")
self._big_files_queue = asyncio.Queue()
self._small_files_queue = asyncio.Queue()
await asyncio.gather(
self.distribute_queues(items_to_upload),
self.upload_big_annotations(),
self.upload_big_annotations(),
self.upload_big_annotations(),
)
await asyncio.gather(self.upload_small_annotations())

def execute(self):
missing_annotations = []
Expand All @@ -472,7 +467,11 @@ def execute(self):
items_to_upload.append(self.AnnotationToUpload(item.uuid, name, path))
except KeyError:
missing_annotations.append(name)
asyncio.run(self.run_workers(items_to_upload))
try:
asyncio.run(self.run_workers(items_to_upload))
except Exception as e:
self.reporter.log_error(str(e))
self._response.errors = AppException("Can't upload annotations.")
self.reporter.finish_progress()
self._log_report()
uploaded_annotations = list(
Expand Down Expand Up @@ -675,7 +674,6 @@ def execute(self):
)
if not uploaded:
self._response.errors = constants.INVALID_JSON_MESSAGE

else:
response = asyncio.run(
self._backend_service.upload_annotations(
Expand Down Expand Up @@ -750,6 +748,7 @@ def __init__(
self._client = backend_service_provider
self._show_process = show_process
self._item_names_provided = True
self._big_annotations_queue = None

def validate_project_type(self):
if self._project.type == constants.ProjectType.PIXEL.value:
Expand All @@ -775,76 +774,64 @@ def validate_item_names(self):
self._item_names = [item.name for item in self._images.get_all(condition)]

def _prettify_annotations(self, annotations: List[dict]):
restruct = {}
re_struct = {}

if self._item_names_provided:
for annotation in annotations:
restruct[annotation["metadata"]["name"]] = annotation
re_struct[annotation["metadata"]["name"]] = annotation
try:
return [restruct[x] for x in self._item_names if x in restruct]
return [re_struct[x] for x in self._item_names if x in re_struct]
except KeyError:
raise AppException("Broken data.")

return annotations

async def get_big_annotation(
self,
):
async def get_big_annotation(self):

large_annotations = []
while True:
item = await self._big_annotations_queue.get()
if not item:
await self._big_annotations_queue.put(None)
break

large_annotation = await self._client.get_big_annotation(
team_id=self._project.team_id,
project_id=self._project.id,
folder_id=self._folder.uuid,
item=item,
reporter=self.reporter,
)

large_annotations.append(large_annotation)

return large_annotations

async def get_small_annotations(self, item_names):
small_annotations = await self._client.get_small_annotations(
return await self._client.get_small_annotations(
team_id=self._project.team_id,
project_id=self._project.id,
folder_id=self._folder.uuid,
items=item_names,
reporter=self.reporter,
)

return small_annotations

async def distribute_to_queue(self, big_annotations):

for item in big_annotations:
await self._big_annotations_queue.put(item)

await self._big_annotations_queue.put(None)

async def run_workers(self, big_annotations, small_annotations):

self._big_annotations_queue = asyncio.Queue()
annotations = await asyncio.gather(
self.distribute_to_queue(big_annotations),
self.get_small_annotations(small_annotations),
self.get_big_annotation(),
self.get_big_annotation(),
self.get_big_annotation(),
return_exceptions=True,
)

annotations = [i for x in annotations[1:] for i in x if x]
return annotations

def execute(self):
if self.is_valid():
self._big_annotations_queue = asyncio.Queue()
items_count = len(self._item_names)
self.reporter.log_info(
f"Getting {items_count} annotations from "
Expand All @@ -858,12 +845,15 @@ def execute(self):
folder_id=self._folder.uuid,
project_id=self._project.id,
)

small_annotations = [x["name"] for x in items["small"]]
annotations = asyncio.run(
self.run_workers(items["large"], small_annotations)
)

try:
annotations = asyncio.run(
self.run_workers(items["large"], small_annotations)
)
except Exception as e:
self.reporter.log_error(str(e))
self._response.errors = AppException("Can't upload annotations.")
return self._response
received_items_count = len(annotations)
self.reporter.finish_progress()
if items_count > received_items_count:
Expand Down Expand Up @@ -1122,31 +1112,27 @@ def coroutine_wrapper(coroutine):
loop.close()
return count

async def _download_big_annotation(self, item, export_path, folder_id):
postfix = self.get_postfix()
await self._backend_client.download_big_annotation(
item=item,
team_id=self._project.team_id,
project_id=self._project.id,
download_path=f"{export_path}{'/' + self._folder.name if not self._folder.is_root else ''}",
postfix=postfix,
callback=self._callback,
)

async def download_big_annotations(self, queue_idx, export_path, folder_id):
async def download_big_annotations(self, queue_idx, export_path):
while True:
cur_queue = self._big_file_queues[queue_idx]
item = await cur_queue.get()
cur_queue.task_done()
if item:
await self._download_big_annotation(item, export_path, folder_id)
postfix = self.get_postfix()
await self._backend_client.download_big_annotation(
item=item,
team_id=self._project.team_id,
project_id=self._project.id,
download_path=f"{export_path}{'/' + self._folder.name if not self._folder.is_root else ''}",
postfix=postfix,
callback=self._callback,
)
else:
cur_queue.put_nowait(None)
break

async def download_small_annotations(self, queue_idx, export_path, folder_id):
cur_queue = self._small_file_queues[queue_idx]

items = []
item = ""
postfix = self.get_postfix()
Expand Down Expand Up @@ -1195,15 +1181,9 @@ async def run_workers(self, item_names, folder_id, export_path):
self.distribute_to_queues(
item_names, small_file_queue_idx, big_file_queue_idx, folder_id
),
self.download_big_annotations(
big_file_queue_idx, export_path, folder_id
),
self.download_big_annotations(
big_file_queue_idx, export_path, folder_id
),
self.download_big_annotations(
big_file_queue_idx, export_path, folder_id
),
self.download_big_annotations(big_file_queue_idx, export_path),
self.download_big_annotations(big_file_queue_idx, export_path),
self.download_big_annotations(big_file_queue_idx, export_path),
self.download_small_annotations(
small_file_queue_idx, export_path, folder_id
),
Expand All @@ -1214,11 +1194,7 @@ async def run_workers(self, item_names, folder_id, export_path):
except Exception as e:
self.reporter.log_error(f"Error {str(e)}")

def per_folder_execute(self, item_names, folder_id, export_path):
asyncio.run(self.run_workers(item_names, folder_id, export_path))

def execute(self):

if self.is_valid():
export_path = str(
self.destination
Expand Down Expand Up @@ -1255,22 +1231,22 @@ def execute(self):
]
else:
item_names = self._item_names

new_export_path = export_path

if not folder.is_root and self._folder.is_root:
new_export_path += f"/{folder.name}"

# TODO check
if not item_names:
continue
future = executor.submit(
self.per_folder_execute,
item_names,
folder.uuid,
new_export_path,
futures.append(
executor.submit(
asyncio.run,
self.run_workers(item_names, folder.uuid, new_export_path),
)
)
futures.append(future)

for future in concurrent.futures.as_completed(futures):
exception = future.exception()
if exception:
self._response.errors = exception

self.reporter.stop_spinner()
count = self.get_items_count(export_path)
Expand Down
18 changes: 11 additions & 7 deletions src/superannotate/lib/infrastructure/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ async def _sync_large_annotation(self, team_id, project_id, item_id):
headers=self.default_headers,
raise_for_status=True,
) as session:
res = await session.post(sync_url, params=sync_params)
await session.post(sync_url, params=sync_params)

sync_params.pop("current_source")
sync_params.pop("desired_source")
Expand Down Expand Up @@ -1165,10 +1165,8 @@ async def get_big_annotation(
self,
project_id: int,
team_id: int,
folder_id: int,
item: dict,
reporter: Reporter,
callback: Callable = None,
):

url = urljoin(
Expand All @@ -1183,13 +1181,14 @@ async def get_big_annotation(
"version": "V1.00",
}

synced = await self._sync_large_annotation(
await self._sync_large_annotation(
team_id=team_id, project_id=project_id, item_id=item["id"]
)

async with aiohttp.ClientSession(
connector=aiohttp.TCPConnector(ssl=False),
headers=self.default_headers,
raise_for_status=True,
) as session:
start_response = await session.post(url, params=query_params)
large_annotation = await start_response.json()
Expand Down Expand Up @@ -1253,13 +1252,14 @@ async def download_big_annotation(
self.URL_DOWNLOAD_LARGE_ANNOTATION.format(item_id=item_id),
)

synced = await self._sync_large_annotation(
await self._sync_large_annotation(
team_id=team_id, project_id=project_id, item_id=item_id
)

async with aiohttp.ClientSession(
connector=aiohttp.TCPConnector(ssl=False),
headers=self.default_headers,
raise_for_status=True,
) as session:
start_response = await session.post(url, params=query_params)
res = await start_response.json()
Expand Down Expand Up @@ -1485,7 +1485,9 @@ async def upload_annotations(
headers = copy.copy(self.default_headers)
del headers["Content-Type"]
async with aiohttp.ClientSession(
headers=headers, connector=aiohttp.TCPConnector(ssl=self._verify_ssl)
headers=headers,
connector=aiohttp.TCPConnector(ssl=self._verify_ssl),
raise_for_status=True,
) as session:
data = aiohttp.FormData(quote_fields=False)
for key, file in items_name_file_map.items():
Expand Down Expand Up @@ -1526,7 +1528,9 @@ async def upload_big_annotation(
chunk_size: int,
) -> bool:
async with aiohttp.ClientSession(
connector=aiohttp.TCPConnector(ssl=False), headers=self.default_headers
connector=aiohttp.TCPConnector(ssl=False),
headers=self.default_headers,
raise_for_status=True,
) as session:
params = {
"team_id": team_id,
Expand Down
Loading