Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,30 @@ To run test, first install the `test` extra package
pip install darwin-py[test]
```

### Configuration

#### Retry Configuration

The SDK includes a retry mechanism for handling API rate limits (429) and server errors (500, 502, 503, 504). You can configure the retry behavior using the following environment variables:

- `DARWIN_RETRY_INITIAL_WAIT`: Initial wait time in seconds between retries (default: 60)
- `DARWIN_RETRY_MAX_WAIT`: Maximum wait time in seconds between retries (default: 300)
- `DARWIN_RETRY_MAX_ATTEMPTS`: Maximum number of retry attempts (default: 10)

Example configuration:
```bash
# Configure shorter retry intervals and fewer attempts
export DARWIN_RETRY_INITIAL_WAIT=30
export DARWIN_RETRY_MAX_WAIT=120
export DARWIN_RETRY_MAX_ATTEMPTS=5
```

The retry mechanism will automatically handle:
- Rate limiting (HTTP 429)
- Server errors (HTTP 500, 502, 503, 504)

For each retry attempt, you'll see a message indicating the type of error and the wait time before the next attempt.

### Development

See our development and QA environment installation recommendations [here](docs/DEV.md)
Expand Down
23 changes: 0 additions & 23 deletions darwin/backend_v2.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
from urllib import parse

from requests.exceptions import HTTPError
from requests.models import Response
from tenacity import RetryCallState, retry, stop_after_attempt, wait_exponential_jitter

from darwin.datatypes import ItemId


Expand All @@ -21,19 +17,6 @@ def wrapper(self, *args, **kwargs) -> Callable:
return wrapper


def log_rate_limit_exceeded(retry_state: RetryCallState):
wait_time = retry_state.next_action.sleep
print(f"Rate limit exceeded. Retrying in {wait_time:.2f} seconds...")


def retry_if_status_code_429(retry_state: RetryCallState):
exception = retry_state.outcome.exception()
if isinstance(exception, HTTPError):
response: Response = exception.response
return response.status_code == 429
return False


class BackendV2:
def __init__(self, client: "Client", default_team): # noqa F821
self._client = client
Expand Down Expand Up @@ -255,12 +238,6 @@ def import_annotation(
f"v2/teams/{team_slug}/items/{item_id}/import", payload=payload
)

@retry(
wait=wait_exponential_jitter(initial=60, max=300),
stop=stop_after_attempt(10),
retry=retry_if_status_code_429,
before_sleep=log_rate_limit_exceeded,
)
@inject_default_team_slug
def register_items(self, payload: Dict[str, Any], team_slug: str) -> None:
"""
Expand Down
153 changes: 110 additions & 43 deletions darwin/client.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
import json
import logging
import os
import time
import zlib
from logging import Logger
from pathlib import Path
from typing import Dict, Iterator, List, Optional, Union, cast

from requests.exceptions import HTTPError
import requests
from requests import Response
from requests.adapters import HTTPAdapter

from tenacity import RetryCallState, retry, stop_after_attempt, wait_exponential_jitter
from darwin.backend_v2 import BackendV2
from darwin.config import Config
from darwin.dataset.identifier import DatasetIdentifier
Expand Down Expand Up @@ -52,6 +51,75 @@
)
from darwin.utils.get_item_count import get_item_count

INITIAL_WAIT = int(os.getenv("DARWIN_RETRY_INITIAL_WAIT", "60"))
MAX_WAIT = int(os.getenv("DARWIN_RETRY_MAX_WAIT", "300"))
MAX_RETRIES = int(os.getenv("DARWIN_RETRY_MAX_ATTEMPTS", "10"))


def log_rate_limit_exceeded(retry_state: RetryCallState):
wait_time = retry_state.next_action.sleep
print(f"Rate limit exceeded. Retrying in {wait_time:.2f} seconds...")


def retry_if_status_code_429(retry_state: RetryCallState):
exception = retry_state.outcome.exception()
if isinstance(exception, HTTPError):
response: Response = exception.response
return response.status_code == 429
return False


def retry_if_status_code_429_or_5xx(retry_state: RetryCallState) -> bool:
"""
Determines if a request should be retried based on the response status code.

Retries on:
- Rate limit (429)
- Server errors (500, 502, 503, 504)

Parameters
----------
retry_state : RetryCallState
The current state of the retry mechanism

Returns
-------
bool
True if the request should be retried, False otherwise
"""
exception = retry_state.outcome.exception()
if isinstance(exception, HTTPError):
response: Response = exception.response
return response.status_code in {
429,
500,
502,
503,
504,
}
return False


def log_retry_error(retry_state: RetryCallState) -> None:
"""
Logs information about why a request is being retried.

Parameters
----------
retry_state : RetryCallState
The current state of the retry mechanism
"""
wait_time = retry_state.next_action.sleep
exception = retry_state.outcome.exception()
if isinstance(exception, HTTPError):
response: Response = exception.response
if response.status_code == 429:
print(f"Rate limit exceeded. Retrying in {wait_time:.2f} seconds...")
else:
print(
f"Server error {response.status_code}. Retrying in {wait_time:.2f} seconds..."
)


class Client:
def __init__(
Expand Down Expand Up @@ -719,9 +787,14 @@ def default_base_url() -> str:
return os.getenv("DARWIN_BASE_URL", "https://darwin.v7labs.com")

def _get_headers(
self, team_slug: Optional[str] = None, compressed: bool = False
self,
team_slug: Optional[str] = None,
compressed: bool = False,
auth_token: Optional[bool] = False,
) -> Dict[str, str]:
headers: Dict[str, str] = {"Content-Type": "application/json"}
if auth_token:
return headers

api_key: Optional[str] = None
team_config: Optional[Team] = self.config.get_team(
Expand All @@ -742,15 +815,23 @@ def _get_headers(
headers["User-Agent"] = f"darwin-py/{__version__}"
return headers

@retry(
wait=wait_exponential_jitter(initial=INITIAL_WAIT, max=MAX_WAIT),
stop=stop_after_attempt(MAX_RETRIES),
retry=retry_if_status_code_429_or_5xx,
before_sleep=log_retry_error,
)
def _get_raw_from_full_url(
self,
url: str,
team_slug: Optional[str] = None,
retry: bool = False,
stream: bool = False,
auth_token: Optional[bool] = False,
) -> Response:
response: Response = self.session.get(
url, headers=self._get_headers(team_slug), stream=stream
url,
headers=self._get_headers(team_slug, auth_token=auth_token),
stream=stream,
)

self.log.debug(
Expand All @@ -761,40 +842,36 @@ def _get_raw_from_full_url(
)

self._raise_if_known_error(response, url)

if not response.ok and retry:
time.sleep(10)
return self._get_raw_from_full_url(
url=url, team_slug=team_slug, retry=False, stream=stream
)

response.raise_for_status()

return response

def _get_raw(
self,
endpoint: str,
team_slug: Optional[str] = None,
retry: bool = False,
stream: bool = False,
) -> Response:
return self._get_raw_from_full_url(
urljoin(self.url, endpoint), team_slug, retry=retry, stream=stream
urljoin(self.url, endpoint), team_slug, stream=stream
)

def _get(
self, endpoint: str, team_slug: Optional[str] = None, retry: bool = False
self, endpoint: str, team_slug: Optional[str] = None
) -> Union[Dict[str, UnknownType], List[Dict[str, UnknownType]]]:
response = self._get_raw(endpoint, team_slug, retry)
response = self._get_raw(endpoint, team_slug)
return self._decode_response(response)

@retry(
wait=wait_exponential_jitter(initial=INITIAL_WAIT, max=MAX_WAIT),
stop=stop_after_attempt(MAX_RETRIES),
retry=retry_if_status_code_429_or_5xx,
before_sleep=log_retry_error,
)
def _put_raw(
self,
endpoint: str,
payload: Dict[str, UnknownType],
team_slug: Optional[str] = None,
retry: bool = False,
) -> Response:
response: requests.Response = self.session.put(
urljoin(self.url, endpoint),
Expand All @@ -810,31 +887,29 @@ def _put_raw(
)

self._raise_if_known_error(response, urljoin(self.url, endpoint))

if not response.ok and retry:
time.sleep(10)
return self._put_raw(endpoint, payload=payload, retry=False)

response.raise_for_status()

return response

def _put(
self,
endpoint: str,
payload: Dict[str, UnknownType],
team_slug: Optional[str] = None,
retry: bool = False,
) -> Union[Dict[str, UnknownType], List[Dict[str, UnknownType]]]:
response: Response = self._put_raw(endpoint, payload, team_slug, retry)
response: Response = self._put_raw(endpoint, payload, team_slug)
return self._decode_response(response)

@retry(
wait=wait_exponential_jitter(initial=INITIAL_WAIT, max=MAX_WAIT),
stop=stop_after_attempt(MAX_RETRIES),
retry=retry_if_status_code_429_or_5xx,
before_sleep=log_retry_error,
)
def _post_raw(
self,
endpoint: str,
payload: Optional[Dict[str, UnknownType]] = None,
team_slug: Optional[str] = None,
retry: bool = False,
) -> Response:
if payload is None:
payload = {}
Expand Down Expand Up @@ -868,31 +943,29 @@ def _post_raw(
)

self._raise_if_known_error(response, urljoin(self.url, endpoint))

if not response.ok and retry:
time.sleep(10)
return self._post_raw(endpoint, payload=payload, retry=False)

response.raise_for_status()

return response

def _post(
self,
endpoint: str,
payload: Optional[Dict[str, UnknownType]] = None,
team_slug: Optional[str] = None,
retry: bool = False,
) -> Union[Dict[str, UnknownType], List[Dict[str, UnknownType]]]:
response: Response = self._post_raw(endpoint, payload, team_slug, retry)
response: Response = self._post_raw(endpoint, payload, team_slug)
return self._decode_response(response)

@retry(
wait=wait_exponential_jitter(initial=INITIAL_WAIT, max=MAX_WAIT),
stop=stop_after_attempt(MAX_RETRIES),
retry=retry_if_status_code_429_or_5xx,
before_sleep=log_retry_error,
)
def _delete(
self,
endpoint: str,
payload: Optional[Dict[str, UnknownType]] = None,
team_slug: Optional[str] = None,
retry: bool = False,
) -> Union[Dict[str, UnknownType], List[Dict[str, UnknownType]]]:
if payload is None:
payload = {}
Expand All @@ -911,13 +984,7 @@ def _delete(
)

self._raise_if_known_error(response, urljoin(self.url, endpoint))

if not response.ok and retry:
time.sleep(10)
return self._delete(endpoint, payload=payload, retry=False)

response.raise_for_status()

return self._decode_response(response)

def _raise_if_known_error(self, response: Response, url: str) -> None:
Expand Down
Loading