Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: unblock event loop to allow health service #5433

Merged
merged 6 commits into from
Nov 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/cd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ jobs:
export LINKERD2_VERSION=stable-2.11.4
curl --proto '=https' --tlsv1.2 -sSfL https://run.linkerd.io/install | sh
pytest -v -s --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml ./tests/k8s/test_k8s.py ./tests/k8s/test_graceful_request_handling.py
timeout-minutes: 30
timeout-minutes: 45
env:
JINA_K8S_USE_TEST_PIP: 1
- name: Check codecov file
Expand Down Expand Up @@ -307,13 +307,13 @@ jobs:
export JINA_LOG_LEVEL="ERROR"
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- name: Test k8s
- name: Test k8s failures
run: |
export LINKERD2_VERSION=stable-2.11.4
curl --proto '=https' --tlsv1.2 -sSfL https://run.linkerd.io/install | sh
curl --proto '=https' --tlsv1.2 -sSfL https://linkerd.github.io/linkerd-smi/install | sh
pytest -v -s --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml ./tests/k8s/test_k8s_failures.py
timeout-minutes: 30
timeout-minutes: 45
env:
JINA_K8S_USE_TEST_PIP: 1
- name: Check codecov file
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ jobs:
export LINKERD2_VERSION=stable-2.11.4
curl --proto '=https' --tlsv1.2 -sSfL https://run.linkerd.io/install | sh
pytest -v -s --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml ./tests/k8s/test_k8s.py ./tests/k8s/test_graceful_request_handling.py
timeout-minutes: 30
timeout-minutes: 45
env:
JINA_K8S_USE_TEST_PIP: 1
- name: Check codecov file
Expand Down Expand Up @@ -230,13 +230,13 @@ jobs:
export JINA_LOG_LEVEL="ERROR"
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- name: Test k8s
- name: Test k8s failures
run: |
export LINKERD2_VERSION=stable-2.11.4
curl --proto '=https' --tlsv1.2 -sSfL https://run.linkerd.io/install | sh
curl --proto '=https' --tlsv1.2 -sSfL https://linkerd.github.io/linkerd-smi/install | sh
pytest -v -s --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml ./tests/k8s/test_k8s_failures.py
timeout-minutes: 30
timeout-minutes: 45
env:
JINA_K8S_USE_TEST_PIP: 1
- name: Check codecov file
Expand Down
6 changes: 3 additions & 3 deletions extra-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@

numpy: core
protobuf>=3.20.0: core
grpcio>=1.46.0,<1.48.1: core
grpcio-reflection>=1.46.0,<1.48.1: core
grpcio-health-checking>=1.46.0,<1.48.1: core
grpcio>=1.46.0,<=1.47.0: core
grpcio-reflection>=1.46.0,<=1.47.0: core
grpcio-health-checking>=1.46.0,<=1.47.0: core
pyyaml>=5.3.1: core
packaging>=20.0: core
docarray>=0.16.4: core
Expand Down
6 changes: 3 additions & 3 deletions jina/resources/extra-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@

numpy: core
protobuf>=3.20.0: core
grpcio>=1.46.0,<1.48.1: core
grpcio-reflection>=1.46.0,<1.48.1: core
grpcio-health-checking>=1.46.0,<1.48.1: core
grpcio>=1.46.0,<=1.47.0: core
grpcio-reflection>=1.46.0,<=1.47.0: core
grpcio-health-checking>=1.46.0,<=1.47.0: core
pyyaml>=5.3.1: core
packaging>=20.0: core
docarray>=0.16.4: core
Expand Down
128 changes: 70 additions & 58 deletions jina/serve/executors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@
import copy
import inspect
import multiprocessing
import functools
import os
import asyncio
import threading
import warnings
import contextlib
from types import SimpleNamespace
from typing import TYPE_CHECKING, Any, Dict, Optional, Type, Union

Expand All @@ -21,6 +24,7 @@
from jina.serve.executors.metas import get_executor_taboo
from jina.serve.helper import store_init_kwargs, wrap_func
from jina.serve.instrumentation import MetricsTimer
from jina.helper import get_or_reuse_loop

if TYPE_CHECKING: # pragma: no cover
from opentelemetry.context.context import Context
Expand Down Expand Up @@ -60,7 +64,7 @@ def register_class(cls):
arg_spec = inspect.getfullargspec(cls.__init__)

if not arg_spec.varkw and not __args_executor_init__.issubset(
arg_spec.args
arg_spec.args
):
raise TypeError(
f'{cls.__init__} does not follow the full signature of `Executor.__init__`, '
Expand Down Expand Up @@ -120,12 +124,12 @@ def __init__(awesomeness=5):
"""

def __init__(
self,
metas: Optional[Dict] = None,
requests: Optional[Dict] = None,
runtime_args: Optional[Dict] = None,
workspace: Optional[str] = None,
**kwargs,
self,
metas: Optional[Dict] = None,
requests: Optional[Dict] = None,
runtime_args: Optional[Dict] = None,
workspace: Optional[str] = None,
**kwargs,
):
"""`metas` and `requests` are always auto-filled with values from YAML config.

Expand All @@ -152,17 +156,22 @@ def __init__(
if type(self) == BaseExecutor:
self.requests[__default_endpoint__] = self._dry_run_func

try:
self._lock = asyncio.Lock() # Lock to run in Executor non async methods in a way that does not block the event loop to do health checks without the fear of having race conditions or multithreading issues.
except RuntimeError:
self._lock = contextlib.AsyncExitStack()

def _dry_run_func(self, *args, **kwargs):
pass

def _init_monitoring(self):
if (
hasattr(self.runtime_args, 'metrics_registry')
and self.runtime_args.metrics_registry
hasattr(self.runtime_args, 'metrics_registry')
and self.runtime_args.metrics_registry
):
with ImportExtensions(
required=True,
help_text='You need to install the `prometheus_client` to use the montitoring functionality of jina',
required=True,
help_text='You need to install the `prometheus_client` to use the montitoring functionality of jina',
):
from prometheus_client import Summary

Expand Down Expand Up @@ -293,7 +302,7 @@ def _add_metas(self, _metas: Optional[Dict]):
if not hasattr(target, k):
if isinstance(v, str):
if not (
env_var_regex.findall(v) or internal_var_regex.findall(v)
env_var_regex.findall(v) or internal_var_regex.findall(v)
):
setattr(target, k, v)
else:
Expand Down Expand Up @@ -345,18 +354,21 @@ async def __acall__(self, req_endpoint: str, **kwargs):
return await self.__acall_endpoint__(__default_endpoint__, **kwargs)

async def __acall_endpoint__(
self, req_endpoint, tracing_context: Optional['Context'], **kwargs
self, req_endpoint, tracing_context: Optional['Context'], **kwargs
):
func = self.requests[req_endpoint]

async def exec_func(
summary, histogram, histogram_metric_labels, tracing_context
summary, histogram, histogram_metric_labels, tracing_context
):
with MetricsTimer(summary, histogram, histogram_metric_labels):
if iscoroutinefunction(func):
return await func(self, tracing_context=tracing_context, **kwargs)
else:
return func(self, tracing_context=tracing_context, **kwargs)

func = self.requests[req_endpoint]
async with self._lock:
return await get_or_reuse_loop().run_in_executor(None, functools.partial(func, self,
tracing_context=tracing_context,
**kwargs))

runtime_name = (
self.runtime_args.name if hasattr(self.runtime_args, 'name') else None
Expand All @@ -377,7 +389,7 @@ async def exec_func(

if self.tracer:
with self.tracer.start_as_current_span(
req_endpoint, context=tracing_context
req_endpoint, context=tracing_context
):
from opentelemetry.propagate import extract
from opentelemetry.trace.propagation.tracecontext import (
Expand Down Expand Up @@ -408,10 +420,10 @@ def workspace(self) -> Optional[str]:
:return: returns the workspace of the current shard of this Executor.
"""
workspace = (
getattr(self.runtime_args, 'workspace', None)
or getattr(self.metas, 'workspace')
or self._init_workspace
or __cache_path__
getattr(self.runtime_args, 'workspace', None)
or getattr(self.metas, 'workspace')
or self._init_workspace
or __cache_path__
)
if workspace:
complete_workspace = os.path.join(workspace, self.metas.name)
Expand All @@ -434,13 +446,13 @@ def __exit__(self, exc_type, exc_val, exc_tb):

@classmethod
def from_hub(
cls: Type[T],
uri: str,
context: Optional[Dict[str, Any]] = None,
uses_with: Optional[Dict] = None,
uses_metas: Optional[Dict] = None,
uses_requests: Optional[Dict] = None,
**kwargs,
cls: Type[T],
uri: str,
context: Optional[Dict[str, Any]] = None,
uses_with: Optional[Dict] = None,
uses_metas: Optional[Dict] = None,
uses_requests: Optional[Dict] = None,
**kwargs,
) -> T:
"""Construct an Executor from Hub.

Expand Down Expand Up @@ -492,12 +504,12 @@ def from_hub(

@classmethod
def serve(
cls,
uses_with: Optional[Dict] = None,
uses_metas: Optional[Dict] = None,
uses_requests: Optional[Dict] = None,
stop_event: Optional[Union[threading.Event, multiprocessing.Event]] = None,
**kwargs,
cls,
uses_with: Optional[Dict] = None,
uses_metas: Optional[Dict] = None,
uses_requests: Optional[Dict] = None,
stop_event: Optional[Union[threading.Event, multiprocessing.Event]] = None,
**kwargs,
):
"""Serve this Executor in a temporary Flow. Useful in testing an Executor in remote settings.

Expand Down Expand Up @@ -530,16 +542,16 @@ class StandaloneExecutorType(BetterEnum):

@staticmethod
def to_kubernetes_yaml(
uses: str,
output_base_path: str,
k8s_namespace: Optional[str] = None,
executor_type: Optional[
StandaloneExecutorType
] = StandaloneExecutorType.EXTERNAL,
uses_with: Optional[Dict] = None,
uses_metas: Optional[Dict] = None,
uses_requests: Optional[Dict] = None,
**kwargs,
uses: str,
output_base_path: str,
k8s_namespace: Optional[str] = None,
executor_type: Optional[
StandaloneExecutorType
] = StandaloneExecutorType.EXTERNAL,
uses_with: Optional[Dict] = None,
uses_metas: Optional[Dict] = None,
uses_requests: Optional[Dict] = None,
**kwargs,
):
"""
Converts the Executor into a set of yaml deployments to deploy in Kubernetes.
Expand Down Expand Up @@ -567,23 +579,23 @@ def to_kubernetes_yaml(
output_base_path=output_base_path,
k8s_namespace=k8s_namespace,
include_gateway=executor_type
== BaseExecutor.StandaloneExecutorType.EXTERNAL,
== BaseExecutor.StandaloneExecutorType.EXTERNAL,
)

to_k8s_yaml = to_kubernetes_yaml

@staticmethod
def to_docker_compose_yaml(
uses: str,
output_path: Optional[str] = None,
network_name: Optional[str] = None,
executor_type: Optional[
StandaloneExecutorType
] = StandaloneExecutorType.EXTERNAL,
uses_with: Optional[Dict] = None,
uses_metas: Optional[Dict] = None,
uses_requests: Optional[Dict] = None,
**kwargs,
uses: str,
output_path: Optional[str] = None,
network_name: Optional[str] = None,
executor_type: Optional[
StandaloneExecutorType
] = StandaloneExecutorType.EXTERNAL,
uses_with: Optional[Dict] = None,
uses_metas: Optional[Dict] = None,
uses_requests: Optional[Dict] = None,
**kwargs,
):
"""
Converts the Executor into a yaml file to run with `docker-compose up`
Expand All @@ -608,11 +620,11 @@ def to_docker_compose_yaml(
output_path=output_path,
network_name=network_name,
include_gateway=executor_type
== BaseExecutor.StandaloneExecutorType.EXTERNAL,
== BaseExecutor.StandaloneExecutorType.EXTERNAL,
)

def monitor(
self, name: Optional[str] = None, documentation: Optional[str] = None
self, name: Optional[str] = None, documentation: Optional[str] = None
) -> Optional[MetricsTimer]:
"""
Get a given prometheus metric, if it does not exist yet, it will create it and store it in a buffer.
Expand Down
22 changes: 9 additions & 13 deletions jina/serve/runtimes/worker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,10 @@ async def _async_setup_grpc_server(self):
reflection.SERVICE_NAME,
)
# Mark all services as healthy.
health_pb2_grpc.add_HealthServicer_to_server(
self._health_servicer, self._grpc_server
)
health_pb2_grpc.add_HealthServicer_to_server(self._health_servicer, self._grpc_server)

for service in service_names:
await self._health_servicer.set(
service, health_pb2.HealthCheckResponse.SERVING
)
await self._health_servicer.set(service, health_pb2.HealthCheckResponse.SERVING)
reflection.enable_server_reflection(service_names, self._grpc_server)
bind_addr = f'{self.args.host}:{self.args.port}'
self.logger.debug(f'start listening on {bind_addr}')
Expand Down Expand Up @@ -187,11 +183,11 @@ async def endpoint_discovery(self, empty, context) -> jina_pb2.EndpointsProto:
:returns: the response request
"""
self.logger.debug('got an endpoint discovery request')
endpointsProto = jina_pb2.EndpointsProto()
endpointsProto.endpoints.extend(
endpoints_proto = jina_pb2.EndpointsProto()
endpoints_proto.endpoints.extend(
list(self._request_handler._executor.requests.keys())
)
return endpointsProto
return endpoints_proto

def _extract_tracing_context(
self, metadata: grpc.aio.Metadata
Expand Down Expand Up @@ -268,10 +264,10 @@ async def _status(self, empty, context) -> jina_pb2.JinaInfoProto:
:param context: grpc context
:returns: the response request
"""
infoProto = jina_pb2.JinaInfoProto()
info_proto = jina_pb2.JinaInfoProto()
version, env_info = get_full_version()
for k, v in version.items():
infoProto.jina[k] = str(v)
info_proto.jina[k] = str(v)
for k, v in env_info.items():
infoProto.envs[k] = str(v)
return infoProto
info_proto.envs[k] = str(v)
return info_proto
Loading