Skip to content

Commit

Permalink
test: added remote local test (#2853)
Browse files Browse the repository at this point in the history
* test: added remote local test

* fix: better error handling
  • Loading branch information
maximilianwerk committed Jul 8, 2021
1 parent 14cbb0d commit a6a2647
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 11 deletions.
6 changes: 5 additions & 1 deletion jina/peapods/zmq/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ def __init__(
self.msg_sent = 0
self.is_closed = False
self.is_polling_paused = False
self.in_sock_type = None
self.out_sock_type = None
self.ctrl_sock_type = None
self.opened_socks = [] # this must be here for `close()`
(
self.ctx,
Expand Down Expand Up @@ -785,7 +788,8 @@ def _parse_from_frames(sock_type, frames: List[bytes]) -> 'Message':
frames = [b' '] + frames
elif sock_type == zmq.ROUTER:
# the router appends dealer id when receive it, we need to remove it
frames.pop(0)
if len(frames) == 4:
frames.pop(0)

return Message(frames[1], frames[2])

Expand Down
66 changes: 59 additions & 7 deletions tests/unit/peapods/zmq/test_dynamic_routing.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import logging
import threading
import time

import pytest
import multiprocessing
from google.protobuf import json_format

from jina.logging.logger import JinaLogger
from jina.helper import random_identity
from jina.parsers import set_pea_parser
from jina.peapods.zmq import Zmqlet, AsyncZmqlet, ZmqStreamlet
Expand All @@ -25,7 +24,7 @@ def get_args():
'--socket-in',
'ROUTER_BIND',
'--socket-out',
'DEALER_CONNECT',
'ROUTER_BIND',
'--timeout-ctrl',
'-1',
'--dynamic-routing-out',
Expand All @@ -41,7 +40,7 @@ def test_simple_dynamic_routing_zmqlet():
args1 = get_args()
args2 = get_args()

logger = logging.getLogger('zmq-test')
logger = JinaLogger('zmq-test')
with Zmqlet(args1, logger) as z1, Zmqlet(args2, logger) as z2:
assert z1.msg_sent == 0
assert z1.msg_recv == 0
Expand Down Expand Up @@ -91,7 +90,7 @@ def test_double_dynamic_routing_zmqlet():
args2 = get_args()
args3 = get_args()

logger = logging.getLogger('zmq-test')
logger = JinaLogger('zmq-test')
with Zmqlet(args1, logger) as z1, Zmqlet(args2, logger) as z2, Zmqlet(
args3, logger
) as z3:
Expand Down Expand Up @@ -157,7 +156,7 @@ async def test_double_dynamic_routing_async_zmqlet():
args2 = get_args()
args3 = get_args()

logger = logging.getLogger('zmq-test')
logger = JinaLogger('zmq-test')
with AsyncZmqlet(args1, logger) as z1, AsyncZmqlet(
args2, logger
) as z2, AsyncZmqlet(args3, logger) as z3:
Expand Down Expand Up @@ -215,7 +214,7 @@ def test_double_dynamic_routing_zmqstreamlet():
args2 = get_args()
args3 = get_args()

logger = logging.getLogger('zmq-test')
logger = JinaLogger('zmq-test')
with ZmqStreamlet(
args=args1, ready_event=multiprocessing.Event(), logger=logger
) as z1, ZmqStreamlet(
Expand Down Expand Up @@ -274,3 +273,56 @@ def test_double_dynamic_routing_zmqstreamlet():
assert z2.msg_recv == number_messages
assert z3.msg_sent == 0
assert z3.msg_recv == number_messages


def test_remote_local_dynamic_routing_zmqlet():
args1 = get_args()

args2 = get_args()
args2.zmq_identity = 'test-identity'
args2.hosts_in_connect = [f'{args1.host}:{args1.port_out}']

logger = JinaLogger('zmq-test')
with Zmqlet(args1, logger) as z1, Zmqlet(args2, logger) as z2:
assert z1.msg_sent == 0
assert z1.msg_recv == 0
assert z2.msg_sent == 0
assert z2.msg_recv == 0
req = jina_pb2.RequestProto()
req.request_id = random_identity()
d = req.data.docs.add()
d.tags['id'] = 2
msg = Message(None, req, 'tmp', '')
routing_pb = jina_pb2.RoutingTableProto()
routing_table = {
'active_pod': 'pod1',
'pods': {
'pod1': {
'host': '0.0.0.0',
'port': args1.port_in,
'expected_parts': 0,
'out_edges': [{'pod': 'pod2', 'send_as_bind': True}],
},
'pod2': {
'host': '0.0.0.0',
'port': args2.port_in,
'expected_parts': 1,
'out_edges': [],
'target_identity': args2.zmq_identity,
},
},
}
json_format.ParseDict(routing_table, routing_pb)
msg.envelope.routing_table.CopyFrom(routing_pb)
z2.recv_message(callback)

assert z2.msg_sent == 0
assert z2.msg_recv == 0

z1.send_message(msg)
z2.recv_message(callback)

assert z1.msg_sent == 1
assert z1.msg_recv == 0
assert z2.msg_sent == 0
assert z2.msg_recv == 1
5 changes: 2 additions & 3 deletions tests/unit/test_echostream.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import logging

import pytest

from jina import Flow
from jina.logging.logger import JinaLogger
from jina.parsers import set_pea_parser
from jina.peapods.peas import BasePea
from jina.peapods.zmq import Zmqlet
Expand Down Expand Up @@ -47,7 +46,7 @@ def test_simple_zmqlet():
]
)

logger = logging.getLogger('zmq-test')
logger = JinaLogger('zmq-test')
with BasePea(args2), Zmqlet(args, logger) as z:
req = jina_pb2.RequestProto()
req.request_id = random_identity()
Expand Down

0 comments on commit a6a2647

Please sign in to comment.