Skip to content

Commit

Permalink
feat: add additional metrics (#4789)
Browse files Browse the repository at this point in the history
* feat: add request size metrics

* feat: add pending requests metrics
  • Loading branch information
samsja committed May 19, 2022
1 parent 3fb3b41 commit a9ed27e
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 15 deletions.
13 changes: 9 additions & 4 deletions docs/fundamentals/flow/monitoring-flow.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,12 @@ Because not all Pods have the same role, they expose different kinds of metrics:

### Gateway Pods

| Metrics name | Metrics type | Description |
|------------------------------------|----------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `jina_receiving_request_seconds` | [Summary](https://prometheus.io/docs/concepts/metric_types/#summary) | Measures the time elapsed between receiving a request from the client and sending back the response. |
| `jina_sending_request_seconds` | [Summary](https://prometheus.io/docs/concepts/metric_types/#summary) | Measures the time elapsed between sending a downstream request to an Executor/Head and receiving the response back. |
| Metrics name | Metrics type | Description |
|-------------------------------------|----------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------|
| `jina_receiving_request_seconds` | [Summary](https://prometheus.io/docs/concepts/metric_types/#summary) | Measures the time elapsed between receiving a request from the client and sending back the response. |
| `jina_sending_request_seconds` | [Summary](https://prometheus.io/docs/concepts/metric_types/#summary) | Measures the time elapsed between sending a downstream request to an Executor/Head and receiving the response back. |
| `jina_number_of_pending_requests` | [Gauge](https://prometheus.io/docs/concepts/metric_types/#gauge) | Count the number of pending requests |


```{seealso}
You can find more information on the different type of metrics in Prometheus [here](https://prometheus.io/docs/concepts/metric_types/#metric-types)
Expand All @@ -123,6 +125,9 @@ You can find more information on the different type of metrics in Prometheus [he
| `jina_receiving_request_seconds` | [Summary](https://prometheus.io/docs/concepts/metric_types/#summary) | Measure the time elapsed between receiving a request from the gateway (or the head) and sending back the response. |
| `jina_process_request_seconds` | [Summary](https://prometheus.io/docs/concepts/metric_types/#summary) | Measure the time spend calling the requested method |
| `jina_document_processed_total` | [Counter](https://prometheus.io/docs/concepts/metric_types/#counter) | Counts the number of Documents processed by an Executor |
| `request_size_bytes` | [Summary](https://prometheus.io/docs/concepts/metric_types/#summary) | Measures the size of the requests in Bytes



```{seealso}
Beyond monitoring every endpoint of an Executor you can define {ref}`custom metrics <monitoring-executor>`for you
Expand Down
34 changes: 24 additions & 10 deletions jina/serve/runtimes/gateway/request_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,26 +30,35 @@ def __init__(
metrics_registry: Optional['CollectorRegistry'] = None,
runtime_name: Optional[str] = None,
):
self.request_init_time = {} if metrics_registry else None
self._request_init_time = {} if metrics_registry else None
self._executor_endpoint_mapping = None

if metrics_registry:
with ImportExtensions(
required=True,
help_text='You need to install the `prometheus_client` to use the montitoring functionality of jina',
):
from prometheus_client import Summary
from prometheus_client import Gauge, Summary

self._summary = Summary(
self._receiving_request_metrics = Summary(
'receiving_request_seconds',
'Time spent processing request',
registry=metrics_registry,
namespace='jina',
labelnames=('runtime_name',),
).labels(runtime_name)

self._pending_requests_metrics = Gauge(
'number_of_pending_requests',
'Number of pending requests',
registry=metrics_registry,
namespace='jina',
labelnames=('runtime_name',),
).labels(runtime_name)

else:
self._summary = None
self._receiving_request_metrics = None
self._pending_requests_metrics = None

def handle_request(
self, graph: 'TopologyGraph', connection_pool: 'GrpcConnectionPool'
Expand Down Expand Up @@ -100,8 +109,10 @@ def _get_all_nodes(node, accum, accum_names):
self._executor_endpoint_mapping[node.name] = endp.endpoints

def _handle_request(request: 'Request') -> 'asyncio.Future':
if self._summary:
self.request_init_time[request.request_id] = time.time()
if self._receiving_request_metrics:
self._request_init_time[request.request_id] = time.time()
if self._pending_requests_metrics:
self._pending_requests_metrics.inc()
# important that the gateway needs to have an instance of the graph per request
request_graph = copy.deepcopy(graph)

Expand Down Expand Up @@ -194,11 +205,14 @@ def _handle_result(result: 'Request'):
if route.executor == 'gateway':
route.end_time.GetCurrentTime()

if self._summary:
init_time = self.request_init_time.pop(
if self._receiving_request_metrics:
init_time = self._request_init_time.pop(
result.request_id
) # need to pop otherwise it stay in memory for ever
self._summary.observe(time.time() - init_time)
) # need to pop otherwise it stays in memory forever
self._receiving_request_metrics.observe(time.time() - init_time)

if self._pending_requests_metrics:
self._pending_requests_metrics.dec()

return result

Expand Down
20 changes: 19 additions & 1 deletion jina/serve/runtimes/request_handlers/data_request_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def _init_monitoring(self, metrics_registry: Optional['CollectorRegistry'] = Non
required=True,
help_text='You need to install the `prometheus_client` to use the montitoring functionality of jina',
):
from prometheus_client import Counter
from prometheus_client import Counter, Summary

self._counter = Counter(
'document_processed',
Expand All @@ -58,8 +58,17 @@ def _init_monitoring(self, metrics_registry: Optional['CollectorRegistry'] = Non
labelnames=('executor_endpoint', 'executor', 'runtime_name'),
registry=metrics_registry,
)

self._request_size_metrics = Summary(
'request_size_bytes',
'The request size in Bytes',
namespace='jina',
labelnames=('executor_endpoint', 'executor', 'runtime_name'),
registry=metrics_registry,
)
else:
self._counter = None
self._request_size_metrics = None

def _load_executor(self, metrics_registry: Optional['CollectorRegistry'] = None):
"""
Expand Down Expand Up @@ -121,6 +130,15 @@ async def handle(self, requests: List['DataRequest']) -> DataRequest:
)
return requests[0]

if self._request_size_metrics:

for req in requests:
self._request_size_metrics.labels(
requests[0].header.exec_endpoint,
self._executor.__class__.__name__,
self.args.name,
).observe(req.nbytes)

params = self._parse_params(requests[0].parameters, self._executor.metas.name)
docs = DataRequestHandler.get_docs_from_request(
requests,
Expand Down
98 changes: 98 additions & 0 deletions tests/integration/monitoring/test_monitoring.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import multiprocessing
import time

import pytest
Expand Down Expand Up @@ -152,3 +153,100 @@ def test_disable_monitoring_on_gatway_only(port_generator, executor):

resp = req.get(f'http://localhost:{port1}/') # enable on port0
assert resp.status_code == 200


def test_requests_size(port_generator, executor):
port0 = port_generator()
port1 = port_generator()

with Flow(monitoring=True, port_monitoring=port0).add(
uses=executor, port_monitoring=port1
) as f:

f.post('/foo', inputs=DocumentArray.empty(size=1))

resp = req.get(f'http://localhost:{port1}/') # enable on port0
assert resp.status_code == 200

assert (
f'jina_request_size_bytes_count{{executor="DummyExecutor",executor_endpoint="/foo",runtime_name="executor0/rep-0"}} 1.0'
in str(resp.content)
)

def _get_request_bytes_size():
resp = req.get(f'http://localhost:{port1}/') # enable on port0

resp_lines = str(resp.content).split('\\n')
byte_line = [
line
for line in resp_lines
if 'jina_request_size_bytes_sum{executor="DummyExecutor"' in line
]

return float(byte_line[0][-5:])

measured_request_bytes_sum_init = _get_request_bytes_size()
f.post('/foo', inputs=DocumentArray.empty(size=1))
measured_request_bytes_sum = _get_request_bytes_size()

assert measured_request_bytes_sum > measured_request_bytes_sum_init


def test_pending_request(port_generator):
port0 = port_generator()
port1 = port_generator()

class SlowExecutor(Executor):
@requests
def foo(self, docs, **kwargs):
time.sleep(5)

with Flow(monitoring=True, port_monitoring=port0).add(
uses=SlowExecutor, port_monitoring=port1
) as f:

def _send_request():
f.search(inputs=DocumentArray.empty(size=1))

def _assert_pending_value(val: str):
resp = req.get(f'http://localhost:{port0}/')
assert resp.status_code == 200
assert (
f'jina_number_of_pending_requests{{runtime_name="gateway/rep-0/GRPCGatewayRuntime"}} {val}'
in str(resp.content)
)

_assert_while = lambda: _assert_pending_value(
'1.0'
) # while the request is being processed the counter is at one
_assert_after = lambda: _assert_pending_value(
'0.0'
) # but before and after it is at 0
_assert_before = lambda: _assert_pending_value(
'0.0'
) # but before and after it is at 0

p_send = multiprocessing.Process(target=_send_request)
p_before = multiprocessing.Process(target=_assert_before)
p_while = multiprocessing.Process(target=_assert_while)

p_before.start()
time.sleep(1)
p_send.start()
time.sleep(1)
p_while.start()

for p in [p_before, p_send, p_while]:
p.join()

exitcodes = []
for p in [p_before, p_send, p_while]:
p.terminate()
exitcodes.append(
p.exitcode
) # collect the exit codes and assert after all of them have been terminated, to avoid timeouts

for code in exitcodes:
assert not code

_assert_after()

0 comments on commit a9ed27e

Please sign in to comment.