From 5573aac8076c39c19219c9f544888f5566c90984 Mon Sep 17 00:00:00 2001 From: Vaghinak Basentsyan Date: Tue, 25 Feb 2025 11:11:27 +0400 Subject: [PATCH 1/2] Add session reset --- .../lib/infrastructure/stream_data_handler.py | 98 +++++++++++-------- 1 file changed, 57 insertions(+), 41 deletions(-) diff --git a/src/superannotate/lib/infrastructure/stream_data_handler.py b/src/superannotate/lib/infrastructure/stream_data_handler.py index 3496aa863..cdca1c8da 100644 --- a/src/superannotate/lib/infrastructure/stream_data_handler.py +++ b/src/superannotate/lib/infrastructure/stream_data_handler.py @@ -2,7 +2,10 @@ import json import logging import os +import threading +import time import typing +from functools import lru_cache from typing import Callable import aiohttp @@ -24,6 +27,7 @@ class StreamedAnnotations: DELIMITER = "\\n;)\\n" DELIMITER_LEN = len(DELIMITER) + VERIFY_SSL = False def __init__( self, @@ -50,7 +54,6 @@ def get_json(self, data: bytes): async def fetch( self, method: str, - session: AIOHttpSession, url: str, data: dict = None, params: dict = None, @@ -58,7 +61,9 @@ async def fetch( kwargs = {"params": params, "json": data} if data: kwargs["json"].update(data) - response = await session.request(method, url, **kwargs, timeout=TIMEOUT) # noqa + response = await self.get_session().request( + method, url, **kwargs, timeout=TIMEOUT + ) # noqa if not response.ok: logger.error(response.text) buffer = "" @@ -103,33 +108,47 @@ async def fetch( ) break + @lru_cache(maxsize=32) + def _get_session(self, thread_id, ttl=None): # noqa + del ttl + del thread_id + return AIOHttpSession( + headers=self._headers, + timeout=TIMEOUT, + connector=aiohttp.TCPConnector( + ssl=self.VERIFY_SSL, keepalive_timeout=2**32 + ), + raise_for_status=True, + ) + + def get_session(self): + return self._get_session( + thread_id=threading.get_ident(), ttl=round(time.time() / 360) + ) + + def rest_session(self): + self._get_session.cache_clear() + async def list_annotations( self, method: str, url: str, data: typing.List[int] = None, params: dict = None, - verify_ssl=False, ): params = copy.copy(params) params["limit"] = len(data) annotations = [] - async with AIOHttpSession( - headers=self._headers, - timeout=TIMEOUT, - connector=aiohttp.TCPConnector(ssl=verify_ssl, keepalive_timeout=2**32), - raise_for_status=True, - ) as session: - async for annotation in self.fetch( - method, - session, - url, - self._process_data(data), - params=copy.copy(params), - ): - annotations.append( - self._callback(annotation) if self._callback else annotation - ) + + async for annotation in self.fetch( + method, + url, + self._process_data(data), + params=copy.copy(params), + ): + annotations.append( + self._callback(annotation) if self._callback else annotation + ) return annotations @@ -143,28 +162,22 @@ async def download_annotations( ): params = copy.copy(params) params["limit"] = len(data) - async with AIOHttpSession( - headers=self._headers, - timeout=TIMEOUT, - connector=aiohttp.TCPConnector(ssl=False, keepalive_timeout=2**32), - raise_for_status=True, - ) as session: - async for annotation in self.fetch( - method, - session, - url, - self._process_data(data), - params=params, - ): - self._annotations.append( - self._callback(annotation) if self._callback else annotation - ) - self._store_annotation( - download_path, - annotation, - self._callback, - ) - self._items_downloaded += 1 + + async for annotation in self.fetch( + method, + url, + self._process_data(data), + params=params, + ): + self._annotations.append( + self._callback(annotation) if self._callback else annotation + ) + self._store_annotation( + download_path, + annotation, + self._callback, + ) + self._items_downloaded += 1 @staticmethod def _store_annotation(path, annotation: dict, callback: Callable = None): @@ -177,3 +190,6 @@ def _process_data(self, data): if data and self._map_function: return self._map_function(data) return data + + def __del__(self): + self._get_session.cache_clear() From 2e0ff816fd559c3b3807b2b504be5134c25d61cd Mon Sep 17 00:00:00 2001 From: Narek Mkhitaryan Date: Tue, 25 Feb 2025 12:53:02 +0400 Subject: [PATCH 2/2] added rest_session in get_annotations --- CHANGELOG.rst | 15 +++++++++++++++ .../lib/infrastructure/stream_data_handler.py | 3 ++- 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index d8779fe94..54a18694e 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -6,8 +6,23 @@ History All release highlights of this project will be documented in this file. + +4.4.31 - Feb 27, 2025 +_____________________ + +**Added** + + - Guide for Converting CSV and JSONL Formats. + - New SDK Functionality Table. + +**Updated** + + - ``SAClient.attach_items_from_integrated_storage`` now supports Databricks integration, enabling efficient + data fetching and mapping from Databricks into SuperAnnotate. + 4.4.30 - Feb 13, 2025 _____________________ + **Added** - ``SAClient.list_users`` method lists contributors with optional custom field filtering. diff --git a/src/superannotate/lib/infrastructure/stream_data_handler.py b/src/superannotate/lib/infrastructure/stream_data_handler.py index cdca1c8da..0471dd488 100644 --- a/src/superannotate/lib/infrastructure/stream_data_handler.py +++ b/src/superannotate/lib/infrastructure/stream_data_handler.py @@ -91,6 +91,7 @@ async def fetch( "Invalid JSON detected in small annotations stream process." ) else: + self.rest_session() raise BackendError( "Invalid JSON detected at the start of the small annotations stream process." ) @@ -192,4 +193,4 @@ def _process_data(self, data): return data def __del__(self): - self._get_session.cache_clear() + self.rest_session()