Skip to content
This repository has been archived by the owner on Aug 2, 2023. It is now read-only.

Commit

Permalink
test: Update pytest and refactor test codes (#64)
Browse files Browse the repository at this point in the history
* Let it spawn the agent subprocess only for integration tests

* Upgrade pytest and plugins with explicit version ranges for
  compatibility.
  • Loading branch information
achimnol committed Feb 13, 2018
1 parent 91d8d08 commit 6fc0929
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 95 deletions.
6 changes: 3 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,16 @@ def read_src_version():
]
test_requires = [
'aiodocker',
'pytest>=3.1',
'pytest-asyncio',
'pytest>=3.4.0',
'pytest-asyncio>=0.8.0',
'pytest-aiohttp',
'pytest-cov',
'pytest-mock',
'codecov',
'flake8',
]
dev_requires = build_requires + test_requires + [
'pytest-sugar',
'pytest-sugar>=0.9.1',
]
ci_requires = []
monitor_requires = [
Expand Down
133 changes: 80 additions & 53 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import pathlib
import re
import secrets
import shutil
import signal
import subprocess
import tempfile
Expand Down Expand Up @@ -134,23 +135,6 @@ def finalize_db():
conn.close()
engine.dispose()

# Launch an agent daemon
agent_proc = subprocess.Popen([
'python', '-m', 'ai.backend.agent.server',
'--etcd-addr', str(etcd_addr),
'--namespace', test_ns,
'--scratch-root', '/tmp/scratches',
], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)

def finalize_agent():
agent_proc.terminate()
try:
agent_proc.wait(timeout=5.0)
except subprocess.TimeoutExpired:
agent_proc.kill()

request.addfinalizer(finalize_agent)


class Client:
def __init__(self, session, url):
Expand Down Expand Up @@ -200,7 +184,7 @@ def ws_connect(self, path, **kwargs):


@pytest.fixture
async def pre_app(event_loop, test_ns, test_db, unused_tcp_port):
async def app(event_loop, test_ns, test_db, unused_tcp_port):
""" For tests that do not require actual server running.
"""
app = web.Application(loop=event_loop)
Expand Down Expand Up @@ -231,9 +215,9 @@ async def pre_app(event_loop, test_ns, test_db, unused_tcp_port):


@pytest.fixture
async def default_keypair(event_loop, pre_app):
async def default_keypair(event_loop, app):
access_key = 'AKIAIOSFODNN7EXAMPLE'
config = pre_app['config']
config = app['config']
pool = await create_engine(
host=config.db_addr[0], port=config.db_addr[1],
user=config.db_user, password=config.db_password,
Expand All @@ -255,9 +239,9 @@ async def default_keypair(event_loop, pre_app):


@pytest.fixture
async def user_keypair(event_loop, pre_app):
async def user_keypair(event_loop, app):
access_key = 'AKIANOTADMIN7EXAMPLE'
config = pre_app['config']
config = app['config']
pool = await create_engine(
host=config.db_addr[0], port=config.db_addr[1],
user=config.db_user, password=config.db_password,
Expand All @@ -279,12 +263,12 @@ async def user_keypair(event_loop, pre_app):


@pytest.fixture
def get_headers(pre_app, default_keypair, prepare_docker_images):
def get_headers(app, default_keypair, prepare_docker_images):
def create_header(method, url, req_bytes, ctype='application/json',
hash_type='sha256', api_version='v3.20170615',
keypair=default_keypair):
now = datetime.now(tzutc())
hostname = f"localhost:{pre_app['config'].service_port}"
hostname = f"localhost:{app['config'].service_port}"
headers = {
'Date': now.isoformat(),
'Content-Type': ctype,
Expand Down Expand Up @@ -314,31 +298,35 @@ def create_header(method, url, req_bytes, ctype='application/json',
return create_header


async def _create_server(loop, pre_app, extra_inits=None, debug=False):
await gw_init(pre_app)
async def _create_server(loop, app, extra_inits=None, debug=False):
await gw_init(app)
if extra_inits:
for init in extra_inits:
await init(pre_app)

handler = pre_app.make_handler(debug=debug, keep_alive_on=False, loop=loop)
server = await loop.create_server(
handler,
pre_app['config'].service_ip,
pre_app['config'].service_port,
ssl=pre_app.get('sslctx'),
await init(app)

runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(
runner,
app['config'].service_ip,
app['config'].service_port,
ssl_context=app.get('sslctx'),
)
return pre_app, pre_app['config'].service_port, handler, server
await site.start()
return runner


@pytest.fixture
async def create_app_and_client(event_loop, pre_app, default_keypair, user_keypair):
async def create_app_and_client(request, test_id, test_ns,
event_loop, app,
default_keypair, user_keypair):
client = None
app = handler = server = None
runner = None
extra_proc = None
extra_shutdowns = []

async def maker(extras=None, ev_router=False):
nonlocal client, app, handler, server, extra_proc, extra_shutdowns
async def maker(extras=None, ev_router=False, spawn_agent=False):
nonlocal client, runner, extra_proc, extra_shutdowns

# Store extra inits and shutowns
extra_inits = []
Expand All @@ -357,8 +345,11 @@ async def maker(extras=None, ev_router=False):

server_params = {}
client_params = {}
app, port, handler, server = await _create_server(
event_loop, pre_app, extra_inits=extra_inits, **server_params)
runner = await _create_server(
event_loop, app,
extra_inits=extra_inits,
**server_params)

if ev_router:
# Run event_router proc. Is it enough? No way to get return values
# (app, client, etc) by using aiotools.start_server.
Expand All @@ -367,6 +358,39 @@ async def maker(extras=None, ev_router=False):
args=('', 0, args,),
daemon=True)
extra_proc.start()

# Launch an agent daemon
if spawn_agent:
etcd_addr = host_port_pair(os.environ['BACKEND_ETCD_ADDR'])
os.makedirs(f'/tmp/backend.ai/scratches-{test_id}', exist_ok=True)
agent_proc = subprocess.Popen([
'python', '-m', 'ai.backend.agent.server',
'--etcd-addr', str(etcd_addr),
'--namespace', test_ns,
'--scratch-root', f'/tmp/backend.ai/scratches-{test_id}',
], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)

def finalize_agent():
agent_proc.terminate()
try:
agent_proc.wait(timeout=5.0)
except subprocess.TimeoutExpired:
agent_proc.kill()
shutil.rmtree(f'/tmp/backend.ai/scratches-{test_id}')

request.addfinalizer(finalize_agent)

async def wait_for_agent():
while True:
all_ids = [inst_id async for inst_id in
app['registry'].enumerate_instances()]
if len(all_ids) > 0:
break
await asyncio.sleep(0.2)
task = event_loop.create_task(wait_for_agent())
await task

port = app['config'].service_port
if app.get('sslctx'):
url = f'https://localhost:{port}'
client_params['connector'] = aiohttp.TCPConnector(verify_ssl=False)
Expand All @@ -377,10 +401,10 @@ async def maker(extras=None, ev_router=False):

yield maker

# Clean DB tables.
# (the database itself is dropped when all tests finishes.)
if app and 'dbpool' in app:
async with app['dbpool'].acquire() as conn, conn.begin():
dbpool = app.get('dbpool')
if dbpool is not None:
# Clean DB tables for subsequent tests.
async with dbpool.acquire() as conn, conn.begin():
await conn.execute((vfolders.delete()))
await conn.execute((kernels.delete()))
await conn.execute((agents.delete()))
Expand All @@ -404,15 +428,17 @@ async def maker(extras=None, ev_router=False):
# Terminate client and servers
if client:
await client.close()
await gw_shutdown(pre_app)
await gw_shutdown(app)
for shutdown in extra_shutdowns:
await shutdown(pre_app)
if server:
server.close()
await server.wait_closed()
if app:
await app.shutdown()
await app.cleanup()
await shutdown(app)
if runner:
await runner.cleanup()

if dbpool is not None:
# Close the DB connection pool.
dbpool.close()
await dbpool.wait_closed()

if extra_proc:
os.kill(extra_proc.pid, signal.SIGINT)
extra_proc.join()
Expand All @@ -435,4 +461,5 @@ async def pull():
print(f'Pulling image "{img}" for testing...')
await docker.pull(img)
await docker.close()

event_loop.run_until_complete(pull())
12 changes: 6 additions & 6 deletions tests/gateway/test_etcd.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@


@pytest.fixture
async def config_server(pre_app):
server = ConfigServer(pre_app['config'].etcd_addr, pre_app['config'].namespace)
async def config_server(app):
server = ConfigServer(app['config'].etcd_addr, app['config'].namespace)
yield server
await server.etcd.delete_prefix('nodes/manager')

Expand Down Expand Up @@ -72,16 +72,16 @@ async def volumes(config_server, tmpdir):
class TestConfigServer:

@pytest.mark.asyncio
async def test_register_myself(self, pre_app, config_server):
await config_server.register_myself(pre_app['config'])
async def test_register_myself(self, app, config_server):
await config_server.register_myself(app['config'])

assert await config_server.etcd.get('nodes/manager')
assert await config_server.etcd.get('nodes/redis')
assert await config_server.etcd.get('nodes/manager/event_addr')

@pytest.mark.asyncio
async def test_deregister_myself(self, pre_app, config_server):
await config_server.register_myself(pre_app['config'])
async def test_deregister_myself(self, app, config_server):
await config_server.register_myself(app['config'])
await config_server.deregister_myself()

data = list(await config_server.etcd.get_prefix('nodes/manager'))
Expand Down
24 changes: 12 additions & 12 deletions tests/gateway/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@


@pytest.fixture
def dispatcher(pre_app, event_loop):
return EventDispatcher(pre_app, loop=event_loop)
def dispatcher(app, event_loop):
return EventDispatcher(app, loop=event_loop)


class TestEventDispatcher:

def test_add_and_dispatch_handler(self, pre_app, dispatcher, event_loop):
def test_add_and_dispatch_handler(self, app, dispatcher, event_loop):
event_name = 'test-event'
assert len(dispatcher.handlers) == 0

Expand All @@ -33,23 +33,23 @@ def cb(app, agent_id):
assert dispatcher.handlers[event_name][0] == cb

# Dispatch the event
pre_app['test-flag'] = False
app['test-flag'] = False
dispatcher.dispatch(event_name, agent_id='')

# Run loop only once (https://stackoverflow.com/a/29797709/7397571)
event_loop.call_soon(event_loop.stop)
event_loop.run_forever()

assert pre_app['test-flag']
assert app['test-flag']


@pytest.mark.asyncio
async def test_event_router(pre_app, event_loop, unused_tcp_port, mocker):
async def test_event_router(app, event_loop, unused_tcp_port, mocker):
TEST_EVENT_IPC_ADDR = EVENT_IPC_ADDR + '-test-router'
mocker.patch('ai.backend.gateway.events.EVENT_IPC_ADDR', TEST_EVENT_IPC_ADDR)

pre_app['config'].events_port = unused_tcp_port
args = (pre_app['config'],)
app['config'].events_port = unused_tcp_port
args = (app['config'],)

try:
# Router process
Expand Down Expand Up @@ -80,12 +80,12 @@ async def _router_test():


@pytest.mark.asyncio
async def test_event_subscriber(pre_app, dispatcher, event_loop, mocker):
async def test_event_subscriber(app, dispatcher, event_loop, mocker):
TEST_EVENT_IPC_ADDR = EVENT_IPC_ADDR + '-test-event-subscriber'
mocker.patch('ai.backend.gateway.events.EVENT_IPC_ADDR', TEST_EVENT_IPC_ADDR)

event_name = 'test-event'
pre_app['test-flag'] = False
app['test-flag'] = False

# Add test handler to dispatcher
def cb(app, agent_id, *args):
Expand All @@ -106,7 +106,7 @@ async def _send_msg():

# Checking flas is set by the subscriber
while True:
if pre_app['test-flag']:
if app['test-flag']:
break
await asyncio.sleep(0.5)
send_task = event_loop.create_task(_send_msg())
Expand All @@ -116,4 +116,4 @@ async def _send_msg():
sub_task.cancel()
await sub_task

assert pre_app['test-flag']
assert app['test-flag']
Loading

0 comments on commit 6fc0929

Please sign in to comment.