-
Notifications
You must be signed in to change notification settings - Fork 2.2k
/
__init__.py
172 lines (151 loc) · 6.1 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
import argparse
import asyncio
import os
import urllib
from abc import ABC
from http import HTTPStatus
from typing import TYPE_CHECKING, Optional, Union
from jina import __default_host__
from jina.enums import GatewayProtocolType
from jina.excepts import PortAlreadyUsed
from jina.helper import is_port_free
from jina.parsers.helper import _set_gateway_uses
from jina.serve.gateway import BaseGateway
from jina.serve.runtimes.asyncio import AsyncNewLoopRuntime
if TYPE_CHECKING:
import multiprocessing
import threading
class GatewayRuntime(AsyncNewLoopRuntime):
"""
The Gateway Runtime that starts a gateway pod.
The GatewayRuntime is associated with a Gateway class that inherits :class:`~BaseGateway`.
While the Gateway class takes care of server and application logic and serving gRPC/HTTP/Websocket API,
The GatewayRuntime is responsible of instantiating the right Gateway class, injecting right parameters to it and
running/terminating the Gateway object.
"""
def __init__(
self,
args: argparse.Namespace,
cancel_event: Optional[
Union['asyncio.Event', 'multiprocessing.Event', 'threading.Event']
] = None,
**kwargs,
):
# this order is intentional: The timeout is needed in _create_topology_graph(), called by super
self.timeout_send = args.timeout_send
if self.timeout_send:
self.timeout_send /= 1e3 # convert ms to seconds
_set_gateway_uses(args)
super().__init__(args, cancel_event, **kwargs)
async def async_setup(self):
"""
The async method setup the runtime.
Setup the uvicorn server.
"""
if not (is_port_free(__default_host__, self.args.port)):
raise PortAlreadyUsed(f'port:{self.args.port}')
uses_with = self.args.uses_with or {}
print(f'--->args: {self.args}')
self.gateway = BaseGateway.load_config(
self.args.uses,
uses_with=dict(
name=self.name,
grpc_server_options=self.args.grpc_server_options,
port=self.args.port,
title=self.args.title,
description=self.args.description,
no_debug_endpoints=self.args.no_debug_endpoints,
no_crud_endpoints=self.args.no_crud_endpoints,
expose_endpoints=self.args.expose_endpoints,
expose_graphql_endpoint=self.args.expose_graphql_endpoint,
cors=self.args.cors,
ssl_keyfile=self.args.ssl_keyfile,
ssl_certfile=self.args.ssl_certfile,
uvicorn_kwargs=self.args.uvicorn_kwargs,
proxy=self.args.proxy,
opentelemetry_tracing=self.opentelemetry_tracing,
tracer_provider=self.tracer_provider,
**uses_with,
),
uses_metas={},
runtime_args={ # these are not parsed to the yaml config file but are pass directly during init
'name': self.args.name,
},
py_modules=self.args.py_modules,
extra_search_paths=self.args.extra_search_paths,
)
self.gateway.set_streamer(
args=self.args,
timeout_send=self.timeout_send,
metrics_registry=self.metrics_registry,
runtime_name=self.args.name,
aio_tracing_client_interceptors=self.aio_tracing_client_interceptors(
self.tracer
),
tracing_client_interceptor=self.tracing_client_interceptor(
self.tracer_provider
),
)
await self.gateway.setup_server()
async def _wait_for_cancel(self):
"""Do NOT override this method when inheriting from :class:`GatewayPod`"""
# handle terminate signals
while not self.is_cancel.is_set() and not self.gateway.should_exit:
await asyncio.sleep(0.1)
await self.async_cancel()
async def async_teardown(self):
"""Shutdown the server."""
await self.gateway.teardown()
await self.async_cancel()
async def async_cancel(self):
"""Stop the server."""
await self.gateway.stop_server()
async def async_run_forever(self):
"""Running method of the server."""
await self.gateway.run_server()
@staticmethod
def is_ready(ctrl_address: str, protocol: Optional[str] = 'grpc', **kwargs) -> bool:
"""
Check if status is ready.
:param ctrl_address: the address where the control request needs to be sent
:param protocol: protocol of the gateway runtime
:param kwargs: extra keyword arguments
:return: True if status is ready else False.
"""
if (
protocol is None
or protocol == GatewayProtocolType.GRPC
or protocol == 'grpc'
):
res = AsyncNewLoopRuntime.is_ready(ctrl_address)
else:
try:
conn = urllib.request.urlopen(url=f'http://{ctrl_address}')
res = conn.code == HTTPStatus.OK
except:
res = False
return res
@classmethod
def wait_for_ready_or_shutdown(
cls,
timeout: Optional[float],
ready_or_shutdown_event: Union['multiprocessing.Event', 'threading.Event'],
ctrl_address: str,
protocol: Optional[str] = 'grpc',
**kwargs,
):
"""
Check if the runtime has successfully started
:param timeout: The time to wait before readiness or failure is determined
:param ctrl_address: the address where the control message needs to be sent
:param ready_or_shutdown_event: the multiprocessing event to detect if the process failed or is ready
:param protocol: protocol of the gateway runtime
:param kwargs: extra keyword arguments
:return: True if is ready or it needs to be shutdown
"""
return super().wait_for_ready_or_shutdown(
timeout=timeout,
ready_or_shutdown_event=ready_or_shutdown_event,
ctrl_address=ctrl_address,
protocol=protocol,
)