diff --git a/src/superannotate/lib/core/usecases/annotations.py b/src/superannotate/lib/core/usecases/annotations.py index 5d889990f..765b72b3b 100644 --- a/src/superannotate/lib/core/usecases/annotations.py +++ b/src/superannotate/lib/core/usecases/annotations.py @@ -329,6 +329,7 @@ async def _upload_small_annotations(self, chunk) -> Report: except Exception: # noqa failed_annotations.extend([i.name for i in chunk]) finally: + print(1111, len(chunk)) self.reporter.update_progress(len(chunk)) return Report( failed_annotations, missing_classes, missing_attr_groups, missing_attrs @@ -345,7 +346,7 @@ async def upload(_chunk): self._report.missing_attr_groups.extend(report.missing_attr_groups) self._report.missing_attrs.extend(report.missing_attrs) except Exception as e: - self.reporter.log_debug(str(e)) + self.reporter.log_error(str(e)) self._report.failed_annotations.extend([i.name for i in _chunk]) _size = 0 @@ -386,7 +387,6 @@ async def _upload_big_annotation(self, item) -> Tuple[str, bool]: except Exception as e: self.reporter.log_debug(str(e)) self._report.failed_annotations.append(item.name) - raise finally: self.reporter.update_progress() @@ -397,7 +397,7 @@ async def upload_big_annotations(self): if item: await self._upload_big_annotation(item) else: - await self._big_files_queue.put_nowait(None) + self._big_files_queue.put_nowait(None) break async def distribute_queues(self, items_to_upload: list): @@ -431,26 +431,21 @@ async def distribute_queues(self, items_to_upload: list): self.reporter.update_progress() data[idx][1] = True processed_count += 1 - self.reporter.update_progress() data[idx][1] = True processed_count += 1 self._big_files_queue.put_nowait(None) self._small_files_queue.put_nowait(None) async def run_workers(self, items_to_upload): - try: - self._big_files_queue = asyncio.Queue() - self._small_files_queue = asyncio.Queue() - await asyncio.gather( - self.distribute_queues(items_to_upload), - self.upload_big_annotations(), - self.upload_big_annotations(), - self.upload_big_annotations(), - return_exceptions=True, - ) - await asyncio.gather(self.upload_small_annotations()) - except Exception as e: - self.reporter.log_error(f"Error {str(e)}") + self._big_files_queue = asyncio.Queue() + self._small_files_queue = asyncio.Queue() + await asyncio.gather( + self.distribute_queues(items_to_upload), + self.upload_big_annotations(), + self.upload_big_annotations(), + self.upload_big_annotations(), + ) + await asyncio.gather(self.upload_small_annotations()) def execute(self): missing_annotations = [] @@ -472,7 +467,11 @@ def execute(self): items_to_upload.append(self.AnnotationToUpload(item.uuid, name, path)) except KeyError: missing_annotations.append(name) - asyncio.run(self.run_workers(items_to_upload)) + try: + asyncio.run(self.run_workers(items_to_upload)) + except Exception as e: + self.reporter.log_error(str(e)) + self._response.errors = AppException("Can't upload annotations.") self.reporter.finish_progress() self._log_report() uploaded_annotations = list( @@ -675,7 +674,6 @@ def execute(self): ) if not uploaded: self._response.errors = constants.INVALID_JSON_MESSAGE - else: response = asyncio.run( self._backend_service.upload_annotations( @@ -750,6 +748,7 @@ def __init__( self._client = backend_service_provider self._show_process = show_process self._item_names_provided = True + self._big_annotations_queue = None def validate_project_type(self): if self._project.type == constants.ProjectType.PIXEL.value: @@ -775,21 +774,19 @@ def validate_item_names(self): self._item_names = [item.name for item in self._images.get_all(condition)] def _prettify_annotations(self, annotations: List[dict]): - restruct = {} + re_struct = {} if self._item_names_provided: for annotation in annotations: - restruct[annotation["metadata"]["name"]] = annotation + re_struct[annotation["metadata"]["name"]] = annotation try: - return [restruct[x] for x in self._item_names if x in restruct] + return [re_struct[x] for x in self._item_names if x in re_struct] except KeyError: raise AppException("Broken data.") return annotations - async def get_big_annotation( - self, - ): + async def get_big_annotation(self): large_annotations = [] while True: @@ -797,21 +794,17 @@ async def get_big_annotation( if not item: await self._big_annotations_queue.put(None) break - large_annotation = await self._client.get_big_annotation( team_id=self._project.team_id, project_id=self._project.id, - folder_id=self._folder.uuid, item=item, reporter=self.reporter, ) - large_annotations.append(large_annotation) - return large_annotations async def get_small_annotations(self, item_names): - small_annotations = await self._client.get_small_annotations( + return await self._client.get_small_annotations( team_id=self._project.team_id, project_id=self._project.id, folder_id=self._folder.uuid, @@ -819,24 +812,19 @@ async def get_small_annotations(self, item_names): reporter=self.reporter, ) - return small_annotations - async def distribute_to_queue(self, big_annotations): - for item in big_annotations: await self._big_annotations_queue.put(item) - await self._big_annotations_queue.put(None) async def run_workers(self, big_annotations, small_annotations): - + self._big_annotations_queue = asyncio.Queue() annotations = await asyncio.gather( self.distribute_to_queue(big_annotations), self.get_small_annotations(small_annotations), self.get_big_annotation(), self.get_big_annotation(), self.get_big_annotation(), - return_exceptions=True, ) annotations = [i for x in annotations[1:] for i in x if x] @@ -844,7 +832,6 @@ async def run_workers(self, big_annotations, small_annotations): def execute(self): if self.is_valid(): - self._big_annotations_queue = asyncio.Queue() items_count = len(self._item_names) self.reporter.log_info( f"Getting {items_count} annotations from " @@ -858,12 +845,15 @@ def execute(self): folder_id=self._folder.uuid, project_id=self._project.id, ) - small_annotations = [x["name"] for x in items["small"]] - annotations = asyncio.run( - self.run_workers(items["large"], small_annotations) - ) - + try: + annotations = asyncio.run( + self.run_workers(items["large"], small_annotations) + ) + except Exception as e: + self.reporter.log_error(str(e)) + self._response.errors = AppException("Can't upload annotations.") + return self._response received_items_count = len(annotations) self.reporter.finish_progress() if items_count > received_items_count: @@ -1122,31 +1112,27 @@ def coroutine_wrapper(coroutine): loop.close() return count - async def _download_big_annotation(self, item, export_path, folder_id): - postfix = self.get_postfix() - await self._backend_client.download_big_annotation( - item=item, - team_id=self._project.team_id, - project_id=self._project.id, - download_path=f"{export_path}{'/' + self._folder.name if not self._folder.is_root else ''}", - postfix=postfix, - callback=self._callback, - ) - - async def download_big_annotations(self, queue_idx, export_path, folder_id): + async def download_big_annotations(self, queue_idx, export_path): while True: cur_queue = self._big_file_queues[queue_idx] item = await cur_queue.get() cur_queue.task_done() if item: - await self._download_big_annotation(item, export_path, folder_id) + postfix = self.get_postfix() + await self._backend_client.download_big_annotation( + item=item, + team_id=self._project.team_id, + project_id=self._project.id, + download_path=f"{export_path}{'/' + self._folder.name if not self._folder.is_root else ''}", + postfix=postfix, + callback=self._callback, + ) else: cur_queue.put_nowait(None) break async def download_small_annotations(self, queue_idx, export_path, folder_id): cur_queue = self._small_file_queues[queue_idx] - items = [] item = "" postfix = self.get_postfix() @@ -1195,15 +1181,9 @@ async def run_workers(self, item_names, folder_id, export_path): self.distribute_to_queues( item_names, small_file_queue_idx, big_file_queue_idx, folder_id ), - self.download_big_annotations( - big_file_queue_idx, export_path, folder_id - ), - self.download_big_annotations( - big_file_queue_idx, export_path, folder_id - ), - self.download_big_annotations( - big_file_queue_idx, export_path, folder_id - ), + self.download_big_annotations(big_file_queue_idx, export_path), + self.download_big_annotations(big_file_queue_idx, export_path), + self.download_big_annotations(big_file_queue_idx, export_path), self.download_small_annotations( small_file_queue_idx, export_path, folder_id ), @@ -1214,11 +1194,7 @@ async def run_workers(self, item_names, folder_id, export_path): except Exception as e: self.reporter.log_error(f"Error {str(e)}") - def per_folder_execute(self, item_names, folder_id, export_path): - asyncio.run(self.run_workers(item_names, folder_id, export_path)) - def execute(self): - if self.is_valid(): export_path = str( self.destination @@ -1255,22 +1231,22 @@ def execute(self): ] else: item_names = self._item_names - new_export_path = export_path - if not folder.is_root and self._folder.is_root: new_export_path += f"/{folder.name}" - - # TODO check if not item_names: continue - future = executor.submit( - self.per_folder_execute, - item_names, - folder.uuid, - new_export_path, + futures.append( + executor.submit( + asyncio.run, + self.run_workers(item_names, folder.uuid, new_export_path), + ) ) - futures.append(future) + + for future in concurrent.futures.as_completed(futures): + exception = future.exception() + if exception: + self._response.errors = exception self.reporter.stop_spinner() count = self.get_items_count(export_path) diff --git a/src/superannotate/lib/infrastructure/services.py b/src/superannotate/lib/infrastructure/services.py index 16ea064f9..ebd895cbe 100644 --- a/src/superannotate/lib/infrastructure/services.py +++ b/src/superannotate/lib/infrastructure/services.py @@ -339,7 +339,7 @@ async def _sync_large_annotation(self, team_id, project_id, item_id): headers=self.default_headers, raise_for_status=True, ) as session: - res = await session.post(sync_url, params=sync_params) + await session.post(sync_url, params=sync_params) sync_params.pop("current_source") sync_params.pop("desired_source") @@ -1165,10 +1165,8 @@ async def get_big_annotation( self, project_id: int, team_id: int, - folder_id: int, item: dict, reporter: Reporter, - callback: Callable = None, ): url = urljoin( @@ -1183,13 +1181,14 @@ async def get_big_annotation( "version": "V1.00", } - synced = await self._sync_large_annotation( + await self._sync_large_annotation( team_id=team_id, project_id=project_id, item_id=item["id"] ) async with aiohttp.ClientSession( connector=aiohttp.TCPConnector(ssl=False), headers=self.default_headers, + raise_for_status=True, ) as session: start_response = await session.post(url, params=query_params) large_annotation = await start_response.json() @@ -1253,13 +1252,14 @@ async def download_big_annotation( self.URL_DOWNLOAD_LARGE_ANNOTATION.format(item_id=item_id), ) - synced = await self._sync_large_annotation( + await self._sync_large_annotation( team_id=team_id, project_id=project_id, item_id=item_id ) async with aiohttp.ClientSession( connector=aiohttp.TCPConnector(ssl=False), headers=self.default_headers, + raise_for_status=True, ) as session: start_response = await session.post(url, params=query_params) res = await start_response.json() @@ -1485,7 +1485,9 @@ async def upload_annotations( headers = copy.copy(self.default_headers) del headers["Content-Type"] async with aiohttp.ClientSession( - headers=headers, connector=aiohttp.TCPConnector(ssl=self._verify_ssl) + headers=headers, + connector=aiohttp.TCPConnector(ssl=self._verify_ssl), + raise_for_status=True, ) as session: data = aiohttp.FormData(quote_fields=False) for key, file in items_name_file_map.items(): @@ -1526,7 +1528,9 @@ async def upload_big_annotation( chunk_size: int, ) -> bool: async with aiohttp.ClientSession( - connector=aiohttp.TCPConnector(ssl=False), headers=self.default_headers + connector=aiohttp.TCPConnector(ssl=False), + headers=self.default_headers, + raise_for_status=True, ) as session: params = { "team_id": team_id, diff --git a/src/superannotate/lib/infrastructure/stream_data_handler.py b/src/superannotate/lib/infrastructure/stream_data_handler.py index 055abf6b3..3d4569bbc 100644 --- a/src/superannotate/lib/infrastructure/stream_data_handler.py +++ b/src/superannotate/lib/infrastructure/stream_data_handler.py @@ -41,9 +41,7 @@ async def fetch( kwargs = {"params": params, "json": {"folder_id": params.pop("folder_id")}} if data: kwargs["json"].update(data) - response = await session._request( - method, url, **kwargs, raise_for_status=True, timeout=TIMEOUT - ) + response = await session._request(method, url, **kwargs, timeout=TIMEOUT) buffer = b"" async for line in response.content.iter_any(): slices = line.split(self.DELIMITER) @@ -76,6 +74,7 @@ async def process_chunk( headers=self._headers, timeout=TIMEOUT, connector=aiohttp.TCPConnector(ssl=verify_ssl, keepalive_timeout=2**32), + raise_for_status=True, ) as session: async for annotation in self.fetch( method, @@ -98,10 +97,10 @@ async def store_chunk( params: dict = None, ): async with aiohttp.ClientSession( - raise_for_status=True, headers=self._headers, timeout=TIMEOUT, connector=aiohttp.TCPConnector(ssl=False, keepalive_timeout=2**32), + raise_for_status=True, ) as session: async for annotation in self.fetch( method,