Skip to content
This repository has been archived by the owner on Sep 22, 2023. It is now read-only.

Commit

Permalink
Stabilize shutdown/error handling of app command (#44)
Browse files Browse the repository at this point in the history
* Also fix for Python 3.5/3.6 compatibility
* Fix hand-written HTTP protocol output to use "\r\n"
  • Loading branch information
achimnol committed Dec 12, 2018
1 parent 6275115 commit faea9f2
Showing 1 changed file with 37 additions and 27 deletions.
64 changes: 37 additions & 27 deletions src/ai/backend/client/cli/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@
import aiohttp

from . import register_command
from .pretty import print_info
from .pretty import print_info, print_error
from ..request import Request
from ..session import AsyncSession
from ..compat import current_loop


class WSProxy:
__slots__ = (
'conn', 'down_conn', 'upstream_buffer', 'upstream_buffer_task',
'reader', 'writer', 'api_session', 'path',
'api_session', 'path',
'down_task',
'reader', 'writer',
)
BUFFER_SIZE = 8192

Expand All @@ -24,52 +25,54 @@ def __init__(self, api_session: AsyncSession,
self.path = f"/stream/kernel/{session_id}/{protocol}proxy"
self.reader = reader
self.writer = writer
self.down_task = None

async def run(self):
api_rqst = Request(
self.api_session, "GET", self.path, b'',
content_type="application/json")
async with api_rqst.connect_websocket() as ws:
async def up():

async def downstream():
try:
async for msg in ws:
if msg.type == aiohttp.WSMsgType.ERROR:
await self.write_error_and_close()
await self.write_error(msg)
break
elif msg.type == aiohttp.WSMsgType.CLOSE:
if msg.data != aiohttp.WSCloseCode.OK:
await self.write_error_and_close()
await self.write_error(msg)
break
elif msg.type == aiohttp.WSMsgType.BINARY:
self.writer.write(msg.data)
await self.writer.drain()
except asyncio.CancelledError:
pass
finally:
self.writer.close()
await ws.close()
task = asyncio.ensure_future(up())
while True:
try:
if hasattr(self.writer, 'wait_closed'): # Python 3.7+
await self.writer.wait_closed()

self.down_task = asyncio.ensure_future(downstream())
try:
while True:
chunk = await self.reader.read(self.BUFFER_SIZE)
if not chunk:
break
await ws.send_bytes(chunk)
except GeneratorExit:
break

await self.close()

async def close(self):
await self.writer.drain()
self.writer.close()
await self.writer.wait_closed()

async def write_error_and_close(self):
rsp = 'HTTP/1.1 503 Service Unavailable\n' \
'Connection: Closed\n' \
'\n' \
'Service Unavailable\n'
except asyncio.CancelledError:
pass
finally:
if not self.down_task.done():
await self.down_task
self.down_task = None

async def write_error(self, msg):
rsp = 'HTTP/1.1 503 Service Unavailable\r\n' \
'Connection: Closed\r\n\r\n' \
'WebSocket reply: {}'.format(msg.data.decode('utf8'))
self.writer.write(rsp.encode())
await self.close()
await self.writer.drain()


class ProxyRunner:
Expand All @@ -92,7 +95,12 @@ def __init__(self, session_id, name, protocol, host, port, *, loop=None):
async def handle_connection(self, reader, writer):
p = WSProxy(self.api_session, self.session_id,
self.protocol, reader, writer)
await p.run()
try:
await p.run()
except asyncio.CancelledError:
pass
except Exception as e:
print_error(e)

async def ready(self):
self.api_session = AsyncSession()
Expand All @@ -112,6 +120,8 @@ async def close(self):
self.local_server.close()
await self.local_server.wait_closed()
await self.api_session.close()
# Let asyncio-internal socket cleanups finish.
await asyncio.sleep(0.1)


@register_command
Expand Down

0 comments on commit faea9f2

Please sign in to comment.