Skip to content

Commit

Permalink
Upgraded engine.io module to improve socket.io connection stability. …
Browse files Browse the repository at this point in the history
…Should help to prevent #1613.
  • Loading branch information
morpheus65535 committed Nov 30, 2021
1 parent a7a6854 commit c60c751
Show file tree
Hide file tree
Showing 16 changed files with 235 additions and 120 deletions.
3 changes: 2 additions & 1 deletion bazarr/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ def create_app():
else:
app.config["DEBUG"] = False

socketio.init_app(app, path=base_url.rstrip('/')+'/api/socket.io', cors_allowed_origins='*', async_mode='threading')
socketio.init_app(app, path=base_url.rstrip('/')+'/api/socket.io', cors_allowed_origins='*',
async_mode='threading', allow_upgrades=False, transports='polling')
return app


Expand Down
2 changes: 1 addition & 1 deletion libs/engineio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
get_tornado_handler = None
ASGIApp = None

__version__ = '4.0.2dev'
__version__ = '4.2.1dev'

__all__ = ['__version__', 'Server', 'WSGIApp', 'Middleware', 'Client']
if AsyncServer is not None: # pragma: no cover
Expand Down
37 changes: 22 additions & 15 deletions libs/engineio/async_drivers/asgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,23 @@ def __init__(self, engineio_server, other_asgi_app=None,
on_startup=None, on_shutdown=None):
self.engineio_server = engineio_server
self.other_asgi_app = other_asgi_app
self.engineio_path = engineio_path.strip('/')
self.engineio_path = engineio_path
if not self.engineio_path.startswith('/'):
self.engineio_path = '/' + self.engineio_path
if not self.engineio_path.endswith('/'):
self.engineio_path += '/'
self.static_files = static_files or {}
self.on_startup = on_startup
self.on_shutdown = on_shutdown

async def __call__(self, scope, receive, send):
if scope['type'] in ['http', 'websocket'] and \
scope['path'].startswith('/{0}/'.format(self.engineio_path)):
scope['path'].startswith(self.engineio_path):
await self.engineio_server.handle_request(scope, receive, send)
else:
static_file = get_static_file(scope['path'], self.static_files) \
if scope['type'] == 'http' and self.static_files else None
if static_file:
if static_file and os.path.exists(static_file['filename']):
await self.serve_static_file(static_file, receive, send)
elif self.other_asgi_app is not None:
await self.other_asgi_app(scope, receive, send)
Expand All @@ -68,17 +72,14 @@ async def serve_static_file(self, static_file, receive,
send): # pragma: no cover
event = await receive()
if event['type'] == 'http.request':
if os.path.exists(static_file['filename']):
with open(static_file['filename'], 'rb') as f:
payload = f.read()
await send({'type': 'http.response.start',
'status': 200,
'headers': [(b'Content-Type', static_file[
'content_type'].encode('utf-8'))]})
await send({'type': 'http.response.body',
'body': payload})
else:
await self.not_found(receive, send)
with open(static_file['filename'], 'rb') as f:
payload = f.read()
await send({'type': 'http.response.start',
'status': 200,
'headers': [(b'Content-Type', static_file[
'content_type'].encode('utf-8'))]})
await send({'type': 'http.response.body',
'body': payload})

async def lifespan(self, receive, send):
while True:
Expand Down Expand Up @@ -195,7 +196,13 @@ async def make_response(status, headers, payload, environ):
await environ['asgi.send']({'type': 'websocket.accept',
'headers': headers})
else:
await environ['asgi.send']({'type': 'websocket.close'})
if payload:
reason = payload.decode('utf-8') \
if isinstance(payload, bytes) else str(payload)
await environ['asgi.send']({'type': 'websocket.close',
'reason': reason})
else:
await environ['asgi.send']({'type': 'websocket.close'})
return

await environ['asgi.send']({'type': 'http.response.start',
Expand Down
22 changes: 10 additions & 12 deletions libs/engineio/async_drivers/gevent_uwsgi.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from __future__ import absolute_import

import gevent
from gevent import queue
from gevent.event import Event
from gevent import selectors
import uwsgi
_websocket_available = hasattr(uwsgi, 'websocket_handshake')

Expand Down Expand Up @@ -40,21 +39,20 @@ def __call__(self, environ, start_response):
self._req_ctx = uwsgi.request_context()
else:
# use event and queue for sending messages
from gevent.event import Event
from gevent.queue import Queue
from gevent.select import select
self._event = Event()
self._send_queue = Queue()
self._send_queue = queue.Queue()

# spawn a select greenlet
def select_greenlet_runner(fd, event):
"""Sets event when data becomes available to read on fd."""
while True:
event.set()
try:
select([fd], [], [])[0]
except ValueError:
break
sel = selectors.DefaultSelector()
sel.register(fd, selectors.EVENT_READ)
try:
while True:
sel.select()
event.set()
except gevent.GreenletExit:
sel.unregister(fd)
self._select_greenlet = gevent.spawn(
select_greenlet_runner,
self._sock,
Expand Down
37 changes: 34 additions & 3 deletions libs/engineio/async_drivers/threading.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,48 @@
from __future__ import absolute_import
import queue
import threading
import time

try:
import queue
from simple_websocket import Server, ConnectionClosed
_websocket_available = True
except ImportError: # pragma: no cover
import Queue as queue
_websocket_available = False


class WebSocketWSGI(object): # pragma: no cover
"""
This wrapper class provides a threading WebSocket interface that is
compatible with eventlet's implementation.
"""
def __init__(self, app):
self.app = app

def __call__(self, environ, start_response):
self.ws = Server(environ)
return self.app(self)

def close(self):
return self.ws.close()

def send(self, message):
try:
return self.ws.send(message)
except ConnectionClosed:
raise IOError()

def wait(self):
try:
return self.ws.receive()
except ConnectionClosed:
raise IOError()


_async = {
'thread': threading.Thread,
'queue': queue.Queue,
'queue_empty': queue.Empty,
'event': threading.Event,
'websocket': None,
'websocket': WebSocketWSGI if _websocket_available else None,
'sleep': time.sleep,
}
44 changes: 26 additions & 18 deletions libs/engineio/asyncio_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ class AsyncClient(client.Client):
skip SSL certificate verification, allowing
connections to servers with self signed certificates.
The default is ``True``.
:param handle_sigint: Set to ``True`` to automatically handle disconnection
when the process is interrupted, or to ``False`` to
leave interrupt handling to the calling application.
Interrupt handling can only be enabled when the
client instance is created in the main thread.
"""
def is_asyncio_based(self):
return True
Expand Down Expand Up @@ -85,9 +90,8 @@ async def connect(self, url, headers=None, transports=None,
await eio.connect('http://localhost:5000')
"""
global async_signal_handler_set
if not async_signal_handler_set and \
if self.handle_sigint and not async_signal_handler_set and \
threading.current_thread() == threading.main_thread():

try:
asyncio.get_event_loop().add_signal_handler(
signal.SIGINT, async_signal_handler)
Expand Down Expand Up @@ -166,11 +170,7 @@ def start_background_task(self, target, *args, **kwargs):
:param args: arguments to pass to the function.
:param kwargs: keyword arguments to pass to the function.
This function returns an object compatible with the `Thread` class in
the Python standard library. The `start()` method on this object is
already called by this function.
Note: this method is a coroutine.
The return value is a ``asyncio.Task`` object.
"""
return asyncio.ensure_future(target(*args, **kwargs))

Expand All @@ -191,10 +191,17 @@ def create_event(self):
"""Create an event object."""
return asyncio.Event()

def _reset(self):
if self.http: # pragma: no cover
asyncio.ensure_future(self.http.close())
super()._reset()
def __del__(self): # pragma: no cover
# try to close the aiohttp session if it is still open
if self.http and not self.http.closed:
try:
loop = asyncio.get_event_loop()
if loop.is_running():
loop.ensure_future(self.http.close())
else:
loop.run_until_complete(self.http.close())
except:
pass

async def _connect_polling(self, url, headers, engineio_path):
"""Establish a long-polling connection to the Engine.IO server."""
Expand All @@ -207,10 +214,10 @@ async def _connect_polling(self, url, headers, engineio_path):
r = await self._send_request(
'GET', self.base_url + self._get_url_timestamp(), headers=headers,
timeout=self.request_timeout)
if r is None:
if r is None or isinstance(r, str):
self._reset()
raise exceptions.ConnectionError(
'Connection refused by the server')
r or 'Connection refused by the server')
if r.status < 200 or r.status >= 300:
self._reset()
try:
Expand Down Expand Up @@ -416,6 +423,7 @@ async def _send_request(
except (aiohttp.ClientError, asyncio.TimeoutError) as exc:
self.logger.info('HTTP %s request to %s failed with error %s.',
method, url, exc)
return str(exc)

async def _trigger_event(self, event, *args, **kwargs):
"""Invoke an event handler."""
Expand Down Expand Up @@ -462,9 +470,9 @@ async def _read_loop_polling(self):
r = await self._send_request(
'GET', self.base_url + self._get_url_timestamp(),
timeout=max(self.ping_interval, self.ping_timeout) + 5)
if r is None:
if r is None or isinstance(r, str):
self.logger.warning(
'Connection refused by the server, aborting')
r or 'Connection refused by the server, aborting')
await self.queue.put(None)
break
if r.status < 200 or r.status >= 300:
Expand Down Expand Up @@ -578,13 +586,13 @@ async def _write_loop(self):
p = payload.Payload(packets=packets)
r = await self._send_request(
'POST', self.base_url, body=p.encode(),
headers={'Content-Type': 'application/octet-stream'},
headers={'Content-Type': 'text/plain'},
timeout=self.request_timeout)
for pkt in packets:
self.queue.task_done()
if r is None:
if r is None or isinstance(r, str):
self.logger.warning(
'Connection refused by the server, aborting')
r or 'Connection refused by the server, aborting')
break
if r.status < 200 or r.status >= 300:
self.logger.warning('Unexpected status code %s in server '
Expand Down
19 changes: 14 additions & 5 deletions libs/engineio/asyncio_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class AsyncServer(server.Server):
is a grace period added by the server.
:param ping_timeout: The time in seconds that the client waits for the
server to respond before disconnecting. The default
is 5 seconds.
is 20 seconds.
:param max_http_buffer_size: The maximum size of a message when using the
polling transport. The default is 1,000,000
bytes.
Expand Down Expand Up @@ -63,6 +63,9 @@ class AsyncServer(server.Server):
:param async_handlers: If set to ``True``, run message event handlers in
non-blocking threads. To run handlers synchronously,
set to ``False``. The default is ``True``.
:param transports: The list of allowed transports. Valid transports
are ``'polling'`` and ``'websocket'``. Defaults to
``['polling', 'websocket']``.
:param kwargs: Reserved for future extensions, any additional parameters
given as keyword arguments will be silently ignored.
"""
Expand Down Expand Up @@ -213,6 +216,13 @@ async def handle_request(self, *args, **kwargs):
jsonp = False
jsonp_index = None

# make sure the client uses an allowed transport
transport = query.get('transport', ['polling'])[0]
if transport not in self.transports:
self._log_error_once('Invalid transport', 'bad-transport')
return await self._make_response(
self._bad_request('Invalid transport'), environ)

# make sure the client speaks a compatible Engine.IO version
sid = query['sid'][0] if 'sid' in query else None
if sid is None and query.get('EIO') != ['4']:
Expand All @@ -239,7 +249,6 @@ async def handle_request(self, *args, **kwargs):
r = self._bad_request('Invalid JSONP index number')
elif method == 'GET':
if sid is None:
transport = query.get('transport', ['polling'])[0]
# transport must be one of 'polling' or 'websocket'.
# if 'websocket', the HTTP_UPGRADE header must match.
upgrade_header = environ.get('HTTP_UPGRADE').lower() \
Expand All @@ -249,9 +258,9 @@ async def handle_request(self, *args, **kwargs):
r = await self._handle_connect(environ, transport,
jsonp_index)
else:
self._log_error_once('Invalid transport ' + transport,
'bad-transport')
r = self._bad_request('Invalid transport ' + transport)
self._log_error_once('Invalid websocket upgrade',
'bad-upgrade')
r = self._bad_request('Invalid websocket upgrade')
else:
if sid not in self.sockets:
self._log_error_once('Invalid session ' + sid, 'bad-sid')
Expand Down
12 changes: 9 additions & 3 deletions libs/engineio/asyncio_socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,18 @@ async def _upgrade_websocket(self, environ):

async def _websocket_handler(self, ws):
"""Engine.IO handler for websocket transport."""
async def websocket_wait():
data = await ws.wait()
if data and len(data) > self.server.max_http_buffer_size:
raise ValueError('packet is too large')
return data

if self.connected:
# the socket was already connected, so this is an upgrade
self.upgrading = True # hold packet sends during the upgrade

try:
pkt = await ws.wait()
pkt = await websocket_wait()
except IOError: # pragma: no cover
return
decoded_pkt = packet.Packet(encoded_packet=pkt)
Expand All @@ -162,7 +168,7 @@ async def _websocket_handler(self, ws):
await self.queue.put(packet.Packet(packet.NOOP)) # end poll

try:
pkt = await ws.wait()
pkt = await websocket_wait()
except IOError: # pragma: no cover
self.upgrading = False
return
Expand Down Expand Up @@ -204,7 +210,7 @@ async def writer():

while True:
p = None
wait_task = asyncio.ensure_future(ws.wait())
wait_task = asyncio.ensure_future(websocket_wait())
try:
p = await asyncio.wait_for(
wait_task,
Expand Down

0 comments on commit c60c751

Please sign in to comment.