Skip to content

Commit

Permalink
init websocket
Browse files Browse the repository at this point in the history
  • Loading branch information
joway committed Apr 7, 2018
1 parent 503141e commit 0956986
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 8 deletions.
20 changes: 20 additions & 0 deletions examples/test_websocket.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<!DOCTYPE html>
<html>
<head>
<title>WSConnection demo</title>
</head>
<body>
<script>
var ws = new WebSocket("ws://127.0.0.1:9999/"),
messages = document.createElement('ul');
ws.onmessage = function (event) {
var messages = document.getElementsByTagName('ul')[0],
message = document.createElement('li'),
content = document.createTextNode(event.data);
message.appendChild(content);
messages.appendChild(message);
};
document.body.appendChild(messages);
</script>
</body>
</html>
25 changes: 25 additions & 0 deletions examples/websocket.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import asyncio

from lemon.app import Lemon
from lemon.wsconnection import WSMessage, WSConnection

cache = []


async def pull(conn: WSConnection, msg: WSMessage):
print(msg.text)
await conn.send(msg.text)


async def push(conns):
while True:
for conn in conns:
await conn.send('xxx')
await asyncio.sleep(0.1)


ins = Lemon(debug=True)

ins.ws(ws_pull=pull, ws_push=push)

ins.listen()
59 changes: 51 additions & 8 deletions lemon/app.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import asyncio
import json
import logging.config
import time
import typing
from asyncio import get_event_loop
from functools import partial
Expand All @@ -13,6 +15,7 @@
from lemon.middleware import exception_middleware, cors_middleware
from lemon.request import Request
from lemon.server import serve
from lemon.wsconnection import WSConnection, WSMessage

LEMON_PRE_PROCESS_MIDDLEWARE: list = [
exception_middleware,
Expand Down Expand Up @@ -58,6 +61,11 @@ async def exec_middleware(ctx: Context, middleware_list: list, pos: int = 0) ->
)


# websocket: empty handler
async def empty_handler(*args, **kwargs):
pass


class Lemon:
def __init__(self, config: dict = None, debug=False) -> None:
"""Init app instance
Expand All @@ -68,18 +76,32 @@ def __init__(self, config: dict = None, debug=False) -> None:
settings.set_config(config=config)

self.middleware_list: list = []

self.pre_process_middleware_list = LEMON_PRE_PROCESS_MIDDLEWARE
self.post_process_middleware_list = LEMON_POST_PROCESS_MIDDLEWARE
if settings.LEMON_CORS_ENABLE:
self.pre_process_middleware_list.append(
cors_middleware,
)

# websocket
self.ws_enable = False
self.ws_pull = None
self.ws_push = None
self.ws_conns = set()

# logging
logging.config.dictConfig(LOGGING_CONFIG_DEFAULTS)
logger.setLevel(logging.DEBUG if debug else logging.INFO)

def ws(self, ws_pull: typing.Callable, ws_push: typing.Callable):
"""Register pull and push handlers for websocket connections
:param ws_pull: handler function to pull from server
:param ws_push: handler function to push to client
"""
self.ws_enable = True
self.ws_pull = ws_pull
self.ws_push = ws_push

def use(self, *middleware) -> None:
"""Register middleware into app
:param middleware: the chain of the middleware
Expand All @@ -88,6 +110,14 @@ def use(self, *middleware) -> None:

@property
def application(self) -> typing.Callable:
# websocket enable : push message to client
# if self.ws_enable:
# # TODO: auto timeout connection
# pass
#
# # push daemon
# asyncio.ensure_future(self.ws_push(self.ws_conns))

async def _wrapper(message: dict, channels: dict) -> typing.Any:
"""
:param message: is an ASGI message.
Expand All @@ -96,7 +126,7 @@ async def _wrapper(message: dict, channels: dict) -> typing.Any:
if message['channel'] == 'http.request':
# init context
ctx = Context()
# prepare request
# prepare HTTP request
ctx.req = await Request.from_asgi_interface(
message=message, channels=channels
)
Expand All @@ -120,13 +150,26 @@ async def _wrapper(message: dict, channels: dict) -> typing.Any:
})
else:
return await channels['reply'].send(ctx.res.message)
# TODO: websocket support
elif message['channel'] == 'websocket.connect':
return None
elif message['channel'] == 'websocket.receive':
return None

# websocket connection
ws_conn = WSConnection(channels['reply']._websocket)
if message['channel'] == 'websocket.connect':
await ws_conn.establish()
self.ws_conns.add(ws_conn)
logger.info(f'Websocket connected')
# disconnect
elif message['channel'] == 'websocket.disconnect':
return None
try:
self.ws_conns.remove(ws_conn)
except:
pass
await ws_conn.destroy()
logger.info(f'Websocket disconnected')
# receive msg from client
elif message['channel'] == 'websocket.receive':
ws_msg = WSMessage(message)
await self.ws_pull(ws_conn, ws_msg)
logger.info(f'Websocket receive message')

return _wrapper

Expand Down
42 changes: 42 additions & 0 deletions lemon/wsconnection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
class WSHeaders(dict):
def __init__(self, raw_headers=None, *args, **kwargs):
super(WSHeaders, self).__init__(*args, **kwargs)
if raw_headers:
for h in raw_headers:
self.__setitem__(h[0].decode(), h[1].decode())

def __setitem__(self, key: str, value):
return super(WSHeaders, self).__setitem__(key.lower(), str(value))

def __getitem__(self, key: str):
return super(WSHeaders, self).__getitem__(key.lower())

def set(self, key: str, value):
return self.__setitem__(key, value)


class WSConnection:
def __init__(self, conn):
self.conn = conn

async def establish(self):
self.conn.accept()
self.conn.listen()
self.conn.connection_open()

async def destroy(self):
self.conn.reject()
await self.conn.close()

async def send(self, message_text: str):
return await self.conn.send(message_text)


class WSMessage(dict):
def __init__(self, raw_message, **kwargs):
self.path = raw_message['path']
self.order = raw_message['order']
self.text = raw_message['text']
self.bytes = raw_message['bytes']

super().__init__(**kwargs)
Empty file added tests/test_websocket.py
Empty file.

0 comments on commit 0956986

Please sign in to comment.