Skip to content

Commit

Permalink
fix: unblock event loop to allow health service (#5433)
Browse files Browse the repository at this point in the history
  • Loading branch information
JoanFM committed Nov 24, 2022
1 parent 8344d16 commit 8c110d6
Show file tree
Hide file tree
Showing 10 changed files with 242 additions and 134 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/cd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ jobs:
export LINKERD2_VERSION=stable-2.11.4
curl --proto '=https' --tlsv1.2 -sSfL https://run.linkerd.io/install | sh
pytest -v -s --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml ./tests/k8s/test_k8s.py ./tests/k8s/test_graceful_request_handling.py
timeout-minutes: 30
timeout-minutes: 45
env:
JINA_K8S_USE_TEST_PIP: 1
- name: Check codecov file
Expand Down Expand Up @@ -307,13 +307,13 @@ jobs:
export JINA_LOG_LEVEL="ERROR"
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- name: Test k8s
- name: Test k8s failures
run: |
export LINKERD2_VERSION=stable-2.11.4
curl --proto '=https' --tlsv1.2 -sSfL https://run.linkerd.io/install | sh
curl --proto '=https' --tlsv1.2 -sSfL https://linkerd.github.io/linkerd-smi/install | sh
pytest -v -s --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml ./tests/k8s/test_k8s_failures.py
timeout-minutes: 30
timeout-minutes: 45
env:
JINA_K8S_USE_TEST_PIP: 1
- name: Check codecov file
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ jobs:
export LINKERD2_VERSION=stable-2.11.4
curl --proto '=https' --tlsv1.2 -sSfL https://run.linkerd.io/install | sh
pytest -v -s --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml ./tests/k8s/test_k8s.py ./tests/k8s/test_graceful_request_handling.py
timeout-minutes: 30
timeout-minutes: 45
env:
JINA_K8S_USE_TEST_PIP: 1
- name: Check codecov file
Expand Down Expand Up @@ -230,13 +230,13 @@ jobs:
export JINA_LOG_LEVEL="ERROR"
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- name: Test k8s
- name: Test k8s failures
run: |
export LINKERD2_VERSION=stable-2.11.4
curl --proto '=https' --tlsv1.2 -sSfL https://run.linkerd.io/install | sh
curl --proto '=https' --tlsv1.2 -sSfL https://linkerd.github.io/linkerd-smi/install | sh
pytest -v -s --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml ./tests/k8s/test_k8s_failures.py
timeout-minutes: 30
timeout-minutes: 45
env:
JINA_K8S_USE_TEST_PIP: 1
- name: Check codecov file
Expand Down
6 changes: 3 additions & 3 deletions extra-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@

numpy: core
protobuf>=3.20.0: core
grpcio>=1.46.0,<1.48.1: core
grpcio-reflection>=1.46.0,<1.48.1: core
grpcio-health-checking>=1.46.0,<1.48.1: core
grpcio>=1.46.0,<=1.47.0: core
grpcio-reflection>=1.46.0,<=1.47.0: core
grpcio-health-checking>=1.46.0,<=1.47.0: core
pyyaml>=5.3.1: core
packaging>=20.0: core
docarray>=0.16.4: core
Expand Down
6 changes: 3 additions & 3 deletions jina/resources/extra-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@

numpy: core
protobuf>=3.20.0: core
grpcio>=1.46.0,<1.48.1: core
grpcio-reflection>=1.46.0,<1.48.1: core
grpcio-health-checking>=1.46.0,<1.48.1: core
grpcio>=1.46.0,<=1.47.0: core
grpcio-reflection>=1.46.0,<=1.47.0: core
grpcio-health-checking>=1.46.0,<=1.47.0: core
pyyaml>=5.3.1: core
packaging>=20.0: core
docarray>=0.16.4: core
Expand Down
128 changes: 70 additions & 58 deletions jina/serve/executors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@
import copy
import inspect
import multiprocessing
import functools
import os
import asyncio
import threading
import warnings
import contextlib
from types import SimpleNamespace
from typing import TYPE_CHECKING, Any, Dict, Optional, Type, Union

Expand All @@ -21,6 +24,7 @@
from jina.serve.executors.metas import get_executor_taboo
from jina.serve.helper import store_init_kwargs, wrap_func
from jina.serve.instrumentation import MetricsTimer
from jina.helper import get_or_reuse_loop

if TYPE_CHECKING: # pragma: no cover
from opentelemetry.context.context import Context
Expand Down Expand Up @@ -60,7 +64,7 @@ def register_class(cls):
arg_spec = inspect.getfullargspec(cls.__init__)

if not arg_spec.varkw and not __args_executor_init__.issubset(
arg_spec.args
arg_spec.args
):
raise TypeError(
f'{cls.__init__} does not follow the full signature of `Executor.__init__`, '
Expand Down Expand Up @@ -120,12 +124,12 @@ def __init__(awesomeness=5):
"""

def __init__(
self,
metas: Optional[Dict] = None,
requests: Optional[Dict] = None,
runtime_args: Optional[Dict] = None,
workspace: Optional[str] = None,
**kwargs,
self,
metas: Optional[Dict] = None,
requests: Optional[Dict] = None,
runtime_args: Optional[Dict] = None,
workspace: Optional[str] = None,
**kwargs,
):
"""`metas` and `requests` are always auto-filled with values from YAML config.
Expand All @@ -152,17 +156,22 @@ def __init__(
if type(self) == BaseExecutor:
self.requests[__default_endpoint__] = self._dry_run_func

try:
self._lock = asyncio.Lock() # Lock to run in Executor non async methods in a way that does not block the event loop to do health checks without the fear of having race conditions or multithreading issues.
except RuntimeError:
self._lock = contextlib.AsyncExitStack()

def _dry_run_func(self, *args, **kwargs):
pass

def _init_monitoring(self):
if (
hasattr(self.runtime_args, 'metrics_registry')
and self.runtime_args.metrics_registry
hasattr(self.runtime_args, 'metrics_registry')
and self.runtime_args.metrics_registry
):
with ImportExtensions(
required=True,
help_text='You need to install the `prometheus_client` to use the montitoring functionality of jina',
required=True,
help_text='You need to install the `prometheus_client` to use the montitoring functionality of jina',
):
from prometheus_client import Summary

Expand Down Expand Up @@ -293,7 +302,7 @@ def _add_metas(self, _metas: Optional[Dict]):
if not hasattr(target, k):
if isinstance(v, str):
if not (
env_var_regex.findall(v) or internal_var_regex.findall(v)
env_var_regex.findall(v) or internal_var_regex.findall(v)
):
setattr(target, k, v)
else:
Expand Down Expand Up @@ -345,18 +354,21 @@ async def __acall__(self, req_endpoint: str, **kwargs):
return await self.__acall_endpoint__(__default_endpoint__, **kwargs)

async def __acall_endpoint__(
self, req_endpoint, tracing_context: Optional['Context'], **kwargs
self, req_endpoint, tracing_context: Optional['Context'], **kwargs
):
func = self.requests[req_endpoint]

async def exec_func(
summary, histogram, histogram_metric_labels, tracing_context
summary, histogram, histogram_metric_labels, tracing_context
):
with MetricsTimer(summary, histogram, histogram_metric_labels):
if iscoroutinefunction(func):
return await func(self, tracing_context=tracing_context, **kwargs)
else:
return func(self, tracing_context=tracing_context, **kwargs)

func = self.requests[req_endpoint]
async with self._lock:
return await get_or_reuse_loop().run_in_executor(None, functools.partial(func, self,
tracing_context=tracing_context,
**kwargs))

runtime_name = (
self.runtime_args.name if hasattr(self.runtime_args, 'name') else None
Expand All @@ -377,7 +389,7 @@ async def exec_func(

if self.tracer:
with self.tracer.start_as_current_span(
req_endpoint, context=tracing_context
req_endpoint, context=tracing_context
):
from opentelemetry.propagate import extract
from opentelemetry.trace.propagation.tracecontext import (
Expand Down Expand Up @@ -408,10 +420,10 @@ def workspace(self) -> Optional[str]:
:return: returns the workspace of the current shard of this Executor.
"""
workspace = (
getattr(self.runtime_args, 'workspace', None)
or getattr(self.metas, 'workspace')
or self._init_workspace
or __cache_path__
getattr(self.runtime_args, 'workspace', None)
or getattr(self.metas, 'workspace')
or self._init_workspace
or __cache_path__
)
if workspace:
complete_workspace = os.path.join(workspace, self.metas.name)
Expand All @@ -434,13 +446,13 @@ def __exit__(self, exc_type, exc_val, exc_tb):

@classmethod
def from_hub(
cls: Type[T],
uri: str,
context: Optional[Dict[str, Any]] = None,
uses_with: Optional[Dict] = None,
uses_metas: Optional[Dict] = None,
uses_requests: Optional[Dict] = None,
**kwargs,
cls: Type[T],
uri: str,
context: Optional[Dict[str, Any]] = None,
uses_with: Optional[Dict] = None,
uses_metas: Optional[Dict] = None,
uses_requests: Optional[Dict] = None,
**kwargs,
) -> T:
"""Construct an Executor from Hub.
Expand Down Expand Up @@ -492,12 +504,12 @@ def from_hub(

@classmethod
def serve(
cls,
uses_with: Optional[Dict] = None,
uses_metas: Optional[Dict] = None,
uses_requests: Optional[Dict] = None,
stop_event: Optional[Union[threading.Event, multiprocessing.Event]] = None,
**kwargs,
cls,
uses_with: Optional[Dict] = None,
uses_metas: Optional[Dict] = None,
uses_requests: Optional[Dict] = None,
stop_event: Optional[Union[threading.Event, multiprocessing.Event]] = None,
**kwargs,
):
"""Serve this Executor in a temporary Flow. Useful in testing an Executor in remote settings.
Expand Down Expand Up @@ -530,16 +542,16 @@ class StandaloneExecutorType(BetterEnum):

@staticmethod
def to_kubernetes_yaml(
uses: str,
output_base_path: str,
k8s_namespace: Optional[str] = None,
executor_type: Optional[
StandaloneExecutorType
] = StandaloneExecutorType.EXTERNAL,
uses_with: Optional[Dict] = None,
uses_metas: Optional[Dict] = None,
uses_requests: Optional[Dict] = None,
**kwargs,
uses: str,
output_base_path: str,
k8s_namespace: Optional[str] = None,
executor_type: Optional[
StandaloneExecutorType
] = StandaloneExecutorType.EXTERNAL,
uses_with: Optional[Dict] = None,
uses_metas: Optional[Dict] = None,
uses_requests: Optional[Dict] = None,
**kwargs,
):
"""
Converts the Executor into a set of yaml deployments to deploy in Kubernetes.
Expand Down Expand Up @@ -567,23 +579,23 @@ def to_kubernetes_yaml(
output_base_path=output_base_path,
k8s_namespace=k8s_namespace,
include_gateway=executor_type
== BaseExecutor.StandaloneExecutorType.EXTERNAL,
== BaseExecutor.StandaloneExecutorType.EXTERNAL,
)

to_k8s_yaml = to_kubernetes_yaml

@staticmethod
def to_docker_compose_yaml(
uses: str,
output_path: Optional[str] = None,
network_name: Optional[str] = None,
executor_type: Optional[
StandaloneExecutorType
] = StandaloneExecutorType.EXTERNAL,
uses_with: Optional[Dict] = None,
uses_metas: Optional[Dict] = None,
uses_requests: Optional[Dict] = None,
**kwargs,
uses: str,
output_path: Optional[str] = None,
network_name: Optional[str] = None,
executor_type: Optional[
StandaloneExecutorType
] = StandaloneExecutorType.EXTERNAL,
uses_with: Optional[Dict] = None,
uses_metas: Optional[Dict] = None,
uses_requests: Optional[Dict] = None,
**kwargs,
):
"""
Converts the Executor into a yaml file to run with `docker-compose up`
Expand All @@ -608,11 +620,11 @@ def to_docker_compose_yaml(
output_path=output_path,
network_name=network_name,
include_gateway=executor_type
== BaseExecutor.StandaloneExecutorType.EXTERNAL,
== BaseExecutor.StandaloneExecutorType.EXTERNAL,
)

def monitor(
self, name: Optional[str] = None, documentation: Optional[str] = None
self, name: Optional[str] = None, documentation: Optional[str] = None
) -> Optional[MetricsTimer]:
"""
Get a given prometheus metric, if it does not exist yet, it will create it and store it in a buffer.
Expand Down
22 changes: 9 additions & 13 deletions jina/serve/runtimes/worker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,10 @@ async def _async_setup_grpc_server(self):
reflection.SERVICE_NAME,
)
# Mark all services as healthy.
health_pb2_grpc.add_HealthServicer_to_server(
self._health_servicer, self._grpc_server
)
health_pb2_grpc.add_HealthServicer_to_server(self._health_servicer, self._grpc_server)

for service in service_names:
await self._health_servicer.set(
service, health_pb2.HealthCheckResponse.SERVING
)
await self._health_servicer.set(service, health_pb2.HealthCheckResponse.SERVING)
reflection.enable_server_reflection(service_names, self._grpc_server)
bind_addr = f'{self.args.host}:{self.args.port}'
self.logger.debug(f'start listening on {bind_addr}')
Expand Down Expand Up @@ -187,11 +183,11 @@ async def endpoint_discovery(self, empty, context) -> jina_pb2.EndpointsProto:
:returns: the response request
"""
self.logger.debug('got an endpoint discovery request')
endpointsProto = jina_pb2.EndpointsProto()
endpointsProto.endpoints.extend(
endpoints_proto = jina_pb2.EndpointsProto()
endpoints_proto.endpoints.extend(
list(self._request_handler._executor.requests.keys())
)
return endpointsProto
return endpoints_proto

def _extract_tracing_context(
self, metadata: grpc.aio.Metadata
Expand Down Expand Up @@ -268,10 +264,10 @@ async def _status(self, empty, context) -> jina_pb2.JinaInfoProto:
:param context: grpc context
:returns: the response request
"""
infoProto = jina_pb2.JinaInfoProto()
info_proto = jina_pb2.JinaInfoProto()
version, env_info = get_full_version()
for k, v in version.items():
infoProto.jina[k] = str(v)
info_proto.jina[k] = str(v)
for k, v in env_info.items():
infoProto.envs[k] = str(v)
return infoProto
info_proto.envs[k] = str(v)
return info_proto
Loading

0 comments on commit 8c110d6

Please sign in to comment.