Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
jadbin committed Nov 10, 2016
1 parent e7e374c commit 381da0e
Show file tree
Hide file tree
Showing 17 changed files with 492 additions and 553 deletions.
2 changes: 1 addition & 1 deletion conf/agent.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
agent_server_listen: 0.0.0.0:7340

proxy_checker:
proxy_checkers:
4 changes: 2 additions & 2 deletions conf/master.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
master_rpc_listen: 0.0.0.0:7310
mqcache_rpc_listen: 0.0.0.0:7311
unikafka_listen: 0.0.0.0:7311

kafka_addr: {KAFKA_IP}:9092
zookeeper_addr: {ZOOKEEPER_IP}:2181
mongo_addr: mongodb://{MONGO_USER}:{USER_PWD}@{MONGO_IP}:27017
mqcache_rpc_addr: {MASTER_IP}:7311
unikafka_addr: {MASTER_IP}:7311
2 changes: 1 addition & 1 deletion tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ def stop_loop():
server.register_function(get_task_info)
server.register_function(get_task_progress)
server.register_function(get_running_tasks)
server.serve_forever()
server.start()
t = threading.Thread(target=run)
t.start()
wait_server_start(d.rpc_addr)
Expand Down
187 changes: 0 additions & 187 deletions tests/test_mqcache.py

This file was deleted.

68 changes: 11 additions & 57 deletions tests/test_rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,38 +11,6 @@
from .helpers import wait_server_start


@pytest.fixture(scope="function")
def server_without_loop(request):
def handle(*args, **kw):
pass

def run():
asyncio.set_event_loop(loop)
server = RpcServer("0.0.0.0:7350")
server.register_function(handle)
server.serve_forever()
try:
loop.run_forever()
except Exception:
pass
finally:
loop.close()

def stop_loop():
loop.call_soon_threadsafe(loop.stop)

loop = asyncio.new_event_loop()
t = threading.Thread(target=run)
t.start()
wait_server_start("127.0.0.1:7350")
request.addfinalizer(stop_loop)


def test_start_server_without_loop(server_without_loop):
client = RpcClient("127.0.0.1:7350")
assert client.handle() is None


@pytest.fixture(scope="class")
def rpc_server(request):
def handle(_host, *args, **kw):
Expand Down Expand Up @@ -80,71 +48,57 @@ def stop_loop():
server.register_function(async_handle, "rpc.async_handle")
server.register_function(divide_zero)
server.register_function(return_none)
server.serve_forever()
server.start()
t = threading.Thread(target=run)
t.start()
wait_server_start("127.0.0.1:7351")
request.addfinalizer(stop_loop)


@pytest.fixture(scope="class")
def rpc_client():
return RpcClient("127.0.0.1:7351")


class TestRpc:
rpc_addr = "127.0.0.1:7351"

def test_call_function(self, rpc_server, rpc_client):
def test_call_function(self, rpc_server):
async def _test():
async_rpc_client = RpcClient(self.rpc_addr, loop=loop)
res = await async_rpc_client.rpc.handle(0, "str", b"bytes", pi=3.141592657, dict={"list": [0, 1.0, "2", b"3"]})
assert res == ((0, "str", b"bytes"), {"pi": 3.141592657, "dict": {"list": [0, 1.0, "2", b"3"]}})
res = await async_rpc_client.rpc.handle(0, "str", pi=3.141592657, dict={"list": [0, 1.0, "2"]})
assert res == ((0, "str"), {"pi": 3.141592657, "dict": {"list": [0, 1.0, "2"]}})

assert rpc_client.rpc.handle(0, "str", b"bytes", pi=3.141592657, dict={"list": [0, 1.0, "2", b"3"]}) == \
((0, "str", b"bytes"), {"pi": 3.141592657, "dict": {"list": [0, 1.0, "2", b"3"]}})
loop = asyncio.new_event_loop()
loop.run_until_complete(_test())

def test_call_async_function(self, rpc_server, rpc_client):
def test_call_async_function(self, rpc_server):
async def _test():
async_rpc_client = RpcClient(self.rpc_addr, loop=loop)
res = await async_rpc_client.rpc.async_handle(0, "str", b"bytes", pi=3.141592657, dict={"list": [0, 1.0, "2", b"3"]})
assert res == ((0, "str", b"bytes"), {"pi": 3.141592657, "dict": {"list": [0, 1.0, "2", b"3"]}})
res = await async_rpc_client.rpc.async_handle(0, "str", pi=3.141592657, dict={"list": [0, 1.0, "2"]})
assert res == ((0, "str"), {"pi": 3.141592657, "dict": {"list": [0, 1.0, "2"]}})

assert rpc_client.rpc.async_handle(0, "str", b"bytes", pi=3.141592657, dict={"list": [0, 1.0, "2", b"3"]}) == \
((0, "str", b"bytes"), {"pi": 3.141592657, "dict": {"list": [0, 1.0, "2", b"3"]}})
loop = asyncio.new_event_loop()
loop.run_until_complete(_test())

def test_raise_error(self, rpc_server, rpc_client):
def test_raise_error(self, rpc_server):
async def _test():
async_rpc_client = RpcClient(self.rpc_addr, loop=loop)
with pytest.raises(ZeroDivisionError):
with pytest.raises(RpcError):
await async_rpc_client.divide_zero(1)

with pytest.raises(ZeroDivisionError):
rpc_client.divide_zero(1)
loop = asyncio.new_event_loop()
loop.run_until_complete(_test())

def test_method_not_found(self, rpc_server, rpc_client):
def test_method_not_found(self, rpc_server):
async def _test():
async_rpc_client = RpcClient(self.rpc_addr, loop=loop)
with pytest.raises(RpcError):
await async_rpc_client.handle()

with pytest.raises(RpcError):
rpc_client.handle()
loop = asyncio.new_event_loop()
loop.run_until_complete(_test())

def test_return_none(self, rpc_server, rpc_client):
def test_return_none(self, rpc_server):
async def _test():
async_rpc_client = RpcClient(self.rpc_addr, loop=loop)
res = async_rpc_client.return_none()
await res is None

assert rpc_client.return_none() is None
loop = asyncio.new_event_loop()
loop.run_until_complete(_test())
Loading

0 comments on commit 381da0e

Please sign in to comment.