Skip to content

Commit 32263f9

Browse files
authored
Merge pull request #757 from superannotateai/session_reset
Session reset
2 parents 1676a9e + 2e0ff81 commit 32263f9

File tree

2 files changed

+73
-41
lines changed

2 files changed

+73
-41
lines changed

CHANGELOG.rst

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,23 @@ History
66

77
All release highlights of this project will be documented in this file.
88

9+
10+
4.4.31 - Feb 27, 2025
11+
_____________________
12+
13+
**Added**
14+
15+
- Guide for Converting CSV and JSONL Formats.
16+
- New SDK Functionality Table.
17+
18+
**Updated**
19+
20+
- ``SAClient.attach_items_from_integrated_storage`` now supports Databricks integration, enabling efficient
21+
data fetching and mapping from Databricks into SuperAnnotate.
22+
923
4.4.30 - Feb 13, 2025
1024
_____________________
25+
1126
**Added**
1227

1328
- ``SAClient.list_users`` method lists contributors with optional custom field filtering.

src/superannotate/lib/infrastructure/stream_data_handler.py

Lines changed: 58 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@
22
import json
33
import logging
44
import os
5+
import threading
6+
import time
57
import typing
8+
from functools import lru_cache
69
from typing import Callable
710

811
import aiohttp
@@ -24,6 +27,7 @@
2427
class StreamedAnnotations:
2528
DELIMITER = "\\n;)\\n"
2629
DELIMITER_LEN = len(DELIMITER)
30+
VERIFY_SSL = False
2731

2832
def __init__(
2933
self,
@@ -50,15 +54,16 @@ def get_json(self, data: bytes):
5054
async def fetch(
5155
self,
5256
method: str,
53-
session: AIOHttpSession,
5457
url: str,
5558
data: dict = None,
5659
params: dict = None,
5760
):
5861
kwargs = {"params": params, "json": data}
5962
if data:
6063
kwargs["json"].update(data)
61-
response = await session.request(method, url, **kwargs, timeout=TIMEOUT) # noqa
64+
response = await self.get_session().request(
65+
method, url, **kwargs, timeout=TIMEOUT
66+
) # noqa
6267
if not response.ok:
6368
logger.error(response.text)
6469
buffer = ""
@@ -86,6 +91,7 @@ async def fetch(
8691
"Invalid JSON detected in small annotations stream process."
8792
)
8893
else:
94+
self.rest_session()
8995
raise BackendError(
9096
"Invalid JSON detected at the start of the small annotations stream process."
9197
)
@@ -103,33 +109,47 @@ async def fetch(
103109
)
104110
break
105111

112+
@lru_cache(maxsize=32)
113+
def _get_session(self, thread_id, ttl=None): # noqa
114+
del ttl
115+
del thread_id
116+
return AIOHttpSession(
117+
headers=self._headers,
118+
timeout=TIMEOUT,
119+
connector=aiohttp.TCPConnector(
120+
ssl=self.VERIFY_SSL, keepalive_timeout=2**32
121+
),
122+
raise_for_status=True,
123+
)
124+
125+
def get_session(self):
126+
return self._get_session(
127+
thread_id=threading.get_ident(), ttl=round(time.time() / 360)
128+
)
129+
130+
def rest_session(self):
131+
self._get_session.cache_clear()
132+
106133
async def list_annotations(
107134
self,
108135
method: str,
109136
url: str,
110137
data: typing.List[int] = None,
111138
params: dict = None,
112-
verify_ssl=False,
113139
):
114140
params = copy.copy(params)
115141
params["limit"] = len(data)
116142
annotations = []
117-
async with AIOHttpSession(
118-
headers=self._headers,
119-
timeout=TIMEOUT,
120-
connector=aiohttp.TCPConnector(ssl=verify_ssl, keepalive_timeout=2**32),
121-
raise_for_status=True,
122-
) as session:
123-
async for annotation in self.fetch(
124-
method,
125-
session,
126-
url,
127-
self._process_data(data),
128-
params=copy.copy(params),
129-
):
130-
annotations.append(
131-
self._callback(annotation) if self._callback else annotation
132-
)
143+
144+
async for annotation in self.fetch(
145+
method,
146+
url,
147+
self._process_data(data),
148+
params=copy.copy(params),
149+
):
150+
annotations.append(
151+
self._callback(annotation) if self._callback else annotation
152+
)
133153

134154
return annotations
135155

@@ -143,28 +163,22 @@ async def download_annotations(
143163
):
144164
params = copy.copy(params)
145165
params["limit"] = len(data)
146-
async with AIOHttpSession(
147-
headers=self._headers,
148-
timeout=TIMEOUT,
149-
connector=aiohttp.TCPConnector(ssl=False, keepalive_timeout=2**32),
150-
raise_for_status=True,
151-
) as session:
152-
async for annotation in self.fetch(
153-
method,
154-
session,
155-
url,
156-
self._process_data(data),
157-
params=params,
158-
):
159-
self._annotations.append(
160-
self._callback(annotation) if self._callback else annotation
161-
)
162-
self._store_annotation(
163-
download_path,
164-
annotation,
165-
self._callback,
166-
)
167-
self._items_downloaded += 1
166+
167+
async for annotation in self.fetch(
168+
method,
169+
url,
170+
self._process_data(data),
171+
params=params,
172+
):
173+
self._annotations.append(
174+
self._callback(annotation) if self._callback else annotation
175+
)
176+
self._store_annotation(
177+
download_path,
178+
annotation,
179+
self._callback,
180+
)
181+
self._items_downloaded += 1
168182

169183
@staticmethod
170184
def _store_annotation(path, annotation: dict, callback: Callable = None):
@@ -177,3 +191,6 @@ def _process_data(self, data):
177191
if data and self._map_function:
178192
return self._map_function(data)
179193
return data
194+
195+
def __del__(self):
196+
self.rest_session()

0 commit comments

Comments
 (0)