Skip to content

Commit

Permalink
refactor: refactor data request handler monitoring (#5088)
Browse files Browse the repository at this point in the history
* refactor: improve request handler monitoring code

* refactor: precise receiving request metrics scope

* refactor: add mixin for monitoring

* fix: apply johannes suggestion on init and super
  • Loading branch information
samsja committed Aug 26, 2022
1 parent c0c3bf9 commit 6e51e81
Showing 1 changed file with 43 additions and 16 deletions.
59 changes: 43 additions & 16 deletions jina/serve/runtimes/gateway/request_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
from jina.types.request import Request


class RequestHandler:
class MonitoringRequestMixin:
"""
Class that handles the requests arriving to the gateway and the result extracted from the requests future.
Mixin for the request handling monitoring
:param metrics_registry: optional metrics registry for prometheus used if we need to expose metrics from the executor or from the data request handler
:param runtime_name: optional runtime_name that will be registered during monitoring
Expand All @@ -35,9 +35,8 @@ def __init__(
metrics_registry: Optional['CollectorRegistry'] = None,
runtime_name: Optional[str] = None,
):

self._request_init_time = {} if metrics_registry else None
self._executor_endpoint_mapping = None
self._gathering_endpoints = False

if metrics_registry:
with ImportExtensions(
Expand All @@ -48,7 +47,7 @@ def __init__(

self._receiving_request_metrics = Summary(
'receiving_request_seconds',
'Time spent processing request',
'Time spent processing successful request',
registry=metrics_registry,
namespace='jina',
labelnames=('runtime_name',),
Expand Down Expand Up @@ -90,25 +89,53 @@ def _update_start_request_metrics(self, request: 'Request'):
if self._pending_requests_metrics:
self._pending_requests_metrics.inc()

def _update_end_request_metrics(self, result: 'Request', exc: Exception = None):
if self._receiving_request_metrics:
def _update_end_successful_requests_metrics(self, result: 'Request'):
if (
self._receiving_request_metrics
): # this one should only be observed when the metrics is succesful
init_time = self._request_init_time.pop(
result.request_id
) # need to pop otherwise it stays in memory forever
self._receiving_request_metrics.observe(time.time() - init_time)

if self._pending_requests_metrics:
self._pending_requests_metrics.dec()
if (
exc or result.status.code == jina_pb2.StatusProto.ERROR
) and self._failed_requests_metrics:
self._failed_requests_metrics.inc()
if (
not (exc or result.status.code == jina_pb2.StatusProto.ERROR)
and self._successful_requests_metrics
):

if self._successful_requests_metrics:
self._successful_requests_metrics.inc()

def _update_end_failed_requests_metrics(self, result: 'Request'):
if self._pending_requests_metrics:
self._pending_requests_metrics.dec()

if self._failed_requests_metrics:
self._failed_requests_metrics.inc()

def _update_end_request_metrics(self, result: 'Request'):

if result.status.code != jina_pb2.StatusProto.ERROR:
self._update_end_successful_requests_metrics(result)
else:
self._update_end_failed_requests_metrics(result)


class RequestHandler(MonitoringRequestMixin):
"""
Class that handles the requests arriving to the gateway and the result extracted from the requests future.
:param metrics_registry: optional metrics registry for prometheus used if we need to expose metrics from the executor or from the data request handler
:param runtime_name: optional runtime_name that will be registered during monitoring
"""

def __init__(
self,
metrics_registry: Optional['CollectorRegistry'] = None,
runtime_name: Optional[str] = None,
):
super().__init__(metrics_registry, runtime_name)
self._executor_endpoint_mapping = None
self._gathering_endpoints = False

def handle_request(
self, graph: 'TopologyGraph', connection_pool: 'GrpcConnectionPool'
) -> Callable[['Request'], 'Tuple[Future, Optional[Future]]']:
Expand Down Expand Up @@ -215,7 +242,7 @@ async def _process_results_at_end_gateway(
partial_responses = await asyncio.gather(*tasks)
except Exception as e:
# update here failed request
self._update_end_request_metrics(request, exc=e)
self._update_end_failed_requests_metrics(request)
raise
partial_responses, metadatas = zip(*partial_responses)
filtered_partial_responses = list(
Expand Down

0 comments on commit 6e51e81

Please sign in to comment.