Skip to content

Commit

Permalink
Merge pull request #17 from saturncloud/bhperry/error-handling
Browse files Browse the repository at this point in the history
error handling
  • Loading branch information
bhperry committed Feb 26, 2024
2 parents 060b1bc + daeee60 commit 70f7468
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 26 deletions.
3 changes: 2 additions & 1 deletion saturnfs/client/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from requests import Response, Session
from saturnfs.errors import ExpiredSignature, SaturnError
from saturnfs.utils import requests_session


class AWSPresignedClient:
Expand All @@ -12,7 +13,7 @@ class AWSPresignedClient:
"""

def __init__(self) -> None:
self.session = Session()
self.session = requests_session()

def get(
self,
Expand Down
32 changes: 14 additions & 18 deletions saturnfs/client/file_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from typing import Any, BinaryIO, Dict, Iterable, List, Optional, Tuple, Union

from fsspec import Callback
from requests import Session
from saturnfs import settings
from saturnfs.client.aws import AWSPresignedClient
from saturnfs.errors import ExpiredSignature
Expand All @@ -23,7 +22,7 @@
ObjectStoragePresignedUpload,
)
from saturnfs.schemas.upload import ObjectStoragePresignedPart
from saturnfs.utils import byte_range_header
from saturnfs.utils import byte_range_header, requests_session


class FileTransferClient:
Expand Down Expand Up @@ -368,7 +367,7 @@ def _worker(self):
Pull chunks from the download queue, and write the associated byte range to a temp file.
Push completed chunk onto the completed queue to be reconstructed.
"""
with Session() as session:
with requests_session() as session:
while True:
part = self.download_queue.get()
if part is None:
Expand All @@ -380,24 +379,23 @@ def _worker(self):
pass
break

if self.disk_buffer:
buffer = tempfile.TemporaryFile(mode="w+b")
else:
buffer = BytesIO()

try:
if self.disk_buffer:
buffer = tempfile.TemporaryFile(mode="w+b")
else:
buffer = BytesIO()

self.file_transfer._download_to_writer(
part.url, buffer, headers=part.headers, session=session
)
except ExpiredSignature:
except Exception as e:
# Signal that an error has occurred
self.stop.set()
self.download_queue.task_done()
buffer.close()
if self.exit_on_timeout:
return
else:
if isinstance(e, ExpiredSignature) and not self.exit_on_timeout:
continue
return

buffer.seek(0)
self.completed_queue.put(DownloadChunk(part.part_number, part.size, buffer))
Expand Down Expand Up @@ -500,7 +498,7 @@ def _put_chunk(self, chunk: UploadChunk, poll_interval: int = 5) -> bool:
return False

def _worker(self):
with Session() as session:
with requests_session() as session:
while True:
chunk = self.upload_queue.get()
if chunk is None:
Expand All @@ -516,15 +514,13 @@ def _worker(self):
completed_part = self.file_transfer.upload_part(
chunk.data, chunk.part, session=session
)
except ExpiredSignature:
except Exception as e:
# Signal that an error has occurred
self.stop.set()
self.upload_queue.task_done()
if self.exit_on_timeout:
return
else:
if isinstance(e, ExpiredSignature) and not self.exit_on_timeout:
continue

return
self.upload_queue.task_done()
self.completed_queue.put(completed_part)

Expand Down
13 changes: 7 additions & 6 deletions saturnfs/client/object_storage.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
from typing import Collection, Iterable, List, Optional

from requests import Session
from requests.adapters import HTTPAdapter, Retry
from saturnfs import settings
from saturnfs.api.delete import BulkDeleteAPI, DeleteAPI
from saturnfs.api.download import BulkDownloadAPI, DownloadAPI
Expand Down Expand Up @@ -32,6 +30,7 @@
ObjectStorageUploadList,
)
from saturnfs.schemas.usage import ObjectStorageUsageResults
from saturnfs.utils import requests_session


class ObjectStorageClient:
Expand All @@ -45,10 +44,12 @@ def __init__(
backoff_factor: float = 0.1,
retry_statuses: Collection[int] = frozenset([409, 423]),
):
retry = Retry(retries, backoff_factor=backoff_factor, status_forcelist=retry_statuses)
self.session = Session()
self.session.headers["Authorization"] = f"token {settings.SATURN_TOKEN}"
self.session.mount("http", HTTPAdapter(max_retries=retry))
self.session = requests_session(
retries=retries,
backoff_factor=backoff_factor,
status_forcelist=retry_statuses,
headers={"Authorization": f"token {settings.SATURN_TOKEN}"},
)

def start_upload(
self,
Expand Down
19 changes: 18 additions & 1 deletion saturnfs/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from enum import Enum
from typing import Dict
from typing import Dict, Optional

from requests import Session
from requests.adapters import HTTPAdapter, Retry


class Units(int, Enum):
Expand All @@ -24,3 +27,17 @@ def byte_range_header(start: int, end: int) -> Dict[str, str]:
HTTP byte range header with non-inclusive end
"""
return {"Range": f"bytes={start}-{end - 1}"}


def requests_session(
retries: int = 10,
backoff_factor: float = 0.1,
headers: Optional[Dict[str, str]] = None,
**kwargs,
) -> Session:
retry = Retry(total=retries, backoff_factor=backoff_factor, **kwargs)
session = Session()
session.mount("http", HTTPAdapter(max_retries=retry))
if headers:
session.headers.update(headers)
return session

0 comments on commit 70f7468

Please sign in to comment.