Skip to content

Commit

Permalink
replace ws with sse
Browse files Browse the repository at this point in the history
  • Loading branch information
wybiral committed Jan 29, 2022
1 parent 8a2fea1 commit dea15b4
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 24 deletions.
38 changes: 20 additions & 18 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from aiohttp import web
import aiohttp_jinja2
from aiohttp_sse import sse_response
import aiojobs
import asyncio
from database import SQLiteDatabase
Expand All @@ -11,7 +12,7 @@

async def pump_firehose(app):
''' Spawn all stream source jobs and pump the app['queue'] stream out to
all app['websockets']
all Server-Sent Event clients.
'''
db = app['db']
queue = app['queue']
Expand All @@ -21,29 +22,30 @@ async def pump_firehose(app):
while True:
x = await queue.get()
j = json.dumps(x)
for ws in app['websockets']:
for client in app['clients']:
try:
await ws.send_str(j)
await client.put(j)
except:
pass

async def handle(req):
print('/')
return aiohttp_jinja2.render_template('index.html', req, {})

async def wshandle(req):
print('/socket')
ws = web.WebSocketResponse(heartbeat=5)
await ws.prepare(req)
req.app['websockets'].add(ws)
try:
async for msg in ws:
if msg.type == web.WSMsgType.close:
break
except:
pass
req.app['websockets'].remove(ws)
return ws
async def streamhandle(req):
app = req.app
print('/stream')
async with sse_response(req) as resp:
q = asyncio.Queue()
app['clients'].add(q)
try:
while not resp.task.done():
data = await q.get()
await resp.send(data)
q.task_done()
finally:
app['clients'].remove(q)
return resp

async def dailyhandle(req):
day = req.match_info.get('day', None)
Expand Down Expand Up @@ -73,7 +75,7 @@ def main():
app = web.Application()
app['queue'] = asyncio.Queue(maxsize=10)
app['sources'] = []
app['websockets'] = set()
app['clients'] = set()
# setup template env
loader = jinja2.FileSystemLoader('templates')
jinja_env = aiohttp_jinja2.setup(app, loader=loader)
Expand All @@ -91,7 +93,7 @@ def main():
web.get('/', handle),
web.get('/daily', todayhandle),
web.get('/daily/{day}', dailyhandle),
web.get('/socket', wshandle),
web.get('/stream', streamhandle),
])
app.router.add_static('/static/', path='static', name='static')
app.on_startup.append(app_startup)
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
aiohttp>=3.7.4
aiohttp-jinja2>=1.4.2
aiohttp-sse>=2.1.0
aiojobs>=0.3.0
aiosqlite>=0.17.0
bs4>=0.0.1
Expand Down
9 changes: 3 additions & 6 deletions static/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,11 @@ function createUpdate(data) {
}

function connect(updates) {
let url = 'ws://' + window.location.host + '/socket';
socket = new WebSocket(url);
socket.onmessage = evt => {
const stream = new EventSource('/stream');
stream.addEventListener('message', evt => {
const data = JSON.parse(evt.data);
updates.insertBefore(createUpdate(data), updates.firstChild);
};
// if WebSocket closes keep trying to connect
socket.onclose = evt => setTimeout(() => connect(updates), 5000);
});
}

window.onload = () => {
Expand Down

0 comments on commit dea15b4

Please sign in to comment.