-
Notifications
You must be signed in to change notification settings - Fork 2.2k
/
base.py
1828 lines (1535 loc) · 74.6 KB
/
base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import argparse
import base64
import copy
import itertools
import json
import os
import re
import sys
import threading
import time
import uuid
import warnings
from collections import OrderedDict
from contextlib import ExitStack
from typing import Optional, Union, Tuple, List, Set, Dict, overload, Type
from .builder import allowed_levels, _hanging_pods
from .. import __default_host__
from ..clients import Client
from ..clients.mixin import AsyncPostMixin, PostMixin
from ..enums import (
FlowBuildLevel,
PodRoleType,
FlowInspectType,
GatewayProtocolType,
InfrastructureType,
PollingType,
)
from ..excepts import (
FlowTopologyError,
FlowMissingPodError,
RoutingTableCyclicError,
RuntimeFailToStart,
)
from ..helper import (
colored,
get_public_ip,
get_internal_ip,
typename,
ArgNamespace,
download_mermaid_url,
CatchAllCleanupContextManager,
)
from ..jaml import JAMLCompatible
from ..logging.logger import JinaLogger
from ..parsers import set_gateway_parser, set_pod_parser, set_client_cli_parser
from ..parsers.flow import set_flow_parser
from ..peapods import CompoundPod, Pod
from ..peapods.pods.k8s import K8sPod
from ..peapods.pods.factory import PodFactory
from ..types.routing.table import RoutingTable
from ..peapods.networking import is_remote_local_connection
__all__ = ['Flow']
class FlowType(type(ExitStack), type(JAMLCompatible)):
"""Type of Flow, metaclass of :class:`BaseFlow`"""
pass
_regex_port = r'(.*?):([0-9]{1,4}|[1-5][0-9]{4}|6[0-4][0-9]{3}|65[0-4][0-9]{2}|655[0-2][0-9]|6553[0-5])$'
if False:
from ..executors import BaseExecutor
from ..clients.base import BaseClient
from .asyncio import AsyncFlow
GATEWAY_NAME = 'gateway'
FALLBACK_PARSERS = [
set_gateway_parser(),
set_pod_parser(),
set_client_cli_parser(),
set_flow_parser(),
]
class Flow(PostMixin, JAMLCompatible, ExitStack, metaclass=FlowType):
"""Flow is how Jina streamlines and distributes Executors. """
class _FlowK8sInfraResourcesManager:
def __init__(self, k8s_namespace: str, k8s_custom_resource_dir: Optional[str]):
self.k8s_namespace = k8s_namespace
self.k8s_custom_resource_dir = k8s_custom_resource_dir
self.namespace_created = False
def __enter__(self):
from ..peapods.pods.k8slib import kubernetes_tools
client = kubernetes_tools.K8sClients().core_v1
list_namespaces = [
item.metadata.name for item in client.list_namespace().items
]
if self.k8s_namespace not in list_namespaces:
with JinaLogger(f'create_{self.k8s_namespace}') as logger:
logger.info(f'🏝️\tCreate Namespace "{self.k8s_namespace}"')
kubernetes_tools.create(
'namespace',
{'name': self.k8s_namespace},
logger=logger,
custom_resource_dir=self.k8s_custom_resource_dir,
)
self.namespace_created = True
def __exit__(self, exc_type, exc_val, exc_tb):
from ..peapods.pods.k8slib import kubernetes_tools
if self.namespace_created:
client = kubernetes_tools.K8sClients().core_v1
client.delete_namespace(name=self.k8s_namespace)
# overload_inject_start_client_flow
@overload
def __init__(
self,
*,
asyncio: Optional[bool] = False,
host: Optional[str] = '0.0.0.0',
https: Optional[bool] = False,
port: Optional[int] = None,
prefetch: Optional[int] = 50,
prefetch_on_recv: Optional[int] = 1,
protocol: Optional[str] = 'GRPC',
proxy: Optional[bool] = False,
**kwargs,
):
"""Create a Flow. Flow is how Jina streamlines and scales Executors. This overloaded method provides arguments from `jina client` CLI.
:param asyncio: If set, then the input and output of this Client work in an asynchronous manner.
:param host: The host address of the runtime, by default it is 0.0.0.0.
:param https: If set, connect to gateway using https
:param port: The port of the Gateway, which the client should connect to.
:param prefetch: The number of pre-fetched requests from the client
:param prefetch_on_recv: The number of additional requests to fetch on every receive
:param protocol: Communication protocol between server and client.
:param proxy: If set, respect the http_proxy and https_proxy environment variables. otherwise, it will unset these proxy variables before start. gRPC seems to prefer no proxy
.. # noqa: DAR202
.. # noqa: DAR101
.. # noqa: DAR003
"""
# overload_inject_end_client_flow
# overload_inject_start_gateway_flow
@overload
def __init__(
self,
*,
compress: Optional[str] = 'NONE',
compress_min_bytes: Optional[int] = 1024,
compress_min_ratio: Optional[float] = 1.1,
cors: Optional[bool] = False,
ctrl_with_ipc: Optional[bool] = True,
daemon: Optional[bool] = False,
default_swagger_ui: Optional[bool] = False,
description: Optional[str] = None,
env: Optional[dict] = None,
expose_endpoints: Optional[str] = None,
expose_public: Optional[bool] = False,
host: Optional[str] = '0.0.0.0',
host_in: Optional[str] = '0.0.0.0',
host_out: Optional[str] = '0.0.0.0',
hosts_in_connect: Optional[List[str]] = None,
log_config: Optional[str] = None,
memory_hwm: Optional[int] = -1,
name: Optional[str] = 'gateway',
native: Optional[bool] = False,
no_crud_endpoints: Optional[bool] = False,
no_debug_endpoints: Optional[bool] = False,
on_error_strategy: Optional[str] = 'IGNORE',
port_ctrl: Optional[int] = None,
port_expose: Optional[int] = None,
port_in: Optional[int] = None,
port_out: Optional[int] = None,
prefetch: Optional[int] = 50,
prefetch_on_recv: Optional[int] = 1,
protocol: Optional[str] = 'GRPC',
proxy: Optional[bool] = False,
py_modules: Optional[List[str]] = None,
quiet: Optional[bool] = False,
quiet_error: Optional[bool] = False,
replicas: Optional[int] = 1,
runs_in_docker: Optional[bool] = False,
runtime_backend: Optional[str] = 'PROCESS',
runtime_cls: Optional[str] = 'GRPCRuntime',
shards: Optional[int] = 1,
socket_in: Optional[str] = 'PULL_CONNECT',
socket_out: Optional[str] = 'PUSH_CONNECT',
ssh_keyfile: Optional[str] = None,
ssh_password: Optional[str] = None,
ssh_server: Optional[str] = None,
static_routing_table: Optional[bool] = False,
timeout_ctrl: Optional[int] = 5000,
timeout_ready: Optional[int] = 600000,
title: Optional[str] = None,
uses: Optional[Union[str, Type['BaseExecutor'], dict]] = 'BaseExecutor',
uses_metas: Optional[dict] = None,
uses_requests: Optional[dict] = None,
uses_with: Optional[dict] = None,
uvicorn_kwargs: Optional[dict] = None,
workspace: Optional[str] = None,
zmq_identity: Optional[str] = None,
**kwargs,
):
"""Create a Flow. Flow is how Jina streamlines and scales Executors. This overloaded method provides arguments from `jina gateway` CLI.
:param compress: The compress algorithm used over the entire Flow.
Note that this is not necessarily effective,
it depends on the settings of `--compress-min-bytes` and `compress-min-ratio`
:param compress_min_bytes: The original message size must be larger than this number to trigger the compress algorithm, -1 means disable compression.
:param compress_min_ratio: The compression ratio (uncompressed_size/compressed_size) must be higher than this number to trigger the compress algorithm.
:param cors: If set, a CORS middleware is added to FastAPI frontend to allow cross-origin access.
:param ctrl_with_ipc: If set, use ipc protocol for control socket
:param daemon: The Pea attempts to terminate all of its Runtime child processes/threads on existing. setting it to true basically tell the Pea do not wait on the Runtime when closing
:param default_swagger_ui: If set, the default swagger ui is used for `/docs` endpoint.
:param description: The description of this HTTP server. It will be used in automatics docs such as Swagger UI.
:param env: The map of environment variables that are available inside runtime
:param expose_endpoints: A JSON string that represents a map from executor endpoints (`@requests(on=...)`) to HTTP endpoints.
:param expose_public: If set, expose the public IP address to remote when necessary, by default it exposesprivate IP address, which only allows accessing under the same network/subnet. Important to set this to true when the Pea will receive input connections from remote Peas
:param host: The host address of the runtime, by default it is 0.0.0.0.
:param host_in: The host address for input, by default it is 0.0.0.0
:param host_out: The host address for output, by default it is 0.0.0.0
:param hosts_in_connect: The host address for input, by default it is 0.0.0.0
:param log_config: The YAML config of the logger used in this object.
:param memory_hwm: The memory high watermark of this pod in Gigabytes, pod will restart when this is reached. -1 means no restriction
:param name: The name of this object.
This will be used in the following places:
- how you refer to this object in Python/YAML/CLI
- visualization
- log message header
- ...
When not given, then the default naming strategy will apply.
:param native: If set, only native Executors is allowed, and the Executor is always run inside ZEDRuntime.
:param no_crud_endpoints: If set, /index, /search, /update, /delete endpoints are removed from HTTP interface.
Any executor that has `@requests(on=...)` bind with those values will receive data requests.
:param no_debug_endpoints: If set, /status /post endpoints are removed from HTTP interface.
:param on_error_strategy: The skip strategy on exceptions.
- IGNORE: Ignore it, keep running all Executors in the sequel flow
- SKIP_HANDLE: Skip all Executors in the sequel, only `pre_hook` and `post_hook` are called
- THROW_EARLY: Immediately throw the exception, the sequel flow will not be running at all
Note, `IGNORE`, `SKIP_EXECUTOR` and `SKIP_HANDLE` do not guarantee the success execution in the sequel flow. If something
is wrong in the upstream, it is hard to carry this exception and moving forward without any side-effect.
:param port_ctrl: The port for controlling the runtime, default a random port between [49152, 65535]
:param port_expose: The port that the gateway exposes for clients for GRPC connections.
:param port_in: The port for input data, default a random port between [49152, 65535]
:param port_out: The port for output data, default a random port between [49152, 65535]
:param prefetch: The number of pre-fetched requests from the client
:param prefetch_on_recv: The number of additional requests to fetch on every receive
:param protocol: Communication protocol between server and client.
:param proxy: If set, respect the http_proxy and https_proxy environment variables. otherwise, it will unset these proxy variables before start. gRPC seems to prefer no proxy
:param py_modules: The customized python modules need to be imported before loading the executor
Note that the recommended way is to only import a single module - a simple python file, if your
executor can be defined in a single file, or an ``__init__.py`` file if you have multiple files,
which should be structured as a python package. For more details, please see the
`Executor cookbook <https://docs.jina.ai/fundamentals/executor/repository-structure/>`__
:param quiet: If set, then no log will be emitted from this object.
:param quiet_error: If set, then exception stack information will not be added to the log
:param replicas: The number of replicas in the pod, `port_in` and `port_out` will be set to random, and routers will be added automatically when necessary
:param runs_in_docker: Informs a Pea that runs in a container. Important to properly set networking information
:param runtime_backend: The parallel backend of the runtime inside the Pea
:param runtime_cls: The runtime class to run inside the Pea
:param shards: The number of shards in the pod running at the same time, `port_in` and `port_out` will be set to random, and routers will be added automatically when necessary
:param socket_in: The socket type for input port
:param socket_out: The socket type for output port
:param ssh_keyfile: This specifies a key to be used in ssh login, default None. regular default ssh keys will be used without specifying this argument.
:param ssh_password: The ssh password to the ssh server.
:param ssh_server: The SSH server through which the tunnel will be created, can actually be a fully specified `user@server:port` ssh url.
:param static_routing_table: Defines if the routing table should be pre computed by the Flow. In this case it is statically defined for each Pod and not send on every data request. Can not be used in combination with external pods
:param timeout_ctrl: The timeout in milliseconds of the control request, -1 for waiting forever
:param timeout_ready: The timeout in milliseconds of a Pea waits for the runtime to be ready, -1 for waiting forever
:param title: The title of this HTTP server. It will be used in automatics docs such as Swagger UI.
:param uses: The config of the executor, it could be one of the followings:
* an Executor YAML file (.yml, .yaml, .jaml)
* a Jina Hub Executor (must start with `jinahub://` or `jinahub+docker://`)
* a docker image (must start with `docker://`)
* the string literal of a YAML config (must start with `!` or `jtype: `)
* the string literal of a JSON config
When use it under Python, one can use the following values additionally:
- a Python dict that represents the config
- a text file stream has `.read()` interface
:param uses_metas: Dictionary of keyword arguments that will override the `metas` configuration in `uses`
:param uses_requests: Dictionary of keyword arguments that will override the `requests` configuration in `uses`
:param uses_with: Dictionary of keyword arguments that will override the `with` configuration in `uses`
:param uvicorn_kwargs: Dictionary of kwargs arguments that will be passed to Uvicorn server when starting the server
More details can be found in Uvicorn docs: https://www.uvicorn.org/settings/
:param workspace: The working directory for any IO operations in this object. If not set, then derive from its parent `workspace`.
:param zmq_identity: The identity of a ZMQRuntime. It is used for unique socket identification towards other ZMQRuntimes.
.. # noqa: DAR202
.. # noqa: DAR101
.. # noqa: DAR003
"""
# overload_inject_end_gateway_flow
# overload_inject_start_flow
@overload
def __init__(
self,
*,
env: Optional[dict] = None,
inspect: Optional[str] = 'COLLECT',
log_config: Optional[str] = None,
name: Optional[str] = None,
quiet: Optional[bool] = False,
quiet_error: Optional[bool] = False,
static_routing_table: Optional[bool] = False,
uses: Optional[str] = None,
workspace: Optional[str] = './',
**kwargs,
):
"""Create a Flow. Flow is how Jina streamlines and scales Executors. This overloaded method provides arguments from `jina flow` CLI.
:param env: The map of environment variables that are available inside runtime
:param inspect: The strategy on those inspect pods in the flow.
If `REMOVE` is given then all inspect pods are removed when building the flow.
:param log_config: The YAML config of the logger used in this object.
:param name: The name of this object.
This will be used in the following places:
- how you refer to this object in Python/YAML/CLI
- visualization
- log message header
- ...
When not given, then the default naming strategy will apply.
:param quiet: If set, then no log will be emitted from this object.
:param quiet_error: If set, then exception stack information will not be added to the log
:param static_routing_table: Defines if the routing table should be pre computed by the Flow. In this case it is statically defined for each Pod and not send on every data request. Can not be used in combination with external pods
:param uses: The YAML file represents a flow
:param workspace: The working directory for any IO operations in this object. If not set, then derive from its parent `workspace`.
.. # noqa: DAR202
.. # noqa: DAR101
.. # noqa: DAR003
"""
# overload_inject_end_flow
def __init__(
self,
args: Optional['argparse.Namespace'] = None,
**kwargs,
):
super().__init__()
self._version = '1' #: YAML version number, this will be later overridden if YAML config says the other way
self._pod_nodes = OrderedDict() # type: Dict[str, Pod]
self._inspect_pods = {} # type: Dict[str, str]
self._endpoints_mapping = {} # type: Dict[str, Dict]
self._build_level = FlowBuildLevel.EMPTY
self._last_changed_pod = [
GATEWAY_NAME
] #: default first pod is gateway, will add when build()
self._update_args(args, **kwargs)
self.k8s_infrastructure_manager = None
if self.args.infrastructure == InfrastructureType.K8S:
self.k8s_infrastructure_manager = self._FlowK8sInfraResourcesManager(
k8s_namespace=self.args.name,
k8s_custom_resource_dir=getattr(
self.args, 'k8s_custom_resource_dir', None
),
)
if isinstance(self.args, argparse.Namespace):
self.logger = JinaLogger(
self.__class__.__name__, **vars(self.args), **self._common_kwargs
)
else:
self.logger = JinaLogger(self.__class__.__name__, **self._common_kwargs)
def _update_args(self, args, **kwargs):
from ..parsers.flow import set_flow_parser
from ..helper import ArgNamespace
_flow_parser = set_flow_parser()
if args is None:
args = ArgNamespace.kwargs2namespace(
kwargs, _flow_parser, True, fallback_parsers=FALLBACK_PARSERS
)
self.args = args
# common args should be the ones that can not be parsed by _flow_parser
known_keys = vars(args)
self._common_kwargs = {k: v for k, v in kwargs.items() if k not in known_keys}
self._kwargs = ArgNamespace.get_non_defaults_args(
args, _flow_parser
) #: for yaml dump
if self._common_kwargs.get('asyncio', False) and not isinstance(
self, AsyncPostMixin
):
from .asyncio import AsyncFlow
self.__class__ = AsyncFlow
@staticmethod
def _parse_endpoints(op_flow, pod_name, endpoint, connect_to_last_pod=False) -> Set:
# parsing needs
if isinstance(endpoint, str):
endpoint = [endpoint]
elif not endpoint:
if op_flow._last_changed_pod and connect_to_last_pod:
endpoint = [op_flow.last_pod]
else:
endpoint = []
if isinstance(endpoint, (list, tuple)):
for idx, s in enumerate(endpoint):
if s == pod_name:
raise FlowTopologyError(
'the income/output of a pod can not be itself'
)
else:
raise ValueError(f'endpoint={endpoint} is not parsable')
# if an endpoint is being inspected, then replace it with inspected Pod
endpoint = set(op_flow._inspect_pods.get(ep, ep) for ep in endpoint)
return endpoint
@property
def last_pod(self):
"""Last pod
.. # noqa: DAR401
.. # noqa: DAR201
"""
return self._last_changed_pod[-1]
@last_pod.setter
def last_pod(self, name: str):
"""
Set a Pod as the last Pod in the Flow, useful when modifying the Flow.
.. # noqa: DAR401
:param name: the name of the existing Pod
"""
if name not in self._pod_nodes:
raise FlowMissingPodError(f'{name} can not be found in this Flow')
if self._last_changed_pod and name == self.last_pod:
pass
else:
self._last_changed_pod.append(name)
# graph is now changed so we need to
# reset the build level to the lowest
self._build_level = FlowBuildLevel.EMPTY
@allowed_levels([FlowBuildLevel.EMPTY])
def _add_gateway(self, needs, **kwargs):
kwargs.update(
dict(
name=GATEWAY_NAME,
ctrl_with_ipc=True, # otherwise ctrl port would be conflicted
host=self.host,
protocol=self.protocol,
port_expose=self.port_expose,
pod_role=PodRoleType.GATEWAY,
expose_endpoints=json.dumps(self._endpoints_mapping),
k8s_namespace=self.args.name,
)
)
kwargs.update(self._common_kwargs)
args = ArgNamespace.kwargs2namespace(kwargs, set_gateway_parser())
args.k8s_namespace = self.args.name
args.connect_to_predecessor = False
args.noblock_on_start = True
self._pod_nodes[GATEWAY_NAME] = PodFactory.build_pod(
args, needs, self.args.infrastructure
)
@allowed_levels([FlowBuildLevel.EMPTY])
def needs(
self, needs: Union[Tuple[str], List[str]], name: str = 'joiner', *args, **kwargs
) -> 'Flow':
"""
Add a blocker to the Flow, wait until all peas defined in **needs** completed.
.. # noqa: DAR401
:param needs: list of service names to wait
:param name: the name of this joiner, by default is ``joiner``
:param args: additional positional arguments forwarded to the add function
:param kwargs: additional key value arguments forwarded to the add function
:return: the modified Flow
"""
if len(needs) <= 1:
raise FlowTopologyError(
'no need to wait for a single service, need len(needs) > 1'
)
return self.add(
name=name, needs=needs, pod_role=PodRoleType.JOIN, *args, **kwargs
)
def needs_all(self, name: str = 'joiner', *args, **kwargs) -> 'Flow':
"""
Collect all hanging Pods so far and add a blocker to the Flow; wait until all handing peas completed.
:param name: the name of this joiner (default is ``joiner``)
:param args: additional positional arguments which are forwarded to the add and needs function
:param kwargs: additional key value arguments which are forwarded to the add and needs function
:return: the modified Flow
"""
needs = _hanging_pods(self)
if len(needs) == 1:
return self.add(name=name, needs=needs, *args, **kwargs)
return self.needs(name=name, needs=needs, *args, **kwargs)
# overload_inject_start_pod
@overload
def add(
self,
*,
connect_to_predecessor: Optional[bool] = False,
ctrl_with_ipc: Optional[bool] = False,
daemon: Optional[bool] = False,
docker_kwargs: Optional[dict] = None,
entrypoint: Optional[str] = None,
env: Optional[dict] = None,
expose_public: Optional[bool] = False,
external: Optional[bool] = False,
force: Optional[bool] = False,
gpus: Optional[str] = None,
host: Optional[str] = '0.0.0.0',
host_in: Optional[str] = '0.0.0.0',
host_out: Optional[str] = '0.0.0.0',
hosts_in_connect: Optional[List[str]] = None,
install_requirements: Optional[bool] = False,
log_config: Optional[str] = None,
memory_hwm: Optional[int] = -1,
name: Optional[str] = None,
native: Optional[bool] = False,
on_error_strategy: Optional[str] = 'IGNORE',
peas_hosts: Optional[List[str]] = None,
polling: Optional[str] = 'ANY',
port_ctrl: Optional[int] = None,
port_in: Optional[int] = None,
port_jinad: Optional[int] = 8000,
port_out: Optional[int] = None,
pull_latest: Optional[bool] = False,
py_modules: Optional[List[str]] = None,
quiet: Optional[bool] = False,
quiet_error: Optional[bool] = False,
quiet_remote_logs: Optional[bool] = False,
replicas: Optional[int] = 1,
runs_in_docker: Optional[bool] = False,
runtime_backend: Optional[str] = 'PROCESS',
runtime_cls: Optional[str] = 'ZEDRuntime',
scheduling: Optional[str] = 'LOAD_BALANCE',
shards: Optional[int] = 1,
socket_in: Optional[str] = 'PULL_BIND',
socket_out: Optional[str] = 'PUSH_BIND',
ssh_keyfile: Optional[str] = None,
ssh_password: Optional[str] = None,
ssh_server: Optional[str] = None,
static_routing_table: Optional[bool] = False,
timeout_ctrl: Optional[int] = 5000,
timeout_ready: Optional[int] = 600000,
upload_files: Optional[List[str]] = None,
uses: Optional[Union[str, Type['BaseExecutor'], dict]] = 'BaseExecutor',
uses_after: Optional[Union[str, Type['BaseExecutor'], dict]] = None,
uses_before: Optional[Union[str, Type['BaseExecutor'], dict]] = None,
uses_metas: Optional[dict] = None,
uses_requests: Optional[dict] = None,
uses_with: Optional[dict] = None,
volumes: Optional[List[str]] = None,
workspace: Optional[str] = None,
zmq_identity: Optional[str] = None,
**kwargs,
) -> Union['Flow', 'AsyncFlow']:
"""Add an Executor to the current Flow object.
:param connect_to_predecessor: The head Pea of this Pod will connect to the TailPea of the predecessor Pod.
:param ctrl_with_ipc: If set, use ipc protocol for control socket
:param daemon: The Pea attempts to terminate all of its Runtime child processes/threads on existing. setting it to true basically tell the Pea do not wait on the Runtime when closing
:param docker_kwargs: Dictionary of kwargs arguments that will be passed to Docker SDK when starting the docker '
container.
More details can be found in the Docker SDK docs: https://docker-py.readthedocs.io/en/stable/
:param entrypoint: The entrypoint command overrides the ENTRYPOINT in Docker image. when not set then the Docker image ENTRYPOINT takes effective.
:param env: The map of environment variables that are available inside runtime
:param expose_public: If set, expose the public IP address to remote when necessary, by default it exposesprivate IP address, which only allows accessing under the same network/subnet. Important to set this to true when the Pea will receive input connections from remote Peas
:param external: The Pod will be considered an external Pod that has been started independently from the Flow.This Pod will not be context managed by the Flow.
:param force: If set, always pull the latest Hub Executor bundle even it exists on local
:param gpus: This argument allows dockerized Jina executor discover local gpu devices.
Note,
- To access all gpus, use `--gpus all`.
- To access multiple gpus, e.g. make use of 2 gpus, use `--gpus 2`.
- To access specified gpus based on device id, use `--gpus device=[YOUR-GPU-DEVICE-ID]`
- To access specified gpus based on multiple device id, use `--gpus device=[YOUR-GPU-DEVICE-ID1],device=[YOUR-GPU-DEVICE-ID2]`
- To specify more parameters, use `--gpus device=[YOUR-GPU-DEVICE-ID],runtime=nvidia,capabilities=display
:param host: The host address of the runtime, by default it is 0.0.0.0.
:param host_in: The host address for input, by default it is 0.0.0.0
:param host_out: The host address for output, by default it is 0.0.0.0
:param hosts_in_connect: The host address for input, by default it is 0.0.0.0
:param install_requirements: If set, install `requirements.txt` in the Hub Executor bundle to local
:param log_config: The YAML config of the logger used in this object.
:param memory_hwm: The memory high watermark of this pod in Gigabytes, pod will restart when this is reached. -1 means no restriction
:param name: The name of this object.
This will be used in the following places:
- how you refer to this object in Python/YAML/CLI
- visualization
- log message header
- ...
When not given, then the default naming strategy will apply.
:param native: If set, only native Executors is allowed, and the Executor is always run inside ZEDRuntime.
:param on_error_strategy: The skip strategy on exceptions.
- IGNORE: Ignore it, keep running all Executors in the sequel flow
- SKIP_HANDLE: Skip all Executors in the sequel, only `pre_hook` and `post_hook` are called
- THROW_EARLY: Immediately throw the exception, the sequel flow will not be running at all
Note, `IGNORE`, `SKIP_EXECUTOR` and `SKIP_HANDLE` do not guarantee the success execution in the sequel flow. If something
is wrong in the upstream, it is hard to carry this exception and moving forward without any side-effect.
:param peas_hosts: The hosts of the peas when shards greater than 1.
Peas will be evenly distributed among the hosts. By default,
peas are running on host provided by the argument ``host``
:param polling: The polling strategy of the Pod (when `shards>1`)
- ANY: only one (whoever is idle) Pea polls the message
- ALL: all Peas poll the message (like a broadcast)
:param port_ctrl: The port for controlling the runtime, default a random port between [49152, 65535]
:param port_in: The port for input data, default a random port between [49152, 65535]
:param port_jinad: The port of the remote machine for usage with JinaD.
:param port_out: The port for output data, default a random port between [49152, 65535]
:param pull_latest: Pull the latest image before running
:param py_modules: The customized python modules need to be imported before loading the executor
Note that the recommended way is to only import a single module - a simple python file, if your
executor can be defined in a single file, or an ``__init__.py`` file if you have multiple files,
which should be structured as a python package. For more details, please see the
`Executor cookbook <https://docs.jina.ai/fundamentals/executor/repository-structure/>`__
:param quiet: If set, then no log will be emitted from this object.
:param quiet_error: If set, then exception stack information will not be added to the log
:param quiet_remote_logs: Do not display the streaming of remote logs on local console
:param replicas: The number of replicas in the pod, `port_in` and `port_out` will be set to random, and routers will be added automatically when necessary
:param runs_in_docker: Informs a Pea that runs in a container. Important to properly set networking information
:param runtime_backend: The parallel backend of the runtime inside the Pea
:param runtime_cls: The runtime class to run inside the Pea
:param scheduling: The strategy of scheduling workload among Peas
:param shards: The number of shards in the pod running at the same time, `port_in` and `port_out` will be set to random, and routers will be added automatically when necessary
:param socket_in: The socket type for input port
:param socket_out: The socket type for output port
:param ssh_keyfile: This specifies a key to be used in ssh login, default None. regular default ssh keys will be used without specifying this argument.
:param ssh_password: The ssh password to the ssh server.
:param ssh_server: The SSH server through which the tunnel will be created, can actually be a fully specified `user@server:port` ssh url.
:param static_routing_table: Defines if the routing table should be pre computed by the Flow. In this case it is statically defined for each Pod and not send on every data request. Can not be used in combination with external pods
:param timeout_ctrl: The timeout in milliseconds of the control request, -1 for waiting forever
:param timeout_ready: The timeout in milliseconds of a Pea waits for the runtime to be ready, -1 for waiting forever
:param upload_files: The files on the host to be uploaded to the remote
workspace. This can be useful when your Pod has more
file dependencies beyond a single YAML file, e.g.
Python files, data files.
Note,
- currently only flatten structure is supported, which means if you upload `[./foo/a.py, ./foo/b.pp, ./bar/c.yml]`, then they will be put under the _same_ workspace on the remote, losing all hierarchies.
- by default, `--uses` YAML file is always uploaded.
- uploaded files are by default isolated across the runs. To ensure files are submitted to the same workspace across different runs, use `--workspace-id` to specify the workspace.
:param uses: The config of the executor, it could be one of the followings:
* an Executor YAML file (.yml, .yaml, .jaml)
* a Jina Hub Executor (must start with `jinahub://` or `jinahub+docker://`)
* a docker image (must start with `docker://`)
* the string literal of a YAML config (must start with `!` or `jtype: `)
* the string literal of a JSON config
When use it under Python, one can use the following values additionally:
- a Python dict that represents the config
- a text file stream has `.read()` interface
:param uses_after: The executor attached after the Peas described by --uses, typically used for receiving from all shards, accepted type follows `--uses`
:param uses_before: The executor attached after the Peas described by --uses, typically before sending to all shards, accepted type follows `--uses`
:param uses_metas: Dictionary of keyword arguments that will override the `metas` configuration in `uses`
:param uses_requests: Dictionary of keyword arguments that will override the `requests` configuration in `uses`
:param uses_with: Dictionary of keyword arguments that will override the `with` configuration in `uses`
:param volumes: The path on the host to be mounted inside the container.
Note,
- If separated by `:`, then the first part will be considered as the local host path and the second part is the path in the container system.
- If no split provided, then the basename of that directory will be mounted into container's root path, e.g. `--volumes="/user/test/my-workspace"` will be mounted into `/my-workspace` inside the container.
- All volumes are mounted with read-write mode.
:param workspace: The working directory for any IO operations in this object. If not set, then derive from its parent `workspace`.
:param zmq_identity: The identity of a ZMQRuntime. It is used for unique socket identification towards other ZMQRuntimes.
:return: a (new) Flow object with modification
.. # noqa: DAR202
.. # noqa: DAR101
.. # noqa: DAR003
"""
# overload_inject_end_pod
@allowed_levels([FlowBuildLevel.EMPTY])
def add(
self,
*,
needs: Optional[Union[str, Tuple[str], List[str]]] = None,
copy_flow: bool = True,
pod_role: 'PodRoleType' = PodRoleType.POD,
**kwargs,
) -> 'Flow':
"""
Add a Pod to the current Flow object and return the new modified Flow object.
The attribute of the Pod can be later changed with :py:meth:`set` or deleted with :py:meth:`remove`
.. # noqa: DAR401
:param needs: the name of the Pod(s) that this Pod receives data from.
One can also use 'gateway' to indicate the connection with the gateway.
:param pod_role: the role of the Pod, used for visualization and route planning
:param copy_flow: when set to true, then always copy the current Flow and do the modification on top of it then return, otherwise, do in-line modification
:param kwargs: other keyword-value arguments that the Pod CLI supports
:return: a (new) Flow object with modification
"""
op_flow = copy.deepcopy(self) if copy_flow else self
# pod naming logic
pod_name = kwargs.get('name', None)
if pod_name in op_flow._pod_nodes:
new_name = f'{pod_name}{len(op_flow._pod_nodes)}'
self.logger.debug(
f'"{pod_name}" is used in this Flow already! renamed it to "{new_name}"'
)
pod_name = new_name
if not pod_name:
pod_name = f'executor{len(op_flow._pod_nodes)}'
if not pod_name.isidentifier():
# hyphen - can not be used in the name
raise ValueError(
f'name: {pod_name} is invalid, please follow the python variable name conventions'
)
# needs logic
needs = op_flow._parse_endpoints(
op_flow, pod_name, needs, connect_to_last_pod=True
)
# set the kwargs inherit from `Flow(kwargs1=..., kwargs2=)`
for key, value in op_flow._common_kwargs.items():
if key not in kwargs:
kwargs[key] = value
# check if host is set to remote:port
if 'host' in kwargs:
m = re.match(_regex_port, kwargs['host'])
if (
kwargs.get('host', __default_host__) != __default_host__
and m
and 'port_jinad' not in kwargs
):
kwargs['port_jinad'] = m.group(2)
kwargs['host'] = m.group(1)
# update kwargs of this Pod
kwargs.update(dict(name=pod_name, pod_role=pod_role, num_part=len(needs)))
parser = set_pod_parser()
if pod_role == PodRoleType.GATEWAY:
parser = set_gateway_parser()
args = ArgNamespace.kwargs2namespace(
kwargs, parser, True, fallback_parsers=FALLBACK_PARSERS
)
# grpc data runtime does not support sharding at the moment
if (
args.grpc_data_requests
and kwargs.get('shards') is not None
and kwargs.get('shards', 1) > 1
and self.args.infrastructure != InfrastructureType.K8S
):
raise NotImplementedError("GRPC data runtime does not support sharding")
if args.grpc_data_requests and args.runtime_cls == 'ZEDRuntime':
args.runtime_cls = 'GRPCDataRuntime'
# pod workspace if not set then derive from flow workspace
args.workspace = os.path.abspath(args.workspace or self.workspace)
args.k8s_namespace = self.args.name
args.noblock_on_start = True
# BACKWARDS COMPATIBILITY:
# We assume that this is used in a search Flow if replicas and shards are used
# Thus the polling type should be all
# But dont override any user provided polling
if args.replicas > 1 and args.shards > 1 and 'polling' not in kwargs:
args.polling = PollingType.ALL
op_flow._pod_nodes[pod_name] = PodFactory.build_pod(
args, needs, self.args.infrastructure
)
op_flow.last_pod = pod_name
return op_flow
@allowed_levels([FlowBuildLevel.EMPTY])
def inspect(self, name: str = 'inspect', *args, **kwargs) -> 'Flow':
"""Add an inspection on the last changed Pod in the Flow
Internally, it adds two Pods to the Flow. But don't worry, the overhead is minimized and you
can remove them by simply using `Flow(inspect=FlowInspectType.REMOVE)` before using the Flow.
.. highlight:: bash
.. code-block:: bash
Flow -- PUB-SUB -- BasePod(_pass) -- Flow
|
-- PUB-SUB -- InspectPod (Hanging)
In this way, :class:`InspectPod` looks like a simple ``_pass`` from outside and
does not introduce side-effects (e.g. changing the socket type) to the original Flow.
The original incoming and outgoing socket types are preserved.
This function is very handy for introducing an Evaluator into the Flow.
.. seealso::
:meth:`gather_inspect`
:param name: name of the Pod
:param args: args for .add()
:param kwargs: kwargs for .add()
:return: the new instance of the Flow
"""
_last_pod = self.last_pod
op_flow = self.add(
name=name, needs=_last_pod, pod_role=PodRoleType.INSPECT, *args, **kwargs
)
# now remove uses and add an auxiliary Pod
if 'uses' in kwargs:
kwargs.pop('uses')
op_flow = op_flow.add(
name=f'_aux_{name}',
needs=_last_pod,
pod_role=PodRoleType.INSPECT_AUX_PASS,
*args,
**kwargs,
)
# register any future connection to _last_pod by the auxiliary Pod
op_flow._inspect_pods[_last_pod] = op_flow.last_pod
return op_flow
@allowed_levels([FlowBuildLevel.EMPTY])
def gather_inspect(
self,
name: str = 'gather_inspect',
include_last_pod: bool = True,
*args,
**kwargs,
) -> 'Flow':
"""Gather all inspect Pods output into one Pod. When the Flow has no inspect Pod then the Flow itself
is returned.
.. note::
If ``--no-inspect`` is **not** given, then :meth:`gather_inspect` is auto called before :meth:`build`. So
in general you don't need to manually call :meth:`gather_inspect`.
:param name: the name of the gather Pod
:param include_last_pod: if to include the last modified Pod in the Flow
:param args: args for .add()
:param kwargs: kwargs for .add()
:return: the modified Flow or the copy of it
.. seealso::
:meth:`inspect`
"""
needs = [k for k, v in self._pod_nodes.items() if v.role == PodRoleType.INSPECT]
if needs:
if include_last_pod:
needs.append(self.last_pod)
return self.add(
name=name,
needs=needs,
pod_role=PodRoleType.JOIN_INSPECT,
*args,
**kwargs,
)
else:
# no inspect node is in the graph, return the current graph
return self
def _get_gateway_target(self, prefix):
gateway_pod = self._pod_nodes[GATEWAY_NAME]
return (
f'{prefix}-{GATEWAY_NAME}',
{
'host': gateway_pod.head_host,
'port': gateway_pod.head_port_in,
'expected_parts': 0,
},
)
# TODO needs to be refactored - deployment should not be a dictionary. Related Ticket:
# https://github.com/jina-ai/jina/issues/3280
def _get_routing_table(self) -> RoutingTable:
graph = RoutingTable()
for pod_id, pod in self._pod_nodes.items():
if pod_id == GATEWAY_NAME:
deployment = pod.deployments[0]
graph.add_pod(
f'start-{GATEWAY_NAME}',
deployment['head_host'],
deployment['head_port_in'],
deployment['tail_port_out'],
deployment['head_zmq_identity'],
)
graph.add_pod(
f'end-{GATEWAY_NAME}',
deployment['head_host'],
deployment['head_port_in'],
deployment['tail_port_out'],
deployment['head_zmq_identity'],
)
else:
for deployment in pod.deployments:
graph.add_pod(
deployment['name'],
deployment['head_host'],
deployment['head_port_in'],
deployment['tail_port_out'],
deployment['head_zmq_identity'],
)
for end, pod in self._pod_nodes.items():
if end == GATEWAY_NAME:
end = f'end-{GATEWAY_NAME}'
if pod.head_args.hosts_in_connect is None:
pod.head_args.hosts_in_connect = []
if isinstance(pod, K8sPod):
from ..peapods.pods.k8slib import kubernetes_deployment
end = kubernetes_deployment.to_dns_name(end)
if end not in graph.pods:
end = end + '_head'
if isinstance(pod, K8sPod):
from ..peapods.pods.k8slib import kubernetes_deployment
end = kubernetes_deployment.to_dns_name(end)
for start in pod.needs:
start_pod = self._pod_nodes[start]
if start == GATEWAY_NAME:
start = f'start-{GATEWAY_NAME}'
if isinstance(start_pod, K8sPod):
from ..peapods.pods.k8slib import kubernetes_deployment
start = kubernetes_deployment.to_dns_name(start)
if start not in graph.pods:
start = start + '_tail'
if isinstance(start_pod, K8sPod):
from ..peapods.pods.k8slib import kubernetes_deployment
start = kubernetes_deployment.to_dns_name(start)
start_pod = graph._get_target_pod(start)
if pod.connect_to_predecessor or is_remote_local_connection(
start_pod.host, pod.head_host
):
pod.head_args.hosts_in_connect.append(
graph._get_target_pod(start).full_out_address
)
graph.add_edge(start, end, True)
else:
graph.add_edge(start, end)