Skip to content

Commit

Permalink
http: fix websocket broadcasts in Tornado 5.x (#1796)
Browse files Browse the repository at this point in the history
In Tornado 4.x and earlier, IOLoop.current() would return the global IOLoop
if there was no IOLoop already running in the calling thread. This was the
case when calling our websocket broadcast method from the HttpFrontend thread
and so callbacks were correctly scheduled on the HttpServer thread's IOLoop.
However, in Tornado 5.0+, the idea of a global IOLoop is gone and calling
IOLoop.current() will simply create a new IOLoop if there isn't one already
running in the calling thread.
This incorrectly resulted in callbacks being scheduled on that new IOLoop
which is never actually started and so the broadcasts were never sent.

This is related to #1716.

(cherry picked from commit 59a3935)
  • Loading branch information
kingosticks authored and jodal committed Sep 30, 2019
1 parent 5bcb4ca commit c5a62f9
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 13 deletions.
6 changes: 3 additions & 3 deletions mopidy/http/actor.py
Expand Up @@ -76,14 +76,14 @@ def on_stop(self):
self.server.stop()

def on_event(self, name, **data):
on_event(name, **data)
on_event(name, self.server.io_loop, **data)


def on_event(name, **data):
def on_event(name, io_loop, **data):
event = data
event['event'] = name
message = json.dumps(event, cls=models.ModelJSONEncoder)
handlers.WebSocketHandler.broadcast(message)
handlers.WebSocketHandler.broadcast(message, io_loop)


class HttpServer(threading.Thread):
Expand Down
7 changes: 3 additions & 4 deletions mopidy/http/handlers.py
Expand Up @@ -98,14 +98,13 @@ class WebSocketHandler(tornado.websocket.WebSocketHandler):
clients = set()

@classmethod
def broadcast(cls, msg):
loop = tornado.ioloop.IOLoop.current()

def broadcast(cls, msg, io_loop):
# This can be called from outside the Tornado ioloop, so we need to
# safely cross the thread boundary by adding a callback to the loop.
for client in cls.clients:
# One callback per client to keep time we hold up the loop short
loop.add_callback(functools.partial(_send_broadcast, client, msg))
io_loop.add_callback(
functools.partial(_send_broadcast, client, msg))

def initialize(self, core, allowed_origins, csrf_protection):
self.jsonrpc = make_jsonrpc_wrapper(core)
Expand Down
7 changes: 5 additions & 2 deletions tests/http/test_events.py
Expand Up @@ -11,8 +11,11 @@
@mock.patch('mopidy.http.handlers.WebSocketHandler.broadcast')
class HttpEventsTest(unittest.TestCase):

def setUp(self):
self.io_loop = mock.Mock()

def test_track_playback_paused_is_broadcasted(self, broadcast):
actor.on_event('track_playback_paused', foo='bar')
actor.on_event('track_playback_paused', self.io_loop, foo='bar')

self.assertDictEqual(
json.loads(str(broadcast.call_args[0][0])), {
Expand All @@ -21,7 +24,7 @@ def test_track_playback_paused_is_broadcasted(self, broadcast):
})

def test_track_playback_resumed_is_broadcasted(self, broadcast):
actor.on_event('track_playback_resumed', foo='bar')
actor.on_event('track_playback_resumed', self.io_loop, foo='bar')

self.assertDictEqual(
json.loads(str(broadcast.call_args[0][0])), {
Expand Down
8 changes: 4 additions & 4 deletions tests/http/test_handlers.py
Expand Up @@ -55,7 +55,7 @@ def get_app(self):

def connection(self):
url = self.get_url('/ws').replace('http', 'ws')
return tornado.websocket.websocket_connect(url, self.io_loop)
return tornado.websocket.websocket_connect(url)

@tornado.testing.gen_test
def test_invalid_json_rpc_request_doesnt_crash_handler(self):
Expand All @@ -69,15 +69,15 @@ def test_invalid_json_rpc_request_doesnt_crash_handler(self):
@tornado.testing.gen_test
def test_broadcast_makes_it_to_client(self):
conn = yield self.connection()
handlers.WebSocketHandler.broadcast('message')
handlers.WebSocketHandler.broadcast('message', self.io_loop)
message = yield conn.read_message()
self.assertEqual(message, 'message')

@tornado.testing.gen_test
def test_broadcast_to_client_that_just_closed_connection(self):
conn = yield self.connection()
conn.stream.close()
handlers.WebSocketHandler.broadcast('message')
handlers.WebSocketHandler.broadcast('message', self.io_loop)

@tornado.testing.gen_test
def test_broadcast_to_client_without_ws_connection_present(self):
Expand All @@ -87,7 +87,7 @@ def test_broadcast_to_client_without_ws_connection_present(self):
# this has happened but we have not yet been removed from clients.
for client in handlers.WebSocketHandler.clients:
client.ws_connection = None
handlers.WebSocketHandler.broadcast('message')
handlers.WebSocketHandler.broadcast('message', self.io_loop)


class CheckOriginTests(unittest.TestCase):
Expand Down

0 comments on commit c5a62f9

Please sign in to comment.