Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,23 @@ History

All release highlights of this project will be documented in this file.


4.4.31 - Feb 27, 2025
_____________________

**Added**

- Guide for Converting CSV and JSONL Formats.
- New SDK Functionality Table.

**Updated**

- ``SAClient.attach_items_from_integrated_storage`` now supports Databricks integration, enabling efficient
data fetching and mapping from Databricks into SuperAnnotate.

4.4.30 - Feb 13, 2025
_____________________

**Added**

- ``SAClient.list_users`` method lists contributors with optional custom field filtering.
Expand Down
99 changes: 58 additions & 41 deletions src/superannotate/lib/infrastructure/stream_data_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
import json
import logging
import os
import threading
import time
import typing
from functools import lru_cache
from typing import Callable

import aiohttp
Expand All @@ -24,6 +27,7 @@
class StreamedAnnotations:
DELIMITER = "\\n;)\\n"
DELIMITER_LEN = len(DELIMITER)
VERIFY_SSL = False

def __init__(
self,
Expand All @@ -50,15 +54,16 @@ def get_json(self, data: bytes):
async def fetch(
self,
method: str,
session: AIOHttpSession,
url: str,
data: dict = None,
params: dict = None,
):
kwargs = {"params": params, "json": data}
if data:
kwargs["json"].update(data)
response = await session.request(method, url, **kwargs, timeout=TIMEOUT) # noqa
response = await self.get_session().request(
method, url, **kwargs, timeout=TIMEOUT
) # noqa
if not response.ok:
logger.error(response.text)
buffer = ""
Expand Down Expand Up @@ -86,6 +91,7 @@ async def fetch(
"Invalid JSON detected in small annotations stream process."
)
else:
self.rest_session()
raise BackendError(
"Invalid JSON detected at the start of the small annotations stream process."
)
Expand All @@ -103,33 +109,47 @@ async def fetch(
)
break

@lru_cache(maxsize=32)
def _get_session(self, thread_id, ttl=None): # noqa
del ttl
del thread_id
return AIOHttpSession(
headers=self._headers,
timeout=TIMEOUT,
connector=aiohttp.TCPConnector(
ssl=self.VERIFY_SSL, keepalive_timeout=2**32
),
raise_for_status=True,
)

def get_session(self):
return self._get_session(
thread_id=threading.get_ident(), ttl=round(time.time() / 360)
)

def rest_session(self):
self._get_session.cache_clear()

async def list_annotations(
self,
method: str,
url: str,
data: typing.List[int] = None,
params: dict = None,
verify_ssl=False,
):
params = copy.copy(params)
params["limit"] = len(data)
annotations = []
async with AIOHttpSession(
headers=self._headers,
timeout=TIMEOUT,
connector=aiohttp.TCPConnector(ssl=verify_ssl, keepalive_timeout=2**32),
raise_for_status=True,
) as session:
async for annotation in self.fetch(
method,
session,
url,
self._process_data(data),
params=copy.copy(params),
):
annotations.append(
self._callback(annotation) if self._callback else annotation
)

async for annotation in self.fetch(
method,
url,
self._process_data(data),
params=copy.copy(params),
):
annotations.append(
self._callback(annotation) if self._callback else annotation
)

return annotations

Expand All @@ -143,28 +163,22 @@ async def download_annotations(
):
params = copy.copy(params)
params["limit"] = len(data)
async with AIOHttpSession(
headers=self._headers,
timeout=TIMEOUT,
connector=aiohttp.TCPConnector(ssl=False, keepalive_timeout=2**32),
raise_for_status=True,
) as session:
async for annotation in self.fetch(
method,
session,
url,
self._process_data(data),
params=params,
):
self._annotations.append(
self._callback(annotation) if self._callback else annotation
)
self._store_annotation(
download_path,
annotation,
self._callback,
)
self._items_downloaded += 1

async for annotation in self.fetch(
method,
url,
self._process_data(data),
params=params,
):
self._annotations.append(
self._callback(annotation) if self._callback else annotation
)
self._store_annotation(
download_path,
annotation,
self._callback,
)
self._items_downloaded += 1

@staticmethod
def _store_annotation(path, annotation: dict, callback: Callable = None):
Expand All @@ -177,3 +191,6 @@ def _process_data(self, data):
if data and self._map_function:
return self._map_function(data)
return data

def __del__(self):
self.rest_session()