Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Server-Sent Events implementation #381

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions demos/server_sent_events/message_source.py
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,72 @@
"""
Simple demo for Server-Sent Event protocol
"""

import time
import logging
import tornado.escape
import tornado.ioloop
import tornado.options
import tornado.web
import tornado.server_sent_events
import os.path

from tornado.options import define, options

define("port", default=8888, help="run on the given port", type=int)


class Application(tornado.web.Application):
def __init__(self):
handlers = [
(r"/", MainHandler),
(r"/sse-handler", MessageSourceHandler),
]
settings = dict(
cookie_secret="43oETzKXQAGaYdkL5gEmGeJJFuYh7EQnp2XdTP1o/Vo=",
template_path=os.path.join(os.path.dirname(__file__), "templates"),
xsrf_cookies=True,
autoescape=None,
)
tornado.web.Application.__init__(self, handlers, **settings)


class MainHandler(tornado.web.RequestHandler):
def get(self):
self.render("index.html")

class MessageSourceHandler(tornado.server_sent_events.SSEHandler):
_msg_timeout = None
counter = 0

def on_open(self):
print 'connection %s opened'%self.connection_id

self.write_message('connection_id', self.connection_id)

if not MessageSourceHandler._msg_timeout:
self.send_message()

def on_close(self):
print 'connection %s closed'%self.connection_id

def send_message(self):
logging.info("sending new message")
MessageSourceHandler.counter += 1
MessageSourceHandler.write_message_to_all('message', {
'waiters': len(MessageSourceHandler._live_connections),
'counter': MessageSourceHandler.counter,
})

MessageSourceHandler._msg_timeout = tornado.ioloop.IOLoop.instance().add_timeout(time.time() + 5, self.send_message)


def main():
tornado.options.parse_command_line()
app = Application()
app.listen(options.port)
tornado.ioloop.IOLoop.instance().start()


if __name__ == "__main__":
main()
33 changes: 33 additions & 0 deletions demos/server_sent_events/templates/index.html
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,33 @@
<!doctype html>
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8"/>
<title>Server-Sent Events Demo</title>
</head>
<body>
<ul id="messages">
</ul>

<script src="https://ajax.googleapis.com/ajax/libs/jquery/1.6.4/jquery.js"></script>
<script>
source = new EventSource('/sse-handler');

source.onopen = function(e){
$('<li>is open.</li>').appendTo('#messages');
}

source.onerror = function(e){
if (e.eventPhase == EventSource.CLOSED) {
$('<li>is closed</li>').appendTo('#messages');
} else {
$('<li>'+e.eventPhase+'</li>').appendTo('#messages');
console.log(e);
}
}

source.onmessage = function(e){
$('<li>'+e.lastEventId+': '+e.data+'</li>').appendTo('#messages');
}
</script>
</body>
</html>
80 changes: 80 additions & 0 deletions tornado/server_sent_events.py
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,80 @@
import time
import tornado.web
import tornado.escape
import tornado.ioloop
import tornado.web
import hashlib

class SSEHandler(tornado.web.RequestHandler):
_closing_timeout = False
_live_connections = [] # Yes, this list is declared here because it is used by the class methods

def __init__(self, application, request, **kwargs):
super(SSEHandler, self).__init__(application, request, **kwargs)
self.stream = request.connection.stream
self._closed = False

def initialize(self):
self.set_header('Content-Type','text/event-stream; charset=utf-8')
self.set_header('Cache-Control','no-cache')
self.set_header('Connection','keep-alive')

def generate_id(self):
return hashlib.md5('%s-%s-%s'%(
self.request.connection.address[0],
self.request.connection.address[1],
time.time(),
)).hexdigest()

@tornado.web.asynchronous
def get(self):
# Sending the standard headers
headers = self._generate_headers()
self.write(headers); self.flush()

# Adding the current client instance to the live handlers pool
self.connection_id = self.generate_id()
SSEHandler._live_connections.append(self)

# Calling the open event
self.on_open()

def on_open(self, *args, **kwargs):
"""Invoked for a new connection opened."""
pass

def on_close(self):
"""Invoked when the connection for this instance is closed."""
pass

def close(self):
"""Closes the connection for this instance"""
if not self._closed and not getattr(self, '_closing_timeout', None):
self._closed = True
self._closing_timeout = tornado.ioloop.IOLoop.instance().add_timeout(time.time() + 5, self._abort)
else:
tornado.ioloop.IOLoop.instance().remove_timeout(self._closing_timeout)
self.on_close() # Calling the closing event
self.stream.close()

def _abort(self):
"""Instantly aborts the connection by closing the socket"""
self._closed = True
self.stream.close()

@classmethod
def write_message_to_all(cls, event_id, data):
"""Sends a message to all live connections"""
for conn in cls._live_connections:
conn.write_message(event_id, data)

@tornado.web.asynchronous
def write_message(self, event_id, data):
message = tornado.escape.utf8(('id: %s\n'%event_id if event_id else '') + 'data: %s\n\n'%data)
self.write(message)
self.flush()

def remove_connection(self):
if self in SSEHandler._live_connections:
SSEHandler._live_connections.remove(self)