diff --git a/src/superannotate/lib/app/interface/sdk_interface.py b/src/superannotate/lib/app/interface/sdk_interface.py index d904249b3..f0f6c716d 100644 --- a/src/superannotate/lib/app/interface/sdk_interface.py +++ b/src/superannotate/lib/app/interface/sdk_interface.py @@ -1,3 +1,4 @@ +import concurrent.futures import io import json import logging @@ -564,34 +565,16 @@ def copy_image( LIMITED_FUNCTIONS[source_project_metadata["project"].project_type] ) - img_bytes = get_image_bytes(project=source_project, image_name=image_name) - - s3_response = controller.upload_image_to_s3( - project_name=destination_project, image_path=image_name, image_bytes=img_bytes - ) - if s3_response.errors: - raise AppException(s3_response.errors) - image_entity = s3_response.data - del img_bytes - - annotation_status = None - if copy_annotation_status: - res = controller.get_image( - project_name=source_project_name, - image_name=image_name, - folder_path=source_folder_name, - ) - annotation_status = constances.AnnotationStatus.get_name( - res.annotation_status_code - ) - - controller.attach_urls( - project_name=destination_project, - files=[image_entity], - folder_name=destination_folder, - annotation_status=annotation_status, - upload_state_code=constances.UploadState.BASIC.value, + response = controller.copy_image( + from_project_name=source_project_name, + from_folder_name=source_folder_name, + to_project_name=destination_project, + to_folder_name=destination_folder, + image_name=image_name, + copy_annotation_status=copy_annotation_status ) + if response.errors: + raise AppException(response.errors) if include_annotations: controller.copy_image_annotation_classes( @@ -729,8 +712,8 @@ def move_images( source_project: Union[NotEmptyStr, dict], image_names: Optional[List[NotEmptyStr]], destination_project: Union[NotEmptyStr, dict], - *_, - **__, + *args, + **kwargs, ): """Move images in bulk between folders in a project @@ -1011,7 +994,7 @@ def delete_image(project: Union[NotEmptyStr, dict], image_name: str): @Trackable @validate_arguments -def get_image_metadata(project: Union[NotEmptyStr, dict], image_name: str, *_, **__): +def get_image_metadata(project: Union[NotEmptyStr, dict], image_name: str, *args, **kwargs): """Returns image metadata :param project: project name or folder path (e.g., "project1/folder1") @@ -2009,16 +1992,15 @@ def move_image( :type copy_pin: bool """ source_project_name, source_folder_name = extract_project_folder(source_project) - destination_project_name, destination_folder = extract_project_folder( - destination_project - ) - response = controller.move_image( + destination_project_name, destination_folder = extract_project_folder(destination_project) + response = controller.copy_image( from_project_name=source_project_name, from_folder_name=source_folder_name, to_project_name=destination_project_name, to_folder_name=destination_folder, image_name=image_name, copy_annotation_status=copy_annotation_status, + move=True ) if response.errors: raise AppException(response.errors) @@ -2271,13 +2253,11 @@ def attach_image_urls_to_project( annotation_status=annotation_status, ) if use_case.is_valid(): - with tqdm( - total=use_case.attachments_count, desc="Attaching urls" - ) as progress_bar: + with tqdm(total=use_case.attachments_count, desc="Attaching urls") as progress_bar: for _ in use_case.execute(): progress_bar.update(1) uploaded, duplications = use_case.data - uploaded = [i["name"] for i in uploaded] + uploaded = [i['name'] for i in uploaded] duplications.extend(duplicate_images) failed_images = [ image["name"] @@ -2324,13 +2304,11 @@ def attach_video_urls_to_project( annotation_status=annotation_status, ) if use_case.is_valid(): - with tqdm( - total=use_case.attachments_count, desc="Attaching urls" - ) as progress_bar: + with tqdm(total=use_case.attachments_count, desc="Attaching urls") as progress_bar: for _ in use_case.execute(): progress_bar.update(1) uploaded, duplications = use_case.data - uploaded = [i["name"] for i in uploaded] + uploaded = [i['name'] for i in uploaded] duplications.extend(duplicate_images) failed_images = [ image["name"] @@ -3324,7 +3302,7 @@ def upload_image_to_project( image=img, annotation_status=annotation_status, from_s3_bucket=from_s3_bucket, - image_quality_in_editor=image_quality_in_editor, + image_quality_in_editor=image_quality_in_editor ) if response.errors: raise AppException(response.errors) @@ -3535,13 +3513,11 @@ def attach_document_urls_to_project( annotation_status=annotation_status, ) if use_case.is_valid(): - with tqdm( - total=use_case.attachments_count, desc="Attaching urls" - ) as progress_bar: + with tqdm(total=use_case.attachments_count, desc="Attaching urls") as progress_bar: for _ in use_case.execute(): progress_bar.update(1) uploaded, duplications = use_case.data - uploaded = [i["name"] for i in uploaded] + uploaded = [i['name'] for i in uploaded] duplications.extend(duplicate_images) failed_images = [ image["name"] diff --git a/src/superannotate/lib/core/usecases/images.py b/src/superannotate/lib/core/usecases/images.py index ccb934488..7f5b135d1 100644 --- a/src/superannotate/lib/core/usecases/images.py +++ b/src/superannotate/lib/core/usecases/images.py @@ -125,8 +125,8 @@ def execute(self): folder_id=self._folder.uuid, images=[self._image_name], ) - .execute() - .data + .execute() + .data ) if images: self._response.data = images[0] @@ -171,12 +171,12 @@ def execute(self): class GetBulkImages(BaseUseCase): def __init__( - self, - service: SuerannotateServiceProvider, - project_id: int, - team_id: int, - folder_id: int, - images: List[str], + self, + service: SuerannotateServiceProvider, + project_id: int, + team_id: int, + folder_id: int, + images: List[str], ): super().__init__() self._service = service @@ -193,7 +193,7 @@ def execute(self): project_id=self._project_id, team_id=self._team_id, folder_id=self._folder_id, - images=self._images[i : i + self._chunk_size], # noqa: E203 + images=self._images[i: i + self._chunk_size], # noqa: E203 ) if "error" in response: raise AppException(response["error"]) @@ -204,13 +204,13 @@ def execute(self): class AttachFileUrlsUseCase(BaseUseCase): def __init__( - self, - project: ProjectEntity, - folder: FolderEntity, - attachments: List[ImageEntity], - backend_service_provider: SuerannotateServiceProvider, - annotation_status: str = None, - upload_state_code: int = constances.UploadState.EXTERNAL.value, + self, + project: ProjectEntity, + folder: FolderEntity, + attachments: List[ImageEntity], + backend_service_provider: SuerannotateServiceProvider, + annotation_status: str = None, + upload_state_code: int = constances.UploadState.EXTERNAL.value, ): super().__init__() self._attachments = attachments @@ -234,8 +234,8 @@ def _validate_limitations(self, to_upload_count): elif to_upload_count > response.data.project_limit.remaining_image_count: errors.append(constances.ATTACH_PROJECT_LIMIT_ERROR_MESSAGE) elif ( - response.data.super_user_limit - and to_upload_count > response.data.super_user_limit.remaining_image_count + response.data.super_user_limit + and to_upload_count > response.data.super_user_limit.remaining_image_count ): errors.append(constances.ATTACH_USER_LIMIT_ERROR_MESSAGE) if errors: @@ -300,10 +300,10 @@ def execute(self): class GetImageBytesUseCase(BaseUseCase): def __init__( - self, - image: ImageEntity, - backend_service_provider: SuerannotateServiceProvider, - image_variant: str = "original", + self, + image: ImageEntity, + backend_service_provider: SuerannotateServiceProvider, + image_variant: str = "original", ): super().__init__() self._image = image @@ -327,19 +327,19 @@ def execute(self): class CopyImageAnnotationClasses(BaseUseCase): def __init__( - self, - from_project: ProjectEntity, - to_project: ProjectEntity, - from_image: ImageEntity, - to_image: ImageEntity, - from_project_s3_repo: BaseManageableRepository, - to_project_s3_repo: BaseManageableRepository, - to_project_annotation_classes: BaseReadOnlyRepository, - from_project_annotation_classes: BaseReadOnlyRepository, - backend_service_provider: SuerannotateServiceProvider, - from_folder: FolderEntity = None, - to_folder: FolderEntity = None, - annotation_type: str = "MAIN", + self, + from_project: ProjectEntity, + to_project: ProjectEntity, + from_image: ImageEntity, + to_image: ImageEntity, + from_project_s3_repo: BaseManageableRepository, + to_project_s3_repo: BaseManageableRepository, + to_project_annotation_classes: BaseReadOnlyRepository, + from_project_annotation_classes: BaseReadOnlyRepository, + backend_service_provider: SuerannotateServiceProvider, + from_folder: FolderEntity = None, + to_folder: FolderEntity = None, + annotation_type: str = "MAIN", ): super().__init__() self._from_project = from_project @@ -428,7 +428,7 @@ def execute(self): for instance in image_annotations["instances"]: if instance["classId"] < 0 or not annotations_classes_from_copy.get( - instance["classId"] + instance["classId"] ): continue project_annotation_class = annotations_classes_from_copy[ @@ -450,8 +450,8 @@ def execute(self): for instance in image_annotations["instances"]: if ( - "className" not in instance - and instance["className"] not in annotations_classes_to_copy + "className" not in instance + and instance["className"] not in annotations_classes_to_copy ): continue annotation_class = annotations_classes_to_copy.get(instance["className"]) @@ -486,9 +486,9 @@ def execute(self): self.to_project_s3_repo.insert(file) if ( - self._to_project.project_type == constances.ProjectType.PIXEL.value - and annotations.get("annotation_bluemap_path") - and annotations["annotation_bluemap_path"]["exist"] + self._to_project.project_type == constances.ProjectType.PIXEL.value + and annotations.get("annotation_bluemap_path") + and annotations["annotation_bluemap_path"]["exist"] ): response = requests.get( url=annotations["annotation_bluemap_path"]["url"], @@ -516,7 +516,7 @@ def execute(self): class DownloadImageFromPublicUrlUseCase(BaseUseCase): def __init__( - self, project: ProjectEntity, image_url: str, image_name: str = None, + self, project: ProjectEntity, image_url: str, image_name: str = None, ): super().__init__() self._project = project @@ -568,14 +568,14 @@ class ImagesBulkCopyUseCase(BaseUseCase): CHUNK_SIZE = 1000 def __init__( - self, - project: ProjectEntity, - from_folder: FolderEntity, - to_folder: FolderEntity, - image_names: List[str], - backend_service_provider: SuerannotateServiceProvider, - include_annotations: bool, - include_pin: bool, + self, + project: ProjectEntity, + from_folder: FolderEntity, + to_folder: FolderEntity, + image_names: List[str], + backend_service_provider: SuerannotateServiceProvider, + include_annotations: bool, + include_pin: bool, ): super().__init__() self._project = project @@ -631,12 +631,12 @@ def execute(self): project_id=self._project.uuid, from_folder_id=self._from_folder.uuid, to_folder_id=self._to_folder.uuid, - images=self._image_names[i : i + self.CHUNK_SIZE], + images=self._image_names[i: i + self.CHUNK_SIZE], include_annotations=self._include_annotations, include_pin=self._include_pin, ) if not poll_id: - skipped_images.append(self._image_names[i : i + self.CHUNK_SIZE]) + skipped_images.append(self._image_names[i: i + self.CHUNK_SIZE]) continue await_time = len(images_to_copy) * 0.3 @@ -655,11 +655,11 @@ def execute(self): class DeleteImageUseCase(BaseUseCase): def __init__( - self, - images: BaseManageableRepository, - image: ImageEntity, - team_id: int, - project_id: int, + self, + images: BaseManageableRepository, + image: ImageEntity, + team_id: int, + project_id: int, ): super().__init__() self._images = images @@ -674,11 +674,11 @@ def execute(self): class GetImageMetadataUseCase(BaseUseCase): def __init__( - self, - image_name: str, - project: ProjectEntity, - folder: FolderEntity, - service: SuerannotateServiceProvider, + self, + image_name: str, + project: ProjectEntity, + folder: FolderEntity, + service: SuerannotateServiceProvider, ): super().__init__() self._image_name = image_name @@ -716,12 +716,12 @@ class ImagesBulkMoveUseCase(BaseUseCase): CHUNK_SIZE = 1000 def __init__( - self, - project: ProjectEntity, - from_folder: FolderEntity, - to_folder: FolderEntity, - image_names: List[str], - backend_service_provider: SuerannotateServiceProvider, + self, + project: ProjectEntity, + from_folder: FolderEntity, + to_folder: FolderEntity, + image_names: List[str], + backend_service_provider: SuerannotateServiceProvider, ): super().__init__() self._project = project @@ -752,7 +752,7 @@ def execute(self): project_id=self._project.uuid, from_folder_id=self._from_folder.uuid, to_folder_id=self._to_folder.uuid, - images=self._image_names[i : i + self.CHUNK_SIZE], # noqa: E203 + images=self._image_names[i: i + self.CHUNK_SIZE], # noqa: E203 ) ) self._response.data = moved_images @@ -763,12 +763,12 @@ class CreateFuseImageUseCase(BaseUseCase): TRANSPARENCY = 128 def __init__( - self, - project_type: str, - image_path: str, - classes: list = None, - in_memory: bool = False, - generate_overlay: bool = False, + self, + project_type: str, + image_path: str, + classes: list = None, + in_memory: bool = False, + generate_overlay: bool = False, ): super().__init__() self._project_type = project_type @@ -787,7 +787,7 @@ def generate_color(value: str = None): random.randint(1, 255), random.randint(1, 255), ) - return tuple(int(value.lstrip("#")[i : i + 2], 16) for i in (0, 2, 4)) + return tuple(int(value.lstrip("#")[i: i + 2], 16) for i in (0, 2, 4)) @property def annotations(self): @@ -825,7 +825,7 @@ def execute(self): image = ImagePlugin(io.BytesIO(file.read())) images = [ - Image("fuse", f"{self._image_path}___fuse.png", image.get_empty(),) + Image("fuse", f"{self._image_path}___fuse.png", image.get_empty(), ) ] if self._generate_overlay: images.append( @@ -834,16 +834,14 @@ def execute(self): outline_color = 4 * (255,) for instance in self.annotations["instances"]: - if not instance.get("className"): - continue color = class_color_map.get(instance["className"]) if not color: class_color_map[instance["className"]] = self.generate_color() + fill_color = ( + *class_color_map[instance["className"]], + self.TRANSPARENCY, + ) for image in images: - fill_color = ( - *class_color_map[instance["className"]], - 255 if image.type == "fuse" else self.TRANSPARENCY - ) if instance["type"] == "bbox": image.content.draw_bbox( **instance["points"], @@ -912,7 +910,7 @@ def execute(self): empty_image_arr = np.full((height, weight, 4), [0, 0, 0, 255], np.uint8) for annotation in self.annotations["instances"]: if annotation.get("className") and not class_color_map.get( - annotation["className"] + annotation["className"] ): continue fill_color = *class_color_map[annotation["className"]], 255 @@ -953,7 +951,7 @@ def execute(self): class GetS3ImageUseCase(BaseUseCase): def __init__( - self, s3_bucket, image_path: str, + self, s3_bucket, image_path: str, ): super().__init__() self._s3_bucket = s3_bucket @@ -978,19 +976,19 @@ def execute(self): class DownloadImageUseCase(BaseUseCase): def __init__( - self, - project: ProjectEntity, - folder: FolderEntity, - image: ImageEntity, - images: BaseManageableRepository, - classes: BaseManageableRepository, - backend_service_provider: SuerannotateServiceProvider, - annotation_classes: BaseReadOnlyRepository, - download_path: str, - image_variant: str = "original", - include_annotations: bool = False, - include_fuse: bool = False, - include_overlay: bool = False, + self, + project: ProjectEntity, + folder: FolderEntity, + image: ImageEntity, + images: BaseManageableRepository, + classes: BaseManageableRepository, + backend_service_provider: SuerannotateServiceProvider, + annotation_classes: BaseReadOnlyRepository, + download_path: str, + image_variant: str = "original", + include_annotations: bool = False, + include_fuse: bool = False, + include_overlay: bool = False, ): super().__init__() self._project = project @@ -1038,7 +1036,7 @@ def validate_download_path(self): def validate_include_annotations(self): if ( - self._include_fuse or self._include_overlay + self._include_fuse or self._include_overlay ) and not self._include_annotations: raise AppValidationException( "To download fuse or overlay image need to set include_annotations=True in download_image" @@ -1059,9 +1057,7 @@ def execute(self): if self._include_annotations: annotations = self.download_annotation_use_case.execute().data - if self._include_annotations and ( - self._include_fuse or self._include_overlay - ): + if self._include_annotations and (self._include_fuse or self._include_overlay): classes = self.get_annotation_classes_ues_case.execute().data fuse_image = ( CreateFuseImageUseCase( @@ -1074,8 +1070,8 @@ def execute(self): ], generate_overlay=self._include_overlay, ) - .execute() - .data + .execute() + .data ) self._response.data = ( @@ -1089,18 +1085,18 @@ def execute(self): class UploadImageToProject(BaseUseCase): def __init__( - self, - project: ProjectEntity, - folder: FolderEntity, - s3_repo: BaseManageableRepository, - settings: BaseManageableRepository, - backend_client: SuerannotateServiceProvider, - annotation_status: str, - image_bytes: io.BytesIO = None, - image_path: str = None, - image_name: str = None, - from_s3_bucket: str = None, - image_quality_in_editor: str = None, + self, + project: ProjectEntity, + folder: FolderEntity, + s3_repo: BaseManageableRepository, + settings: BaseManageableRepository, + backend_client: SuerannotateServiceProvider, + annotation_status: str, + image_bytes: io.BytesIO = None, + image_path: str = None, + image_name: str = None, + from_s3_bucket: str = None, + image_quality_in_editor: str = None, ): super().__init__() self._project = project @@ -1145,8 +1141,8 @@ def validate_limitations(self): elif response.data.project_limit.remaining_image_count < 1: errors.append(constances.UPLOAD_PROJECT_LIMIT_ERROR_MESSAGE) elif ( - response.data.super_user_limit - and response.data.super_user_limit.remaining_image_count < 1 + response.data.super_user_limit + and response.data.super_user_limit.remaining_image_count < 1 ): errors.append(constances.UPLOAD_USER_LIMIT_ERROR_MESSAGE) if errors: @@ -1173,8 +1169,8 @@ def validate_image_name_uniqueness(self): else Path(self._image_path).name ], ) - .execute() - .data + .execute() + .data ) if image_entities: raise AppValidationException("Image with this name already exists.") @@ -1186,8 +1182,8 @@ def execute(self) -> Response: GetS3ImageUseCase( s3_bucket=self._from_s3_bucket, image_path=self._image_path ) - .execute() - .data + .execute() + .data ) elif self._image_path: image_bytes = io.BytesIO(open(self._image_path, "rb").read()) @@ -1223,19 +1219,19 @@ class UploadImagesToProject(BaseInteractiveUseCase): MAX_WORKERS = 10 def __init__( - self, - project: ProjectEntity, - folder: FolderEntity, - settings: BaseManageableRepository, - s3_repo, - backend_client: SuerannotateServiceProvider, - paths: List[str], - extensions=constances.DEFAULT_IMAGE_EXTENSIONS, - annotation_status="NotStarted", - from_s3_bucket=None, - exclude_file_patterns: List[str] = constances.DEFAULT_FILE_EXCLUDE_PATTERNS, - recursive_sub_folders: bool = False, - image_quality_in_editor=None, + self, + project: ProjectEntity, + folder: FolderEntity, + settings: BaseManageableRepository, + s3_repo, + backend_client: SuerannotateServiceProvider, + paths: List[str], + extensions=constances.DEFAULT_IMAGE_EXTENSIONS, + annotation_status="NotStarted", + from_s3_bucket=None, + exclude_file_patterns: List[str] = constances.DEFAULT_FILE_EXCLUDE_PATTERNS, + recursive_sub_folders: bool = False, + image_quality_in_editor=None, ): super().__init__() @@ -1286,8 +1282,8 @@ def validate_limitations(self): elif to_upload_count > response.data.project_limit.remaining_image_count: errors.append(constances.UPLOAD_PROJECT_LIMIT_ERROR_MESSAGE) elif ( - response.data.super_user_limit - and to_upload_count > response.data.super_user_limit.remaining_image_count + response.data.super_user_limit + and to_upload_count > response.data.super_user_limit.remaining_image_count ): errors.append(constances.UPLOAD_USER_LIMIT_ERROR_MESSAGE) if errors: @@ -1295,18 +1291,18 @@ def validate_limitations(self): def validate_annotation_status(self): if ( - self._annotation_status - and self._annotation_status.lower() - not in constances.AnnotationStatus.values() + self._annotation_status + and self._annotation_status.lower() + not in constances.AnnotationStatus.values() ): raise AppValidationException("Invalid annotations status") def validate_extensions(self): if self._extensions and not all( - [ - extension in constances.DEFAULT_IMAGE_EXTENSIONS - for extension in self._extensions - ] + [ + extension in constances.DEFAULT_IMAGE_EXTENSIONS + for extension in self._extensions + ] ): raise AppValidationException("") @@ -1397,7 +1393,9 @@ def filter_paths(self, paths: List[str]): paths = [ path for path in paths - if not any([extension in path for extension in self.exclude_file_patterns]) + if not any( + [extension in path for extension in self.exclude_file_patterns] + ) ] name_path_map = defaultdict(list) for path in paths: @@ -1417,9 +1415,7 @@ def filter_paths(self, paths: List[str]): team_id=self._project.team_id, folder_id=self._folder.uuid, images=[image.split("/")[-1] for image in filtered_paths], - ) - .execute() - .data + ).execute().data ) images_to_upload = [] image_list = [image.name for image in image_entities] @@ -1447,7 +1443,7 @@ def execute(self): uploaded_images = [] failed_images = [] with concurrent.futures.ThreadPoolExecutor( - max_workers=self.MAX_WORKERS + max_workers=self.MAX_WORKERS ) as executor: results = [ executor.submit(self._upload_image, image_path) @@ -1468,7 +1464,7 @@ def execute(self): folder=self._folder, backend_service_provider=self._backend_client, attachments=[ - image.entity for image in uploaded_images[i : i + 100] + image.entity for image in uploaded_images[i: i + 100] ], annotation_status=self._annotation_status, upload_state_code=constances.UploadState.BASIC.value, @@ -1489,19 +1485,19 @@ class UploadImagesFromFolderToProject(UploadImagesToProject): MAX_WORKERS = 10 def __init__( - self, - project: ProjectEntity, - folder: FolderEntity, - settings: BaseManageableRepository, - s3_repo, - backend_client: SuerannotateServiceProvider, - folder_path: str, - extensions=constances.DEFAULT_IMAGE_EXTENSIONS, - annotation_status="NotStarted", - from_s3_bucket=None, - exclude_file_patterns: List[str] = constances.DEFAULT_FILE_EXCLUDE_PATTERNS, - recursive_sub_folders: bool = False, - image_quality_in_editor=None, + self, + project: ProjectEntity, + folder: FolderEntity, + settings: BaseManageableRepository, + s3_repo, + backend_client: SuerannotateServiceProvider, + folder_path: str, + extensions=constances.DEFAULT_IMAGE_EXTENSIONS, + annotation_status="NotStarted", + from_s3_bucket=None, + exclude_file_patterns: List[str] = constances.DEFAULT_FILE_EXCLUDE_PATTERNS, + recursive_sub_folders: bool = False, + image_quality_in_editor=None, ): paths = UploadImagesFromFolderToProject.extract_paths( folder_path=folder_path, @@ -1526,7 +1522,7 @@ def __init__( @classmethod def extract_paths( - cls, folder_path, extensions, from_s3_bucket=None, recursive_sub_folders=False + cls, folder_path, extensions, from_s3_bucket=None, recursive_sub_folders=False ): if not extensions: extensions = constances.DEFAULT_IMAGE_EXTENSIONS @@ -1552,11 +1548,11 @@ def extract_paths( contents = response.get("Contents", []) for object_data in contents: key = object_data["Key"] - if not recursive_sub_folders and "/" in key[len(folder_path) + 1 :]: + if not recursive_sub_folders and "/" in key[len(folder_path) + 1:]: continue for extension in extensions: if key.endswith(f".{extension.lower()}") or key.endswith( - f".{extension.upper()}" + f".{extension.upper()}" ): paths.append(key) break @@ -1569,16 +1565,16 @@ class UploadImagesFromPublicUrls(BaseInteractiveUseCase): ProcessedImage = namedtuple("ProcessedImage", ["url", "uploaded", "path", "entity"]) def __init__( - self, - project: ProjectEntity, - folder: FolderEntity, - backend_service: SuerannotateServiceProvider, - settings: BaseManageableRepository, - s3_repo, - image_urls: List[str], - image_names: List[str] = None, - annotation_status: str = None, - image_quality_in_editor: str = None, + self, + project: ProjectEntity, + folder: FolderEntity, + backend_service: SuerannotateServiceProvider, + settings: BaseManageableRepository, + s3_repo, + image_urls: List[str], + image_names: List[str] = None, + annotation_status: str = None, + image_quality_in_editor: str = None, ): super().__init__() self._project = project @@ -1606,8 +1602,8 @@ def validate_limitations(self): elif to_upload_count > response.data.project_limit.remaining_image_count: errors.append(constances.UPLOAD_PROJECT_LIMIT_ERROR_MESSAGE) elif ( - response.data.super_user_limit - and to_upload_count > response.data.super_user_limit.remaining_image_count + response.data.super_user_limit + and to_upload_count > response.data.super_user_limit.remaining_image_count ): errors.append(constances.UPLOAD_USER_LIMIT_ERROR_MESSAGE) if errors: @@ -1619,8 +1615,8 @@ def validate_image_names(self): def validate_project_type(self): if self._project.project_type in ( - constances.ProjectType.VIDEO.value, - constances.ProjectType.DOCUMENT.value, + constances.ProjectType.VIDEO.value, + constances.ProjectType.DOCUMENT.value, ): raise AppValidationException( "The function does not support projects containing " @@ -1630,8 +1626,8 @@ def validate_project_type(self): def validate_annotation_status(self): if self._annotation_status: if ( - self._annotation_status.lower() - not in constances.AnnotationStatus.values() + self._annotation_status.lower() + not in constances.AnnotationStatus.values() ): raise AppValidationException("Invalid annotations status.") else: @@ -1665,8 +1661,8 @@ def upload_image(self, image_url, image_name=None): folder_id=self._folder.uuid, images=[image_name], ) - .execute() - .data + .execute() + .data ] if image_name not in duplicated_images: upload_response = UploadImageS3UseCase( @@ -1701,7 +1697,7 @@ def execute(self): logger.info("Downloading %s images", len(self._image_urls)) with concurrent.futures.ThreadPoolExecutor( - max_workers=self.MAX_WORKERS + max_workers=self.MAX_WORKERS ) as executor: failed_images = [] if self._image_names: @@ -1730,7 +1726,7 @@ def execute(self): folder=self._folder, backend_service_provider=self._backend_service, attachments=[ - image.entity for image in images_to_upload[i : i + 100] + image.entity for image in images_to_upload[i: i + 100] ], annotation_status=self._annotation_status, ).execute() @@ -1758,14 +1754,14 @@ def execute(self): class UploadImageS3UseCase(BaseUseCase): def __init__( - self, - project: ProjectEntity, - project_settings: List[ProjectSettingEntity], - image_path: str, - image: io.BytesIO, - s3_repo: BaseManageableRepository, - upload_path: str, - image_quality_in_editor: str = None, + self, + project: ProjectEntity, + project_settings: List[ProjectSettingEntity], + image_path: str, + image: io.BytesIO, + s3_repo: BaseManageableRepository, + upload_path: str, + image_quality_in_editor: str = None, ): super().__init__() self._project = project @@ -1824,7 +1820,7 @@ def execute(self): quality=quality, subsampling=-1 ) image_key = ( - self._upload_path + str(uuid.uuid4()) + Path(self._image_path).suffix + self._upload_path + str(uuid.uuid4()) + Path(self._image_path).suffix ) file_entity = S3FileEntity(uuid=image_key, data=self._image) @@ -1862,13 +1858,13 @@ class InteractiveAttachFileUrlsUseCase(BaseInteractiveUseCase): CHUNK_SIZE = 500 def __init__( - self, - project: ProjectEntity, - folder: FolderEntity, - attachments: List[ImageEntity], - backend_service_provider: SuerannotateServiceProvider, - annotation_status: str = None, - upload_state_code: int = constances.UploadState.EXTERNAL.value, + self, + project: ProjectEntity, + folder: FolderEntity, + attachments: List[ImageEntity], + backend_service_provider: SuerannotateServiceProvider, + annotation_status: str = None, + upload_state_code: int = constances.UploadState.EXTERNAL.value, ): super().__init__() self._attachments = attachments @@ -1897,8 +1893,8 @@ def validate_limitations(self): elif attachments_count > response.data.project_limit.remaining_image_count: errors.append(constances.ATTACH_PROJECT_LIMIT_ERROR_MESSAGE) elif ( - response.data.super_user_limit - and attachments_count > response.data.super_user_limit.remaining_image_count + response.data.super_user_limit + and attachments_count > response.data.super_user_limit.remaining_image_count ): errors.append(constances.ATTACH_USER_LIMIT_ERROR_MESSAGE) if errors: @@ -1923,10 +1919,10 @@ def execute(self): response = AttachFileUrlsUseCase( project=self._project, folder=self._folder, - attachments=self._attachments[i : i + 500], # noqa: E203 + attachments=self._attachments[i: i + 500], # noqa: E203 backend_service_provider=self._backend_service, annotation_status=self._annotation_status, - upload_state_code=self._upload_state_code, + upload_state_code=self._upload_state_code ).execute() if response.errors: self._response.errors = response.errors @@ -1939,21 +1935,22 @@ def execute(self): return self._response -class MoveImageUseCase(BaseUseCase): +class CopyImageUseCase(BaseUseCase): def __init__( - self, - from_project: ProjectEntity, - from_folder: FolderEntity, - image_name: str, - to_project: ProjectEntity, - to_folder: FolderEntity, - backend_service: SuerannotateServiceProvider, - images: BaseManageableRepository, - to_upload_s3_repo: BaseManageableRepository, - project_settings: List[ProjectSettingEntity], - include_annotations: Optional[bool] = True, - copy_annotation_status: Optional[bool] = True, - copy_pin: Optional[bool] = True, + self, + from_project: ProjectEntity, + from_folder: FolderEntity, + image_name: str, + to_project: ProjectEntity, + to_folder: FolderEntity, + backend_service: SuerannotateServiceProvider, + images: BaseManageableRepository, + to_upload_s3_repo: BaseManageableRepository, + project_settings: List[ProjectSettingEntity], + include_annotations: Optional[bool] = True, + copy_annotation_status: Optional[bool] = True, + copy_pin: Optional[bool] = True, + move=False ): super().__init__() self._from_project = from_project @@ -1968,24 +1965,20 @@ def __init__( self._copy_pin = copy_pin self._backend_service = backend_service self._images = images + self._move = move def validate_copy_path(self): if ( - self._from_project.name == self._to_project.name - and self._from_folder.name == self._to_folder.name + self._from_project.name == self._to_project.name + and self._from_folder.name == self._to_folder.name + ): - raise AppValidationException( - "Cannot move image if source_project == destination_project." - ) + raise AppValidationException("Cannot move image if source_project == destination_project.") def validate_project_type(self): if self._from_project.project_type in ( - constances.ProjectType.VIDEO.value, - constances.ProjectType.DOCUMENT.value, - ): - raise AppValidationException( - constances.LIMITED_FUNCTIONS[self._from_project.project_type] - ) + constances.ProjectType.VIDEO.value, constances.ProjectType.DOCUMENT.value): + raise AppValidationException(constances.LIMITED_FUNCTIONS[self._from_project.project_type]) def validate_limitations(self): response = self._backend_service.get_limitations( @@ -1998,35 +1991,25 @@ def validate_limitations(self): errors = [] if response.data.folder_limit.remaining_image_count < 1: errors.append(constances.ATTACH_FOLDER_LIMIT_ERROR_MESSAGE) - elif ( - self._to_project.uuid != self._from_project.uuid - and response.data.project_limit.remaining_image_count < 1 - ): + elif self._to_project.uuid != self._from_project.uuid and response.data.project_limit.remaining_image_count < 1: errors.append(constances.ATTACH_PROJECT_LIMIT_ERROR_MESSAGE) if errors: raise AppValidationException("\n".join(errors)) def execute(self) -> Response: if self.is_valid(): - image = ( - GetImageUseCase( - project=self._from_project, - folder=self._from_folder, - image_name=self._image_name, - images=self._images, - service=self._backend_service, - ) - .execute() - .data - ) - - image_bytes = ( - GetImageBytesUseCase( - image=image, backend_service_provider=self._backend_service, - ) - .execute() - .data - ) + image = GetImageUseCase( + project=self._from_project, + folder=self._from_folder, + image_name=self._image_name, + images=self._images, + service=self._backend_service + ).execute().data + + image_bytes = GetImageBytesUseCase( + image=image, + backend_service_provider=self._backend_service, + ).execute().data image_path = f"{self._to_folder}/{self._image_name}" auth_data = self._backend_service.get_s3_upload_auth_token( @@ -2068,11 +2051,11 @@ class DeleteAnnotations(BaseUseCase): CHUNK_SIZE = 2000 def __init__( - self, - project: ProjectEntity, - folder: FolderEntity, - backend_service: SuerannotateServiceProvider, - image_names: Optional[List[str]] = None, + self, + project: ProjectEntity, + folder: FolderEntity, + backend_service: SuerannotateServiceProvider, + image_names: Optional[List[str]] = None, ): super().__init__() self._project = project @@ -2089,8 +2072,8 @@ def execute(self) -> Response: team_id=self._project.team_id, folder_id=self._folder.uuid, image_names=self._image_names[ - idx : idx + self.CHUNK_SIZE # noqa: E203 - ], + idx: idx + self.CHUNK_SIZE # noqa: E203 + ], ) if response: polling_states[response.get("poll_id")] = False @@ -2127,9 +2110,9 @@ def execute(self) -> Response: continue project_folder_name = ( - self._project.name - + (f"/{self._folder.name}" if self._folder.name != "root" else "") - + "." + self._project.name + + (f"/{self._folder.name}" if self._folder.name != "root" else "") + + "." ) if all(polling_states.values()): @@ -2144,16 +2127,16 @@ def execute(self) -> Response: class UploadImageAnnotationsUseCase(BaseUseCase): def __init__( - self, - project: ProjectEntity, - folder: FolderEntity, - annotation_classes: BaseReadOnlyRepository, - image_name: str, - annotations: dict, - backend_service_provider: SuerannotateServiceProvider, - mask=None, - verbose: bool = True, - annotation_path: str = True, + self, + project: ProjectEntity, + folder: FolderEntity, + annotation_classes: BaseReadOnlyRepository, + image_name: str, + annotations: dict, + backend_service_provider: SuerannotateServiceProvider, + mask=None, + verbose: bool = True, + annotation_path: str = True, ): super().__init__() self._project = project @@ -2226,23 +2209,22 @@ def get_annotation_classes_name_to_id(self): if attribute["name"] in attribute_group_info: logger.warning( "Duplicate annotation class attribute name %s in attribute group %s. Only one of the annotation classe attributes will be used. This will result in errors in annotation upload.", - attribute["name"], - attribute_group["name"], + attribute["name"], attribute_group["name"] ) attribute_group_info[attribute["name"]] = attribute["id"] if attribute_group["name"] in class_info["attribute_groups"]: logger.warning( "Duplicate annotation class attribute group name %s. Only one of the annotation classe attribute groups will be used. This will result in errors in annotation upload.", - attribute_group["name"], + attribute_group["name"] ) class_info["attribute_groups"][attribute_group["name"]] = { "id": attribute_group["id"], - "attributes": attribute_group_info, + "attributes": attribute_group_info } if class_name in annotation_classes_dict: logger.warning( "Duplicate annotation class name %s. Only one of the annotation classes will be used. This will result in errors in annotation upload.", - class_name, + class_name ) annotation_classes_dict[class_name] = class_info return annotation_classes_dict @@ -2276,7 +2258,7 @@ def fill_classes_data(self, annotations: dict): annotation_classes.update(unknown_classes) templates = self.get_templates_mapping() for annotation in ( - i for i in annotations["instances"] if i.get("type", None) == "template" + i for i in annotations["instances"] if i.get("type", None) == "template" ): annotation["templateId"] = templates.get( annotation.get("templateName", ""), -1 @@ -2285,28 +2267,24 @@ def fill_classes_data(self, annotations: dict): for annotation in [i for i in annotations["instances"] if "className" in i]: annotation_class_name = annotation["className"] if annotation_class_name not in annotation_classes.keys(): - logger.warning( - f"Couldn't find annotation class {annotation_class_name}" - ) + logger.warning(f"Couldn't find annotation class {annotation_class_name}") continue annotation["classId"] = annotation_classes[annotation_class_name]["id"] for attribute in annotation["attributes"]: if ( - attribute["groupName"] - not in annotation_classes[annotation_class_name]["attribute_groups"] + attribute["groupName"] + not in annotation_classes[annotation_class_name]["attribute_groups"] ): - logger.warning( - f"Couldn't find annotation group {attribute['groupName']}." - ) + logger.warning(f"Couldn't find annotation group {attribute['groupName']}.") continue attribute["groupId"] = annotation_classes[annotation_class_name][ "attribute_groups" ][attribute["groupName"]]["id"] if ( - attribute["name"] - not in annotation_classes[annotation_class_name][ - "attribute_groups" - ][attribute["groupName"]]["attributes"] + attribute["name"] + not in annotation_classes[annotation_class_name][ + "attribute_groups" + ][attribute["groupName"]]["attributes"] ): del attribute["groupId"] logger.warning( @@ -2380,12 +2358,12 @@ class DeleteImagesUseCase(BaseUseCase): CHUNK_SIZE = 1000 def __init__( - self, - project: ProjectEntity, - folder: FolderEntity, - backend_service_provider: SuerannotateServiceProvider, - images: BaseReadOnlyRepository, - image_names: List[str] = None, + self, + project: ProjectEntity, + folder: FolderEntity, + backend_service_provider: SuerannotateServiceProvider, + images: BaseReadOnlyRepository, + image_names: List[str] = None, ): super().__init__() self._project = project @@ -2412,14 +2390,14 @@ def execute(self): folder_id=self._folder.uuid, images=self._image_names, ) - .execute() - .data + .execute() + .data ] else: condition = ( - Condition("team_id", self._project.team_id, EQ) - & Condition("project_id", self._project.uuid, EQ) - & Condition("folder_id", self._folder.uuid, EQ) + Condition("team_id", self._project.team_id, EQ) + & Condition("project_id", self._project.uuid, EQ) + & Condition("folder_id", self._folder.uuid, EQ) ) image_ids = [image.uuid for image in self._images.get_all(condition)] @@ -2427,7 +2405,7 @@ def execute(self): self._backend_service.delete_images( project_id=self._project.uuid, team_id=self._project.team_id, - image_ids=image_ids[i : i + self.CHUNK_SIZE], # noqa: E203 + image_ids=image_ids[i: i + self.CHUNK_SIZE], # noqa: E203 ) return self._response @@ -2438,16 +2416,16 @@ class UploadAnnotationsUseCase(BaseInteractiveUseCase): AUTH_DATA_CHUNK_SIZE = 500 def __init__( - self, - project: ProjectEntity, - folder: FolderEntity, - annotation_classes: List[AnnotationClassEntity], - folder_path: str, - annotation_paths: List[str], - backend_service_provider: SuerannotateServiceProvider, - templates: List[dict], - pre_annotation: bool = False, - client_s3_bucket=None, + self, + project: ProjectEntity, + folder: FolderEntity, + annotation_classes: List[AnnotationClassEntity], + folder_path: str, + annotation_paths: List[str], + backend_service_provider: SuerannotateServiceProvider, + templates: List[dict], + pre_annotation: bool = False, + client_s3_bucket=None, ): super().__init__() self._project = project @@ -2522,7 +2500,7 @@ def fill_classes_data(self, annotations: dict): annotation_classes.update(unknown_classes) templates = self.get_templates_mapping() for annotation in ( - i for i in annotations["instances"] if i.get("type", None) == "template" + i for i in annotations["instances"] if i.get("type", None) == "template" ): annotation["templateId"] = templates.get( annotation.get("templateName", ""), -1 @@ -2536,10 +2514,10 @@ def fill_classes_data(self, annotations: dict): for attribute in annotation["attributes"]: if annotation_classes[annotation_class_name].get("attribute_groups"): if ( - attribute["groupName"] - not in annotation_classes[annotation_class_name][ - "attribute_groups" - ] + attribute["groupName"] + not in annotation_classes[annotation_class_name][ + "attribute_groups" + ] ): continue else: @@ -2551,10 +2529,10 @@ def fill_classes_data(self, annotations: dict): ][attribute["groupName"]]["id"] if ( - attribute["name"] - not in annotation_classes[annotation_class_name][ - "attribute_groups" - ][attribute["groupName"]]["attributes"] + attribute["name"] + not in annotation_classes[annotation_class_name][ + "attribute_groups" + ][attribute["groupName"]]["attributes"] ): del attribute["groupId"] self.missing_attributes.add(attribute["name"]) @@ -2589,8 +2567,8 @@ def annotations_to_upload(self): folder_id=self._folder.uuid, images=[image.name for image in images_detail], ) - .execute() - .data + .execute() + .data ) for image_data in images_data: @@ -2667,7 +2645,7 @@ def execute(self): from_s3 = None with concurrent.futures.ThreadPoolExecutor( - max_workers=self.MAX_WORKERS + max_workers=self.MAX_WORKERS ) as executor: results = [ executor.submit( @@ -2761,14 +2739,14 @@ def report_missing_data(self): class DownloadImageAnnotationsUseCase(BaseUseCase): def __init__( - self, - service: SuerannotateServiceProvider, - project: ProjectEntity, - folder: FolderEntity, - image_name: str, - images: BaseManageableRepository, - destination: str, - annotation_classes: BaseManageableRepository, + self, + service: SuerannotateServiceProvider, + project: ProjectEntity, + folder: FolderEntity, + image_name: str, + images: BaseManageableRepository, + destination: str, + annotation_classes: BaseManageableRepository, ): super().__init__() self._service = service @@ -2830,7 +2808,7 @@ def fill_classes_data(self, annotations: dict): return templates = self.get_templates_mapping() for annotation in ( - i for i in annotations["instances"] if i.get("type", None) == "template" + i for i in annotations["instances"] if i.get("type", None) == "template" ): template_name = templates.get(annotation.get("templateId"), None) if template_name: @@ -2846,15 +2824,16 @@ def fill_classes_data(self, annotations: dict): i for i in annotation["attributes"] if "groupId" in i - and i["groupId"] in annotation_class["attribute_groups"].keys() + and i["groupId"] in annotation_class["attribute_groups"].keys() ]: attribute["groupName"] = annotation_class["attribute_groups"][ attribute["groupId"] ]["name"] - if attribute["id"] not in list( - annotation_class["attribute_groups"][attribute["groupId"]][ - "attributes" - ].keys() + if ( + attribute["id"] + not in list(annotation_class["attribute_groups"][attribute["groupId"]][ + "attributes" + ].keys()) ): continue attribute["name"] = annotation_class["attribute_groups"][ @@ -2905,7 +2884,7 @@ def execute(self): if response.ok: data["annotation_mask"] = io.BytesIO(response.content).getbuffer() mask_path = ( - Path(self._destination) / data["annotation_mask_filename"] + Path(self._destination) / data["annotation_mask_filename"] ) with open(mask_path, "wb") as f: f.write(data["annotation_mask"]) @@ -2923,13 +2902,13 @@ def execute(self): class DownloadImagePreAnnotationsUseCase(BaseUseCase): def __init__( - self, - service: SuerannotateServiceProvider, - project: ProjectEntity, - folder: FolderEntity, - image_name: str, - images: BaseManageableRepository, - destination: str, + self, + service: SuerannotateServiceProvider, + project: ProjectEntity, + folder: FolderEntity, + image_name: str, + images: BaseManageableRepository, + destination: str, ): super().__init__() self._service = service @@ -3001,12 +2980,12 @@ def execute(self): class GetImageAnnotationsUseCase(BaseUseCase): def __init__( - self, - service: SuerannotateServiceProvider, - project: ProjectEntity, - folder: FolderEntity, - image_name: str, - images: BaseManageableRepository, + self, + service: SuerannotateServiceProvider, + project: ProjectEntity, + folder: FolderEntity, + image_name: str, + images: BaseManageableRepository, ): super().__init__() self._service = service @@ -3080,12 +3059,12 @@ def execute(self): class GetImagePreAnnotationsUseCase(BaseUseCase): def __init__( - self, - service: SuerannotateServiceProvider, - project: ProjectEntity, - folder: FolderEntity, - image_name: str, - images: BaseManageableRepository, + self, + service: SuerannotateServiceProvider, + project: ProjectEntity, + folder: FolderEntity, + image_name: str, + images: BaseManageableRepository, ): super().__init__() self._service = service @@ -3155,12 +3134,12 @@ class AssignImagesUseCase(BaseUseCase): CHUNK_SIZE = 500 def __init__( - self, - service: SuerannotateServiceProvider, - project: ProjectEntity, - folder: FolderEntity, - image_names: list, - user: str, + self, + service: SuerannotateServiceProvider, + project: ProjectEntity, + folder: FolderEntity, + image_names: list, + user: str, ): super().__init__() self._project = project @@ -3184,8 +3163,8 @@ def execute(self): folder_name=self._folder.name, user=self._user, image_names=self._image_names[ - i : i + self.CHUNK_SIZE # noqa: E203 - ], + i: i + self.CHUNK_SIZE # noqa: E203 + ], ) if not is_assigned: self._response.errors = AppException( @@ -3199,11 +3178,11 @@ class UnAssignImagesUseCase(BaseUseCase): CHUNK_SIZE = 500 def __init__( - self, - service: SuerannotateServiceProvider, - project_entity: ProjectEntity, - folder: FolderEntity, - image_names: list, + self, + service: SuerannotateServiceProvider, + project_entity: ProjectEntity, + folder: FolderEntity, + image_names: list, ): super().__init__() self._project_entity = project_entity @@ -3218,7 +3197,7 @@ def execute(self): team_id=self._project_entity.team_id, project_id=self._project_entity.uuid, folder_name=self._folder.name, - image_names=self._image_names[i : i + self.CHUNK_SIZE], # noqa: E203 + image_names=self._image_names[i: i + self.CHUNK_SIZE], # noqa: E203 ) if not is_un_assigned: self._response.errors = AppException( @@ -3230,10 +3209,10 @@ def execute(self): class UnAssignFolderUseCase(BaseUseCase): def __init__( - self, - service: SuerannotateServiceProvider, - project_entity: ProjectEntity, - folder: FolderEntity, + self, + service: SuerannotateServiceProvider, + project_entity: ProjectEntity, + folder: FolderEntity, ): super().__init__() self._service = service @@ -3255,15 +3234,15 @@ class SetImageAnnotationStatuses(BaseUseCase): CHUNK_SIZE = 500 def __init__( - self, - service: SuerannotateServiceProvider, - projects: BaseReadOnlyRepository, - image_names: list, - team_id: int, - project_id: int, - folder_id: int, - images_repo: BaseManageableRepository, - annotation_status: int, + self, + service: SuerannotateServiceProvider, + projects: BaseReadOnlyRepository, + image_names: list, + team_id: int, + project_id: int, + folder_id: int, + images_repo: BaseManageableRepository, + annotation_status: int, ): super().__init__() self._service = service @@ -3286,9 +3265,9 @@ def execute(self): if self.is_valid(): if self._image_names is None: condition = ( - Condition("team_id", self._team_id, EQ) - & Condition("project_id", self._project_id, EQ) - & Condition("folder_id", self._folder_id, EQ) + Condition("team_id", self._team_id, EQ) + & Condition("project_id", self._project_id, EQ) + & Condition("folder_id", self._folder_id, EQ) ) self._image_names = [ image.name for image in self._images_repo.get_all(condition) @@ -3296,8 +3275,8 @@ def execute(self): for i in range(0, len(self._image_names), self.CHUNK_SIZE): status_changed = self._service.set_images_statuses_bulk( image_names=self._image_names[ - i : i + self.CHUNK_SIZE # noqa: E203 - ], + i: i + self.CHUNK_SIZE # noqa: E203 + ], team_id=self._team_id, project_id=self._project_id, folder_id=self._folder_id, @@ -3310,10 +3289,10 @@ def execute(self): class CreateAnnotationClassUseCase(BaseUseCase): def __init__( - self, - annotation_classes: BaseManageableRepository, - annotation_class: AnnotationClassEntity, - project_name: str, + self, + annotation_classes: BaseManageableRepository, + annotation_class: AnnotationClassEntity, + project_name: str, ): super().__init__() self._annotation_classes = annotation_classes @@ -3325,11 +3304,11 @@ def validate_uniqueness(self): Condition("name", self._annotation_class.name, EQ) ) if any( - [ - True - for annotation_class in annotation_classes - if annotation_class.name == self._annotation_class.name - ] + [ + True + for annotation_class in annotation_classes + if annotation_class.name == self._annotation_class.name + ] ): raise AppValidationException("Annotation class already exits.") @@ -3349,10 +3328,10 @@ def execute(self): class DeleteAnnotationClassUseCase(BaseUseCase): def __init__( - self, - annotation_classes_repo: BaseManageableRepository, - annotation_class_name: str, - project_name: str, + self, + annotation_classes_repo: BaseManageableRepository, + annotation_class_name: str, + project_name: str, ): super().__init__() self._annotation_classes_repo = annotation_classes_repo @@ -3368,7 +3347,7 @@ def uuid(self): def execute(self): annotation_classes = self._annotation_classes_repo.get_all( condition=Condition("name", self._annotation_class_name, EQ) - & Condition("pattern", True, EQ) + & Condition("pattern", True, EQ) ) self._annotation_class = annotation_classes[0] logger.info( @@ -3381,9 +3360,9 @@ def execute(self): class GetAnnotationClassUseCase(BaseUseCase): def __init__( - self, - annotation_classes_repo: BaseManageableRepository, - annotation_class_name: str, + self, + annotation_classes_repo: BaseManageableRepository, + annotation_class_name: str, ): super().__init__() self._annotation_classes_repo = annotation_classes_repo @@ -3399,10 +3378,10 @@ def execute(self): class DownloadAnnotationClassesUseCase(BaseUseCase): def __init__( - self, - annotation_classes_repo: BaseManageableRepository, - download_path: str, - project_name: str, + self, + annotation_classes_repo: BaseManageableRepository, + download_path: str, + project_name: str, ): super().__init__() self._annotation_classes_repo = annotation_classes_repo @@ -3427,11 +3406,11 @@ class CreateAnnotationClassesUseCase(BaseUseCase): CHUNK_SIZE = 500 def __init__( - self, - service: SuerannotateServiceProvider, - annotation_classes_repo: BaseManageableRepository, - annotation_classes: list, - project: ProjectEntity, + self, + service: SuerannotateServiceProvider, + annotation_classes_repo: BaseManageableRepository, + annotation_classes: list, + project: ProjectEntity, ): super().__init__() self._service = service @@ -3463,7 +3442,7 @@ def execute(self): created += self._service.set_annotation_classes( project_id=self._project.uuid, team_id=self._project.team_id, - data=unique_annotation_classes[i : i + self.CHUNK_SIZE], + data=unique_annotation_classes[i: i + self.CHUNK_SIZE], ) self._response.data = created return self._response @@ -3482,18 +3461,18 @@ def execute(self): class ExtractFramesUseCase(BaseUseCase): def __init__( - self, - backend_service_provider: SuerannotateServiceProvider, - project: ProjectEntity, - folder: FolderEntity, - video_path: str, - extract_path: str, - start_time: float, - end_time: float = None, - target_fps: float = None, - annotation_status_code: int = constances.AnnotationStatus.NOT_STARTED.value, - image_quality_in_editor: str = None, - limit: int = None, + self, + backend_service_provider: SuerannotateServiceProvider, + project: ProjectEntity, + folder: FolderEntity, + video_path: str, + extract_path: str, + start_time: float, + end_time: float = None, + target_fps: float = None, + annotation_status_code: int = constances.AnnotationStatus.NOT_STARTED.value, + image_quality_in_editor: str = None, + limit: int = None, ): super().__init__() self._backend_service = backend_service_provider @@ -3551,16 +3530,16 @@ def execute(self): class UploadS3ImagesBackendUseCase(BaseUseCase): def __init__( - self, - backend_service_provider: SuerannotateServiceProvider, - settings: BaseReadOnlyRepository, - project: ProjectEntity, - folder: FolderEntity, - access_key: str, - secret_key: str, - bucket_name: str, - folder_path: str, - image_quality: str, + self, + backend_service_provider: SuerannotateServiceProvider, + settings: BaseReadOnlyRepository, + project: ProjectEntity, + folder: FolderEntity, + access_key: str, + secret_key: str, + bucket_name: str, + folder_path: str, + image_quality: str, ): super().__init__() self._backend_service = backend_service_provider @@ -3575,8 +3554,8 @@ def __init__( def validate_image_quality(self): if self._image_quality and self._image_quality not in ( - "compressed", - "original", + "compressed", + "original", ): raise AppValidationException("Invalid value for image_quality") diff --git a/src/superannotate/lib/infrastructure/controller.py b/src/superannotate/lib/infrastructure/controller.py index afd1a890b..d2597704e 100644 --- a/src/superannotate/lib/infrastructure/controller.py +++ b/src/superannotate/lib/infrastructure/controller.py @@ -699,19 +699,20 @@ def get_image_bytes( ) return use_case.execute() - def move_image( - self, - from_project_name: str, - from_folder_name: str, - to_project_name: str, - to_folder_name: str, - image_name: str, - copy_annotation_status: bool = False, + def copy_image( + self, + from_project_name: str, + from_folder_name: str, + to_project_name: str, + to_folder_name: str, + image_name: str, + copy_annotation_status: bool = False, + move: bool = False ): from_project = self._get_project(from_project_name) to_project = self._get_project(to_project_name) to_folder = self._get_folder(to_project, to_folder_name) - use_case = usecases.MoveImageUseCase( + use_case = usecases.CopyImageUseCase( from_project=from_project, from_folder=self._get_folder(from_project, from_folder_name), to_project=to_project, @@ -719,13 +720,10 @@ def move_image( backend_service=self._backend_client, image_name=image_name, images=self.images, - project_settings=ProjectSettingsRepository( - self._backend_client, to_project - ).get_all(), - to_upload_s3_repo=self.get_s3_repository( - self.team_id, to_project.uuid, to_folder.uuid - ), + project_settings=ProjectSettingsRepository(self._backend_client, to_project).get_all(), + to_upload_s3_repo=self.get_s3_repository(self.team_id, to_project.uuid, to_folder.uuid), copy_annotation_status=copy_annotation_status, + move=move ) return use_case.execute() diff --git a/tests/integration/test_depricated_functions_document.py b/tests/integration/test_depricated_functions_document.py index 742df03e3..f8444df61 100644 --- a/tests/integration/test_depricated_functions_document.py +++ b/tests/integration/test_depricated_functions_document.py @@ -1,14 +1,15 @@ import os from os.path import dirname +from unittest import TestCase + import src.superannotate as sa from src.superannotate import AppException -from tests.integration.base import BaseTestCase from src.superannotate.lib.core import LIMITED_FUNCTIONS from src.superannotate.lib.core import ProjectType from src.superannotate.lib.core import DEPRICATED_DOCUMENT_VIDEO_MESSAGE -class TestDepricatedFunctionsDocument(BaseTestCase): +class TestDeprecatedFunctionsDocument(TestCase): PROJECT_NAME = "document proj 11" PROJECT_DESCRIPTION = "desc" PROJECT_TYPE = "Document"