Skip to content

Commit

Permalink
uniquely identify events
Browse files Browse the repository at this point in the history
  • Loading branch information
zbyte64 committed Apr 4, 2013
1 parent 0550646 commit 1efbf81
Show file tree
Hide file tree
Showing 11 changed files with 46 additions and 33 deletions.
14 changes: 10 additions & 4 deletions eventsocket/listeners.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import uuid

from hyperadmin.signals import endpoint_event, resource_event

from eventsocket.loading import get_subscribers
Expand All @@ -9,14 +11,18 @@ def publish_endpoint_event(endpoint, event, item_list, **kwargs):
Pushes the endpoint event to the matching registered subscribers
'''
subscribers = get_subscribers(endpoint, event)
for subscriber in subscribers:
subscriber.notify(endpoint, event, item_list)
if subscribers:
event_id = uuid.uuid1()
for subscriber in subscribers:
subscriber.notify(endpoint, event, item_list, event_id=event_id)

@resource_event.connect
def publish_resource_event(resource, event, item_list, **kwargs):
'''
Pushes the resource event to the matching registered subscribers
'''
subscribers = get_subscribers(resource, event)
for subscriber in subscribers:
subscriber.notify(resource, event, item_list)
if subscribers:
event_id = uuid.uuid1()
for subscriber in subscribers:
subscriber.notify(resource, event, item_list, event_id=event_id)
2 changes: 1 addition & 1 deletion eventsocket/publishers/adminlink.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def __init__(self, site, endpoint, method='POST', url_args=[], url_kwargs={}, **
self.url_kwargs = url_kwargs
super(HyperadminLinkPublisher, self).__init__(**kwargs)

def publish(self, event, message):
def publish(self, event, message, event_id):
endpoint = self.get_endpoint()

data = MultiValueDict(parse_qsl(message))
Expand Down
12 changes: 6 additions & 6 deletions eventsocket/publishers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,19 @@ def get_id(self):
def get_logger(self):
return logging.getLogger(__name__)

def push(self, transformer, event, message):
def push(self, transformer, event, message, event_id):
'''
Schedules the message to be sent to the publisher
'''
if self.schedule:
return schedule_push(transformer, self, event, message)
return schedule_push(transformer, self, event, message, event_id)
else:
return execute_push(transformer.get_id(), self.get_id(), event, message)
return execute_push(transformer.get_id(), self.get_id(), event, message, event_id)

def _push(self, event, message):
return self.publish(event, message)
def _push(self, event, message, event_id):
return self.publish(event, message, event_id)

def publish(self, event, message):
def publish(self, event, message, event_id):
'''
Send the mesage to the publisher
'''
Expand Down
2 changes: 1 addition & 1 deletion eventsocket/publishers/djangocache.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ def __init__(self, cache_name, cache_key, **kwargs):
self.cache_key = cache_key
super(DjangoCachePublisher, self).__init__(**kwargs)

def publish(self, event, message):
def publish(self, event, message, event_id):
self.cache.set(self.cache_key, message)
5 changes: 2 additions & 3 deletions eventsocket/publishers/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ def __init__(self, elasticsearch_url, index_name, **kwargs):
self.index_name = index_name
self.connection = ElasticSearch(self.elasticsearch_url)

def publish(self, event, message):
#id=id #CONSIDER: shouldn't messages have uuids?
response = self.connection.index(self.index_name, event, message)
def publish(self, event, message, event_id):
response = self.connection.index(self.index_name, event, message, id=event_id)
return response
2 changes: 1 addition & 1 deletion eventsocket/publishers/redispub.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@ def __init__(self, channel, host='localhost', port=6379, db=0, **kwargs):
def get_connection(self):
return redis.Redis(connection_pool=self.pool)

def publish(self, event, message):
def publish(self, event, message, event_id):
connection = self.get_connection()
connection.publish(self.channel, message)
2 changes: 1 addition & 1 deletion eventsocket/publishers/webhook.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def __init__(self, webhook_url, method='POST', **kwargs):
self.webhook_url = webhook_url
self.method = method

def publish(self, event, message):
def publish(self, event, message, event_id):
action = getattr(requests, self.method.lower())
response = action(self.webhook_url, data=message, allow_redirects=False)
return response
Expand Down
12 changes: 10 additions & 2 deletions eventsocket/subscribers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,19 +64,27 @@ def matches_event(self, event):
'''
return bool(self.event.match(event))

def notify(self, endpoint, event, item_list):
def notify(self, endpoint, event, item_list, event_id):
'''
Receives event, serializes and schedules for publishing
:param endpoint: A hyperadmin endpoint bound to an api request
:param event: A string representing the event
:param item_list: A list of hypermedia items
:param event_id: A unique identifier representing the originating event
'''
message = self.serialize(endpoint, event, item_list)
publisher = self.get_publisher()
transformer = self.get_transformer()
return publisher.push(transformer, event, message)
return publisher.push(transformer, event, message, event_id)

def serialize(self, endpoint, event, item_list):
'''
Returns the serialized message
'''
#TODO:
#datatap = endpoint.get_datatap(instream=item_list)
#return JSONDataTap(instream=datatap) #dt.store(stream)
serializable_items = self.serialize_items(item_list)
message = serializable_items
return json.dumps(message, cls=HyperadminJSONEncoder)
Expand Down
20 changes: 10 additions & 10 deletions eventsocket/tasks.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from eventsocket.loading import get_publisher, get_transformer


def schedule_push(transformer, publisher, event, message):
def schedule_push(transformer, publisher, event, message, event_id):
from eventsocket.app_settings import TASK_WORKER
return TASK_WORKER.schedule_push(transformer.get_ident(), publisher.get_ident(), event, message)
return TASK_WORKER.schedule_push(transformer.get_ident(), publisher.get_ident(), event, message, event_id)

def execute_push(transformer_ident, publisher_ident, event, message):
def execute_push(transformer_ident, publisher_ident, event, message, event_id):
'''
The actual function to be executed by the task to publish the message
Expand All @@ -14,27 +14,27 @@ def execute_push(transformer_ident, publisher_ident, event, message):
transformer = get_transformer(transformer_ident)
publisher = get_publisher(publisher_ident)
event, message = transformer.transform(event, message)
return publisher._push(event, message)
return publisher._push(event, message, event_id)

class PublishTasks(object):
def schedule_push(self, transformer_ident, publisher_ident, event, message):
return execute_push(transformer_ident, publisher_ident, event, message)
def schedule_push(self, transformer_ident, publisher_ident, event, message, event_id):
return execute_push(transformer_ident, publisher_ident, event, message, event_id)

class ZTaskPublishTasks(PublishTasks):
def __init__(self):
super(ZTaskPublishTasks, self).__init__()
from django_ztask.decorators import task
self._execute_push = task()(execute_push)

def schedule_push(self, transformer_ident, publisher_ident, event, message):
return self._execute_push.async(transformer_ident=transformer_ident, publisher_ident=publisher_ident, event=event, message=message)
def schedule_push(self, transformer_ident, publisher_ident, event, message, event_id):
return self._execute_push.async(transformer_ident=transformer_ident, publisher_ident=publisher_ident, event=event, message=message, event_id=event_id)

class CeleryPublishTasks(PublishTasks):
def __init__(self):
super(CeleryPublishTasks, self).__init__()
from celery.task import task
self._execute_push = task(execute_push, ignore_result=True)

def schedule_push(self, transformer_ident, publisher_ident, event, message):
return self._execute_push.delay(transformer_ident=transformer_ident, publisher_ident=publisher_ident, event=event, message=message)
def schedule_push(self, transformer_ident, publisher_ident, event, message, event_id):
return self._execute_push.delay(transformer_ident=transformer_ident, publisher_ident=publisher_ident, event=event, message=message, event_id=event_id)

4 changes: 2 additions & 2 deletions eventsocket/tests/test_publishers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def setUp(self):
self.publisher = self.make_publisher()

def test_publish(self):
message = self.publisher.publish('event', 'message')
message = self.publisher.publish('event', 'message', 'uniqueid')
self.assertEqual(message, 'message')

class TestHyperadminLinkPublisher(TestCase):
Expand All @@ -22,7 +22,7 @@ def setUp(self):
self.publisher = self.make_publisher(cls=HyperadminLinkPublisher, site='hyperadmin', endpoint='admin_auth_group_add')

def test_publish(self):
response = self.publisher.publish('event', 'name=testgroup')
response = self.publisher.publish('event', 'name=testgroup', 'uniqueid')
self.assertEqual(response.status_code, 303)
self.assertTrue(response.has_header('Location'))

4 changes: 2 additions & 2 deletions eventsocket/tests/test_stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def test_simple_event(self):
endpoint = None
event = 'someevent'
item_list = []
response = self.subscriber.notify(endpoint, event, item_list)
response = self.subscriber.notify(endpoint, event, item_list, 'uniqueid')
#the default behavior is to pass back a receipt representing the message sent
self.assertEqual(response, [])

Expand All @@ -40,7 +40,7 @@ def test_simple_event(self):
endpoint = None
event = 'someevent'
item_list = [MockedItem(name='testgroup2')]
response = self.subscriber.notify(endpoint, event, item_list)
response = self.subscriber.notify(endpoint, event, item_list, 'uniqueid')
self.assertEqual(response.status_code, 303)
self.assertTrue(response.has_header('Location'))

0 comments on commit 1efbf81

Please sign in to comment.