From 00ad7686d51db0ed3379eded64539e52603cb712 Mon Sep 17 00:00:00 2001 From: Narek Mkhitaryan Date: Thu, 13 Feb 2025 15:40:08 +0400 Subject: [PATCH 1/9] fix test_upload_with_integer_names --- tests/integration/annotations/test_upload_annotations.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/integration/annotations/test_upload_annotations.py b/tests/integration/annotations/test_upload_annotations.py index 1ef34ec7a..0732b5749 100644 --- a/tests/integration/annotations/test_upload_annotations.py +++ b/tests/integration/annotations/test_upload_annotations.py @@ -240,7 +240,7 @@ def test_upload_with_integer_names(self): f"{self.PROJECT_NAME}", annotations=data, data_spec="multimodal" ) assert len(res["failed"]) == 3 - annotations = sa.get_annotations( - f"{self.PROJECT_NAME}/test_folder", data_spec="multimodal" - ) - assert len(annotations) == 0 + with self.assertRaisesRegexp(AppException, "Folder not found."): + sa.get_annotations( + f"{self.PROJECT_NAME}/test_folder", data_spec="multimodal" + ) From 64cc82d8b45ca6136f4de068dc94f0700893557c Mon Sep 17 00:00:00 2001 From: Narek Mkhitaryan Date: Wed, 19 Feb 2025 12:39:10 +0400 Subject: [PATCH 2/9] update in get_annotations logic --- .../lib/app/interface/sdk_interface.py | 8 +-- .../lib/core/serviceproviders.py | 1 + .../lib/infrastructure/services/annotation.py | 46 +++++++++---- .../lib/infrastructure/stream_data_handler.py | 19 ++++++ src/superannotate/lib/infrastructure/utils.py | 68 +++++++++++++++++++ ...load_annotations_from_folder_to_project.py | 5 -- 6 files changed, 124 insertions(+), 23 deletions(-) diff --git a/src/superannotate/lib/app/interface/sdk_interface.py b/src/superannotate/lib/app/interface/sdk_interface.py index 8271dac56..a250b0cac 100644 --- a/src/superannotate/lib/app/interface/sdk_interface.py +++ b/src/superannotate/lib/app/interface/sdk_interface.py @@ -501,7 +501,7 @@ def get_component_config(self, project: Union[NotEmptyStr, int], component_id: s """ def retrieve_context( - component_data: List[dict], component_pk: str + component_data: List[dict], component_pk: str ) -> Tuple[bool, typing.Any]: try: for component in component_data: @@ -512,9 +512,9 @@ def retrieve_context( if found: return found, val if ( - "id" in component and - component["id"] == component_pk - and component["type"] == "webComponent" + "id" in component + and component["id"] == component_pk + and component["type"] == "webComponent" ): return True, json.loads(component.get("context")) diff --git a/src/superannotate/lib/core/serviceproviders.py b/src/superannotate/lib/core/serviceproviders.py index 2e5255743..b1117cd0f 100644 --- a/src/superannotate/lib/core/serviceproviders.py +++ b/src/superannotate/lib/core/serviceproviders.py @@ -490,6 +490,7 @@ def get_upload_chunks( self, project: entities.ProjectEntity, item_ids: List[int], + chunk_size: int = 1000, ) -> Dict[str, List]: raise NotImplementedError diff --git a/src/superannotate/lib/infrastructure/services/annotation.py b/src/superannotate/lib/infrastructure/services/annotation.py index 73a01794f..27a27dd39 100644 --- a/src/superannotate/lib/infrastructure/services/annotation.py +++ b/src/superannotate/lib/infrastructure/services/annotation.py @@ -18,6 +18,8 @@ from lib.core.service_types import UploadAnnotationsResponse from lib.core.serviceproviders import BaseAnnotationService from lib.infrastructure.stream_data_handler import StreamedAnnotations +from lib.infrastructure.utils import annotation_is_valid +from lib.infrastructure.utils import divide_to_chunks try: from pydantic.v1 import parse_obj_as @@ -170,21 +172,29 @@ def get_upload_chunks( self, project: entities.ProjectEntity, item_ids: List[int], + chunk_size: int = 1000, ) -> Dict[str, List]: - response_data = {"small": [], "large": []} - response = self.client.request( - url=urljoin(self.get_assets_provider_url(), self.URL_CLASSIFY_ITEM_SIZE), - method="POST", - params={"limit": len(item_ids)}, - data={"project_id": project.id, "item_ids": item_ids}, - ) - if not response.ok: - raise AppException(response.error) - response_data["small"] = [ - i["data"] for i in response.data.get("small", {}).values() - ] - response_data["large"] = response.data.get("large", []) - return response_data + small = [] + large = [] + + chunks = divide_to_chunks(item_ids, chunk_size) + for chunk in chunks: + response = self.client.request( + method="POST", + url=urljoin( + self.get_assets_provider_url(), self.URL_CLASSIFY_ITEM_SIZE + ), + params={"limit": len(chunk)}, + data={ + "project_id": project.id, + "item_ids": chunk, + }, + ) + if not response.ok: + raise AppException(response.error) + small.extend([i["data"] for i in response.data.get("small", {}).values()]) + large.extend(response.data.get("large", [])) + return {"small": small, "large": large} async def download_big_annotation( self, @@ -218,6 +228,14 @@ async def download_big_annotation( ) as session: start_response = await session.request("post", url, params=query_params) res = await start_response.json() + if start_response.status > 299 or not annotation_is_valid(res): + logger.debug( + f"Failed to download large annotation; item_id [{item_id}];" + f" response: {res}; http_status: {start_response.status}" + ) + raise AppException( + f"Failed to download large annotation, ID: {item_id}" + ) Path(download_path).mkdir(exist_ok=True, parents=True) dest_path = Path(download_path) / (item_name + ".json") diff --git a/src/superannotate/lib/infrastructure/stream_data_handler.py b/src/superannotate/lib/infrastructure/stream_data_handler.py index fd64db96a..3496aa863 100644 --- a/src/superannotate/lib/infrastructure/stream_data_handler.py +++ b/src/superannotate/lib/infrastructure/stream_data_handler.py @@ -6,8 +6,12 @@ from typing import Callable import aiohttp +from lib.core.exceptions import AppException +from lib.core.exceptions import BackendError from lib.core.reporter import Reporter from lib.infrastructure.services.http_client import AIOHttpSession +from lib.infrastructure.utils import annotation_is_valid +from lib.infrastructure.utils import async_retry_on_generator _seconds = 2**10 TIMEOUT = aiohttp.ClientTimeout( @@ -42,6 +46,7 @@ def get_json(self, data: bytes): self._reporter.log_error(f"Invalud chunk: {str(e)}") return None + @async_retry_on_generator((BackendError,)) async def fetch( self, method: str, @@ -59,6 +64,7 @@ async def fetch( buffer = "" line_groups = b"" decoder = json.JSONDecoder() + data_received = False async for line in response.content.iter_any(): line_groups += line try: @@ -71,6 +77,19 @@ async def fetch( if buffer.startswith(self.DELIMITER): buffer = buffer[self.DELIMITER_LEN :] json_obj, index = decoder.raw_decode(buffer) + if not annotation_is_valid(json_obj): + logger.warning( + f"Invalid JSON detected in small annotations stream process, json: {json_obj}." + ) + if data_received: + raise AppException( + "Invalid JSON detected in small annotations stream process." + ) + else: + raise BackendError( + "Invalid JSON detected at the start of the small annotations stream process." + ) + data_received = True yield json_obj if len(buffer[index:]) >= self.DELIMITER_LEN: buffer = buffer[index + self.DELIMITER_LEN :] diff --git a/src/superannotate/lib/infrastructure/utils.py b/src/superannotate/lib/infrastructure/utils.py index f495e4ae4..638c9f9da 100644 --- a/src/superannotate/lib/infrastructure/utils.py +++ b/src/superannotate/lib/infrastructure/utils.py @@ -1,12 +1,17 @@ +import asyncio +import logging import time from abc import ABC from abc import abstractmethod +from functools import wraps from itertools import islice from pathlib import Path from typing import Any +from typing import Callable from typing import Dict from typing import Optional from typing import Tuple +from typing import Type from typing import Union from lib.core.entities import ProjectEntity @@ -16,6 +21,9 @@ from lib.infrastructure.services.work_management import WorkManagementService +logger = logging.getLogger("sa") + + def divide_to_chunks(it, size): it = iter(it) return iter(lambda: tuple(islice(it, size)), ()) @@ -44,6 +52,66 @@ def extract_project_folder(user_input: Union[str, dict]) -> Tuple[str, Optional[ raise PathError("Invalid project path") +def async_retry_on_generator( + exceptions: Tuple[Type[Exception]], + retries: int = 3, + delay: float = 0.3, + backoff: float = 0.3, +): + """ + An async retry decorator that retries a function only on specific exceptions. + + Parameters: + exceptions (tuple): Tuple of exception classes to retry on. + retries (int): Number of retry attempts. + delay (float): Initial delay between retries in seconds. + backoff (float): Factor to increase the delay after each failure. + """ + + def decorator(func: Callable): + @wraps(func) + async def wrapper(*args, **kwargs): + attempt = 0 + current_delay = delay + raised_exception = None + + while attempt < retries: + try: + async for v in func(*args, **kwargs): + yield v + return + except exceptions as e: + raised_exception = e + logger.debug( + f"Attempt {attempt + 1}/{retries} failed with error: {e}. " + f"Retrying in {current_delay} seconds..." + ) + await asyncio.sleep(current_delay) + current_delay += backoff # Exponential backoff + finally: + attempt += 1 + if raised_exception: + logger.error( + f"All {retries} attempts failed due to {raised_exception}." + ) + raise raised_exception + + return wrapper + + return decorator + + +def annotation_is_valid(annotation: dict) -> bool: + annotation_keys = annotation.keys() + if ( + "errors" in annotation_keys + or "error" in annotation_keys + or "metadata" not in annotation_keys + ): + return False + return True + + class BaseCachedWorkManagementRepository(ABC): def __init__(self, ttl_seconds: int, work_management: WorkManagementService): self.ttl_seconds = ttl_seconds diff --git a/tests/integration/annotations/test_upload_annotations_from_folder_to_project.py b/tests/integration/annotations/test_upload_annotations_from_folder_to_project.py index 8553b46e4..a20de7f95 100644 --- a/tests/integration/annotations/test_upload_annotations_from_folder_to_project.py +++ b/tests/integration/annotations/test_upload_annotations_from_folder_to_project.py @@ -166,8 +166,3 @@ def test_annotation_folder_upload_download(self): contents1 = f1.read() contents2 = f2.read() assert contents1 == contents2 - - -def test(): - a = sa.get_user_metadata(pk=244700, include=["custom_fields"]) - print(a) From ee73f3a069ba3e6e8a43e13cfc2232fcca19f50d Mon Sep 17 00:00:00 2001 From: Narek Mkhitaryan Date: Fri, 21 Feb 2025 15:37:55 +0400 Subject: [PATCH 3/9] added Databricks support in attach_items_from_integrated_storage --- .gitignore | 1 + .../lib/app/interface/sdk_interface.py | 60 +++++++- .../lib/core/serviceproviders.py | 1 + .../lib/core/usecases/integrations.py | 139 +++++++++++++++--- .../lib/infrastructure/controller.py | 10 +- .../infrastructure/services/integration.py | 5 + 6 files changed, 191 insertions(+), 25 deletions(-) diff --git a/.gitignore b/.gitignore index afc3a974e..732995c91 100644 --- a/.gitignore +++ b/.gitignore @@ -54,6 +54,7 @@ coverage.xml *.py,cover .hypothesis/ .pytest_cache/ +tests/tmp_test.py # Translations *.mo diff --git a/src/superannotate/lib/app/interface/sdk_interface.py b/src/superannotate/lib/app/interface/sdk_interface.py index a250b0cac..fdb3d2b4e 100644 --- a/src/superannotate/lib/app/interface/sdk_interface.py +++ b/src/superannotate/lib/app/interface/sdk_interface.py @@ -2919,32 +2919,80 @@ def attach_items_from_integrated_storage( project: NotEmptyStr, integration: Union[NotEmptyStr, IntegrationEntity], folder_path: Optional[NotEmptyStr] = None, + *, + query: Optional[NotEmptyStr] = None, + item_name_column: Optional[NotEmptyStr] = None, + custom_item_name: Optional[NotEmptyStr] = None, + component_mapping: Optional[Dict[str, str]] = None, ): - """Link images from integrated external storage to SuperAnnotate. + """Link images from integrated external storage to SuperAnnotate from AWS, GCP, Azure, Databricks. :param project: project name or folder path where items should be attached (e.g., “project1/folder1”). :type project: str - :param integration: existing integration name or metadata dict to pull items from. - Mandatory keys in integration metadata’s dict is “name”. + :param integration: The existing integration name or metadata dict to pull items from. + Mandatory keys in integration metadata’s dict is “name”. :type integration: str or dict :param folder_path: Points to an exact folder/directory within given storage. - If None, items are fetched from the root directory. + If None, items are fetched from the root directory. :type folder_path: str + + :param query: (Only for Databricks). The SQL query to retrieve specific columns from Databricks. + If provided, the function will execute the query and use the results for mapping and uploading. + :type query: Optional[str] + + :param item_name_column: (Only for Databricks). The column name from the SQL query whose values + will be used as item names. If this is provided, custom_item_name cannot be used. + The column must exist in the query result. + :type item_name_column: Optional[str] + + :param custom_item_name: (Only for Databricks). A manually defined prefix for item names. + A random 10-character suffix will be appended to ensure uniqueness. + If this is provided, item_name_column cannot be used. + :type custom_item_name: Optional[str] + + :param component_mapping: (Only for Databricks). A dictionary mapping Databricks + columns to SuperAnnotate component IDs. + :type component_mapping: Optional[dict] + + + Request Example: + :: + + client.attach_items_from_integrated_storage( + project="project_name", + integration="databricks_integration", + query="SELECT * FROM integration_data LIMIT 10", + item_name_column="prompt", + component_mapping={ + "category": "_item_category", + "prompt_id": "id", + "prompt": "prompt" + } + ) + """ project, folder = self.controller.get_project_folder_by_path(project) _integration = None if isinstance(integration, str): integration = IntegrationEntity(name=integration) for i in self.controller.integrations.list().data: - if integration.name == i.name: + if integration.name.lower() == i.name.lower(): _integration = i break else: raise AppException("Integration not found.") + response = self.controller.integrations.attach_items( - project, folder, _integration, folder_path + project=project, + folder=folder, + integration=_integration, + folder_path=folder_path, + query=query, + item_name_column=item_name_column, + custom_item_name=custom_item_name, + component_mapping=component_mapping, ) if response.errors: raise AppException(response.errors) diff --git a/src/superannotate/lib/core/serviceproviders.py b/src/superannotate/lib/core/serviceproviders.py index b1117cd0f..5fc302ec6 100644 --- a/src/superannotate/lib/core/serviceproviders.py +++ b/src/superannotate/lib/core/serviceproviders.py @@ -593,6 +593,7 @@ def attach_items( folder: entities.FolderEntity, integration: entities.IntegrationEntity, folder_name: str = None, + options: Dict[str, str] = None, ) -> ServiceResponse: raise NotImplementedError diff --git a/src/superannotate/lib/core/usecases/integrations.py b/src/superannotate/lib/core/usecases/integrations.py index da60e2ff5..5057ef14e 100644 --- a/src/superannotate/lib/core/usecases/integrations.py +++ b/src/superannotate/lib/core/usecases/integrations.py @@ -1,8 +1,13 @@ -from typing import List +from typing import Dict +from typing import Optional +from lib.core.conditions import Condition +from lib.core.conditions import CONDITION_EQ as EQ from lib.core.entities import FolderEntity from lib.core.entities import IntegrationEntity from lib.core.entities import ProjectEntity +from lib.core.entities.integrations import IntegrationTypeEnum +from lib.core.enums import ProjectType from lib.core.exceptions import AppException from lib.core.reporter import Reporter from lib.core.response import Response @@ -25,6 +30,11 @@ def execute(self) -> Response: class AttachIntegrations(BaseReportableUseCase): + MULTIMODAL_INTEGRATIONS = [ + IntegrationTypeEnum.DATABRICKS, + IntegrationTypeEnum.SNOWFLAKE, + ] + def __init__( self, reporter: Reporter, @@ -33,46 +43,139 @@ def __init__( service_provider: BaseServiceProvider, integration: IntegrationEntity, folder_path: str = None, + query: Optional[str] = None, + item_name_column: Optional[str] = None, + custom_item_name: Optional[str] = None, + component_mapping: Optional[Dict[str, str]] = None, ): - super().__init__(reporter) self._project = project self._folder = folder self._integration = integration self._service_provider = service_provider self._folder_path = folder_path + self._query = query + self._item_name_column = item_name_column + self._custom_item_name = custom_item_name + self._component_mapping = component_mapping + self._options = {} # using only for Databricks and Snowflake + self._item_category_column = None @property def _upload_path(self): return f"{self._project.name}{f'/{self._folder.name}' if self._folder.name != 'root' else ''}" - def execute(self) -> Response: - integrations: List[ - IntegrationEntity - ] = self._service_provider.integrations.list().data.integrations - integration_name_lower = self._integration.name.lower() - integration = next( - (i for i in integrations if i.name.lower() == integration_name_lower), None + def validate_integration(self): + # TODO add support in next iterations + if self._integration.type == IntegrationTypeEnum.SNOWFLAKE: + raise AppException( + "Attaching items is not supported with Snowflake integration." + ) + + if self._integration.type in self.MULTIMODAL_INTEGRATIONS: + if self._project.type != ProjectType.MULTIMODAL: + raise AppException( + f"{self._integration.name} integration is supported only for Multimodal projects." + ) + + def validate_options_for_multimodal_integration(self): + if self._integration.type in self.MULTIMODAL_INTEGRATIONS: + if self._item_name_column and self._custom_item_name: + raise AppException( + "‘item_name_column and custom_item_name cannot be used simultaneously." + ) + + if not self._item_name_column and not self._custom_item_name: + raise AppException( + "Either item_name_column or custom_item_name is required." + ) + + if not all((self._query, self._component_mapping)): + raise AppException( + f"{self._integration.name} integration requires both a query and component_mapping." + ) + + category_setting: bool = bool( + next( + ( + setting.value + for setting in self._service_provider.projects.list_settings( + self._project + ).data + if setting.attribute == "CategorizeItems" + ), + None, + ) + ) + if ( + not category_setting + and "_item_category" in self._component_mapping.values() + ): + raise AppException( + "Item Category must be enabled for a project to use _item_category" + ) + + item_category_column = next( + ( + k + for k, v in self._component_mapping.items() + if v == "_item_category" + ), + None, + ) + if item_category_column: + self._item_category_column = self._component_mapping.pop( + item_category_column + ) + + sa_components = [ + c.name.lower() + for c in self._service_provider.annotation_classes.list( + condition=Condition("project_id", self._project.id, EQ) + ).data + ] + + for i in self._component_mapping.values(): + if i.lower() not in sa_components: + raise AppException( + f"Component mapping contains invalid component ID: `{i}`" + ) + + def generate_options_for_multimodal_integration(self): + self._options["query"] = self._query + self._options["item_name"] = ( + self._custom_item_name if self._custom_item_name else self._item_name_column ) - if integration: + self._options["prefix"] = True if self._custom_item_name else False + self._options["column_class_map"] = self._component_mapping + if self._item_category_column: + self._options["item_category"] = self._item_category_column + + def execute(self) -> Response: + if self.is_valid(): + if self._integration.type in self.MULTIMODAL_INTEGRATIONS: + self.generate_options_for_multimodal_integration() + self.reporter.log_info( "Attaching file(s) from " - f"{integration.root}{f'/{self._folder_path}' if self._folder_path else ''} " + f"{self._integration.root}{f'/{self._folder_path}' if self._folder_path else ''} " f"to {self._upload_path}. This may take some time." ) - attached = self._service_provider.integrations.attach_items( + + attache_response = self._service_provider.integrations.attach_items( project=self._project, folder=self._folder, - integration=integration, - folder_name=self._folder_path, + integration=self._integration, + folder_name=self._folder_path + if self._integration.type not in self.MULTIMODAL_INTEGRATIONS + else None, + options=self._options if self._options else None, ) - if not attached: + if not attache_response.ok: self._response.errors = AppException( f"An error occurred for {self._integration.name}. Please make sure: " "\n - The bucket exists." "\n - The connection is valid." "\n - The path to a specified directory is correct." ) - else: - self._response.errors = AppException("Integration not found.") - return self._response + return self._response diff --git a/src/superannotate/lib/infrastructure/controller.py b/src/superannotate/lib/infrastructure/controller.py index 4a89c069f..57ae432e7 100644 --- a/src/superannotate/lib/infrastructure/controller.py +++ b/src/superannotate/lib/infrastructure/controller.py @@ -1138,7 +1138,11 @@ def attach_items( project: ProjectEntity, folder: FolderEntity, integration: IntegrationEntity, - folder_path: str, + folder_path: str = None, + query: Optional[str] = None, + item_name_column: Optional[str] = None, + custom_item_name: Optional[str] = None, + component_mapping: Optional[Dict[str, str]] = None, ): use_case = usecases.AttachIntegrations( reporter=Reporter(), @@ -1147,6 +1151,10 @@ def attach_items( folder=folder, integration=integration, folder_path=folder_path, + query=query, + item_name_column=item_name_column, + custom_item_name=custom_item_name, + component_mapping=component_mapping, ) return use_case.execute() diff --git a/src/superannotate/lib/infrastructure/services/integration.py b/src/superannotate/lib/infrastructure/services/integration.py index 0b605cab2..48bf1f39d 100644 --- a/src/superannotate/lib/infrastructure/services/integration.py +++ b/src/superannotate/lib/infrastructure/services/integration.py @@ -1,3 +1,5 @@ +from typing import Dict + from lib.core import entities from lib.core.service_types import IntegrationListResponse from lib.core.serviceproviders import BaseIntegrationService @@ -23,6 +25,7 @@ def attach_items( folder: entities.FolderEntity, integration: entities.IntegrationEntity, folder_name: str = None, + options: Dict[str, str] = None, ): data = { "team_id": project.team_id, @@ -32,6 +35,8 @@ def attach_items( } if folder_name: data["customer_folder_name"] = folder_name + if options: + data["options"] = options return self.client.request( self.URL_ATTACH_INTEGRATIONS.format(project.team_id), "post", data=data ) From 15a45cc3ba2d925144cfa6af98fca9c38df85d45 Mon Sep 17 00:00:00 2001 From: Narek Mkhitaryan Date: Fri, 21 Feb 2025 18:15:59 +0400 Subject: [PATCH 4/9] fix in docs --- src/superannotate/lib/app/interface/sdk_interface.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/superannotate/lib/app/interface/sdk_interface.py b/src/superannotate/lib/app/interface/sdk_interface.py index fdb3d2b4e..8e6fb04a8 100644 --- a/src/superannotate/lib/app/interface/sdk_interface.py +++ b/src/superannotate/lib/app/interface/sdk_interface.py @@ -3641,7 +3641,7 @@ def copy_items( "skip", "replace", "replace_annotations_only" ] = "skip", ): - """Copy images in bulk between folders in a project + """Copy items in bulk between folders in a project :param source: project name (root) or folder path to pick items from (e.g., “project1/folder1”). :type source: str @@ -3705,7 +3705,7 @@ def move_items( "skip", "replace", "replace_annotations_only" ] = "skip", ): - """Move images in bulk between folders in a project + """Move items in bulk between folders in a project :param source: project name (root) or folder path to pick items from (e.g., “project1/folder1”). :type source: str From c6e93b1408f9848bc4d48a1ee44009e857ff008c Mon Sep 17 00:00:00 2001 From: Narek Mkhitaryan Date: Mon, 24 Feb 2025 12:03:07 +0400 Subject: [PATCH 5/9] fix in Databricks integration --- docs/source/userguide/SDK_Functions_sheet.csv | 6 +++--- src/superannotate/lib/core/usecases/integrations.py | 8 +++----- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/docs/source/userguide/SDK_Functions_sheet.csv b/docs/source/userguide/SDK_Functions_sheet.csv index 8b07a1294..fd41d3342 100644 --- a/docs/source/userguide/SDK_Functions_sheet.csv +++ b/docs/source/userguide/SDK_Functions_sheet.csv @@ -48,7 +48,7 @@ Annotations,upload_annotations(),Yes,Yes,Yes,Yes,Not Relevant ,set_annotation_statuses(),Yes,Not Relevant,Not Relevant,Not Relevant,Not Relevant ,delete_annotations(),Yes,Not Relevant,Not Relevant,Not Relevant,Not Relevant ,upload_annotations_from_folder_to_project(),No,No,Yes,No,AWS -"Annotation +"Annotation Classes",create_annotation_class(),Not Relevant,Not Relevant,Not Relevant,Not Relevant,Not Relevant ,create_annotation_classes_from_classes_json(),Not Relevant,Not Relevant,Not Relevant,Not Relevant,AWS ,search_annotation_classes(),Not Relevant,Not Relevant,Not Relevant,Not Relevant,Not Relevant @@ -57,7 +57,7 @@ Classes",create_annotation_class(),Not Relevant,Not Relevant,Not Relevant,Not Re Exports,prepare_export(),Yes,Yes,Yes,No,Not Relevant ,download_export(),Yes,Yes,Yes,Yes,AWS ,get_exports(),Yes,Not Relevant,Not Relevant,Not Relevant,Not Relevant -"Custom +"Custom Metadata ",create_custom_fields(),Yes,Not Relevant,Not Relevant,Not Relevant,Not Relevant ,get_custom_fields(),Yes,Not Relevant,Not Relevant,Not Relevant,Not Relevant @@ -82,6 +82,6 @@ Team,get_team_metadata(),Not Relevant,Not Relevant,Not Relevant,Not Relevant,Not Annotations",import_annotation(),Not Relevant,Not Relevant,Not Relevant,Not Relevant,Not Relevant ,export_annotation(),Not Relevant,Not Relevant,Not Relevant,Not Relevant,Not Relevant ,convert_project_type(),Not Relevant,Not Relevant,Not Relevant,Not Relevant,Not Relevant -"Working w/ +"Working w/ Annotations",validate_annotations(),Not Relevant,Not Relevant,Not Relevant,Not Relevant,Not Relevant ,aggregate_annotations_as_df(),Not Relevant,Not Relevant,Not Relevant,Not Relevant,Not Relevant \ No newline at end of file diff --git a/src/superannotate/lib/core/usecases/integrations.py b/src/superannotate/lib/core/usecases/integrations.py index 5057ef14e..2d7da8dbd 100644 --- a/src/superannotate/lib/core/usecases/integrations.py +++ b/src/superannotate/lib/core/usecases/integrations.py @@ -115,7 +115,7 @@ def validate_options_for_multimodal_integration(self): "Item Category must be enabled for a project to use _item_category" ) - item_category_column = next( + self._item_category_column = next( ( k for k, v in self._component_mapping.items() @@ -123,10 +123,8 @@ def validate_options_for_multimodal_integration(self): ), None, ) - if item_category_column: - self._item_category_column = self._component_mapping.pop( - item_category_column - ) + if self._item_category_column: + del self._component_mapping[self._item_category_column] sa_components = [ c.name.lower() From 5573aac8076c39c19219c9f544888f5566c90984 Mon Sep 17 00:00:00 2001 From: Vaghinak Basentsyan Date: Tue, 25 Feb 2025 11:11:27 +0400 Subject: [PATCH 6/9] Add session reset --- .../lib/infrastructure/stream_data_handler.py | 98 +++++++++++-------- 1 file changed, 57 insertions(+), 41 deletions(-) diff --git a/src/superannotate/lib/infrastructure/stream_data_handler.py b/src/superannotate/lib/infrastructure/stream_data_handler.py index 3496aa863..cdca1c8da 100644 --- a/src/superannotate/lib/infrastructure/stream_data_handler.py +++ b/src/superannotate/lib/infrastructure/stream_data_handler.py @@ -2,7 +2,10 @@ import json import logging import os +import threading +import time import typing +from functools import lru_cache from typing import Callable import aiohttp @@ -24,6 +27,7 @@ class StreamedAnnotations: DELIMITER = "\\n;)\\n" DELIMITER_LEN = len(DELIMITER) + VERIFY_SSL = False def __init__( self, @@ -50,7 +54,6 @@ def get_json(self, data: bytes): async def fetch( self, method: str, - session: AIOHttpSession, url: str, data: dict = None, params: dict = None, @@ -58,7 +61,9 @@ async def fetch( kwargs = {"params": params, "json": data} if data: kwargs["json"].update(data) - response = await session.request(method, url, **kwargs, timeout=TIMEOUT) # noqa + response = await self.get_session().request( + method, url, **kwargs, timeout=TIMEOUT + ) # noqa if not response.ok: logger.error(response.text) buffer = "" @@ -103,33 +108,47 @@ async def fetch( ) break + @lru_cache(maxsize=32) + def _get_session(self, thread_id, ttl=None): # noqa + del ttl + del thread_id + return AIOHttpSession( + headers=self._headers, + timeout=TIMEOUT, + connector=aiohttp.TCPConnector( + ssl=self.VERIFY_SSL, keepalive_timeout=2**32 + ), + raise_for_status=True, + ) + + def get_session(self): + return self._get_session( + thread_id=threading.get_ident(), ttl=round(time.time() / 360) + ) + + def rest_session(self): + self._get_session.cache_clear() + async def list_annotations( self, method: str, url: str, data: typing.List[int] = None, params: dict = None, - verify_ssl=False, ): params = copy.copy(params) params["limit"] = len(data) annotations = [] - async with AIOHttpSession( - headers=self._headers, - timeout=TIMEOUT, - connector=aiohttp.TCPConnector(ssl=verify_ssl, keepalive_timeout=2**32), - raise_for_status=True, - ) as session: - async for annotation in self.fetch( - method, - session, - url, - self._process_data(data), - params=copy.copy(params), - ): - annotations.append( - self._callback(annotation) if self._callback else annotation - ) + + async for annotation in self.fetch( + method, + url, + self._process_data(data), + params=copy.copy(params), + ): + annotations.append( + self._callback(annotation) if self._callback else annotation + ) return annotations @@ -143,28 +162,22 @@ async def download_annotations( ): params = copy.copy(params) params["limit"] = len(data) - async with AIOHttpSession( - headers=self._headers, - timeout=TIMEOUT, - connector=aiohttp.TCPConnector(ssl=False, keepalive_timeout=2**32), - raise_for_status=True, - ) as session: - async for annotation in self.fetch( - method, - session, - url, - self._process_data(data), - params=params, - ): - self._annotations.append( - self._callback(annotation) if self._callback else annotation - ) - self._store_annotation( - download_path, - annotation, - self._callback, - ) - self._items_downloaded += 1 + + async for annotation in self.fetch( + method, + url, + self._process_data(data), + params=params, + ): + self._annotations.append( + self._callback(annotation) if self._callback else annotation + ) + self._store_annotation( + download_path, + annotation, + self._callback, + ) + self._items_downloaded += 1 @staticmethod def _store_annotation(path, annotation: dict, callback: Callable = None): @@ -177,3 +190,6 @@ def _process_data(self, data): if data and self._map_function: return self._map_function(data) return data + + def __del__(self): + self._get_session.cache_clear() From 2e0ff816fd559c3b3807b2b504be5134c25d61cd Mon Sep 17 00:00:00 2001 From: Narek Mkhitaryan Date: Tue, 25 Feb 2025 12:53:02 +0400 Subject: [PATCH 7/9] added rest_session in get_annotations --- CHANGELOG.rst | 15 +++++++++++++++ .../lib/infrastructure/stream_data_handler.py | 3 ++- 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index d8779fe94..54a18694e 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -6,8 +6,23 @@ History All release highlights of this project will be documented in this file. + +4.4.31 - Feb 27, 2025 +_____________________ + +**Added** + + - Guide for Converting CSV and JSONL Formats. + - New SDK Functionality Table. + +**Updated** + + - ``SAClient.attach_items_from_integrated_storage`` now supports Databricks integration, enabling efficient + data fetching and mapping from Databricks into SuperAnnotate. + 4.4.30 - Feb 13, 2025 _____________________ + **Added** - ``SAClient.list_users`` method lists contributors with optional custom field filtering. diff --git a/src/superannotate/lib/infrastructure/stream_data_handler.py b/src/superannotate/lib/infrastructure/stream_data_handler.py index cdca1c8da..0471dd488 100644 --- a/src/superannotate/lib/infrastructure/stream_data_handler.py +++ b/src/superannotate/lib/infrastructure/stream_data_handler.py @@ -91,6 +91,7 @@ async def fetch( "Invalid JSON detected in small annotations stream process." ) else: + self.rest_session() raise BackendError( "Invalid JSON detected at the start of the small annotations stream process." ) @@ -192,4 +193,4 @@ def _process_data(self, data): return data def __del__(self): - self._get_session.cache_clear() + self.rest_session() From 9c0abb99911247c8838594703dd39260852df8eb Mon Sep 17 00:00:00 2001 From: Vaghinak Basentsyan Date: Wed, 26 Feb 2025 18:23:40 +0400 Subject: [PATCH 8/9] Fix add_contributors_to_project --- src/superannotate/__init__.py | 2 +- src/superannotate/lib/infrastructure/utils.py | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/superannotate/__init__.py b/src/superannotate/__init__.py index 191158e2f..c6a8a015d 100644 --- a/src/superannotate/__init__.py +++ b/src/superannotate/__init__.py @@ -3,7 +3,7 @@ import sys -__version__ = "4.4.31dev1" +__version__ = "4.4.31dev2" os.environ.update({"sa_version": __version__}) sys.path.append(os.path.split(os.path.realpath(__file__))[0]) diff --git a/src/superannotate/lib/infrastructure/utils.py b/src/superannotate/lib/infrastructure/utils.py index 638c9f9da..d925c5094 100644 --- a/src/superannotate/lib/infrastructure/utils.py +++ b/src/superannotate/lib/infrastructure/utils.py @@ -148,10 +148,12 @@ def sync(self, project: ProjectEntity): roles = response.data["data"] self._K_V_map[project.id] = { "role_name_id_map": { - role["role"]["name"]: role["role_id"] for role in roles + **{role["role"]["name"]: role["role_id"] for role in roles}, + "ProjectAdmin": 3 }, "role_id_name_map": { - role["role_id"]: role["role"]["name"] for role in roles + **{role["role_id"]: role["role"]["name"] for role in roles}, + 3: "ProjectAdmin" }, } self._update_cache_timestamp(project.id) From 045d4585bde96d1bc9f8fc215504da8beb4a15b9 Mon Sep 17 00:00:00 2001 From: Vaghinak Basentsyan Date: Wed, 26 Feb 2025 18:35:51 +0400 Subject: [PATCH 9/9] Update docs --- docs/source/api_reference/api_team.rst | 2 ++ .../lib/app/interface/sdk_interface.py | 20 +++++++++++++++++++ src/superannotate/lib/infrastructure/utils.py | 4 ++-- 3 files changed, 24 insertions(+), 2 deletions(-) diff --git a/docs/source/api_reference/api_team.rst b/docs/source/api_reference/api_team.rst index 5f897ba8e..1d9cda508 100644 --- a/docs/source/api_reference/api_team.rst +++ b/docs/source/api_reference/api_team.rst @@ -10,3 +10,5 @@ Team .. automethod:: superannotate.SAClient.get_user_metadata .. automethod:: superannotate.SAClient.set_user_custom_field .. automethod:: superannotate.SAClient.list_users +.. automethod:: superannotate.SAClient.pause_user_activity +.. automethod:: superannotate.SAClient.resume_user_activity diff --git a/src/superannotate/lib/app/interface/sdk_interface.py b/src/superannotate/lib/app/interface/sdk_interface.py index 8e6fb04a8..2dfe694b6 100644 --- a/src/superannotate/lib/app/interface/sdk_interface.py +++ b/src/superannotate/lib/app/interface/sdk_interface.py @@ -461,6 +461,16 @@ def list_users(self, *, include: List[Literal["custom_fields"]] = None, **filter def pause_user_activity( self, pk: Union[int, str], projects: Union[List[int], List[str], Literal["*"]] ): + """ + Block the team contributor from requesting items from the projects. + + :param pk: The email address or user ID of the team contributor. + :type pk: str or int + + :param projects: A list of project names or IDs from which the user should be blocked. + The special value "*" means block access to all projects + :type projects: Union[List[int], List[str], Literal["*"]] + """ user = self.controller.work_management.get_user_metadata(pk=pk) if user.role is not WMUserTypeEnum.Contributor: raise AppException("User must have a contributor role to pause activity.") @@ -474,6 +484,16 @@ def pause_user_activity( def resume_user_activity( self, pk: Union[int, str], projects: Union[List[int], List[str], Literal["*"]] ): + """ + Resume the team contributor from requesting items from the projects. + + :param pk: The email address or user ID of the team contributor. + :type pk: str or int + + :param projects: A list of project names or IDs from which the user should be resumed. + The special value "*" means resume access to all projects + :type projects: Union[List[int], List[str], Literal["*"]] + """ user = self.controller.work_management.get_user_metadata(pk=pk) if user.role is not WMUserTypeEnum.Contributor: raise AppException("User must have a contributor role to resume activity.") diff --git a/src/superannotate/lib/infrastructure/utils.py b/src/superannotate/lib/infrastructure/utils.py index d925c5094..eaa75e80d 100644 --- a/src/superannotate/lib/infrastructure/utils.py +++ b/src/superannotate/lib/infrastructure/utils.py @@ -149,11 +149,11 @@ def sync(self, project: ProjectEntity): self._K_V_map[project.id] = { "role_name_id_map": { **{role["role"]["name"]: role["role_id"] for role in roles}, - "ProjectAdmin": 3 + "ProjectAdmin": 3, }, "role_id_name_map": { **{role["role_id"]: role["role"]["name"] for role in roles}, - 3: "ProjectAdmin" + 3: "ProjectAdmin", }, } self._update_cache_timestamp(project.id)