|
33 | 33 | from lib.core.usecases.folders import SearchFoldersUseCase |
34 | 34 | from lib.infrastructure.utils import divide_to_chunks |
35 | 35 | from lib.infrastructure.utils import extract_project_folder |
| 36 | +from typing_extensions import Literal |
36 | 37 |
|
37 | 38 | logger = logging.getLogger("sa") |
38 | 39 |
|
@@ -537,6 +538,154 @@ def execute(self): |
537 | 538 | return self._response |
538 | 539 |
|
539 | 540 |
|
| 541 | +class CopyMoveItems(BaseReportableUseCase): |
| 542 | + """ |
| 543 | + Copy/Move items in bulk between folders in a project. |
| 544 | + Return skipped item names. |
| 545 | + """ |
| 546 | + |
| 547 | + def __init__( |
| 548 | + self, |
| 549 | + reporter: Reporter, |
| 550 | + project: ProjectEntity, |
| 551 | + from_folder: FolderEntity, |
| 552 | + to_folder: FolderEntity, |
| 553 | + item_names: List[str], |
| 554 | + service_provider: BaseServiceProvider, |
| 555 | + include_annotations: bool, |
| 556 | + duplicate_strategy: Literal["skip", "replace", "replace_annotations_only"], |
| 557 | + operation: Literal["copy", "move"], |
| 558 | + chunk_size: int = 1000, |
| 559 | + ): |
| 560 | + super().__init__(reporter) |
| 561 | + self._project = project |
| 562 | + self._from_folder = from_folder |
| 563 | + self._to_folder = to_folder |
| 564 | + self._item_names = item_names |
| 565 | + self._service_provider = service_provider |
| 566 | + self._include_annotations = include_annotations |
| 567 | + self._duplicate_strategy = duplicate_strategy |
| 568 | + self._operation = operation |
| 569 | + self._chunk_size = chunk_size |
| 570 | + |
| 571 | + def _validate_limitations(self, items_count): |
| 572 | + response = self._service_provider.get_limitations( |
| 573 | + project=self._project, |
| 574 | + folder=self._to_folder, |
| 575 | + ) |
| 576 | + if not response.ok: |
| 577 | + raise AppValidationException(response.error) |
| 578 | + if self._operation == "copy": |
| 579 | + folder_limit_err_msg = constants.COPY_FOLDER_LIMIT_ERROR_MESSAGE |
| 580 | + project_limit_err_msg = constants.COPY_PROJECT_LIMIT_ERROR_MESSAGE |
| 581 | + else: |
| 582 | + folder_limit_err_msg = constants.MOVE_FOLDER_LIMIT_ERROR_MESSAGE |
| 583 | + project_limit_err_msg = constants.MOVE_PROJECT_LIMIT_ERROR_MESSAGE |
| 584 | + if items_count > response.data.folder_limit.remaining_image_count: |
| 585 | + raise AppValidationException(folder_limit_err_msg) |
| 586 | + if items_count > response.data.project_limit.remaining_image_count: |
| 587 | + raise AppValidationException(project_limit_err_msg) |
| 588 | + |
| 589 | + def validate_item_names(self): |
| 590 | + if self._item_names: |
| 591 | + self._item_names = list(set(self._item_names)) |
| 592 | + |
| 593 | + def execute(self): |
| 594 | + if self.is_valid(): |
| 595 | + if self._item_names: |
| 596 | + items = self._item_names |
| 597 | + else: |
| 598 | + res = self._service_provider.item_service.list( |
| 599 | + self._project.id, self._from_folder.id, EmptyQuery() |
| 600 | + ) |
| 601 | + if res.error: |
| 602 | + raise AppException(res.error) |
| 603 | + items = [i.name for i in res.data] |
| 604 | + try: |
| 605 | + self._validate_limitations(len(items)) |
| 606 | + except AppValidationException as e: |
| 607 | + self._response.errors = e |
| 608 | + return self._response |
| 609 | + skipped_items = [] |
| 610 | + if self._duplicate_strategy == "skip": |
| 611 | + existing_items = [] |
| 612 | + for i in range(0, len(items), self._chunk_size): |
| 613 | + query = Filter( |
| 614 | + "name", items[i : i + self._chunk_size], OperatorEnum.IN |
| 615 | + ) # noqa |
| 616 | + res = self._service_provider.item_service.list( |
| 617 | + self._project.id, self._to_folder.id, query |
| 618 | + ) |
| 619 | + if res.error: |
| 620 | + raise AppException(res.error) |
| 621 | + if not res.data: |
| 622 | + continue |
| 623 | + existing_items += res.data |
| 624 | + duplications = [item.name for item in existing_items] |
| 625 | + items_to_processing = list(set(items) - set(duplications)) |
| 626 | + skipped_items.extend(duplications) |
| 627 | + else: |
| 628 | + items_to_processing = items |
| 629 | + if items_to_processing: |
| 630 | + for i in range(0, len(items_to_processing), self._chunk_size): |
| 631 | + chunk_to_process = items_to_processing[ |
| 632 | + i : i + self._chunk_size |
| 633 | + ] # noqa: E203 |
| 634 | + response = self._service_provider.items.copy_move_multiple( |
| 635 | + project=self._project, |
| 636 | + from_folder=self._from_folder, |
| 637 | + to_folder=self._to_folder, |
| 638 | + item_names=chunk_to_process, |
| 639 | + include_annotations=self._include_annotations, |
| 640 | + duplicate_strategy=self._duplicate_strategy, |
| 641 | + operation=self._operation, |
| 642 | + ) |
| 643 | + if not response.ok or not response.data.get("poll_id"): |
| 644 | + skipped_items.extend(chunk_to_process) |
| 645 | + continue |
| 646 | + try: |
| 647 | + self._service_provider.items.await_copy_move( |
| 648 | + project=self._project, |
| 649 | + poll_id=response.data["poll_id"], |
| 650 | + items_count=len(chunk_to_process), |
| 651 | + ) |
| 652 | + except BackendError as e: |
| 653 | + self._response.errors = AppException(e) |
| 654 | + return self._response |
| 655 | + existing_items = [] |
| 656 | + for i in range(0, len(items_to_processing), self._chunk_size): |
| 657 | + res = self._service_provider.item_service.list( |
| 658 | + self._project.id, |
| 659 | + self._to_folder.id, |
| 660 | + Filter( |
| 661 | + "name", |
| 662 | + items_to_processing[i : i + self._chunk_size], |
| 663 | + OperatorEnum.IN, |
| 664 | + ), # noqa |
| 665 | + ) |
| 666 | + if res.error: |
| 667 | + raise AppException(res.error) |
| 668 | + |
| 669 | + existing_items += res.data |
| 670 | + |
| 671 | + existing_item_names_set = {item.name for item in existing_items} |
| 672 | + items_to_processing_names_set = set(items_to_processing) |
| 673 | + processed_items = existing_item_names_set.intersection( |
| 674 | + items_to_processing_names_set |
| 675 | + ) |
| 676 | + skipped_items.extend( |
| 677 | + list(items_to_processing_names_set - processed_items) |
| 678 | + ) |
| 679 | + operation_processing_map = {"copy": "Copied", "move": "Moved"} |
| 680 | + self.reporter.log_info( |
| 681 | + f"{operation_processing_map[self._operation]} {len(processed_items)}/{len(items_to_processing)} item(s) from " |
| 682 | + f"{self._project.name}{'' if self._from_folder.is_root else f'/{self._from_folder.name}'} to " |
| 683 | + f"{self._project.name}{'' if self._to_folder.is_root else f'/{self._to_folder.name}'}" |
| 684 | + ) |
| 685 | + self._response.data = list(set(skipped_items)) |
| 686 | + return self._response |
| 687 | + |
| 688 | + |
540 | 689 | class SetAnnotationStatues(BaseReportableUseCase): |
541 | 690 | CHUNK_SIZE = 500 |
542 | 691 | ERROR_MESSAGE = "Failed to change status" |
|
0 commit comments