Skip to content

Commit

Permalink
リトライ処理の修正 (#359)
Browse files Browse the repository at this point in the history
* update

* upload

* updat test

* format

* update test

* version up
  • Loading branch information
yuji38kwmt committed Sep 14, 2021
1 parent 72954ed commit a4e44e1
Show file tree
Hide file tree
Showing 7 changed files with 190 additions and 78 deletions.
2 changes: 1 addition & 1 deletion annofabapi/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.47.1"
__version__ = "0.47.2"
56 changes: 1 addition & 55 deletions annofabapi/api.py
Original file line number Diff line number Diff line change
@@ -1,74 +1,20 @@
import functools
import json
import logging
from typing import Any, Dict, Optional, Tuple

import backoff
import requests
from requests.auth import AuthBase
from requests.cookies import RequestsCookieJar

from annofabapi.generated_api import AbstractAnnofabApi
from annofabapi.utils import _log_error_response, _raise_for_status
from annofabapi.utils import _log_error_response, _raise_for_status, my_backoff

logger = logging.getLogger(__name__)

DEFAULT_ENDPOINT_URL = "https://annofab.com"
"""AnnoFab WebAPIのデフォルトのエンドポイントURL"""


def my_backoff(function):
"""
HTTP Status Codeが429 or 5XXのときはリトライする. 最大5分間リトライする。
"""

@functools.wraps(function)
def wrapped(*args, **kwargs):
def fatal_code(e):
"""
リトライするかどうか
status codeが5xxのとき、またはToo many Requests(429)のときはリトライする。429以外の4XXはリトライしない
https://requests.kennethreitz.org/en/master/user/quickstart/#errors-and-exceptions
Args:
e: exception
Returns:
True: give up(リトライしない), False: リトライする
"""
if isinstance(e, requests.exceptions.HTTPError):
if e.response is None:
return True
code = e.response.status_code
return 400 <= code < 500 and code != 429

elif isinstance(
e,
(
requests.exceptions.TooManyRedirects,
requests.exceptions.Timeout,
requests.exceptions.ConnectionError,
ConnectionError,
),
):
return False

else:
# リトライする
return False

return backoff.on_exception(
backoff.expo,
requests.exceptions.RequestException,
jitter=backoff.full_jitter,
max_time=300,
giveup=fatal_code,
)(function)(*args, **kwargs)

return wrapped


class AnnofabApi(AbstractAnnofabApi):
"""
Web APIに対応したメソッドが存在するクラス。
Expand Down
53 changes: 53 additions & 0 deletions annofabapi/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from pathlib import Path
from typing import Any, Dict, List, Optional, Union

import backoff
import dateutil
import dateutil.tz
import requests
Expand Down Expand Up @@ -260,6 +261,58 @@ def can_put_annotation(task: Task, my_account_id: str) -> bool:
#########################################


def my_backoff(function):
"""
HTTP Status Codeが429 or 5XXのときはリトライする. 最大5分間リトライする。
"""

@wraps(function)
def wrapped(*args, **kwargs):
def fatal_code(e):
"""
リトライするかどうか
status codeが5xxのとき、またはToo many Requests(429)のときはリトライする。429以外の4XXはリトライしない
https://requests.kennethreitz.org/en/master/user/quickstart/#errors-and-exceptions
Args:
e: exception
Returns:
True: give up(リトライしない), False: リトライする
"""
if isinstance(e, requests.exceptions.HTTPError):
if e.response is None:
return True
code = e.response.status_code
return 400 <= code < 500 and code != 429

elif isinstance(
e,
(
requests.exceptions.TooManyRedirects,
requests.exceptions.Timeout,
requests.exceptions.ConnectionError,
ConnectionError,
),
):
return False

else:
# リトライする
return False

return backoff.on_exception(
backoff.expo,
(requests.exceptions.RequestException, ConnectionError),
jitter=backoff.full_jitter,
max_time=300,
giveup=fatal_code,
)(function)(*args, **kwargs)

return wrapped


def ignore_http_error(status_code_list: List[int]):
"""
HTTPErrorが発生したとき、特定のstatus codeを無視して処理するデコレータ。
Expand Down
37 changes: 31 additions & 6 deletions annofabapi/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import requests

from annofabapi import AnnofabApi
from annofabapi.api import my_backoff
from annofabapi.exceptions import AnnofabApiException
from annofabapi.models import (
AdditionalData,
Expand Down Expand Up @@ -46,7 +45,7 @@
TaskStatus,
)
from annofabapi.parser import SimpleAnnotationDirParser, SimpleAnnotationParser
from annofabapi.utils import _download, _log_error_response, _raise_for_status, allow_404_error, str_now
from annofabapi.utils import _download, _log_error_response, _raise_for_status, allow_404_error, my_backoff, str_now

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -216,6 +215,28 @@ def _get_all_objects(func_get_list: Callable, limit: int, **kwargs_for_func_get_

return all_objects

@my_backoff
def _request_get_wrapper(self, url: str) -> requests.Response:
"""
HTTP GETのリクエスト。
リトライするためにメソッドを切り出した。
"""
return self.api.session.get(url)

@my_backoff
def _request_put_wrapper(
self,
url: str,
params: Optional[Dict[str, Any]] = None,
data: Optional[Any] = None,
headers: Optional[Dict[str, Any]] = None,
) -> requests.Response:
"""
HTTP PUTのリクエスト。
リトライするためにメソッドを切り出した
"""
return self.api.session.put(url, params=params, data=data, headers=headers)

#########################################
# Public Method : Annotation
#########################################
Expand Down Expand Up @@ -336,7 +357,7 @@ def __to_dest_annotation_detail(
dest_detail["account_id"] = account_id
if detail["data_holding_type"] == AnnotationDataHoldingType.OUTER.value:
outer_file_url = detail["url"]
src_response = self.api.session.get(outer_file_url)
src_response = self._request_get_wrapper(outer_file_url)
s3_path = self.upload_data_to_s3(
dest_project_id, data=src_response.content, content_type=src_response.headers["Content-Type"]
)
Expand Down Expand Up @@ -873,7 +894,9 @@ def upload_data_to_s3(self, project_id: str, data: Any, content_type: str) -> st
s3_url = content["url"].split("?")[0]

# アップロード
res_put = self.api.session.put(s3_url, params=query_dict, data=data, headers={"content-type": content_type})
res_put = self._request_put_wrapper(
url=s3_url, params=query_dict, data=data, headers={"content-type": content_type}
)

_log_error_response(logger, res_put)
_raise_for_status(res_put)
Expand Down Expand Up @@ -935,7 +958,7 @@ def _request_location_header_url(self, response: requests.Response) -> Optional[
logger.warning(f"レスポンスヘッダに'Location'がありません。method={response.request.method}, url={response.request.url}")
return None

response = self.api.session.get(url)
response = self._request_get_wrapper(url)
_log_error_response(logger, response)

response.encoding = "utf-8"
Expand Down Expand Up @@ -1720,7 +1743,9 @@ def upload_data_as_instruction_image(self, project_id: str, image_id: str, data:
s3_url = content["url"].split("?")[0]

# アップロード
res_put = self.api.session.put(s3_url, params=query_dict, data=data, headers={"content-type": content_type})
res_put = self._request_put_wrapper(
url=s3_url, params=query_dict, data=data, headers={"content-type": content_type}
)
_log_error_response(logger, res_put)
_raise_for_status(res_put)
return content["path"]
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "annofabapi"
version = "0.47.1"
version = "0.47.2"
description = "Python Clinet Library of AnnoFab WebAPI (https://annofab.com/docs/api/)"
authors = ["yuji38kwmt"]
license = "MIT"
Expand Down
26 changes: 16 additions & 10 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import annofabapi
import annofabapi.utils
from annofabapi.models import GraphType, ProjectJobType
from annofabapi.wrapper import TaskFrameKey
from tests.utils_for_test import WrapperForTest, create_csv_for_task

# プロジェクトトップに移動する
Expand Down Expand Up @@ -77,6 +78,12 @@ def test_post_annotation_archive_update(self):
job = content["job"]
assert job["job_type"] == ProjectJobType.GEN_ANNOTATION.value

def test_wrapper_copy_annotation(self):
src = TaskFrameKey(project_id, task_id, self.input_data_id)
dest = TaskFrameKey(project_id, task_id, self.input_data_id)
result = wrapper.copy_annotation(src, dest)
assert result == True


class TestAnnotationSpecs:
def test_get_annotation_specs(self):
Expand Down Expand Up @@ -107,8 +114,12 @@ def test_get_annotation_specs_relation(self):
class TestComment:
@classmethod
def setup_class(cls):
wrapper.change_task_operator(project_id, task_id, operator_account_id=api.account_id)
cls.task = wrapper.change_task_status_to_working(project_id, task_id)
task, _ = api.get_task(project_id, task_id)
if task["account_id"] != api.account_id:
task = wrapper.change_task_operator(project_id, task_id, operator_account_id=api.account_id)
if task["status"] != "working":
task = wrapper.change_task_status_to_working(project_id, task_id)
cls.task = task

def test_put_get_delete_comment(self):
task = self.task
Expand Down Expand Up @@ -480,14 +491,9 @@ def test_assign_task(self):
request_body = {"request_type": {"phase": "annotation", "_type": "Random"}}
assert type(api.assign_tasks(project_id, request_body=request_body)[0]) == list

def test_operate_task(self):
task, _ = api.get_task(project_id, task_id)
request_body = {
"status": "not_started",
"last_updated_datetime": task["updated_datetime"],
"account_id": api.account_id,
}
assert type(api.operate_task(project_id, task_id, request_body=request_body)[0]) == dict
def test_operate_task_in_change_task_status_to_break(self):
task = wrapper.change_task_status_to_break(project_id, task_id)
assert task["status"] == "break"

def test_get_task_histories(self):
assert len(api.get_task_histories(project_id, task_id)[0]) > 0
Expand Down
92 changes: 87 additions & 5 deletions tests/test_local_utils.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
# timezonが異なる場所だとテストに失敗するので、コメントアウトする
# def test_to_iso8601_extension():
# d = datetime.datetime(2019, 10, 8, 16, 20, 8, 241762)
# tz_jst = datetime.timezone(datetime.timedelta(hours=9))
# assert to_iso8601_extension(d, tz_jst) == "2019-10-08T16:20:08.241+09:00"
import pytest
import requests

from annofabapi.models import TaskPhase
from annofabapi.utils import (
get_number_of_rejections,
get_task_history_index_skipped_acceptance,
get_task_history_index_skipped_inspection,
my_backoff,
)


Expand Down Expand Up @@ -618,3 +616,87 @@ def test_get_task_history_index_skipped_inspection_検査1回_教師付で提出
actual = get_task_history_index_skipped_inspection(task_history_list)
expected = []
assert all([a == b for a, b in zip(actual, expected)])


class TestMyBackoff:
@my_backoff
def requestexception_connectionerror_then_true(self, log):
if len(log) == 2:
return True

if len(log) == 0:
e = requests.exceptions.RequestException()
elif len(log) == 1:
e = ConnectionError()
log.append(e)
raise e

def test_assert_retry(self):
log = []
assert self.requestexception_connectionerror_then_true(log) is True
assert 2 == len(log)
print(log)
assert type(log[0]) == requests.exceptions.RequestException
assert type(log[1]) == ConnectionError

@my_backoff
def chunkedencodingerror_requestsconnectionerror_then_true(self, log):
if len(log) == 2:
return True
if len(log) == 0:
e = requests.exceptions.ChunkedEncodingError()
log.append(e)
raise e
elif len(log) == 1:
e = requests.exceptions.ConnectionError()
log.append(e)
raise e

def test_assert_retry2(self):
log = []
assert self.chunkedencodingerror_requestsconnectionerror_then_true(log) is True
assert 2 == len(log)
print(log)
assert type(log[0]) == requests.exceptions.ChunkedEncodingError
assert type(log[1]) == requests.exceptions.ConnectionError

@my_backoff
def httperror_then_true(self, log):
if len(log) == 2:
return True
response = requests.Response()
if len(log) == 0:
response.status_code = 429
e = requests.exceptions.HTTPError(response=response)
elif len(log) == 1:
response.status_code = 500
e = requests.exceptions.HTTPError(response=response)
log.append(e)
raise e

def test_assert_retry_with_httperror(self):
log = []
assert self.httperror_then_true(log) is True
assert 2 == len(log)
print(log)
assert type(log[0]) == requests.exceptions.HTTPError
assert log[0].response.status_code == 429
assert type(log[1]) == requests.exceptions.HTTPError
assert log[1].response.status_code == 500

@my_backoff
def httperror_with_400(self, log):
if len(log) == 1:
return True
response = requests.Response()
if len(log) == 0:
response.status_code = 400
e = requests.exceptions.HTTPError(response=response)
log.append(e)
raise e

def test_assert_not_retry(self):
log = []
with pytest.raises(requests.exceptions.HTTPError):
self.httperror_with_400(log)
assert 1 == len(log)

0 comments on commit a4e44e1

Please sign in to comment.