From c76b119d902d1ad6ead900d1f537c053e27b22bd Mon Sep 17 00:00:00 2001 From: John Wilkie Date: Sat, 1 Feb 2025 14:29:51 +0000 Subject: [PATCH 1/3] Introduce API-wide 429 handling --- darwin/backend_v2.py | 23 ------ darwin/client.py | 47 ++++++++++- darwin/dataset/download_manager.py | 2 +- darwin/future/core/items/uploads.py | 2 +- darwin/importer/importer.py | 8 +- darwin/utils/utils.py | 4 +- e2e_tests/helpers.py | 2 +- tests/darwin/backend_v2_test.py | 28 ------- tests/darwin/client_test.py | 122 ++++++++++++++++++++++++++++ 9 files changed, 175 insertions(+), 63 deletions(-) delete mode 100644 tests/darwin/backend_v2_test.py diff --git a/darwin/backend_v2.py b/darwin/backend_v2.py index 69702efcc..8779fa56b 100644 --- a/darwin/backend_v2.py +++ b/darwin/backend_v2.py @@ -1,10 +1,6 @@ from typing import Any, Callable, Dict, List, Optional, Tuple, Union from urllib import parse -from requests.exceptions import HTTPError -from requests.models import Response -from tenacity import RetryCallState, retry, stop_after_attempt, wait_exponential_jitter - from darwin.datatypes import ItemId @@ -21,19 +17,6 @@ def wrapper(self, *args, **kwargs) -> Callable: return wrapper -def log_rate_limit_exceeded(retry_state: RetryCallState): - wait_time = retry_state.next_action.sleep - print(f"Rate limit exceeded. Retrying in {wait_time:.2f} seconds...") - - -def retry_if_status_code_429(retry_state: RetryCallState): - exception = retry_state.outcome.exception() - if isinstance(exception, HTTPError): - response: Response = exception.response - return response.status_code == 429 - return False - - class BackendV2: def __init__(self, client: "Client", default_team): # noqa F821 self._client = client @@ -255,12 +238,6 @@ def import_annotation( f"v2/teams/{team_slug}/items/{item_id}/import", payload=payload ) - @retry( - wait=wait_exponential_jitter(initial=60, max=300), - stop=stop_after_attempt(10), - retry=retry_if_status_code_429, - before_sleep=log_rate_limit_exceeded, - ) @inject_default_team_slug def register_items(self, payload: Dict[str, Any], team_slug: str) -> None: """ diff --git a/darwin/client.py b/darwin/client.py index febdd9520..8b468f0ee 100644 --- a/darwin/client.py +++ b/darwin/client.py @@ -6,11 +6,11 @@ from logging import Logger from pathlib import Path from typing import Dict, Iterator, List, Optional, Union, cast - +from requests.exceptions import HTTPError import requests from requests import Response from requests.adapters import HTTPAdapter - +from tenacity import RetryCallState, retry, stop_after_attempt, wait_exponential_jitter from darwin.backend_v2 import BackendV2 from darwin.config import Config from darwin.dataset.identifier import DatasetIdentifier @@ -52,6 +52,23 @@ ) from darwin.utils.get_item_count import get_item_count +INITIAL_WAIT = 60 +MAX_WAIT = 300 +MAX_RETRIES = 10 + + +def log_rate_limit_exceeded(retry_state: RetryCallState): + wait_time = retry_state.next_action.sleep + print(f"Rate limit exceeded. Retrying in {wait_time:.2f} seconds...") + + +def retry_if_status_code_429(retry_state: RetryCallState): + exception = retry_state.outcome.exception() + if isinstance(exception, HTTPError): + response: Response = exception.response + return response.status_code == 429 + return False + class Client: def __init__( @@ -678,7 +695,7 @@ def from_api_key( "Authorization": f"ApiKey {api_key}", } api_url: str = Client.default_api_url() - response: requests.Response = requests.get( + response: requests.Response = cls._get_raw_from_full_url( urljoin(api_url, "/users/token_info"), headers=headers ) @@ -742,6 +759,12 @@ def _get_headers( headers["User-Agent"] = f"darwin-py/{__version__}" return headers + @retry( + wait=wait_exponential_jitter(initial=INITIAL_WAIT, max=MAX_WAIT), + stop=stop_after_attempt(MAX_RETRIES), + retry=retry_if_status_code_429, + before_sleep=log_rate_limit_exceeded, + ) def _get_raw_from_full_url( self, url: str, @@ -789,6 +812,12 @@ def _get( response = self._get_raw(endpoint, team_slug, retry) return self._decode_response(response) + @retry( + wait=wait_exponential_jitter(initial=INITIAL_WAIT, max=MAX_WAIT), + stop=stop_after_attempt(MAX_RETRIES), + retry=retry_if_status_code_429, + before_sleep=log_rate_limit_exceeded, + ) def _put_raw( self, endpoint: str, @@ -829,6 +858,12 @@ def _put( response: Response = self._put_raw(endpoint, payload, team_slug, retry) return self._decode_response(response) + @retry( + wait=wait_exponential_jitter(initial=INITIAL_WAIT, max=MAX_WAIT), + stop=stop_after_attempt(MAX_RETRIES), + retry=retry_if_status_code_429, + before_sleep=log_rate_limit_exceeded, + ) def _post_raw( self, endpoint: str, @@ -887,6 +922,12 @@ def _post( response: Response = self._post_raw(endpoint, payload, team_slug, retry) return self._decode_response(response) + @retry( + wait=wait_exponential_jitter(initial=INITIAL_WAIT, max=MAX_WAIT), + stop=stop_after_attempt(MAX_RETRIES), + retry=retry_if_status_code_429, + before_sleep=log_rate_limit_exceeded, + ) def _delete( self, endpoint: str, diff --git a/darwin/dataset/download_manager.py b/darwin/dataset/download_manager.py index b36f23f18..92ce6448a 100644 --- a/darwin/dataset/download_manager.py +++ b/darwin/dataset/download_manager.py @@ -756,7 +756,7 @@ def _remove_empty_directories(images_path: Path) -> bool: def _check_for_duplicate_local_filepaths( - download_functions: List[Callable[[], None]] + download_functions: List[Callable[[], None]], ) -> None: """ If pulling a release without folders, check for duplicate filepaths in the download functions. diff --git a/darwin/future/core/items/uploads.py b/darwin/future/core/items/uploads.py index 459759811..79e0532cd 100644 --- a/darwin/future/core/items/uploads.py +++ b/darwin/future/core/items/uploads.py @@ -84,7 +84,7 @@ async def _build_layout(item: UploadItem) -> Dict: async def _build_payload_items( - items_and_paths: List[Tuple[UploadItem, Path]] + items_and_paths: List[Tuple[UploadItem, Path]], ) -> List[Dict]: """ Builds the payload for the items to be registered for upload diff --git a/darwin/importer/importer.py b/darwin/importer/importer.py index 0d4764b84..0ce221f6e 100644 --- a/darwin/importer/importer.py +++ b/darwin/importer/importer.py @@ -81,7 +81,7 @@ def _build_main_annotations_lookup_table( - annotation_classes: List[Dict[str, Unknown]] + annotation_classes: List[Dict[str, Unknown]], ) -> Dict[str, Unknown]: MAIN_ANNOTATION_TYPES = [ "bounding_box", @@ -484,7 +484,7 @@ def _serialize_item_level_properties( def _parse_metadata_file( - metadata_path: Union[Path, bool] + metadata_path: Union[Path, bool], ) -> Tuple[List[PropertyClass], List[Dict[str, str]]]: if isinstance(metadata_path, Path): metadata = parse_metadata(metadata_path) @@ -1001,7 +1001,7 @@ def _import_properties( def _normalize_item_properties( - item_properties: Union[Dict[str, Dict[str, Any]], List[Dict[str, str]]] + item_properties: Union[Dict[str, Dict[str, Any]], List[Dict[str, str]]], ) -> Dict[str, Dict[str, Any]]: """ Normalizes item properties to a common dictionary format. @@ -2046,7 +2046,7 @@ def _overwrite_warning( def _get_annotation_format( - importer: Callable[[Path], Union[List[dt.AnnotationFile], dt.AnnotationFile, None]] + importer: Callable[[Path], Union[List[dt.AnnotationFile], dt.AnnotationFile, None]], ) -> str: """ Returns the annotation format of the importer used to parse local annotation files diff --git a/darwin/utils/utils.py b/darwin/utils/utils.py index 422e31bd8..c098bfda1 100644 --- a/darwin/utils/utils.py +++ b/darwin/utils/utils.py @@ -1220,7 +1220,7 @@ def _parse_annotators(annotators: List[Dict[str, Any]]) -> List[dt.AnnotationAut def _parse_properties( - properties: List[Dict[str, Any]] + properties: List[Dict[str, Any]], ) -> Optional[List[SelectedProperty]]: selected_properties = [] for property in properties: @@ -1512,7 +1512,7 @@ def _parse_version(data: dict) -> dt.AnnotationFileVersion: def _data_to_annotations( - data: Dict[str, Any] + data: Dict[str, Any], ) -> List[Union[dt.Annotation, dt.VideoAnnotation]]: raw_image_annotations = filter( lambda annotation: ( diff --git a/e2e_tests/helpers.py b/e2e_tests/helpers.py index 7ffdf887d..ea68fef21 100644 --- a/e2e_tests/helpers.py +++ b/e2e_tests/helpers.py @@ -263,7 +263,7 @@ def export_release( def delete_annotation_uuids( - annotations: Sequence[Union[dt.Annotation, dt.VideoAnnotation]] + annotations: Sequence[Union[dt.Annotation, dt.VideoAnnotation]], ): """ Removes all UUIDs present in instances of `dt.Annotation` and `dt.VideoAnnotation` objects. diff --git a/tests/darwin/backend_v2_test.py b/tests/darwin/backend_v2_test.py deleted file mode 100644 index dc5ee6c00..000000000 --- a/tests/darwin/backend_v2_test.py +++ /dev/null @@ -1,28 +0,0 @@ -from unittest.mock import Mock, call, patch - -import pytest -from requests.exceptions import HTTPError -from requests.models import Response -from tenacity import RetryError - -from darwin.backend_v2 import BackendV2 - - -class TestBackendV2: - @patch("time.sleep", return_value=None) - def test_register_items_retries_on_429(self, mock_sleep): - mock_client = Mock() - mock_response = Mock(spec=Response) - mock_response.status_code = 429 - mock_client._post_raw.side_effect = HTTPError(response=mock_response) - - backend = BackendV2(mock_client, "team_slug") - - payload = {"key": "value"} - with pytest.raises(RetryError): - backend.register_items(payload) - - assert mock_client._post_raw.call_count == 10 - - expected_call = call("/v2/teams/team_slug/items/register_existing", payload) - assert mock_client._post_raw.call_args_list == [expected_call] * 10 diff --git a/tests/darwin/client_test.py b/tests/darwin/client_test.py index 5b97fb8db..62524c294 100644 --- a/tests/darwin/client_test.py +++ b/tests/darwin/client_test.py @@ -15,6 +15,12 @@ from tests.fixtures import * # noqa: F401, F403 +from unittest.mock import Mock, patch +from requests import Response, HTTPError +from darwin.client import MAX_RETRIES +from tenacity import RetryError + + @pytest.fixture def darwin_client( darwin_config_path: Path, darwin_datasets_path: Path, team_slug_darwin_json_v2: str @@ -557,3 +563,119 @@ def test_returns_list_of_external_storage_connections( assert actual_storages[1].readonly == expected_storage_2.readonly assert actual_storages[1].provider == expected_storage_2.provider assert actual_storages[1].default == expected_storage_2.default + + +class TestClientRetry: + @pytest.fixture + def mock_config(self): + config = Mock(spec=Config) + + # Set up the mock to return different values based on the key + def get_side_effect(key, default=None): + if key == "global/api_endpoint": + return "https://darwin.v7labs.com/api/" + if key == "global/payload_compression_level": + return "0" + return default + + config.get.side_effect = get_side_effect + config.get_team.return_value = Mock(api_key="test-key", slug="test-team") + return config + + @pytest.fixture + def client(self, mock_config): + return Client(config=mock_config, default_team="test-team") + + @patch("time.sleep", return_value=None) + def test_get_retries_on_429(self, mock_sleep, client): + mock_response = Mock(spec=Response) + mock_response.status_code = 429 + mock_response.headers = {} + + with patch("requests.Session.get") as mock_get: + mock_get.return_value = mock_response + + with pytest.raises(RetryError): + client._get("/test-endpoint") + + assert mock_get.call_count == MAX_RETRIES + + @patch("time.sleep", return_value=None) + def test_post_retries_on_429(self, mock_sleep, client): + mock_response = Mock(spec=Response) + mock_response.status_code = 429 + mock_response.headers = {} + mock_response.raise_for_status.side_effect = HTTPError(response=mock_response) + + with patch("requests.post") as mock_post: + mock_post.return_value = mock_response + + with pytest.raises(RetryError): + client._post("/test-endpoint", {"test": "data"}) + + assert mock_post.call_count == MAX_RETRIES + + @patch("time.sleep", return_value=None) + def test_put_retries_on_429(self, mock_sleep, client): + # Create a mock response that simulates a 429 status + mock_response = Mock(spec=Response) + mock_response.status_code = 429 + mock_response.headers = {} + mock_response.raise_for_status.side_effect = HTTPError(response=mock_response) + + with patch("requests.Session.put") as mock_put: + mock_put.return_value = mock_response + + with pytest.raises(RetryError): + client._put("/test-endpoint", {"test": "data"}) + + assert mock_put.call_count == MAX_RETRIES + + @patch("time.sleep", return_value=None) + def test_request_succeeds_after_retries(self, mock_sleep, client): + mock_429_response = Mock(spec=Response) + mock_429_response.status_code = 429 + mock_429_response.headers = {} + + mock_success_response = Mock(spec=Response) + mock_success_response.status_code = 200 + mock_success_response.json.return_value = {"success": True} + mock_success_response.headers = {} + + with patch("requests.Session.get") as mock_get: + mock_get.side_effect = [ + HTTPError(response=mock_429_response), + HTTPError(response=mock_429_response), + mock_success_response, + ] + + result = client._get("/test-endpoint") + + assert result == {"success": True} + assert mock_get.call_count == 3 + + def test_no_retry_on_other_errors(self, client): + mock_response = Mock(spec=Response) + mock_response.status_code = 404 + + with patch("requests.Session.get") as mock_get: + mock_get.side_effect = HTTPError(response=mock_response) + + with pytest.raises(HTTPError): + client._get("/test-endpoint") + + assert mock_get.call_count == 1 + + @patch("time.sleep", return_value=None) + def test_retry_respects_rate_limit_headers(self, mock_sleep, client): + mock_response = Mock(spec=Response) + mock_response.status_code = 429 + + with patch("requests.Session.get") as mock_get: + mock_get.side_effect = HTTPError(response=mock_response) + + with pytest.raises(RetryError): + client._get("/test-endpoint") + + assert mock_get.call_count == MAX_RETRIES + assert mock_sleep.called From ff66d5e49ab902c804c82ea5f812b8d78710c739 Mon Sep 17 00:00:00 2001 From: John Wilkie Date: Tue, 4 Feb 2025 12:16:23 +0000 Subject: [PATCH 2/3] Allow retry params to be configured via env vars --- README.md | 17 +++++++++++++++++ darwin/client.py | 6 +++--- 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 866ab7018..802af8648 100644 --- a/README.md +++ b/README.md @@ -50,6 +50,23 @@ To run test, first install the `test` extra package pip install darwin-py[test] ``` +### Configuration + +#### Retry Configuration + +The SDK includes a retry mechanism for handling API rate limits and transient failures. You can configure the retry behavior using the following environment variables: + +- `DARWIN_RETRY_INITIAL_WAIT`: Initial wait time in seconds between retries (default: 60) +- `DARWIN_RETRY_MAX_WAIT`: Maximum wait time in seconds between retries (default: 300) +- `DARWIN_RETRY_MAX_ATTEMPTS`: Maximum number of retry attempts (default: 10) + +Example configuration: +```bash +export DARWIN_RETRY_INITIAL_WAIT=30 +export DARWIN_RETRY_MAX_WAIT=120 +export DARWIN_RETRY_MAX_ATTEMPTS=5 +``` + ### Development See our development and QA environment installation recommendations [here](docs/DEV.md) diff --git a/darwin/client.py b/darwin/client.py index e3ef9411c..128d26f18 100644 --- a/darwin/client.py +++ b/darwin/client.py @@ -51,9 +51,9 @@ ) from darwin.utils.get_item_count import get_item_count -INITIAL_WAIT = 60 -MAX_WAIT = 300 -MAX_RETRIES = 10 +INITIAL_WAIT = int(os.getenv("DARWIN_RETRY_INITIAL_WAIT", "60")) +MAX_WAIT = int(os.getenv("DARWIN_RETRY_MAX_WAIT", "300")) +MAX_RETRIES = int(os.getenv("DARWIN_RETRY_MAX_ATTEMPTS", "10")) def log_rate_limit_exceeded(retry_state: RetryCallState): From 2649323333c353b05a60c2d08ec6d185715d6c9b Mon Sep 17 00:00:00 2001 From: John Wilkie Date: Wed, 5 Feb 2025 13:26:10 +0000 Subject: [PATCH 3/3] Retry for server errors --- README.md | 9 ++- darwin/client.py | 68 ++++++++++++++++++--- tests/darwin/client_test.py | 2 +- tests/darwin/dataset/upload_manager_test.py | 15 ++--- 4 files changed, 77 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index 802af8648..116f152e3 100644 --- a/README.md +++ b/README.md @@ -54,7 +54,7 @@ pip install darwin-py[test] #### Retry Configuration -The SDK includes a retry mechanism for handling API rate limits and transient failures. You can configure the retry behavior using the following environment variables: +The SDK includes a retry mechanism for handling API rate limits (429) and server errors (500, 502, 503, 504). You can configure the retry behavior using the following environment variables: - `DARWIN_RETRY_INITIAL_WAIT`: Initial wait time in seconds between retries (default: 60) - `DARWIN_RETRY_MAX_WAIT`: Maximum wait time in seconds between retries (default: 300) @@ -62,11 +62,18 @@ The SDK includes a retry mechanism for handling API rate limits and transient fa Example configuration: ```bash +# Configure shorter retry intervals and fewer attempts export DARWIN_RETRY_INITIAL_WAIT=30 export DARWIN_RETRY_MAX_WAIT=120 export DARWIN_RETRY_MAX_ATTEMPTS=5 ``` +The retry mechanism will automatically handle: +- Rate limiting (HTTP 429) +- Server errors (HTTP 500, 502, 503, 504) + +For each retry attempt, you'll see a message indicating the type of error and the wait time before the next attempt. + ### Development See our development and QA environment installation recommendations [here](docs/DEV.md) diff --git a/darwin/client.py b/darwin/client.py index 128d26f18..0b847d810 100644 --- a/darwin/client.py +++ b/darwin/client.py @@ -69,6 +69,58 @@ def retry_if_status_code_429(retry_state: RetryCallState): return False +def retry_if_status_code_429_or_5xx(retry_state: RetryCallState) -> bool: + """ + Determines if a request should be retried based on the response status code. + + Retries on: + - Rate limit (429) + - Server errors (500, 502, 503, 504) + + Parameters + ---------- + retry_state : RetryCallState + The current state of the retry mechanism + + Returns + ------- + bool + True if the request should be retried, False otherwise + """ + exception = retry_state.outcome.exception() + if isinstance(exception, HTTPError): + response: Response = exception.response + return response.status_code in { + 429, + 500, + 502, + 503, + 504, + } + return False + + +def log_retry_error(retry_state: RetryCallState) -> None: + """ + Logs information about why a request is being retried. + + Parameters + ---------- + retry_state : RetryCallState + The current state of the retry mechanism + """ + wait_time = retry_state.next_action.sleep + exception = retry_state.outcome.exception() + if isinstance(exception, HTTPError): + response: Response = exception.response + if response.status_code == 429: + print(f"Rate limit exceeded. Retrying in {wait_time:.2f} seconds...") + else: + print( + f"Server error {response.status_code}. Retrying in {wait_time:.2f} seconds..." + ) + + class Client: def __init__( self, @@ -766,8 +818,8 @@ def _get_headers( @retry( wait=wait_exponential_jitter(initial=INITIAL_WAIT, max=MAX_WAIT), stop=stop_after_attempt(MAX_RETRIES), - retry=retry_if_status_code_429, - before_sleep=log_rate_limit_exceeded, + retry=retry_if_status_code_429_or_5xx, + before_sleep=log_retry_error, ) def _get_raw_from_full_url( self, @@ -812,8 +864,8 @@ def _get( @retry( wait=wait_exponential_jitter(initial=INITIAL_WAIT, max=MAX_WAIT), stop=stop_after_attempt(MAX_RETRIES), - retry=retry_if_status_code_429, - before_sleep=log_rate_limit_exceeded, + retry=retry_if_status_code_429_or_5xx, + before_sleep=log_retry_error, ) def _put_raw( self, @@ -850,8 +902,8 @@ def _put( @retry( wait=wait_exponential_jitter(initial=INITIAL_WAIT, max=MAX_WAIT), stop=stop_after_attempt(MAX_RETRIES), - retry=retry_if_status_code_429, - before_sleep=log_rate_limit_exceeded, + retry=retry_if_status_code_429_or_5xx, + before_sleep=log_retry_error, ) def _post_raw( self, @@ -906,8 +958,8 @@ def _post( @retry( wait=wait_exponential_jitter(initial=INITIAL_WAIT, max=MAX_WAIT), stop=stop_after_attempt(MAX_RETRIES), - retry=retry_if_status_code_429, - before_sleep=log_rate_limit_exceeded, + retry=retry_if_status_code_429_or_5xx, + before_sleep=log_retry_error, ) def _delete( self, diff --git a/tests/darwin/client_test.py b/tests/darwin/client_test.py index 034febe1b..3bb3cc070 100644 --- a/tests/darwin/client_test.py +++ b/tests/darwin/client_test.py @@ -618,7 +618,7 @@ def test_post_retries_on_429(self, mock_sleep, client): @patch("time.sleep", return_value=None) def test_put_retries_on_429(self, mock_sleep, client): - # Create a mock response that simulates a 429 status + mock_response = Mock(spec=Response) mock_response.status_code = 429 mock_response.headers = {} diff --git a/tests/darwin/dataset/upload_manager_test.py b/tests/darwin/dataset/upload_manager_test.py index 7e01c4251..71a6f5319 100644 --- a/tests/darwin/dataset/upload_manager_test.py +++ b/tests/darwin/dataset/upload_manager_test.py @@ -6,7 +6,7 @@ import inspect -from darwin.client import Client +from darwin.client import Client, MAX_RETRIES from darwin.config import Config from darwin.dataset import RemoteDataset from darwin.dataset.identifier import DatasetIdentifier @@ -199,15 +199,16 @@ def test_error_count_is_correct_on_signature_request( responses.add(responses.GET, sign_upload_endpoint, status=500) local_file = LocalFile(local_path=Path("test.jpg")) - with patch.object(dataset, "fetch_remote_files", return_value=[]): + with patch.object(dataset, "fetch_remote_files", return_value=[]), patch( + "time.sleep", return_value=None + ): upload_handler = UploadHandler.build(dataset, [local_file]) - - upload_handler.upload() - for file_to_upload in upload_handler.progress: - file_to_upload() + upload_handler.upload() + for file_to_upload in upload_handler.progress: + file_to_upload() responses.assert_call_count(request_upload_endpoint, 1) - responses.assert_call_count(sign_upload_endpoint, 1) + responses.assert_call_count(sign_upload_endpoint, MAX_RETRIES) responses.assert_call_count(upload_to_s3_endpoint, 0) responses.assert_call_count(confirm_upload_endpoint, 0)