From 32dc3dc38fa807a4f2f180ef459e656a79c56f09 Mon Sep 17 00:00:00 2001 From: Joan Fontanals Date: Wed, 21 Jul 2021 03:55:27 +0200 Subject: [PATCH] fix: fix setting extra host_connect_in (#2973) * feat: add local flow use remote executor test * test: add docker compose * test: add endpoint * test: assert with return results * feat: add static ip address * fix: fix setting extra host_connect_in * test: add tests to have remote pod working Co-authored-by: bwanglzu --- jina/__init__.py | 2 +- jina/flow/base.py | 2 +- jina/peapods/networking.py | 1 + jina/peapods/pods/__init__.py | 7 ++ jina/peapods/zmq/__init__.py | 6 +- jina/proto/jina.proto | 1 + jina/proto/jina_pb2.py | 85 ++++++++++--------- jina/types/routing/table.py | 17 ++++ .../__init__.py | 0 .../docker-compose.yml | 25 ++++++ .../test_integration.py | 58 +++++++++++++ tests/unit/types/test_routing_graph.py | 51 +++++------ 12 files changed, 187 insertions(+), 68 deletions(-) create mode 100644 tests/distributed/test_local_flow_use_remote_executor/__init__.py create mode 100644 tests/distributed/test_local_flow_use_remote_executor/docker-compose.yml create mode 100644 tests/distributed/test_local_flow_use_remote_executor/test_integration.py diff --git a/jina/__init__.py b/jina/__init__.py index 519dd40181ea4..89a11fb8f2636 100644 --- a/jina/__init__.py +++ b/jina/__init__.py @@ -36,7 +36,7 @@ # do not change this line manually # this is managed by proto/build-proto.sh and updated on every execution -__proto_version__ = '0.0.84' +__proto_version__ = '0.0.85' __uptime__ = _datetime.datetime.now().isoformat() diff --git a/jina/flow/base.py b/jina/flow/base.py index be5577fc96134..fadde2de867f9 100644 --- a/jina/flow/base.py +++ b/jina/flow/base.py @@ -806,7 +806,7 @@ def _get_routing_table(self) -> RoutingTable: start_pod = graph._get_target_pod(start) if is_remote_local_connection(start_pod.host, pod.head_host): pod.head_args.hosts_in_connect.append( - graph._get_target_pod(start).full_address + graph._get_target_pod(start).full_out_address ) graph.add_edge(start, end, True) diff --git a/jina/peapods/networking.py b/jina/peapods/networking.py index 9e26361e9a012..34f03f18069f3 100644 --- a/jina/peapods/networking.py +++ b/jina/peapods/networking.py @@ -13,6 +13,7 @@ def is_remote_local_connection(first: str, second: str): :param second: the ip or host name of the second runtime :return: True, if first is remote and second is local """ + try: first_ip = ipaddress.ip_address(first) first_global = first_ip.is_global diff --git a/jina/peapods/pods/__init__.py b/jina/peapods/pods/__init__.py index 19f71c9d92cdf..70098a758b693 100644 --- a/jina/peapods/pods/__init__.py +++ b/jina/peapods/pods/__init__.py @@ -160,6 +160,13 @@ def head_port_in(self): """ return self.head_args.port_in + @property + def tail_port_out(self): + """Get the port_out of the TailPea of this pod + .. # noqa: DAR201 + """ + return self.tail_args.port_out + @property def head_zmq_identity(self): """Get the zmq_identity of the HeadPea of this pod diff --git a/jina/peapods/zmq/__init__.py b/jina/peapods/zmq/__init__.py index 5f974e43ec9da..2b9d8ce7130dc 100644 --- a/jina/peapods/zmq/__init__.py +++ b/jina/peapods/zmq/__init__.py @@ -216,7 +216,6 @@ def _init_sockets(self) -> Tuple: for address in self.args.hosts_in_connect: if in_connect is None: host, port = address.split(':') - in_connect, _ = _init_socket( ctx, host, @@ -286,6 +285,7 @@ def print_stats(self): ) def _init_dynamic_out_socket(self, host_out, port_out): + out_sock, _ = _init_socket( self.ctx, host_out, @@ -452,7 +452,9 @@ async def recv_message( :return: Received protobuf message. Or None in case of any error. """ try: - msg = await recv_message_async(self.in_sock, **self.send_recv_kwargs) + msg = await recv_message_async( + self.in_connect_sock or self.in_sock, **self.send_recv_kwargs + ) self.msg_recv += 1 if msg is not None: self.bytes_recv += msg.size diff --git a/jina/proto/jina.proto b/jina/proto/jina.proto index 408d7e4549674..b802ffe8c6be7 100644 --- a/jina/proto/jina.proto +++ b/jina/proto/jina.proto @@ -161,6 +161,7 @@ message RouteProto { message TargetPodProto{ string host = 1; // the host HeadPea of the BasePod uint32 port = 2; // the port HeadPea of the BasePod + uint32 port_out = 6; // the port TailPea of the BasePod uint32 expected_parts = 3; // the number of parts the pod should expect repeated RoutingEdgeProto out_edges = 4; // pod_name of Pods, the TailPea should send traffic to string target_identity = 5; diff --git a/jina/proto/jina_pb2.py b/jina/proto/jina_pb2.py index 70ac304a3c440..b514e307dda1a 100644 --- a/jina/proto/jina_pb2.py +++ b/jina/proto/jina_pb2.py @@ -21,7 +21,7 @@ syntax='proto3', serialized_options=None, create_key=_descriptor._internal_create_key, - serialized_pb=b'\n\njina.proto\x12\x04jina\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1cgoogle/protobuf/struct.proto\"\xfd\x01\n\x11\x44\x65nseNdArrayProto\x12\x0e\n\x06\x62uffer\x18\x01 \x01(\x0c\x12\r\n\x05shape\x18\x02 \x03(\r\x12\r\n\x05\x64type\x18\x03 \x01(\t\x12>\n\x0cquantization\x18\x04 \x01(\x0e\x32(.jina.DenseNdArrayProto.QuantizationMode\x12\x0f\n\x07max_val\x18\x05 \x01(\x02\x12\x0f\n\x07min_val\x18\x06 \x01(\x02\x12\r\n\x05scale\x18\x07 \x01(\x02\x12\x16\n\x0eoriginal_dtype\x18\x08 \x01(\t\"1\n\x10QuantizationMode\x12\x08\n\x04NONE\x10\x00\x12\x08\n\x04\x46P16\x10\x01\x12\t\n\x05UINT8\x10\x02\"o\n\x0cNdArrayProto\x12(\n\x05\x64\x65nse\x18\x01 \x01(\x0b\x32\x17.jina.DenseNdArrayProtoH\x00\x12*\n\x06sparse\x18\x02 \x01(\x0b\x32\x18.jina.SparseNdArrayProtoH\x00\x42\t\n\x07\x63ontent\"v\n\x12SparseNdArrayProto\x12(\n\x07indices\x18\x01 \x01(\x0b\x32\x17.jina.DenseNdArrayProto\x12\'\n\x06values\x18\x02 \x01(\x0b\x32\x17.jina.DenseNdArrayProto\x12\r\n\x05shape\x18\x03 \x03(\r\"\x7f\n\x0fNamedScoreProto\x12\r\n\x05value\x18\x01 \x01(\x02\x12\x0f\n\x07op_name\x18\x02 \x01(\t\x12\x13\n\x0b\x64\x65scription\x18\x03 \x01(\t\x12\'\n\x08operands\x18\x04 \x03(\x0b\x32\x15.jina.NamedScoreProto\x12\x0e\n\x06ref_id\x18\x05 \x01(\t\"}\n\nGraphProto\x12+\n\tadjacency\x18\x01 \x01(\x0b\x32\x18.jina.SparseNdArrayProto\x12.\n\redge_features\x18\x02 \x01(\x0b\x32\x17.google.protobuf.Struct\x12\x12\n\nundirected\x18\x03 \x01(\x08\"\xdc\x05\n\rDocumentProto\x12\n\n\x02id\x18\x01 \x01(\t\x12\x14\n\x0c\x63ontent_hash\x18\x18 \x01(\t\x12\x13\n\x0bgranularity\x18\x0e \x01(\r\x12\x11\n\tadjacency\x18\x16 \x01(\r\x12\x11\n\tparent_id\x18\x10 \x01(\t\x12\x10\n\x06\x62uffer\x18\x03 \x01(\x0cH\x00\x12\"\n\x04\x62lob\x18\x0c \x01(\x0b\x32\x12.jina.NdArrayProtoH\x00\x12\x0e\n\x04text\x18\r \x01(\tH\x00\x12\r\n\x03uri\x18\t \x01(\tH\x00\x12!\n\x05graph\x18\x1b \x01(\x0b\x32\x10.jina.GraphProtoH\x00\x12#\n\x06\x63hunks\x18\x04 \x03(\x0b\x32\x13.jina.DocumentProto\x12\x0e\n\x06weight\x18\x05 \x01(\x02\x12$\n\x07matches\x18\x08 \x03(\x0b\x32\x13.jina.DocumentProto\x12\x11\n\tmime_type\x18\n \x01(\t\x12%\n\x04tags\x18\x0b \x01(\x0b\x32\x17.google.protobuf.Struct\x12\x10\n\x08location\x18\x11 \x03(\r\x12\x0e\n\x06offset\x18\x12 \x01(\r\x12%\n\tembedding\x18\x13 \x01(\x0b\x32\x12.jina.NdArrayProto\x12/\n\x06scores\x18\x1c \x03(\x0b\x32\x1f.jina.DocumentProto.ScoresEntry\x12\x10\n\x08modality\x18\x15 \x01(\t\x12\x39\n\x0b\x65valuations\x18\x1d \x03(\x0b\x32$.jina.DocumentProto.EvaluationsEntry\x1a\x44\n\x0bScoresEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12$\n\x05value\x18\x02 \x01(\x0b\x32\x15.jina.NamedScoreProto:\x02\x38\x01\x1aI\n\x10\x45valuationsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12$\n\x05value\x18\x02 \x01(\x0b\x32\x15.jina.NamedScoreProto:\x02\x38\x01\x42\t\n\x07\x63ontent\"\xaa\x01\n\nRouteProto\x12\x0b\n\x03pod\x18\x01 \x01(\t\x12\x0e\n\x06pod_id\x18\x02 \x01(\t\x12.\n\nstart_time\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12,\n\x08\x65nd_time\x18\x04 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12!\n\x06status\x18\x05 \x01(\x0b\x32\x11.jina.StatusProto\"\x88\x01\n\x0eTargetPodProto\x12\x0c\n\x04host\x18\x01 \x01(\t\x12\x0c\n\x04port\x18\x02 \x01(\r\x12\x16\n\x0e\x65xpected_parts\x18\x03 \x01(\r\x12)\n\tout_edges\x18\x04 \x03(\x0b\x32\x16.jina.RoutingEdgeProto\x12\x17\n\x0ftarget_identity\x18\x05 \x01(\t\"5\n\x10RoutingEdgeProto\x12\x0b\n\x03pod\x18\x01 \x01(\t\x12\x14\n\x0csend_as_bind\x18\x02 \x01(\x08\"\x9b\x01\n\x11RoutingTableProto\x12/\n\x04pods\x18\x01 \x03(\x0b\x32!.jina.RoutingTableProto.PodsEntry\x12\x12\n\nactive_pod\x18\x02 \x01(\t\x1a\x41\n\tPodsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12#\n\x05value\x18\x02 \x01(\x0b\x32\x14.jina.TargetPodProto:\x02\x38\x01\"\xc9\x04\n\rEnvelopeProto\x12\x11\n\tsender_id\x18\x01 \x01(\t\x12\x13\n\x0breceiver_id\x18\x02 \x01(\t\x12\x12\n\nrequest_id\x18\x03 \x01(\t\x12\x0f\n\x07timeout\x18\x04 \x01(\r\x12\x31\n\x07version\x18\x06 \x01(\x0b\x32 .jina.EnvelopeProto.VersionProto\x12\x14\n\x0crequest_type\x18\x07 \x01(\t\x12\x15\n\rcheck_version\x18\x08 \x01(\x08\x12<\n\x0b\x63ompression\x18\t \x01(\x0b\x32\'.jina.EnvelopeProto.CompressConfigProto\x12 \n\x06routes\x18\n \x03(\x0b\x32\x10.jina.RouteProto\x12.\n\rrouting_table\x18\r \x01(\x0b\x32\x17.jina.RoutingTableProto\x12!\n\x06status\x18\x0b \x01(\x0b\x32\x11.jina.StatusProto\x12!\n\x06header\x18\x0c \x01(\x0b\x32\x11.jina.HeaderProto\x1a\x38\n\x0cVersionProto\x12\x0c\n\x04jina\x18\x01 \x01(\t\x12\r\n\x05proto\x18\x02 \x01(\t\x12\x0b\n\x03vcs\x18\x03 \x01(\t\x1a{\n\x13\x43ompressConfigProto\x12\x11\n\talgorithm\x18\x01 \x01(\t\x12\x11\n\tmin_bytes\x18\x02 \x01(\x04\x12\x11\n\tmin_ratio\x18\x03 \x01(\x02\x12+\n\nparameters\x18\x04 \x01(\x0b\x32\x17.google.protobuf.Struct\"Q\n\x0bHeaderProto\x12\x15\n\rexec_endpoint\x18\x01 \x01(\t\x12\x15\n\rtarget_peapod\x18\x02 \x01(\t\x12\x14\n\x0cno_propagate\x18\x03 \x01(\x08\"\xcf\x02\n\x0bStatusProto\x12*\n\x04\x63ode\x18\x01 \x01(\x0e\x32\x1c.jina.StatusProto.StatusCode\x12\x13\n\x0b\x64\x65scription\x18\x02 \x01(\t\x12\x33\n\texception\x18\x03 \x01(\x0b\x32 .jina.StatusProto.ExceptionProto\x1aN\n\x0e\x45xceptionProto\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x0c\n\x04\x61rgs\x18\x02 \x03(\t\x12\x0e\n\x06stacks\x18\x03 \x03(\t\x12\x10\n\x08\x65xecutor\x18\x04 \x01(\t\"z\n\nStatusCode\x12\x0b\n\x07SUCCESS\x10\x00\x12\x0b\n\x07PENDING\x10\x01\x12\t\n\x05READY\x10\x02\x12\t\n\x05\x45RROR\x10\x03\x12\x13\n\x0f\x45RROR_DUPLICATE\x10\x04\x12\x14\n\x10\x45RROR_NOTALLOWED\x10\x05\x12\x11\n\rERROR_CHAINED\x10\x06\"Z\n\x0cMessageProto\x12%\n\x08\x65nvelope\x18\x01 \x01(\x0b\x32\x13.jina.EnvelopeProto\x12#\n\x07request\x18\x02 \x01(\x0b\x32\x12.jina.RequestProto\"7\n\x12\x44ocumentArrayProto\x12!\n\x04\x64ocs\x18\x01 \x03(\x0b\x32\x13.jina.DocumentProto\"\xcf\x04\n\x0cRequestProto\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12\x39\n\x07\x63ontrol\x18\x02 \x01(\x0b\x32&.jina.RequestProto.ControlRequestProtoH\x00\x12\x33\n\x04\x64\x61ta\x18\x03 \x01(\x0b\x32#.jina.RequestProto.DataRequestProtoH\x00\x12!\n\x06header\x18\x04 \x01(\x0b\x32\x11.jina.HeaderProto\x12+\n\nparameters\x18\x05 \x01(\x0b\x32\x17.google.protobuf.Struct\x12 \n\x06routes\x18\x06 \x03(\x0b\x32\x10.jina.RouteProto\x12!\n\x06status\x18\x07 \x01(\x0b\x32\x11.jina.StatusProto\x1a`\n\x10\x44\x61taRequestProto\x12!\n\x04\x64ocs\x18\x01 \x03(\x0b\x32\x13.jina.DocumentProto\x12)\n\x0cgroundtruths\x18\x02 \x03(\x0b\x32\x13.jina.DocumentProto\x1a\xbb\x01\n\x13\x43ontrolRequestProto\x12?\n\x07\x63ommand\x18\x01 \x01(\x0e\x32..jina.RequestProto.ControlRequestProto.Command\"c\n\x07\x43ommand\x12\r\n\tTERMINATE\x10\x00\x12\n\n\x06STATUS\x10\x01\x12\x08\n\x04IDLE\x10\x02\x12\n\n\x06\x43\x41NCEL\x10\x03\x12\t\n\x05SCALE\x10\x04\x12\x0c\n\x08\x41\x43TIVATE\x10\x05\x12\x0e\n\nDEACTIVATE\x10\x06\x42\x06\n\x04\x62ody2?\n\x07JinaRPC\x12\x34\n\x04\x43\x61ll\x12\x12.jina.RequestProto\x1a\x12.jina.RequestProto\"\x00(\x01\x30\x01\x62\x06proto3' + serialized_pb=b'\n\njina.proto\x12\x04jina\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1cgoogle/protobuf/struct.proto\"\xfd\x01\n\x11\x44\x65nseNdArrayProto\x12\x0e\n\x06\x62uffer\x18\x01 \x01(\x0c\x12\r\n\x05shape\x18\x02 \x03(\r\x12\r\n\x05\x64type\x18\x03 \x01(\t\x12>\n\x0cquantization\x18\x04 \x01(\x0e\x32(.jina.DenseNdArrayProto.QuantizationMode\x12\x0f\n\x07max_val\x18\x05 \x01(\x02\x12\x0f\n\x07min_val\x18\x06 \x01(\x02\x12\r\n\x05scale\x18\x07 \x01(\x02\x12\x16\n\x0eoriginal_dtype\x18\x08 \x01(\t\"1\n\x10QuantizationMode\x12\x08\n\x04NONE\x10\x00\x12\x08\n\x04\x46P16\x10\x01\x12\t\n\x05UINT8\x10\x02\"o\n\x0cNdArrayProto\x12(\n\x05\x64\x65nse\x18\x01 \x01(\x0b\x32\x17.jina.DenseNdArrayProtoH\x00\x12*\n\x06sparse\x18\x02 \x01(\x0b\x32\x18.jina.SparseNdArrayProtoH\x00\x42\t\n\x07\x63ontent\"v\n\x12SparseNdArrayProto\x12(\n\x07indices\x18\x01 \x01(\x0b\x32\x17.jina.DenseNdArrayProto\x12\'\n\x06values\x18\x02 \x01(\x0b\x32\x17.jina.DenseNdArrayProto\x12\r\n\x05shape\x18\x03 \x03(\r\"\x7f\n\x0fNamedScoreProto\x12\r\n\x05value\x18\x01 \x01(\x02\x12\x0f\n\x07op_name\x18\x02 \x01(\t\x12\x13\n\x0b\x64\x65scription\x18\x03 \x01(\t\x12\'\n\x08operands\x18\x04 \x03(\x0b\x32\x15.jina.NamedScoreProto\x12\x0e\n\x06ref_id\x18\x05 \x01(\t\"}\n\nGraphProto\x12+\n\tadjacency\x18\x01 \x01(\x0b\x32\x18.jina.SparseNdArrayProto\x12.\n\redge_features\x18\x02 \x01(\x0b\x32\x17.google.protobuf.Struct\x12\x12\n\nundirected\x18\x03 \x01(\x08\"\xdc\x05\n\rDocumentProto\x12\n\n\x02id\x18\x01 \x01(\t\x12\x14\n\x0c\x63ontent_hash\x18\x18 \x01(\t\x12\x13\n\x0bgranularity\x18\x0e \x01(\r\x12\x11\n\tadjacency\x18\x16 \x01(\r\x12\x11\n\tparent_id\x18\x10 \x01(\t\x12\x10\n\x06\x62uffer\x18\x03 \x01(\x0cH\x00\x12\"\n\x04\x62lob\x18\x0c \x01(\x0b\x32\x12.jina.NdArrayProtoH\x00\x12\x0e\n\x04text\x18\r \x01(\tH\x00\x12\r\n\x03uri\x18\t \x01(\tH\x00\x12!\n\x05graph\x18\x1b \x01(\x0b\x32\x10.jina.GraphProtoH\x00\x12#\n\x06\x63hunks\x18\x04 \x03(\x0b\x32\x13.jina.DocumentProto\x12\x0e\n\x06weight\x18\x05 \x01(\x02\x12$\n\x07matches\x18\x08 \x03(\x0b\x32\x13.jina.DocumentProto\x12\x11\n\tmime_type\x18\n \x01(\t\x12%\n\x04tags\x18\x0b \x01(\x0b\x32\x17.google.protobuf.Struct\x12\x10\n\x08location\x18\x11 \x03(\r\x12\x0e\n\x06offset\x18\x12 \x01(\r\x12%\n\tembedding\x18\x13 \x01(\x0b\x32\x12.jina.NdArrayProto\x12/\n\x06scores\x18\x1c \x03(\x0b\x32\x1f.jina.DocumentProto.ScoresEntry\x12\x10\n\x08modality\x18\x15 \x01(\t\x12\x39\n\x0b\x65valuations\x18\x1d \x03(\x0b\x32$.jina.DocumentProto.EvaluationsEntry\x1a\x44\n\x0bScoresEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12$\n\x05value\x18\x02 \x01(\x0b\x32\x15.jina.NamedScoreProto:\x02\x38\x01\x1aI\n\x10\x45valuationsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12$\n\x05value\x18\x02 \x01(\x0b\x32\x15.jina.NamedScoreProto:\x02\x38\x01\x42\t\n\x07\x63ontent\"\xaa\x01\n\nRouteProto\x12\x0b\n\x03pod\x18\x01 \x01(\t\x12\x0e\n\x06pod_id\x18\x02 \x01(\t\x12.\n\nstart_time\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12,\n\x08\x65nd_time\x18\x04 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12!\n\x06status\x18\x05 \x01(\x0b\x32\x11.jina.StatusProto\"\x9a\x01\n\x0eTargetPodProto\x12\x0c\n\x04host\x18\x01 \x01(\t\x12\x0c\n\x04port\x18\x02 \x01(\r\x12\x10\n\x08port_out\x18\x06 \x01(\r\x12\x16\n\x0e\x65xpected_parts\x18\x03 \x01(\r\x12)\n\tout_edges\x18\x04 \x03(\x0b\x32\x16.jina.RoutingEdgeProto\x12\x17\n\x0ftarget_identity\x18\x05 \x01(\t\"5\n\x10RoutingEdgeProto\x12\x0b\n\x03pod\x18\x01 \x01(\t\x12\x14\n\x0csend_as_bind\x18\x02 \x01(\x08\"\x9b\x01\n\x11RoutingTableProto\x12/\n\x04pods\x18\x01 \x03(\x0b\x32!.jina.RoutingTableProto.PodsEntry\x12\x12\n\nactive_pod\x18\x02 \x01(\t\x1a\x41\n\tPodsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12#\n\x05value\x18\x02 \x01(\x0b\x32\x14.jina.TargetPodProto:\x02\x38\x01\"\xc9\x04\n\rEnvelopeProto\x12\x11\n\tsender_id\x18\x01 \x01(\t\x12\x13\n\x0breceiver_id\x18\x02 \x01(\t\x12\x12\n\nrequest_id\x18\x03 \x01(\t\x12\x0f\n\x07timeout\x18\x04 \x01(\r\x12\x31\n\x07version\x18\x06 \x01(\x0b\x32 .jina.EnvelopeProto.VersionProto\x12\x14\n\x0crequest_type\x18\x07 \x01(\t\x12\x15\n\rcheck_version\x18\x08 \x01(\x08\x12<\n\x0b\x63ompression\x18\t \x01(\x0b\x32\'.jina.EnvelopeProto.CompressConfigProto\x12 \n\x06routes\x18\n \x03(\x0b\x32\x10.jina.RouteProto\x12.\n\rrouting_table\x18\r \x01(\x0b\x32\x17.jina.RoutingTableProto\x12!\n\x06status\x18\x0b \x01(\x0b\x32\x11.jina.StatusProto\x12!\n\x06header\x18\x0c \x01(\x0b\x32\x11.jina.HeaderProto\x1a\x38\n\x0cVersionProto\x12\x0c\n\x04jina\x18\x01 \x01(\t\x12\r\n\x05proto\x18\x02 \x01(\t\x12\x0b\n\x03vcs\x18\x03 \x01(\t\x1a{\n\x13\x43ompressConfigProto\x12\x11\n\talgorithm\x18\x01 \x01(\t\x12\x11\n\tmin_bytes\x18\x02 \x01(\x04\x12\x11\n\tmin_ratio\x18\x03 \x01(\x02\x12+\n\nparameters\x18\x04 \x01(\x0b\x32\x17.google.protobuf.Struct\"Q\n\x0bHeaderProto\x12\x15\n\rexec_endpoint\x18\x01 \x01(\t\x12\x15\n\rtarget_peapod\x18\x02 \x01(\t\x12\x14\n\x0cno_propagate\x18\x03 \x01(\x08\"\xcf\x02\n\x0bStatusProto\x12*\n\x04\x63ode\x18\x01 \x01(\x0e\x32\x1c.jina.StatusProto.StatusCode\x12\x13\n\x0b\x64\x65scription\x18\x02 \x01(\t\x12\x33\n\texception\x18\x03 \x01(\x0b\x32 .jina.StatusProto.ExceptionProto\x1aN\n\x0e\x45xceptionProto\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x0c\n\x04\x61rgs\x18\x02 \x03(\t\x12\x0e\n\x06stacks\x18\x03 \x03(\t\x12\x10\n\x08\x65xecutor\x18\x04 \x01(\t\"z\n\nStatusCode\x12\x0b\n\x07SUCCESS\x10\x00\x12\x0b\n\x07PENDING\x10\x01\x12\t\n\x05READY\x10\x02\x12\t\n\x05\x45RROR\x10\x03\x12\x13\n\x0f\x45RROR_DUPLICATE\x10\x04\x12\x14\n\x10\x45RROR_NOTALLOWED\x10\x05\x12\x11\n\rERROR_CHAINED\x10\x06\"Z\n\x0cMessageProto\x12%\n\x08\x65nvelope\x18\x01 \x01(\x0b\x32\x13.jina.EnvelopeProto\x12#\n\x07request\x18\x02 \x01(\x0b\x32\x12.jina.RequestProto\"7\n\x12\x44ocumentArrayProto\x12!\n\x04\x64ocs\x18\x01 \x03(\x0b\x32\x13.jina.DocumentProto\"\xcf\x04\n\x0cRequestProto\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12\x39\n\x07\x63ontrol\x18\x02 \x01(\x0b\x32&.jina.RequestProto.ControlRequestProtoH\x00\x12\x33\n\x04\x64\x61ta\x18\x03 \x01(\x0b\x32#.jina.RequestProto.DataRequestProtoH\x00\x12!\n\x06header\x18\x04 \x01(\x0b\x32\x11.jina.HeaderProto\x12+\n\nparameters\x18\x05 \x01(\x0b\x32\x17.google.protobuf.Struct\x12 \n\x06routes\x18\x06 \x03(\x0b\x32\x10.jina.RouteProto\x12!\n\x06status\x18\x07 \x01(\x0b\x32\x11.jina.StatusProto\x1a`\n\x10\x44\x61taRequestProto\x12!\n\x04\x64ocs\x18\x01 \x03(\x0b\x32\x13.jina.DocumentProto\x12)\n\x0cgroundtruths\x18\x02 \x03(\x0b\x32\x13.jina.DocumentProto\x1a\xbb\x01\n\x13\x43ontrolRequestProto\x12?\n\x07\x63ommand\x18\x01 \x01(\x0e\x32..jina.RequestProto.ControlRequestProto.Command\"c\n\x07\x43ommand\x12\r\n\tTERMINATE\x10\x00\x12\n\n\x06STATUS\x10\x01\x12\x08\n\x04IDLE\x10\x02\x12\n\n\x06\x43\x41NCEL\x10\x03\x12\t\n\x05SCALE\x10\x04\x12\x0c\n\x08\x41\x43TIVATE\x10\x05\x12\x0e\n\nDEACTIVATE\x10\x06\x42\x06\n\x04\x62ody2?\n\x07JinaRPC\x12\x34\n\x04\x43\x61ll\x12\x12.jina.RequestProto\x1a\x12.jina.RequestProto\"\x00(\x01\x30\x01\x62\x06proto3' , dependencies=[google_dot_protobuf_dot_timestamp__pb2.DESCRIPTOR,google_dot_protobuf_dot_struct__pb2.DESCRIPTOR,]) @@ -102,8 +102,8 @@ ], containing_type=None, serialized_options=None, - serialized_start=2973, - serialized_end=3095, + serialized_start=2991, + serialized_end=3113, ) _sym_db.RegisterEnumDescriptor(_STATUSPROTO_STATUSCODE) @@ -152,8 +152,8 @@ ], containing_type=None, serialized_options=None, - serialized_start=3731, - serialized_end=3830, + serialized_start=3749, + serialized_end=3848, ) _sym_db.RegisterEnumDescriptor(_REQUESTPROTO_CONTROLREQUESTPROTO_COMMAND) @@ -772,21 +772,28 @@ is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( - name='expected_parts', full_name='jina.TargetPodProto.expected_parts', index=2, + name='port_out', full_name='jina.TargetPodProto.port_out', index=2, + number=6, type=13, cpp_type=3, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + _descriptor.FieldDescriptor( + name='expected_parts', full_name='jina.TargetPodProto.expected_parts', index=3, number=3, type=13, cpp_type=3, label=1, has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( - name='out_edges', full_name='jina.TargetPodProto.out_edges', index=3, + name='out_edges', full_name='jina.TargetPodProto.out_edges', index=4, number=4, type=11, cpp_type=10, label=3, has_default_value=False, default_value=[], message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( - name='target_identity', full_name='jina.TargetPodProto.target_identity', index=4, + name='target_identity', full_name='jina.TargetPodProto.target_identity', index=5, number=5, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode('utf-8'), message_type=None, enum_type=None, containing_type=None, @@ -805,7 +812,7 @@ oneofs=[ ], serialized_start=1737, - serialized_end=1873, + serialized_end=1891, ) @@ -843,8 +850,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=1875, - serialized_end=1928, + serialized_start=1893, + serialized_end=1946, ) @@ -882,8 +889,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=2021, - serialized_end=2086, + serialized_start=2039, + serialized_end=2104, ) _ROUTINGTABLEPROTO = _descriptor.Descriptor( @@ -920,8 +927,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=1931, - serialized_end=2086, + serialized_start=1949, + serialized_end=2104, ) @@ -966,8 +973,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=2493, - serialized_end=2549, + serialized_start=2511, + serialized_end=2567, ) _ENVELOPEPROTO_COMPRESSCONFIGPROTO = _descriptor.Descriptor( @@ -1018,8 +1025,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=2551, - serialized_end=2674, + serialized_start=2569, + serialized_end=2692, ) _ENVELOPEPROTO = _descriptor.Descriptor( @@ -1126,8 +1133,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=2089, - serialized_end=2674, + serialized_start=2107, + serialized_end=2692, ) @@ -1172,8 +1179,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=2676, - serialized_end=2757, + serialized_start=2694, + serialized_end=2775, ) @@ -1225,8 +1232,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=2893, - serialized_end=2971, + serialized_start=2911, + serialized_end=2989, ) _STATUSPROTO = _descriptor.Descriptor( @@ -1271,8 +1278,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=2760, - serialized_end=3095, + serialized_start=2778, + serialized_end=3113, ) @@ -1310,8 +1317,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=3097, - serialized_end=3187, + serialized_start=3115, + serialized_end=3205, ) @@ -1342,8 +1349,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=3189, - serialized_end=3244, + serialized_start=3207, + serialized_end=3262, ) @@ -1381,8 +1388,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=3544, - serialized_end=3640, + serialized_start=3562, + serialized_end=3658, ) _REQUESTPROTO_CONTROLREQUESTPROTO = _descriptor.Descriptor( @@ -1413,8 +1420,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=3643, - serialized_end=3830, + serialized_start=3661, + serialized_end=3848, ) _REQUESTPROTO = _descriptor.Descriptor( @@ -1491,8 +1498,8 @@ create_key=_descriptor._internal_create_key, fields=[]), ], - serialized_start=3247, - serialized_end=3838, + serialized_start=3265, + serialized_end=3856, ) _DENSENDARRAYPROTO.fields_by_name['quantization'].enum_type = _DENSENDARRAYPROTO_QUANTIZATIONMODE @@ -1784,8 +1791,8 @@ index=0, serialized_options=None, create_key=_descriptor._internal_create_key, - serialized_start=3840, - serialized_end=3903, + serialized_start=3858, + serialized_end=3921, methods=[ _descriptor.MethodDescriptor( name='Call', diff --git a/jina/types/routing/table.py b/jina/types/routing/table.py index 612b35c80bfd4..26f5c0e33a8fa 100644 --- a/jina/types/routing/table.py +++ b/jina/types/routing/table.py @@ -33,6 +33,14 @@ def port(self) -> int: """ return self.proto.port + @property + def port_out(self) -> int: + """Returns the `port` field of this TargetPod + + :return: port + """ + return self.proto.port_out + @property def host(self) -> str: """Returns the `host` field of this TargetPod @@ -49,6 +57,14 @@ def full_address(self) -> str: """ return f'{self.host}:{self.port}' + @property + def full_out_address(self) -> str: + """Return the full zmq adress of the tail of this TargetPod + + :return: address + """ + return f'{self.host}:{self.port_out}' + @property def expected_parts(self) -> int: """Return the `expected_parts` field of this TargetPod @@ -150,6 +166,7 @@ def add_pod(self, pod_name: str, pod: 'BasePod') -> None: target.host = pod.head_host target.port = pod.head_port_in + target.port_out = pod.tail_port_out target.target_identity = pod.head_zmq_identity def _get_target_pod(self, pod: str) -> TargetPod: diff --git a/tests/distributed/test_local_flow_use_remote_executor/__init__.py b/tests/distributed/test_local_flow_use_remote_executor/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/tests/distributed/test_local_flow_use_remote_executor/docker-compose.yml b/tests/distributed/test_local_flow_use_remote_executor/docker-compose.yml new file mode 100644 index 0000000000000..914712eb230d6 --- /dev/null +++ b/tests/distributed/test_local_flow_use_remote_executor/docker-compose.yml @@ -0,0 +1,25 @@ +version: "3.3" +services: + external-executor: + image: jinaai/jina:test-pip + environment: + JINA_LOG_LEVEL: DEBUG + container_name: test_external_executor + ports: + - "8001:8000" + - "45678:45678" + expose: + - 10000-60000 + networks: + test: + ipv4_address: 10.1.0.100 + volumes: + - /var/run/docker.sock:/var/run/docker.sock + entrypoint: "jina pod --port-in 45678 --port-out 45679 --dynamic-routing-in --dynamic-routing-out --dynamic-routing" +networks: + test: + driver: bridge + ipam: + driver: default + config: + - subnet: 10.1.0.0/24 diff --git a/tests/distributed/test_local_flow_use_remote_executor/test_integration.py b/tests/distributed/test_local_flow_use_remote_executor/test_integration.py new file mode 100644 index 0000000000000..167022be9096f --- /dev/null +++ b/tests/distributed/test_local_flow_use_remote_executor/test_integration.py @@ -0,0 +1,58 @@ +import os + +import pytest +import numpy as np + +from jina import Flow, Document +from jina.parsers import set_pod_parser + +cur_dir = os.path.dirname(os.path.abspath(__file__)) +compose_yml = os.path.join(cur_dir, 'docker-compose.yml') + + +@pytest.fixture +def external_pod_args(): + args = ['--port-in', str(45678), '--port-out', str(45679)] + args = vars(set_pod_parser().parse_args(args)) + del args['external'] + del args['pod_role'] + del args['host'] + return args + + +@pytest.fixture +def local_flow(external_pod_args): + return Flow().add(**external_pod_args, host='10.1.0.100', external=True) + + +@pytest.fixture +def documents_to_index(): + image = np.random.random((50, 50)) + return [Document(content=image) for i in range(200)] + + +@pytest.fixture +def patched_remote_local_connection(monkeypatch): + def alternative_remote_local_connection(first, second): + if first == '10.1.0.100': + return True + else: + return False + + monkeypatch.setattr( + 'jina.flow.base.is_remote_local_connection', + lambda x, y: alternative_remote_local_connection(x, y), + ) + + +@pytest.mark.parametrize('docker_compose', [compose_yml], indirect=['docker_compose']) +def test_local_flow_use_external_executor( + local_flow, documents_to_index, patched_remote_local_connection, docker_compose +): + with local_flow as f: + responses = f.index( + inputs=documents_to_index, return_results=True, request_size=100 + ) + assert len(responses) == 2 + for resp in responses: + assert len(resp.docs) == 100 diff --git a/tests/unit/types/test_routing_graph.py b/tests/unit/types/test_routing_graph.py index f540eb282863a..14900feb4ca06 100644 --- a/tests/unit/types/test_routing_graph.py +++ b/tests/unit/types/test_routing_graph.py @@ -2,15 +2,16 @@ class PodInterface: - def __init__(self, host, port): + def __init__(self, host, port, port_out): self.head_host = host self.head_port_in = port + self.tail_port_out = port_out self.head_zmq_identity = '' def test_single_routing(): graph = RoutingTable() - graph.add_pod('pod0', PodInterface('0.0.0.0', 1230)) + graph.add_pod('pod0', PodInterface('0.0.0.0', 1230, 1233)) graph.active_pod = 'pod0' next_routes = graph.get_next_targets() @@ -19,8 +20,8 @@ def test_single_routing(): def test_simple_routing(): graph = RoutingTable() - graph.add_pod('pod0', PodInterface('0.0.0.0', 1230)) - graph.add_pod('pod1', PodInterface('0.0.0.0', 1231)) + graph.add_pod('pod0', PodInterface('0.0.0.0', 1230, 1232)) + graph.add_pod('pod1', PodInterface('0.0.0.0', 1231, 1233)) graph.add_edge('pod0', 'pod1') graph.active_pod = 'pod0' next_routes = graph.get_next_targets() @@ -31,10 +32,10 @@ def test_simple_routing(): def test_double_routing(): graph = RoutingTable() - graph.add_pod('pod0', PodInterface('0.0.0.0', 1230)) - graph.add_pod('pod1', PodInterface('0.0.0.0', 1231)) - graph.add_pod('pod2', PodInterface('0.0.0.0', 1232)) - graph.add_pod('pod3', PodInterface('0.0.0.0', 1233)) + graph.add_pod('pod0', PodInterface('0.0.0.0', 1230, 1234)) + graph.add_pod('pod1', PodInterface('0.0.0.0', 1231, 1235)) + graph.add_pod('pod2', PodInterface('0.0.0.0', 1232, 1236)) + graph.add_pod('pod3', PodInterface('0.0.0.0', 1233, 1237)) graph.add_edge('pod0', 'pod1') graph.add_edge('pod0', 'pod2') graph.add_edge('pod1', 'pod3') @@ -49,11 +50,11 @@ def test_double_routing(): def test_nested_routing(): graph = RoutingTable() - graph.add_pod('pod0', PodInterface('0.0.0.0', 1230)) - graph.add_pod('pod1', PodInterface('0.0.0.0', 1231)) - graph.add_pod('pod2', PodInterface('0.0.0.0', 1232)) - graph.add_pod('pod3', PodInterface('0.0.0.0', 1233)) - graph.add_pod('pod4', PodInterface('0.0.0.0', 1233)) + graph.add_pod('pod0', PodInterface('0.0.0.0', 1230, 1234)) + graph.add_pod('pod1', PodInterface('0.0.0.0', 1231, 1235)) + graph.add_pod('pod2', PodInterface('0.0.0.0', 1232, 1236)) + graph.add_pod('pod3', PodInterface('0.0.0.0', 1233, 1237)) + graph.add_pod('pod4', PodInterface('0.0.0.0', 1233, 1238)) graph.add_edge('pod0', 'pod1') graph.add_edge('pod0', 'pod2') graph.add_edge('pod1', 'pod3') @@ -92,11 +93,11 @@ def test_nested_routing(): def test_topological_sorting(): graph = RoutingTable() - graph.add_pod('pod0', PodInterface('0.0.0.0', 1230)) - graph.add_pod('pod1', PodInterface('0.0.0.0', 1231)) - graph.add_pod('pod2', PodInterface('0.0.0.0', 1232)) - graph.add_pod('pod3', PodInterface('0.0.0.0', 1233)) - graph.add_pod('pod4', PodInterface('0.0.0.0', 1233)) + graph.add_pod('pod0', PodInterface('0.0.0.0', 1230, 1234)) + graph.add_pod('pod1', PodInterface('0.0.0.0', 1231, 1235)) + graph.add_pod('pod2', PodInterface('0.0.0.0', 1232, 1236)) + graph.add_pod('pod3', PodInterface('0.0.0.0', 1233, 1237)) + graph.add_pod('pod4', PodInterface('0.0.0.0', 1233, 1238)) graph.add_edge('pod0', 'pod1') graph.add_edge('pod0', 'pod2') graph.add_edge('pod1', 'pod3') @@ -114,8 +115,8 @@ def test_topological_sorting(): def test_cycle(): graph = RoutingTable() - graph.add_pod('pod0', PodInterface('0.0.0.0', 1230)) - graph.add_pod('pod1', PodInterface('0.0.0.0', 1231)) + graph.add_pod('pod0', PodInterface('0.0.0.0', 1230, 1232)) + graph.add_pod('pod1', PodInterface('0.0.0.0', 1231, 1233)) graph.add_edge('pod0', 'pod1') graph.add_edge('pod1', 'pod0') graph.active_pod = 'pod0' @@ -124,11 +125,11 @@ def test_cycle(): def test_no_cycle(): graph = RoutingTable() - graph.add_pod('pod0', PodInterface('0.0.0.0', 1230)) - graph.add_pod('pod1', PodInterface('0.0.0.0', 1231)) - graph.add_pod('pod2', PodInterface('0.0.0.0', 1232)) - graph.add_pod('pod3', PodInterface('0.0.0.0', 1233)) - graph.add_pod('pod4', PodInterface('0.0.0.0', 1233)) + graph.add_pod('pod0', PodInterface('0.0.0.0', 1230, 1234)) + graph.add_pod('pod1', PodInterface('0.0.0.0', 1231, 1235)) + graph.add_pod('pod2', PodInterface('0.0.0.0', 1232, 1236)) + graph.add_pod('pod3', PodInterface('0.0.0.0', 1233, 1237)) + graph.add_pod('pod4', PodInterface('0.0.0.0', 1233, 1238)) graph.add_edge('pod2', 'pod1') graph.add_edge('pod1', 'pod0') graph.add_edge('pod0', 'pod3')