Skip to content

Commit

Permalink
fix: fix setting extra host_connect_in (#2973)
Browse files Browse the repository at this point in the history
* feat: add local flow use remote executor test

* test: add docker compose

* test: add endpoint

* test: assert with return results

* feat: add static ip address

* fix: fix setting extra host_connect_in

* test: add tests to have remote pod working

Co-authored-by: bwanglzu <bo.wang@jina.ai>
  • Loading branch information
JoanFM and bwanglzu committed Jul 21, 2021
1 parent 99479dc commit 32dc3dc
Show file tree
Hide file tree
Showing 12 changed files with 187 additions and 68 deletions.
2 changes: 1 addition & 1 deletion jina/__init__.py
Expand Up @@ -36,7 +36,7 @@

# do not change this line manually
# this is managed by proto/build-proto.sh and updated on every execution
__proto_version__ = '0.0.84'
__proto_version__ = '0.0.85'

__uptime__ = _datetime.datetime.now().isoformat()

Expand Down
2 changes: 1 addition & 1 deletion jina/flow/base.py
Expand Up @@ -806,7 +806,7 @@ def _get_routing_table(self) -> RoutingTable:
start_pod = graph._get_target_pod(start)
if is_remote_local_connection(start_pod.host, pod.head_host):
pod.head_args.hosts_in_connect.append(
graph._get_target_pod(start).full_address
graph._get_target_pod(start).full_out_address
)

graph.add_edge(start, end, True)
Expand Down
1 change: 1 addition & 0 deletions jina/peapods/networking.py
Expand Up @@ -13,6 +13,7 @@ def is_remote_local_connection(first: str, second: str):
:param second: the ip or host name of the second runtime
:return: True, if first is remote and second is local
"""

try:
first_ip = ipaddress.ip_address(first)
first_global = first_ip.is_global
Expand Down
7 changes: 7 additions & 0 deletions jina/peapods/pods/__init__.py
Expand Up @@ -160,6 +160,13 @@ def head_port_in(self):
"""
return self.head_args.port_in

@property
def tail_port_out(self):
"""Get the port_out of the TailPea of this pod
.. # noqa: DAR201
"""
return self.tail_args.port_out

@property
def head_zmq_identity(self):
"""Get the zmq_identity of the HeadPea of this pod
Expand Down
6 changes: 4 additions & 2 deletions jina/peapods/zmq/__init__.py
Expand Up @@ -216,7 +216,6 @@ def _init_sockets(self) -> Tuple:
for address in self.args.hosts_in_connect:
if in_connect is None:
host, port = address.split(':')

in_connect, _ = _init_socket(
ctx,
host,
Expand Down Expand Up @@ -286,6 +285,7 @@ def print_stats(self):
)

def _init_dynamic_out_socket(self, host_out, port_out):

out_sock, _ = _init_socket(
self.ctx,
host_out,
Expand Down Expand Up @@ -452,7 +452,9 @@ async def recv_message(
:return: Received protobuf message. Or None in case of any error.
"""
try:
msg = await recv_message_async(self.in_sock, **self.send_recv_kwargs)
msg = await recv_message_async(
self.in_connect_sock or self.in_sock, **self.send_recv_kwargs
)
self.msg_recv += 1
if msg is not None:
self.bytes_recv += msg.size
Expand Down
1 change: 1 addition & 0 deletions jina/proto/jina.proto
Expand Up @@ -161,6 +161,7 @@ message RouteProto {
message TargetPodProto{
string host = 1; // the host HeadPea of the BasePod
uint32 port = 2; // the port HeadPea of the BasePod
uint32 port_out = 6; // the port TailPea of the BasePod
uint32 expected_parts = 3; // the number of parts the pod should expect
repeated RoutingEdgeProto out_edges = 4; // pod_name of Pods, the TailPea should send traffic to
string target_identity = 5;
Expand Down
85 changes: 46 additions & 39 deletions jina/proto/jina_pb2.py

Large diffs are not rendered by default.

17 changes: 17 additions & 0 deletions jina/types/routing/table.py
Expand Up @@ -33,6 +33,14 @@ def port(self) -> int:
"""
return self.proto.port

@property
def port_out(self) -> int:
"""Returns the `port` field of this TargetPod
:return: port
"""
return self.proto.port_out

@property
def host(self) -> str:
"""Returns the `host` field of this TargetPod
Expand All @@ -49,6 +57,14 @@ def full_address(self) -> str:
"""
return f'{self.host}:{self.port}'

@property
def full_out_address(self) -> str:
"""Return the full zmq adress of the tail of this TargetPod
:return: address
"""
return f'{self.host}:{self.port_out}'

@property
def expected_parts(self) -> int:
"""Return the `expected_parts` field of this TargetPod
Expand Down Expand Up @@ -150,6 +166,7 @@ def add_pod(self, pod_name: str, pod: 'BasePod') -> None:

target.host = pod.head_host
target.port = pod.head_port_in
target.port_out = pod.tail_port_out
target.target_identity = pod.head_zmq_identity

def _get_target_pod(self, pod: str) -> TargetPod:
Expand Down
Empty file.
@@ -0,0 +1,25 @@
version: "3.3"
services:
external-executor:
image: jinaai/jina:test-pip
environment:
JINA_LOG_LEVEL: DEBUG
container_name: test_external_executor
ports:
- "8001:8000"
- "45678:45678"
expose:
- 10000-60000
networks:
test:
ipv4_address: 10.1.0.100
volumes:
- /var/run/docker.sock:/var/run/docker.sock
entrypoint: "jina pod --port-in 45678 --port-out 45679 --dynamic-routing-in --dynamic-routing-out --dynamic-routing"
networks:
test:
driver: bridge
ipam:
driver: default
config:
- subnet: 10.1.0.0/24
@@ -0,0 +1,58 @@
import os

import pytest
import numpy as np

from jina import Flow, Document
from jina.parsers import set_pod_parser

cur_dir = os.path.dirname(os.path.abspath(__file__))
compose_yml = os.path.join(cur_dir, 'docker-compose.yml')


@pytest.fixture
def external_pod_args():
args = ['--port-in', str(45678), '--port-out', str(45679)]
args = vars(set_pod_parser().parse_args(args))
del args['external']
del args['pod_role']
del args['host']
return args


@pytest.fixture
def local_flow(external_pod_args):
return Flow().add(**external_pod_args, host='10.1.0.100', external=True)


@pytest.fixture
def documents_to_index():
image = np.random.random((50, 50))
return [Document(content=image) for i in range(200)]


@pytest.fixture
def patched_remote_local_connection(monkeypatch):
def alternative_remote_local_connection(first, second):
if first == '10.1.0.100':
return True
else:
return False

monkeypatch.setattr(
'jina.flow.base.is_remote_local_connection',
lambda x, y: alternative_remote_local_connection(x, y),
)


@pytest.mark.parametrize('docker_compose', [compose_yml], indirect=['docker_compose'])
def test_local_flow_use_external_executor(
local_flow, documents_to_index, patched_remote_local_connection, docker_compose
):
with local_flow as f:
responses = f.index(
inputs=documents_to_index, return_results=True, request_size=100
)
assert len(responses) == 2
for resp in responses:
assert len(resp.docs) == 100
51 changes: 26 additions & 25 deletions tests/unit/types/test_routing_graph.py
Expand Up @@ -2,15 +2,16 @@


class PodInterface:
def __init__(self, host, port):
def __init__(self, host, port, port_out):
self.head_host = host
self.head_port_in = port
self.tail_port_out = port_out
self.head_zmq_identity = ''


def test_single_routing():
graph = RoutingTable()
graph.add_pod('pod0', PodInterface('0.0.0.0', 1230))
graph.add_pod('pod0', PodInterface('0.0.0.0', 1230, 1233))
graph.active_pod = 'pod0'
next_routes = graph.get_next_targets()

Expand All @@ -19,8 +20,8 @@ def test_single_routing():

def test_simple_routing():
graph = RoutingTable()
graph.add_pod('pod0', PodInterface('0.0.0.0', 1230))
graph.add_pod('pod1', PodInterface('0.0.0.0', 1231))
graph.add_pod('pod0', PodInterface('0.0.0.0', 1230, 1232))
graph.add_pod('pod1', PodInterface('0.0.0.0', 1231, 1233))
graph.add_edge('pod0', 'pod1')
graph.active_pod = 'pod0'
next_routes = graph.get_next_targets()
Expand All @@ -31,10 +32,10 @@ def test_simple_routing():

def test_double_routing():
graph = RoutingTable()
graph.add_pod('pod0', PodInterface('0.0.0.0', 1230))
graph.add_pod('pod1', PodInterface('0.0.0.0', 1231))
graph.add_pod('pod2', PodInterface('0.0.0.0', 1232))
graph.add_pod('pod3', PodInterface('0.0.0.0', 1233))
graph.add_pod('pod0', PodInterface('0.0.0.0', 1230, 1234))
graph.add_pod('pod1', PodInterface('0.0.0.0', 1231, 1235))
graph.add_pod('pod2', PodInterface('0.0.0.0', 1232, 1236))
graph.add_pod('pod3', PodInterface('0.0.0.0', 1233, 1237))
graph.add_edge('pod0', 'pod1')
graph.add_edge('pod0', 'pod2')
graph.add_edge('pod1', 'pod3')
Expand All @@ -49,11 +50,11 @@ def test_double_routing():

def test_nested_routing():
graph = RoutingTable()
graph.add_pod('pod0', PodInterface('0.0.0.0', 1230))
graph.add_pod('pod1', PodInterface('0.0.0.0', 1231))
graph.add_pod('pod2', PodInterface('0.0.0.0', 1232))
graph.add_pod('pod3', PodInterface('0.0.0.0', 1233))
graph.add_pod('pod4', PodInterface('0.0.0.0', 1233))
graph.add_pod('pod0', PodInterface('0.0.0.0', 1230, 1234))
graph.add_pod('pod1', PodInterface('0.0.0.0', 1231, 1235))
graph.add_pod('pod2', PodInterface('0.0.0.0', 1232, 1236))
graph.add_pod('pod3', PodInterface('0.0.0.0', 1233, 1237))
graph.add_pod('pod4', PodInterface('0.0.0.0', 1233, 1238))
graph.add_edge('pod0', 'pod1')
graph.add_edge('pod0', 'pod2')
graph.add_edge('pod1', 'pod3')
Expand Down Expand Up @@ -92,11 +93,11 @@ def test_nested_routing():

def test_topological_sorting():
graph = RoutingTable()
graph.add_pod('pod0', PodInterface('0.0.0.0', 1230))
graph.add_pod('pod1', PodInterface('0.0.0.0', 1231))
graph.add_pod('pod2', PodInterface('0.0.0.0', 1232))
graph.add_pod('pod3', PodInterface('0.0.0.0', 1233))
graph.add_pod('pod4', PodInterface('0.0.0.0', 1233))
graph.add_pod('pod0', PodInterface('0.0.0.0', 1230, 1234))
graph.add_pod('pod1', PodInterface('0.0.0.0', 1231, 1235))
graph.add_pod('pod2', PodInterface('0.0.0.0', 1232, 1236))
graph.add_pod('pod3', PodInterface('0.0.0.0', 1233, 1237))
graph.add_pod('pod4', PodInterface('0.0.0.0', 1233, 1238))
graph.add_edge('pod0', 'pod1')
graph.add_edge('pod0', 'pod2')
graph.add_edge('pod1', 'pod3')
Expand All @@ -114,8 +115,8 @@ def test_topological_sorting():

def test_cycle():
graph = RoutingTable()
graph.add_pod('pod0', PodInterface('0.0.0.0', 1230))
graph.add_pod('pod1', PodInterface('0.0.0.0', 1231))
graph.add_pod('pod0', PodInterface('0.0.0.0', 1230, 1232))
graph.add_pod('pod1', PodInterface('0.0.0.0', 1231, 1233))
graph.add_edge('pod0', 'pod1')
graph.add_edge('pod1', 'pod0')
graph.active_pod = 'pod0'
Expand All @@ -124,11 +125,11 @@ def test_cycle():

def test_no_cycle():
graph = RoutingTable()
graph.add_pod('pod0', PodInterface('0.0.0.0', 1230))
graph.add_pod('pod1', PodInterface('0.0.0.0', 1231))
graph.add_pod('pod2', PodInterface('0.0.0.0', 1232))
graph.add_pod('pod3', PodInterface('0.0.0.0', 1233))
graph.add_pod('pod4', PodInterface('0.0.0.0', 1233))
graph.add_pod('pod0', PodInterface('0.0.0.0', 1230, 1234))
graph.add_pod('pod1', PodInterface('0.0.0.0', 1231, 1235))
graph.add_pod('pod2', PodInterface('0.0.0.0', 1232, 1236))
graph.add_pod('pod3', PodInterface('0.0.0.0', 1233, 1237))
graph.add_pod('pod4', PodInterface('0.0.0.0', 1233, 1238))
graph.add_edge('pod2', 'pod1')
graph.add_edge('pod1', 'pod0')
graph.add_edge('pod0', 'pod3')
Expand Down

0 comments on commit 32dc3dc

Please sign in to comment.