Skip to content

Commit

Permalink
[Serving] Handle timeouts and retries in remote step (#1476)
Browse files Browse the repository at this point in the history
  • Loading branch information
yaronha committed Nov 11, 2021
1 parent 1a9f34a commit c946ca2
Show file tree
Hide file tree
Showing 8 changed files with 308 additions and 35 deletions.
7 changes: 5 additions & 2 deletions mlrun/datastore/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def _get_store(self):
def to_step(self, key_field=None, time_field=None, context=None):
import storey

return storey.SyncEmitSource()
return storey.SyncEmitSource(context=context)

def get_table_object(self):
"""get storey Table object"""
Expand Down Expand Up @@ -138,6 +138,7 @@ def to_step(self, key_field=None, time_field=None, context=None):
if context:
attributes["context"] = context
return storey.CSVSource(
context=context,
paths=self.path,
header=True,
build_dict=True,
Expand Down Expand Up @@ -228,6 +229,7 @@ def to_step(
if context:
attributes["context"] = context
return storey.ParquetSource(
context=context,
paths=self.path,
key_field=self.key_field or key_field,
time_field=self.time_field or time_field,
Expand Down Expand Up @@ -412,7 +414,7 @@ def to_step(self, key_field=None, time_field=None, context=None):
attributes = copy(self.attributes)
class_name = attributes.pop("class_name")
class_object = get_class(class_name)
return class_object(**attributes,)
return class_object(context=context, **attributes)


class DataFrameSource:
Expand Down Expand Up @@ -492,6 +494,7 @@ def to_step(self, key_field=None, time_field=None, context=None):
else storey.SyncEmitSource
)
return source_class(
context=context,
key_field=self.key_field or key_field,
time_field=self.time_field or time_field,
full_event=True,
Expand Down
2 changes: 0 additions & 2 deletions mlrun/feature_store/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

from mlrun.serving.utils import StepToDict

this_path = "mlrun.feature_store.steps"


class FeaturesetValidator(StepToDict, MapClass):
"""Validate feature values according to the feature set validation policy"""
Expand Down
148 changes: 124 additions & 24 deletions mlrun/serving/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,35 @@
from urllib3.util.retry import Retry

import mlrun
from mlrun.utils import logger

from .utils import (
StepToDict,
_extract_input_data,
_update_result_body,
event_id_key,
event_path_key,
)

http_adapter = HTTPAdapter(
max_retries=Retry(total=3, backoff_factor=1, status_forcelist=[500, 502, 503, 504])
)
default_retries = 6
default_backoff_factor = 1


def get_http_adapter(retries, backoff_factor):
if retries != 0:
retry = Retry(
total=retries or default_retries,
backoff_factor=default_backoff_factor
if backoff_factor is None
else backoff_factor,
status_forcelist=[500, 502, 503, 504],
method_whitelist=False,
)
else:
retry = Retry(0, read=False)
return HTTPAdapter(max_retries=retry)

class RemoteStep(StepToDict, storey.SendToHttp):

class RemoteStep(storey.SendToHttp):
"""class for calling remote endpoints
"""

Expand All @@ -38,6 +52,9 @@ def __init__(
return_json: bool = True,
input_path: str = None,
result_path: str = None,
retries=None,
backoff_factor=None,
timeout=None,
**kwargs,
):
"""class for calling remote endpoints
Expand Down Expand Up @@ -68,24 +85,36 @@ def __init__(
this require that the event body will behave like a dict, example:
event: {"x": 5} , result_path="resp" means the returned response will be written
to event["y"] resulting in {"x": 5, "resp": <result>}
:param retries: number of retries (in exponential backoff)
:param backoff_factor: A backoff factor in secounds to apply between attempts after the second try
:param timeout: How long to wait for the server to send data before giving up, float in seconds
"""
# init retry args for storey
retries = default_retries if retries is None else retries
super().__init__(
None,
None,
input_path=input_path,
result_path=result_path,
retries=retries,
backoff_factor=backoff_factor,
**kwargs,
)
self.url = url
self.url_expression = url_expression
self.body_expression = body_expression
self.headers = headers
self.method = method
self.return_json = return_json
self.subpath = subpath
super().__init__(
None, None, input_path=input_path, result_path=result_path, **kwargs
)

self.timeout = timeout

self._append_event_path = False
self._endpoint = ""
self._session = None
self._url_function_handler = None
self._body_function_handler = None
self._full_event = False

def post_init(self, mode="sync"):
self._endpoint = self.url
Expand All @@ -110,9 +139,20 @@ async def _process_event(self, event):
# async implementation (with storey)
body = self._get_event_or_body(event)
method, url, headers, body = self._generate_request(event, body)
return await self._client_session.request(
method, url, headers=headers, data=body, ssl=False
)
kwargs = {}
if self.timeout:
kwargs["timeout"] = aiohttp.ClientTimeout(total=self.timeout)
try:
resp = await self._client_session.request(
method, url, headers=headers, data=body, ssl=False, **kwargs
)
if resp.status >= 500:
text = await resp.text()
raise RuntimeError(f"bad http response {resp.status}: {text}")
return resp
except asyncio.TimeoutError as exc:
logger.error(f"http request to {url} timed out in RemoteStep {self.name}")
raise exc

async def _handle_completed(self, event, response):
response_body = await response.read()
Expand All @@ -124,25 +164,34 @@ async def _handle_completed(self, event, response):

body = self._get_data(response_body, response.headers)

if body is not None:
new_event = self._user_fn_output_to_event(event, body)
await self._do_downstream(new_event)
new_event = self._user_fn_output_to_event(event, body)
await self._do_downstream(new_event)

def do_event(self, event):
# sync implementation (without storey)
if not self._session:
self._session = requests.Session()
http_adapter = get_http_adapter(self.retries, self.backoff_factor)
self._session.mount("http://", http_adapter)
self._session.mount("https://", http_adapter)

body = _extract_input_data(self._input_path, event.body)
method, url, headers, body = self._generate_request(event, body)
try:
resp = self._session.request(
method, url, verify=False, headers=headers, data=body
method,
url,
verify=False,
headers=headers,
data=body,
timeout=self.timeout,
)
except requests.exceptions.ReadTimeout as err:
raise requests.exceptions.ReadTimeout(
f"http request to {url} timed out in RemoteStep {self.name}, {err}"
)
except OSError as err:
raise OSError(f"error: cannot invoke url: {url}, {err}")
raise OSError(f"cannot invoke url: {url}, {err}")
if not resp.ok:
raise RuntimeError(f"bad http response {resp.status_code}: {resp.text}")

Expand Down Expand Up @@ -184,7 +233,7 @@ def _get_data(self, data, headers):
return data


class BatchHttpRequests(StepToDict, _ConcurrentJobExecution):
class BatchHttpRequests(_ConcurrentJobExecution):
"""class for calling remote endpoints in parallel
"""

Expand All @@ -199,6 +248,9 @@ def __init__(
return_json: bool = True,
input_path: str = None,
result_path: str = None,
retries=None,
backoff_factor=None,
timeout=None,
**kwargs,
):
"""class for calling remote endpoints in parallel
Expand All @@ -209,16 +261,22 @@ def __init__(
example pipeline::
function = mlrun.new_function("myfunc", kind="serving")
flow = function.set_topology("flow", engine="async")
flow.to("BatchRemoteStep",
flow.to(
BatchHttpRequests(
url_expression="event['url']",
body_expression="event['data']",
method="POST",
input_path="req",
result_path="resp",
).respond()
)
).respond()
server = function.to_mock_server()
resp = server.test(body={"req": [{"url": url, "data": data}]})
# request contains a list of elements, each with url and data
request = [{"url": f"{base_url}/{i}", "data": i} for i in range(2)]
resp = server.test(body={"req": request})
:param url: http(s) url or function [project/]name to call
Expand All @@ -235,6 +293,9 @@ def __init__(
this require that the event body will behave like a dict, example:
event: {"x": 5} , result_path="resp" means the returned response will be written
to event["y"] resulting in {"x": 5, "resp": <result>}
:param retries: number of retries (in exponential backoff)
:param backoff_factor: A backoff factor in secounds to apply between attempts after the second try
:param timeout: How long to wait for the server to send data before giving up, float in seconds
"""
if url and url_expression:
raise mlrun.errors.MLRunInvalidArgumentError(
Expand All @@ -246,14 +307,18 @@ def __init__(
self.headers = headers
self.method = method
self.return_json = return_json
self.subpath = subpath or ""
self.subpath = subpath
super().__init__(input_path=input_path, result_path=result_path, **kwargs)

self.timeout = timeout
self.retries = retries
self.backoff_factor = backoff_factor
self._append_event_path = False
self._endpoint = ""
self._session = None
self._url_function_handler = None
self._body_function_handler = None
self._request_args = {}

def _init(self):
super()._init()
Expand Down Expand Up @@ -282,6 +347,8 @@ def post_init(self, mode="sync"):
)
elif self.subpath:
self._endpoint = self._endpoint + "/" + self.subpath.lstrip("/")
if self.timeout:
self._request_args["timeout"] = aiohttp.ClientTimeout(total=self.timeout)

async def _process_event(self, event):
# async implementation (with storey)
Expand Down Expand Up @@ -314,16 +381,49 @@ async def _process_event(self, event):
responses = []
for url, body in zip(url_list, body_list):
responses.append(
asyncio.ensure_future(self._submit(method, url, headers, body))
asyncio.ensure_future(
self._submit_with_retries(method, url, headers, body)
)
)
return await asyncio.gather(*responses)

async def _process_event_with_retries(self, event):
return await self._process_event(event)

async def _submit(self, method, url, headers, body):
async with self._client_session.request(
method, url, headers=headers, data=body, ssl=False
method, url, headers=headers, data=body, ssl=False, **self._request_args
) as future:
if future.status >= 500:
text = await future.text()
raise RuntimeError(f"bad http response {future.status}: {text}")
return await future.read(), future.headers

async def _submit_with_retries(self, method, url, headers, body):
times_attempted = 0
max_attempts = (self.retries or default_retries) + 1
while True:
try:
return await self._submit(method, url, headers, body)
except Exception as ex:
times_attempted += 1
attempts_left = max_attempts - times_attempted
if self.logger:
self.logger.warn(
f"{self.name} failed to process event ({attempts_left} retries left): {ex}"
)
if attempts_left <= 0:
raise ex
backoff_factor = (
default_backoff_factor
if self.backoff_factor is None
else self.backoff_factor
)
backoff_value = (backoff_factor) * (2 ** (times_attempted - 1))
backoff_value = min(self._BACKOFF_MAX, backoff_value)
if backoff_value >= 0:
await asyncio.sleep(backoff_value)

async def _handle_completed(self, event, response):
data = []
for body, headers in response:
Expand Down
6 changes: 3 additions & 3 deletions mlrun/serving/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ def run(self, event, context=None, get_body=False, extra_args=None):
try:
response = self.graph.run(event, **(extra_args or {}))
except Exception as exc:
message = str(exc)
message = f"{exc.__class__.__name__}: {exc}"
if server_context.verbose:
message += "\n" + str(traceback.format_exc())
context.logger.error(f"run error, {traceback.format_exc()}")
Expand Down Expand Up @@ -434,11 +434,11 @@ def get_secret(self, key: str):
return self._server._secrets.get(key)
return None

def get_remote_endpoint(self, name, external=False):
def get_remote_endpoint(self, name, external=True):
"""return the remote nuclio/serving function http(s) endpoint given its name
:param name: the function name/uri in the form [project/]function-name[:tag]
:param external: return the external url (returns the in-cluster url by default)
:param external: return the external url (returns the external url by default)
"""
if "://" in name:
return name
Expand Down
3 changes: 2 additions & 1 deletion mlrun/serving/states.py
Original file line number Diff line number Diff line change
Expand Up @@ -1401,5 +1401,6 @@ def _init_async_objects(context, steps):
step.async_object.to(storey.Complete(full_event=True))
wait_for_result = True

default_source = storey.SyncEmitSource()
source_args = context.get_param("source_args", {})
default_source = storey.SyncEmitSource(context=context, **source_args)
return default_source, wait_for_result
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ fsspec~=2021.8.1
v3iofs~=0.1.7
# 3.4 and above failed builidng in some images - see https://github.com/pyca/cryptography/issues/5771
cryptography~=3.0, <3.4
storey~=0.8.7; python_version >= '3.7'
storey~=0.8.8; python_version >= '3.7'
deepdiff~=5.0
pymysql~=1.0
inflection~=0.5.0

0 comments on commit c946ca2

Please sign in to comment.