Skip to content

Commit

Permalink
Add initial implementation of sse protocol.
Browse files Browse the repository at this point in the history
Improved version of django-sse implementation.
  • Loading branch information
Andrei Antoukh committed Jul 14, 2012
1 parent 571dfde commit fbba10a
Show file tree
Hide file tree
Showing 4 changed files with 224 additions and 2 deletions.
3 changes: 1 addition & 2 deletions .gitignore
@@ -1,4 +1,3 @@
*.log
*.pot
*.pyc
local_settings.py
*.swp
File renamed without changes.
109 changes: 109 additions & 0 deletions sse.py
@@ -0,0 +1,109 @@
# -*- coding: utf-8 -*-

from __future__ import print_function
from __future__ import unicode_literals

import functools
import re
import sys


class Sse(object):
_retry = None
_buffer = None

_rx_event = re.compile(r'^add_event_([\w\d\_]+)$', flags=re.U)

def __init__(self, default_retry=2000):
self._buffer = []
self.set_retry(default_retry)

def set_retry(self, retrynum):
"""
Set distinct retry timeout instead the default
value.
"""
self._retry = retrynum
self._buffer.append("retry: {0}\n\n".format(self._retry))

def set_event_id(self, event_id):
if event_id:
self._buffer.append("id: {0}\n\n".format(event_id))
else:
# Reset event id
self._buffer.append("id\n\n")

def reset_event_id(self):
"""
Send a reset event id.
"""
self.set_event_id(None)

def _parse_text(self, text):
# parse text if is list, tuple or set instance
if isinstance(text, (list, tuple, set)):
for item in text:
if isinstance(item, bytes):
item = item.decode(encoding)

for subitem in item.splitlines():
yield subitem

else:
if isinstance(text, bytes):
text = text.decode(encoding)

for item in text.splitlines():
yield item

def add_message(self, event, text, encoding='utf-8'):
"""
Add messaget with eventname to the buffer.
:param str event: event name
:param str/list text: event content. Must be a str or list of str
:param bool split: splits str content by lines. default(true)
"""

self._buffer.append("event: {0}\n".format(event))

for text_item in self._parse_text(text):
self._buffer.append("data: {0}\n".format(text_item))

self._buffer.append("\n")

def __getattr__(self, attr):
"""
Make a dinamic method for add messages to specific events
like event_<eventname>(text="Hello")
Examples:
response.add_foo(text="bar")
This sets event to "foo" and put "bar" as content.
"""

res = self._rx_event.search(attr)
if not res:
return super(Sse, self).__getattr__(attr)
return functools.partial(self.add_message, event=res.group(1))

def __str__(self):
if sys.version_info[0] >= 3: # Python 3
return self.__unicode__()
return self.__unicode__().encode('utf8')

def __unicode__(self):
return "".join(self._buffer)

def flush(self):
"""
Reset the internal buffer to initial state.
"""
self._buffer = []

def __iter__(self):
for item in self._buffer:
yield item

self.flush()
114 changes: 114 additions & 0 deletions tests.py
@@ -0,0 +1,114 @@
# -*- coding: utf-8 -*-

from __future__ import print_function
from __future__ import unicode_literals

import unittest
import sys

def to_unicode(data):
if sys.version_info[0] >= 3: # Python 3
return str(data)
return unicode(data)

from sse import Sse


class ServerSentEventsProtocolTests(unittest.TestCase):
def test_constructor(self):
self.assertEqual(list(Sse()), ['retry: 2000\n\n'])
self.assertEqual(list(Sse(default_retry=1000)), ['retry: 1000\n\n'])

def test_add_message__simple_text(self):
sse = Sse()

sse.add_message("foo", "foo-message")
sse.add_message("bar", "bar-message")

self.assertEqual(to_unicode(sse), "retry: 2000\n\nevent: foo\ndata: "
"foo-message\n\nevent: bar\ndata: "
"bar-message\n\n")

self.assertEqual(list(sse), [
'retry: 2000\n\n',
'event: foo\n',
'data: foo-message\n',
'\n',
'event: bar\n',
'data: bar-message\n',
'\n'
])

def test_add_message__simple_text_split(self):
sse = Sse()
sse.add_message("foo", "foo\nmessage")
sse.add_message("bar", "bar\nmessage")

self.assertEqual(list(sse), [
'retry: 2000\n\n',
'event: foo\n',
'data: foo\n',
'data: message\n',
'\n',
'event: bar\n',
'data: bar\n',
'data: message\n',
'\n'
])

def test_add_message__list(self):
sse = Sse()

sse.add_message("foo", ["foo-message"])
sse.add_message("bar", ["bar-message"])

self.assertEqual(list(sse), [
'retry: 2000\n\n',
'event: foo\n',
'data: foo-message\n',
'\n',
'event: bar\n',
'data: bar-message\n',
'\n'
])

def test_add_message__list_split(self):
sse = Sse()
sse.add_message("foo", ["foo\nmessage"])
sse.add_message("bar", ["bar\nmessage"])

self.assertEqual(list(sse), [
'retry: 2000\n\n',
'event: foo\n',
'data: foo\n',
'data: message\n',
'\n',
'event: bar\n',
'data: bar\n',
'data: message\n',
'\n'
])

def test_dinamic_methods(self):
sse = Sse()
sse.add_event_foo(text="bar")

self.assertEqual(list(sse), ['retry: 2000\n\n', 'event: foo\n', 'data: bar\n', '\n'])

def test_flush(self):
sse = Sse()
sse.add_message("foo", "bar")

sse.flush()
self.assertEqual(len(sse._buffer), 0)

def test_flush_on_iter(self):
sse = Sse()
sse.add_message("foo", "bar")

self.assertEqual(list(sse), ['retry: 2000\n\n', 'event: foo\n', 'data: bar\n', '\n'])
self.assertEqual(list(sse), [])


if __name__ == "__main__":
unittest.main()

0 comments on commit fbba10a

Please sign in to comment.