Skip to content

Commit

Permalink
refactor: put metric collection in helper methods (#5337)
Browse files Browse the repository at this point in the history
* refactor: put metric collection in helper methods

* refactor: rename methods

* refactor: rename methods

* fix: properly set result docs
  • Loading branch information
JohannesMessner committed Nov 1, 2022
1 parent bd80035 commit a9ef7bc
Showing 1 changed file with 73 additions and 61 deletions.
134 changes: 73 additions & 61 deletions jina/serve/runtimes/request_handlers/worker_request_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,25 +183,7 @@ def _metric_attributes(executor_endpoint, executor, runtime_name):
'runtime_name': runtime_name,
}

async def handle(
self, requests: List['DataRequest'], tracing_context: Optional['Context'] = None
) -> DataRequest:
"""Initialize private parameters and execute private loading functions.
:param requests: The messages to handle containing a DataRequest
:param tracing_context: Optional OpenTelemetry tracing context from the originating request.
:returns: the processed message
"""
# skip executor if endpoints mismatch
if (
requests[0].header.exec_endpoint not in self._executor.requests
and __default_endpoint__ not in self._executor.requests
):
self.logger.debug(
f'skip executor: mismatch request, exec_endpoint: {requests[0].header.exec_endpoint}, requests: {self._executor.requests}'
)
return requests[0]

def _record_request_size_monitoring(self, requests):
for req in requests:
if self._request_size_metrics:
self._request_size_metrics.labels(
Expand All @@ -217,44 +199,7 @@ async def handle(
)
self._request_size_histogram.record(req.nbytes, attributes=attributes)

params = self._parse_params(requests[0].parameters, self._executor.metas.name)

docs = WorkerRequestHandler.get_docs_from_request(
requests,
field='docs',
)

# executor logic
return_data = await self._executor.__acall__(
req_endpoint=requests[0].header.exec_endpoint,
docs=docs,
parameters=params,
docs_matrix=WorkerRequestHandler.get_docs_matrix_from_request(
requests,
field='docs',
),
tracing_context=tracing_context,
)
# assigning result back to request
if return_data is not None:
if isinstance(return_data, DocumentArray):
docs = return_data
elif isinstance(return_data, dict):
params = requests[0].parameters
results_key = self._KEY_RESULT

if not results_key in params.keys():
params[results_key] = dict()

params[results_key].update({self.args.name: return_data})
requests[0].parameters = params

else:
raise TypeError(
f'The return type must be DocumentArray / Dict / `None`, '
f'but getting {return_data!r}'
)

def _record_docs_processed_monitoring(self, requests, docs):
if self._document_processed_metrics:
self._document_processed_metrics.labels(
requests[0].header.exec_endpoint,
Expand All @@ -269,10 +214,7 @@ async def handle(
)
self._document_processed_counter.add(len(docs), attributes=attributes)

WorkerRequestHandler.replace_docs(
requests[0], docs, self.args.output_array_type
)

def _record_response_size_monitoring(self, requests):
if self._sent_response_size_metrics:
self._sent_response_size_metrics.labels(
requests[0].header.exec_endpoint,
Expand All @@ -289,6 +231,76 @@ async def handle(
requests[0].nbytes, attributes=attributes
)

def _set_result(self, requests, return_data, docs):
# assigning result back to request
if return_data is not None:
if isinstance(return_data, DocumentArray):
docs = return_data
elif isinstance(return_data, dict):
params = requests[0].parameters
results_key = self._KEY_RESULT

if not results_key in params.keys():
params[results_key] = dict()

params[results_key].update({self.args.name: return_data})
requests[0].parameters = params

else:
raise TypeError(
f'The return type must be DocumentArray / Dict / `None`, '
f'but getting {return_data!r}'
)

WorkerRequestHandler.replace_docs(
requests[0], docs, self.args.output_array_type
)
return docs

async def handle(
self, requests: List['DataRequest'], tracing_context: Optional['Context'] = None
) -> DataRequest:
"""Initialize private parameters and execute private loading functions.
:param requests: The messages to handle containing a DataRequest
:param tracing_context: Optional OpenTelemetry tracing context from the originating request.
:returns: the processed message
"""
# skip executor if endpoints mismatch
if (
requests[0].header.exec_endpoint not in self._executor.requests
and __default_endpoint__ not in self._executor.requests
):
self.logger.debug(
f'skip executor: mismatch request, exec_endpoint: {requests[0].header.exec_endpoint}, requests: {self._executor.requests}'
)
return requests[0]

self._record_request_size_monitoring(requests)

params = self._parse_params(requests[0].parameters, self._executor.metas.name)
docs = WorkerRequestHandler.get_docs_from_request(
requests,
field='docs',
)

# executor logic
return_data = await self._executor.__acall__(
req_endpoint=requests[0].header.exec_endpoint,
docs=docs,
parameters=params,
docs_matrix=WorkerRequestHandler.get_docs_matrix_from_request(
requests,
field='docs',
),
tracing_context=tracing_context,
)

docs = self._set_result(requests, return_data, docs)

self._record_docs_processed_monitoring(requests, docs)
self._record_response_size_monitoring(requests)

return requests[0]

@staticmethod
Expand Down

0 comments on commit a9ef7bc

Please sign in to comment.