Skip to content
This repository has been archived by the owner on Aug 2, 2023. It is now read-only.

Commit

Permalink
First adaptation to aiohttp v3 (#64)
Browse files Browse the repository at this point in the history
  • Loading branch information
achimnol committed Feb 12, 2018
1 parent 1d908ec commit 27fbe50
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 60 deletions.
5 changes: 3 additions & 2 deletions ai/backend/gateway/etcd.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,10 @@ async def resolve_image_name(self, name_or_alias):


async def init(app):
app['config_server'] = ConfigServer(app.config.etcd_addr, app.config.namespace)
app['config_server'] = ConfigServer(
app['config'].etcd_addr, app['config'].namespace)
if app['pidx'] == 0:
await app['config_server'].register_myself(app.config)
await app['config_server'].register_myself(app['config'])


async def shutdown(app):
Expand Down
2 changes: 1 addition & 1 deletion ai/backend/gateway/kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ async def instance_heartbeat(app, agent_id, agent_info):
async def check_agent_lost(app, interval):
try:
now = datetime.now(tzutc())
timeout = timedelta(seconds=app.config.heartbeat_timeout)
timeout = timedelta(seconds=app['config'].heartbeat_timeout)
async for agent_id, prev in app['redis_live'].ihscan('last_seen'):
prev = datetime.fromtimestamp(float(prev), tzutc())
if now - prev > timeout:
Expand Down
2 changes: 1 addition & 1 deletion ai/backend/gateway/ratelimit.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ async def rlim_middleware_handler(request):

async def init(app):
rr = await aioredis.create_redis_pool(
app.config.redis_addr.as_sockaddr(),
app['config'].redis_addr.as_sockaddr(),
timeout=3.0,
encoding='utf8',
db=REDIS_RLIM_DB)
Expand Down
46 changes: 23 additions & 23 deletions ai/backend/gateway/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,46 +127,46 @@ async def gw_init(app):
app['datadog.enabled'] = False
app['sentry.enabled'] = False
if datadog_available:
if app.config.datadog_api_key is None:
if app['config'].datadog_api_key is None:
log.warning('Datadog logging is disabled (missing API key).')
else:
datadog.initialize(
api_key=app.config.datadog_api_key,
app_key=app.config.datadog_app_key)
api_key=app['config'].datadog_api_key,
app_key=app['config'].datadog_app_key)
app['datadog'] = datadog
app['datadog.enabled'] = True
log.info('Datadog logging is enabled.')
if raven_available:
if app.config.raven_uri is None:
if app['config'].raven_uri is None:
log.warning('Sentry error reporting is disabled (missing DSN URI).')
else:
app['sentry'] = raven.Client(
app.config.raven_uri,
app['config'].raven_uri,
release=raven.fetch_package_version('backend.ai-manager'))
app['sentry.enabled'] = True
log.info('Sentry error reporting is enabled.')

app['dbpool'] = await create_engine(
host=app.config.db_addr[0], port=app.config.db_addr[1],
user=app.config.db_user, password=app.config.db_password,
dbname=app.config.db_name,
echo=bool(app.config.verbose),
host=app['config'].db_addr[0], port=app['config'].db_addr[1],
user=app['config'].db_user, password=app['config'].db_password,
dbname=app['config'].db_name,
echo=bool(app['config'].verbose),
# TODO: check the throughput impacts of DB/redis pool sizes
minsize=4, maxsize=16,
timeout=30, pool_recycle=30,
)
app['redis_live'] = await aioredis.create_redis(
app.config.redis_addr.as_sockaddr(),
app['config'].redis_addr.as_sockaddr(),
timeout=3.0,
encoding='utf8',
db=REDIS_LIVE_DB)
app['redis_stat'] = await aioredis.create_redis(
app.config.redis_addr.as_sockaddr(),
app['config'].redis_addr.as_sockaddr(),
timeout=3.0,
encoding='utf8',
db=REDIS_STAT_DB)
app['redis_image'] = await aioredis.create_redis(
app.config.redis_addr.as_sockaddr(),
app['config'].redis_addr.as_sockaddr(),
timeout=3.0,
encoding='utf8',
db=REDIS_IMAGE_DB)
Expand All @@ -189,14 +189,14 @@ async def gw_shutdown(app):
async def server_main(loop, pidx, _args):

app = web.Application()
app.config = _args[0]
app.sslctx = None
if app.config.ssl_cert and app.config.ssl_key:
app.sslctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
app.sslctx.load_cert_chain(str(app.config.ssl_cert),
str(app.config.ssl_key))
if app.config.service_port == 0:
app.config.service_port = 8443 if app.sslctx else 8080
app['config'] = _args[0]
app['sslctx'] = None
if app['config'].ssl_cert and app['config'].ssl_key:
app['sslctx'] = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
app['sslctx'].load_cert_chain(str(app['config'].ssl_cert),
str(app['config'].ssl_key))
if app['config'].service_port == 0:
app['config'].service_port = 8443 if app['sslctx'] else 8080
app['pidx'] = pidx

await etcd_init(app)
Expand All @@ -211,11 +211,11 @@ async def server_main(loop, pidx, _args):
web_handler = app.make_handler()
server = await loop.create_server(
web_handler,
host=str(app.config.service_ip),
port=app.config.service_port,
host=str(app['config'].service_ip),
port=app['config'].service_port,
backlog=1024,
reuse_port=True,
ssl=app.sslctx,
ssl=app['sslctx'],
)
log.info('started.')

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def read_src_version():
requires = [
'aioconsole>=0.1.3',
'aiodataloader',
'aiohttp~=2.3.0',
'aiohttp~=3.0.0',
'aiopg~=0.13.0',
'aioredis~=1.0.0',
'aiotools>=0.5.4',
Expand Down
53 changes: 26 additions & 27 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,8 @@ def __init__(self, session, url):
url += '/'
self._url = url

def close(self):
self._session.close()
async def close(self):
await self._session.close()

def request(self, method, path, **kwargs):
while path.startswith('/'):
Expand Down Expand Up @@ -204,26 +204,25 @@ async def pre_app(event_loop, test_ns, test_db, unused_tcp_port):
""" For tests that do not require actual server running.
"""
app = web.Application(loop=event_loop)
app.config = load_config(argv=[], extra_args_funcs=(gw_args,))
app['config'] = load_config(argv=[], extra_args_funcs=(gw_args,))

# Override basic settings.
# Change these configs if local servers have different port numbers.
app.config.redis_addr = host_port_pair(os.environ['BACKEND_REDIS_ADDR'])
app.config.db_addr = host_port_pair(os.environ['BACKEND_DB_ADDR'])
app.config.db_name = test_db
app['config'].redis_addr = host_port_pair(os.environ['BACKEND_REDIS_ADDR'])
app['config'].db_addr = host_port_pair(os.environ['BACKEND_DB_ADDR'])
app['config'].db_name = test_db

# Override extra settings
app.config.namespace = test_ns
app.config.heartbeat_timeout = 10.0
app.config.service_ip = '127.0.0.1'
app.config.service_port = unused_tcp_port
app.config.verbose = False
# app.config.ssl_cert = here / 'sample-ssl-cert' / 'sample.crt'
# app.config.ssl_key = here / 'sample-ssl-cert' / 'sample.key'
app.sslctx = None
# app.sslctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
# app.sslctx.load_cert_chain(str(app.config.ssl_cert),
# str(app.config.ssl_key))
app['config'].namespace = test_ns
app['config'].heartbeat_timeout = 10.0
app['config'].service_ip = '127.0.0.1'
app['config'].service_port = unused_tcp_port
app['config'].verbose = False
# app['config'].ssl_cert = here / 'sample-ssl-cert' / 'sample.crt'
# app['config'].ssl_key = here / 'sample-ssl-cert' / 'sample.key'
# app['sslctx'] = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
# app['sslctx'].load_cert_chain(str(app['config'].ssl_cert),
# str(app['config'].ssl_key))

# num_workers = 1
app['pidx'] = 0
Expand All @@ -234,7 +233,7 @@ async def pre_app(event_loop, test_ns, test_db, unused_tcp_port):
@pytest.fixture
async def default_keypair(event_loop, pre_app):
access_key = 'AKIAIOSFODNN7EXAMPLE'
config = pre_app.config
config = pre_app['config']
pool = await create_engine(
host=config.db_addr[0], port=config.db_addr[1],
user=config.db_user, password=config.db_password,
Expand All @@ -258,7 +257,7 @@ async def default_keypair(event_loop, pre_app):
@pytest.fixture
async def user_keypair(event_loop, pre_app):
access_key = 'AKIANOTADMIN7EXAMPLE'
config = pre_app.config
config = pre_app['config']
pool = await create_engine(
host=config.db_addr[0], port=config.db_addr[1],
user=config.db_user, password=config.db_password,
Expand All @@ -285,7 +284,7 @@ def create_header(method, url, req_bytes, ctype='application/json',
hash_type='sha256', api_version='v3.20170615',
keypair=default_keypair):
now = datetime.now(tzutc())
hostname = f'localhost:{pre_app.config.service_port}'
hostname = f"localhost:{pre_app['config'].service_port}"
headers = {
'Date': now.isoformat(),
'Content-Type': ctype,
Expand Down Expand Up @@ -324,11 +323,11 @@ async def _create_server(loop, pre_app, extra_inits=None, debug=False):
handler = pre_app.make_handler(debug=debug, keep_alive_on=False, loop=loop)
server = await loop.create_server(
handler,
pre_app.config.service_ip,
pre_app.config.service_port,
# ssl=pre_app.sslctx)
pre_app['config'].service_ip,
pre_app['config'].service_port,
ssl=pre_app.get('sslctx'),
)
return pre_app, pre_app.config.service_port, handler, server
return pre_app, pre_app['config'].service_port, handler, server


@pytest.fixture
Expand Down Expand Up @@ -363,12 +362,12 @@ async def maker(extras=None, ev_router=False):
if ev_router:
# Run event_router proc. Is it enough? No way to get return values
# (app, client, etc) by using aiotools.start_server.
args = (app.config,)
args = (app['config'],)
extra_proc = mp.Process(target=event_router,
args=('', 0, args,),
daemon=True)
extra_proc.start()
if app.sslctx:
if app.get('sslctx'):
url = f'https://localhost:{port}'
client_params['connector'] = aiohttp.TCPConnector(verify_ssl=False)
else:
Expand Down Expand Up @@ -404,7 +403,7 @@ async def maker(extras=None, ev_router=False):

# Terminate client and servers
if client:
client.close()
await client.close()
await gw_shutdown(pre_app)
for shutdown in extra_shutdowns:
await shutdown(pre_app)
Expand Down
6 changes: 3 additions & 3 deletions tests/gateway/test_etcd.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

@pytest.fixture
async def config_server(pre_app):
server = ConfigServer(pre_app.config.etcd_addr, pre_app.config.namespace)
server = ConfigServer(pre_app['config'].etcd_addr, pre_app['config'].namespace)
yield server
await server.etcd.delete_prefix('nodes/manager')

Expand Down Expand Up @@ -73,15 +73,15 @@ class TestConfigServer:

@pytest.mark.asyncio
async def test_register_myself(self, pre_app, config_server):
await config_server.register_myself(pre_app.config)
await config_server.register_myself(pre_app['config'])

assert await config_server.etcd.get('nodes/manager')
assert await config_server.etcd.get('nodes/redis')
assert await config_server.etcd.get('nodes/manager/event_addr')

@pytest.mark.asyncio
async def test_deregister_myself(self, pre_app, config_server):
await config_server.register_myself(pre_app.config)
await config_server.register_myself(pre_app['config'])
await config_server.deregister_myself()

data = list(await config_server.etcd.get_prefix('nodes/manager'))
Expand Down
4 changes: 2 additions & 2 deletions tests/gateway/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ async def test_event_router(pre_app, event_loop, unused_tcp_port, mocker):
TEST_EVENT_IPC_ADDR = EVENT_IPC_ADDR + '-test-router'
mocker.patch('ai.backend.gateway.events.EVENT_IPC_ADDR', TEST_EVENT_IPC_ADDR)

pre_app.config.events_port = unused_tcp_port
args = (pre_app.config,)
pre_app['config'].events_port = unused_tcp_port
args = (pre_app['config'],)

try:
# Router process
Expand Down

0 comments on commit 27fbe50

Please sign in to comment.