Skip to content
This repository has been archived by the owner on Sep 22, 2023. It is now read-only.

Commit

Permalink
Revise the app command (#44)
Browse files Browse the repository at this point in the history
* Now "app" command must be used after "start" command to create a
  session and the session ID must be passed.
* Make the protocol explicit.
* Some code clean up.
  • Loading branch information
achimnol committed Dec 12, 2018
1 parent 083a0a9 commit 6275115
Showing 1 changed file with 72 additions and 41 deletions.
113 changes: 72 additions & 41 deletions src/ai/backend/client/cli/app.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,33 @@
import asyncio
import aiohttp
from ..request import Request
from ..session import AsyncSession
from ..compat import token_hex
from .pretty import print_info

from . import register_command
from .pretty import print_info
from ..request import Request
from ..session import AsyncSession
from ..compat import current_loop


class WSProxy:
__slots__ = ('conn', 'down_conn', 'upstream_buffer', 'upstream_buffer_task', 'reader', 'writer', 'session', 'path')
__slots__ = (
'conn', 'down_conn', 'upstream_buffer', 'upstream_buffer_task',
'reader', 'writer', 'api_session', 'path',
)
BUFFER_SIZE = 8192

def __init__(self, session: AsyncSession, kernel, reader, writer):
self.session = session
self.path = f"/stream/kernel/{kernel.kernel_id}/wsproxy"
def __init__(self, api_session: AsyncSession,
session_id: str,
protocol: str,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter):
self.api_session = api_session
self.path = f"/stream/kernel/{session_id}/{protocol}proxy"
self.reader = reader
self.writer = writer

async def run(self):
api_rqst = Request(
self.session, "GET", self.path, b'',
self.api_session, "GET", self.path, b'',
content_type="application/json")
async with api_rqst.connect_websocket() as ws:
async def up():
Expand Down Expand Up @@ -66,57 +73,81 @@ async def write_error_and_close(self):


class ProxyRunner:
__slots__ = ('app_name', 'host', 'port', 'session', 'kernel', 'server', 'loop')

def __init__(self, name, host, port, loop=asyncio.get_event_loop()):
__slots__ = (
'session_id', 'app_name',
'protocol', 'host', 'port',
'api_session', 'local_server', 'loop',
)

def __init__(self, session_id, name, protocol, host, port, *, loop=None):
self.session_id = session_id
self.app_name = name
self.protocol = protocol
self.host = host
self.port = port
self.loop = loop
self.api_session = None
self.local_server = None
self.loop = loop if loop else current_loop()

async def handle_connection(self, reader, writer):
p = WSProxy(self.api_session, self.session_id,
self.protocol, reader, writer)
await p.run()

async def ready(self):
self.session = AsyncSession()
kernel_id = token_hex(16)
self.kernel = await self.session.Kernel.get_or_create(
f"app-{self.app_name}", client_token=kernel_id)
print_info(f"Started with session id - {kernel_id}")
self.api_session = AsyncSession()

async def connection_handler(reader, writer):
p = WSProxy(self.session, self.kernel, reader, writer)
await p.run()
self.local_server = await asyncio.start_server(
self.handle_connection, self.host, self.port,
loop=self.loop)

print_info(f"http://{self.host}:{self.port}")
self.server = await asyncio.start_server(connection_handler, self.host, self.port, loop=self.loop)
print_info(
"A local proxy to the application \"{0}\" ".format(self.app_name) +
"provided by the session \"{0}\" ".format(self.session_id) +
"is available at: {0}://{1}:{2}"
.format(self.protocol, self.host, self.port)
)

async def close(self):
self.server.close()
await self.server.wait_closed()
await self.kernel.destroy()
await self.session.close()
self.local_server.close()
await self.local_server.wait_closed()
await self.api_session.close()


@register_command
def app(args):
"""
Run the web app via backend.ai
Run a local proxy to a service provided by Backend.AI
compute sessions.
The type of proxy depends on the app definition: plain TCP or HTTP.
"""

loop = asyncio.get_event_loop()
r = ProxyRunner(args.app, args.bind, args.port, loop)
loop.run_until_complete(r.ready())
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# TODO: generalize protocol using service ports metadata
runner = ProxyRunner(args.session_id, args.app,
'http', args.bind, args.port,
loop=loop)
loop.run_until_complete(runner.ready())
try:
loop.run_forever()
except KeyboardInterrupt: # pragma: no cover
pass
print_info("Shutting down....")
loop.run_until_complete(r.close())
print_info("Done")


app.add_argument('app',
help='Run an app via http (BETA)')
app.add_argument('--bind', type=str, default='localhost',
finally:
print_info("Shutting down....")
try:
loop.run_until_complete(runner.close())
finally:
print_info("Done")
loop.close()


app.add_argument('session_id', type=str, metavar='SESSID',
help='The compute session ID.')
app.add_argument('app', type=str,
help='The name of service provided by the given session.')
app.add_argument('--bind', type=str, default='127.0.0.1',
help='The IP/host address to bind this proxy.')
app.add_argument('-p', '--port', type=int, default=8080,
help='The TCP port to accept non-encrypted non-authorized '
'API requests.')
help='The port number to listen user connections.')

0 comments on commit 6275115

Please sign in to comment.