Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

All tests working. Sending real email using UNS via a script

  • Loading branch information...
commit e5996b4970690f72943d9cc4e1daf51c5c73087a 2 parents 6d8baf1 + 2588998
swarbhanu swarbhanu authored
5 ion/processes/data/transforms/notification_worker.py
View
@@ -99,13 +99,10 @@ def process_event(self, msg, headers):
user_ids = []
if self.reverse_user_info:
+ log.debug("came here to check the interested users in %s" % msg.type_)
user_ids = check_user_notification_interest(event = msg, reverse_user_info = self.reverse_user_info)
- log.debug("Type of event received by notification worker: %s" % msg.type_)
- log.debug("Event received by notification worker: %s" % msg)
log.debug("Notification worker deduced the following users were interested in the event: %s, event_type: %s, origin: %s" % (user_ids, msg.type_, msg.origin ))
- log.debug("Using the smtp client server: %s" % self.smtp_client)
-
#------------------------------------------------------------------------------------
# Send email to the users
#------------------------------------------------------------------------------------
86 ion/services/dm/presentation/test/user_notification_test.py
View
@@ -81,7 +81,7 @@ def setUp(self):
self.user_notification.smtp_server = 'smtp_server'
self.user_notification.smtp_client = 'smtp_client'
self.user_notification.event_publisher = EventPublisher()
- self.user_notification.event_processor = EmailEventProcessor('an_smtp_client')
+ self.user_notification.event_processor = EmailEventProcessor()
def test_create_notification(self):
'''
@@ -101,7 +101,7 @@ def test_create_notification(self):
self.user_notification.notifications = {}
self.user_notification.event_processor.add_notification_for_user = mocksignature(self.user_notification.event_processor.add_notification_for_user)
-
+ self.user_notification.update_user_info_dictionary = mocksignature(self.user_notification.update_user_info_dictionary)
self.user_notification.event_publisher.publish_event = mocksignature(self.user_notification.event_publisher.publish_event)
self.user_notification._notification_in_notifications = mocksignature(self.user_notification._notification_in_notifications)
@@ -219,8 +219,8 @@ def test_delete_user_notification(self):
notification_id = 'notification_id_1'
- self.user_notification.event_processor.stop_notification_subscriber = mocksignature(self.user_notification.event_processor.stop_notification_subscriber)
self.user_notification.event_publisher.publish_event = mocksignature(self.user_notification.event_publisher.publish_event)
+ self.user_notification.user_info = {}
#-------------------------------------------------------------------------------------------------------------------
# Create a notification object
@@ -327,8 +327,6 @@ def test_pub_reload_user_info_event(self):
Test that the publishing of reload user info event occurs every time a create, update
or delete notification occurs.
'''
- proc1 = self.container.proc_manager.procs_by_name['user_notification']
-
#--------------------------------------------------------------------------------------
# Create subscribers for reload events
#--------------------------------------------------------------------------------------
@@ -492,23 +490,23 @@ def test_user_info_UNS(self):
notification_request_2 = self.rrc.read(notification_id_2)
# check user_info dictionary
- self.assertEquals(proc1.event_processor.user_info[user_id_1]['user_contact'].email, 'user_1@gmail.com' )
- self.assertEquals(proc1.event_processor.user_info[user_id_1]['notifications'], [notification_request_correct])
+ self.assertEquals(proc1.user_info[user_id_1]['user_contact'].email, 'user_1@gmail.com' )
+ self.assertEquals(proc1.user_info[user_id_1]['notifications'], [notification_request_correct])
- self.assertEquals(proc1.event_processor.user_info[user_id_2]['user_contact'].email, 'user_2@gmail.com' )
- self.assertEquals(proc1.event_processor.user_info[user_id_2]['notifications'], [notification_request_2])
+ self.assertEquals(proc1.user_info[user_id_2]['user_contact'].email, 'user_2@gmail.com' )
+ self.assertEquals(proc1.user_info[user_id_2]['notifications'], [notification_request_2])
- self.assertEquals(proc1.event_processor.reverse_user_info['event_origin']['instrument_1'], [user_id_1])
- self.assertEquals(proc1.event_processor.reverse_user_info['event_origin']['instrument_2'], [user_id_2])
+ self.assertEquals(proc1.reverse_user_info['event_origin']['instrument_1'], [user_id_1])
+ self.assertEquals(proc1.reverse_user_info['event_origin']['instrument_2'], [user_id_2])
- self.assertEquals(proc1.event_processor.reverse_user_info['event_type']['ResourceLifecycleEvent'], [user_id_1])
- self.assertEquals(proc1.event_processor.reverse_user_info['event_type']['DetectionEvent'], [user_id_2])
+ self.assertEquals(proc1.reverse_user_info['event_type']['ResourceLifecycleEvent'], [user_id_1])
+ self.assertEquals(proc1.reverse_user_info['event_type']['DetectionEvent'], [user_id_2])
- self.assertEquals(proc1.event_processor.reverse_user_info['event_subtype']['subtype_1'], [user_id_1])
- self.assertEquals(proc1.event_processor.reverse_user_info['event_subtype']['subtype_2'], [user_id_2])
+ self.assertEquals(proc1.reverse_user_info['event_subtype']['subtype_1'], [user_id_1])
+ self.assertEquals(proc1.reverse_user_info['event_subtype']['subtype_2'], [user_id_2])
- self.assertEquals(proc1.event_processor.reverse_user_info['event_origin_type']['type_1'], [user_id_1])
- self.assertEquals(proc1.event_processor.reverse_user_info['event_origin_type']['type_2'], [user_id_2])
+ self.assertEquals(proc1.reverse_user_info['event_origin_type']['type_1'], [user_id_1])
+ self.assertEquals(proc1.reverse_user_info['event_origin_type']['type_2'], [user_id_2])
log.debug("The event processor received the notification topics after a create_notification() for two users")
log.debug("Verified that the event processor correctly updated its user info dictionaries")
@@ -519,21 +517,29 @@ def test_user_info_UNS(self):
self.unsc.create_notification(notification=notification_request_2, user_id=user_id_1)
# Check in UNS ------------>
- self.assertEquals(proc1.event_processor.user_info[user_id_1]['user_contact'].email, 'user_1@gmail.com' )
+ self.assertEquals(proc1.user_info[user_id_1]['user_contact'].email, 'user_1@gmail.com' )
+
+ notifications = proc1.user_info[user_id_1]['notifications']
+ origins = []
+ event_types = []
+ for notific in notifications:
+ origins.append(notific.origin)
+ event_types.append(notific.event_type)
- self.assertEquals(proc1.event_processor.user_info[user_id_1]['notifications'], [notification_request_correct, notification_request_2])
+ self.assertEquals(set(origins), set(['instrument_1', 'instrument_2']))
+ self.assertEquals(set(event_types), set(['ResourceLifecycleEvent', 'DetectionEvent']))
- self.assertEquals(proc1.event_processor.reverse_user_info['event_origin']['instrument_1'], [user_id_1])
- self.assertEquals(set(proc1.event_processor.reverse_user_info['event_origin']['instrument_2']), set([user_id_2, user_id_1]))
+ self.assertEquals(proc1.reverse_user_info['event_origin']['instrument_1'], [user_id_1])
+ self.assertEquals(set(proc1.reverse_user_info['event_origin']['instrument_2']), set([user_id_2, user_id_1]))
- self.assertEquals(proc1.event_processor.reverse_user_info['event_type']['ResourceLifecycleEvent'], [user_id_1])
- self.assertEquals(set(proc1.event_processor.reverse_user_info['event_type']['DetectionEvent']), set([user_id_2, user_id_1]))
+ self.assertEquals(proc1.reverse_user_info['event_type']['ResourceLifecycleEvent'], [user_id_1])
+ self.assertEquals(set(proc1.reverse_user_info['event_type']['DetectionEvent']), set([user_id_2, user_id_1]))
- self.assertEquals(proc1.event_processor.reverse_user_info['event_subtype']['subtype_1'], [user_id_1])
- self.assertEquals(set(proc1.event_processor.reverse_user_info['event_subtype']['subtype_2']), set([user_id_2, user_id_1]))
+ self.assertEquals(proc1.reverse_user_info['event_subtype']['subtype_1'], [user_id_1])
+ self.assertEquals(set(proc1.reverse_user_info['event_subtype']['subtype_2']), set([user_id_2, user_id_1]))
- self.assertEquals(proc1.event_processor.reverse_user_info['event_origin_type']['type_1'], [user_id_1])
- self.assertEquals(set(proc1.event_processor.reverse_user_info['event_origin_type']['type_2']), set([user_id_2, user_id_1]))
+ self.assertEquals(proc1.reverse_user_info['event_origin_type']['type_1'], [user_id_1])
+ self.assertEquals(set(proc1.reverse_user_info['event_origin_type']['type_2']), set([user_id_2, user_id_1]))
log.debug("The event processor received the notification topics after another create_notification() for the first user")
log.debug("Verified that the event processor correctly updated its user info dictionaries")
@@ -552,11 +558,11 @@ def test_user_info_UNS(self):
notification_request_correct = self.rrc.read(notification_id_1)
# check that the updated notification is in the user info dictionary
- self.assertTrue(notification_request_correct in proc1.event_processor.user_info[user_id_1]['notifications'] )
+ self.assertTrue(notification_request_correct in proc1.user_info[user_id_1]['notifications'] )
# check that the notifications in the user info dictionary got updated
update_worked = False
- for notification in proc1.event_processor.user_info[user_id_1]['notifications']:
+ for notification in proc1.user_info[user_id_1]['notifications']:
if notification.origin == "newly_changed_instrument":
update_worked = True
break
@@ -564,7 +570,7 @@ def test_user_info_UNS(self):
self.assertTrue(update_worked)
# reverse_user_info
- self.assertTrue(user_id_1 in proc1.event_processor.reverse_user_info['event_origin']["newly_changed_instrument"])
+ self.assertTrue(user_id_1 in proc1.reverse_user_info['event_origin']["newly_changed_instrument"])
log.debug("Verified that the event processor correctly updated its user info dictionaries after an update_notification()")
@@ -681,8 +687,24 @@ def received_reload(msg, headers):
notification_request_2 = self.rrc.read(notification_id_2)
-
- self.assertEquals(reloaded_user_info[user_id]['notifications'], [notification_request_correct, notification_request_2] )
+ #--------------------------------------------------------------------------------------------------------------------------
+ # Check that the two notifications created for the same user got properly reloaded in the user_info dictionaries of the workers
+ #--------------------------------------------------------------------------------------------------------------------------
+ notifications = reloaded_user_info[user_id]['notifications']
+ origins = []
+ event_types = []
+ for notific in notifications:
+ origins.append(notific.origin)
+ event_types.append(notific.event_type)
+
+ shouldbe_origins = []
+ shouldbe_event_types = []
+ for notific in [notification_request_correct, notification_request_2]:
+ shouldbe_origins.append(notific.origin)
+ shouldbe_event_types.append(notific.event_type)
+
+ self.assertEquals(set(origins), set(shouldbe_origins))
+ self.assertEquals(set(event_types), set(shouldbe_event_types))
self.assertEquals(reloaded_reverse_user_info['event_origin']['instrument_1'], [user_id] )
self.assertEquals(reloaded_reverse_user_info['event_origin']['instrument_2'], [user_id] )
247 ion/services/dm/presentation/user_notification_service.py
View
@@ -12,7 +12,6 @@
from pyon.util.async import spawn
from pyon.util.log import log
from pyon.util.containers import DotDict
-from pyon.datastore.datastore import DatastoreManager
from pyon.event.event import EventPublisher, EventSubscriber
from interface.services.dm.idiscovery_service import DiscoveryServiceClient
from interface.services.coi.iresource_registry_service import ResourceRegistryServiceClient
@@ -51,92 +50,16 @@
list of event subscribers (only one for LCA) that listen for the events in the notification.
"""
-class NotificationSubscription(object):
- """
- Ties a notification's info to it's event subscriber
- """
-
- def __init__(self, notification_request=None, callback=None):
- self._res_obj = notification_request # The Notification Request Resource Object
- self.subscriber = EventSubscriber( origin=notification_request.origin,
- origin_type = notification_request.origin_type,
- event_type=notification_request.event_type,
- sub_type=notification_request.event_subtype,
- callback=callback)
- self.notification_subscription_id = None
-
- def set_notification_id(self, id_=None):
- """
- Set the notification id of the notification object
- @param notification id
- """
- self.notification_subscription_id = id_
-
- def activate(self):
- """
- Start subscribing
- """
- self.subscriber.start()
- def deactivate(self):
- """
- Stop subscribing
- """
- self.subscriber.stop()
-
-class EventProcessor(object):
+class EmailEventProcessor(object):
"""
- The Event Processor is the object that knows all about the user's subscriptions. There will be one event
- processor in the system.
+ A class that helps to get user subscribed to notifications
"""
def __init__(self):
-
- #---------------------------------------------------------------------------------------------------
- # Dictionaries that maintain information about users and their subscribed notifications
- # The user_info dictionary is loaded from the User Info Base (stored in couchdb)
- # The reverse_user_info is calculated from the user_info dictionary
- #---------------------------------------------------------------------------------------------------
-
- # user_info = {'user_id' : [list of notifications]}
- self.user_info = {}
- self.reverse_user_info = {}
-
# the resource registry
self.rr = ResourceRegistryServiceClient()
- def stop_notification_subscriber(self, notification_request):
- """
- Stops the subscriber of a notification
-
- @param notification_request NotificationRequest
- """
-
- for val in self.user_info.itervalues():
- if notification_request in val['notifications']:
- notification_subscription = val['notification_subscriptions'][notification_request._id]
- notification_subscription.deactivate()
- # once the subscription is deactivated, exit, so as not to try deactivating an already deactivated subscription
- return
-
- def __str__(self):
- return str(self.__dict__)
-
-
-
-class EmailEventProcessor(EventProcessor):
- """
- Contains email related info.
- """
-
- def __init__(self, smtp_client):
- """
- Contain information about the smtp_client being used for that user
- """
- super(EmailEventProcessor, self).__init__()
- self.smtp_client = smtp_client
- self.notification_subscriptions = []
-
def add_notification_for_user(self, notification_request, user_id):
"""
Add a notification to the user's list of subscribed notifications
@@ -144,43 +67,11 @@ def add_notification_for_user(self, notification_request, user_id):
@param user_id str
"""
- #---------------------------------------------------------------------------------------------------
- # Make a callback function that is right for the user
- #---------------------------------------------------------------------------------------------------
-
- def callback(message, headers):
- """
- This callback is given to all the event subscribers that this user wants notifications for.
- If this callback gets called the user in this processor should get an email
- """
-
- # find the email address of the user
- user = self.rr.read(user_id)
- msg_recipient = user.contact.email
-
- # send email to the user
- send_email(message, msg_recipient, self.smtp_client)
-
user = self.rr.read(user_id)
- #---------------------------------------------------------------------------------------------------
# Add the notification into the user info object
- #---------------------------------------------------------------------------------------------------
-
user = self.put_notification_in_user_object(user, notification_request)
- #---------------------------------------------------------------------------------------------------
- # Add a notification to the list of subscribed notifications for the user
- #---------------------------------------------------------------------------------------------------
-
- notification_subscription = self._add_callback_to_notification(notification_request, callback)
-
- #---------------------------------------------------------------------------------------------------
- # Update the user_info dictionary and also calculate the reverse user info dictionary
- #---------------------------------------------------------------------------------------------------
-
- self.update_user_info_dictionary(user, notification_subscription)
-
# return the updated user object
return user
@@ -208,63 +99,6 @@ def put_notification_in_user_object(self, user, notification_request):
return user
-
- def update_user_info_dictionary(self, user, notification_subscription):
- """
- Update the user info and reverse user info dictionaries.
- @param user UserInfo object
- @param notification_subscription NotificationSubscription object
- """
-
- if self.user_info.has_key(user._id):
- # the user is already known by the EventProcessor
- self.user_info[user._id]['notifications'].append(notification_subscription._res_obj)
- self.user_info[user._id]['notification_subscriptions'][notification_subscription._res_obj._id] = notification_subscription
- else:
- # it is a new user
- self.user_info[user._id] = { 'user_contact' : user.contact,
- 'notifications' : [notification_subscription._res_obj],
- 'notification_subscriptions' : { notification_subscription._res_obj._id :
- notification_subscription}}
- self.reverse_user_info = calculate_reverse_user_info(self.user_info)
-
- def _add_callback_to_notification(self, notification_request=None, callback = None):
- """
- Adds a notification that this user then subscribes to.
-
- @param notification_request
- @retval notification object
- """
-
- #---------------------------------------------------------------------------------------------------
- # create and save notification in notifications list
- #---------------------------------------------------------------------------------------------------
-
- notification_subscription = NotificationSubscription(notification_request, callback)
- self.notification_subscriptions.append(notification_subscription)
-
- #---------------------------------------------------------------------------------------------------
- # start the event subscriber listening
- #---------------------------------------------------------------------------------------------------
-
- notification_subscription.activate()
-
- log.debug("EventProcessor.add_notification(): added notification " + str(notification_request))
-
- return notification_subscription
-
- def cleanup(self):
- for notif_sub in self.notification_subscriptions:
- notif_sub.deactivate()
-
- def stop_notification_subscriber(self, notification_request):
- """
- Stops the listener of the notification subscriber object
- @param notification_request NotificationRequest
- """
-
- super(EmailEventProcessor, self).stop_notification_subscriber(notification_request)
-
#----------------------------------------------------------------------------------------------------------------
# Keep this note for the time when we need to also include sms delivery via email to sms providers
# provider_email = sms_providers[provider] # self.notification.delivery_config.delivery['provider']
@@ -297,7 +131,7 @@ def on_start(self):
# Create an event processor
#---------------------------------------------------------------------------------------------------
- self.event_processor = EmailEventProcessor(self.smtp_client)
+ self.event_processor = EmailEventProcessor()
#---------------------------------------------------------------------------------------------------
# load event originators, types, and table
@@ -306,6 +140,13 @@ def on_start(self):
self.notifications = {}
#---------------------------------------------------------------------------------------------------
+ # Dictionaries that maintain information about users and their subscribed notifications
+ # The reverse_user_info is calculated from the user_info dictionary
+ #---------------------------------------------------------------------------------------------------
+ self.user_info = {}
+ self.reverse_user_info = {}
+
+ #---------------------------------------------------------------------------------------------------
# Get the clients
#---------------------------------------------------------------------------------------------------
@@ -330,12 +171,6 @@ def on_quit(self):
except IonException as ex:
log.info("Ignoring exception while cancelling schedule id (%s): %s: %s", sid, ex.__class__.__name__, ex)
- # Clean up the notification subscriptions' subscribers created in EmailEventProcessor object
- self.event_processor.cleanup()
-
-# # Close the smtp server
-# self.smtp_client.quit()
-
super(UserNotificationService, self).on_quit()
def __now(self):
@@ -428,6 +263,9 @@ def create_notification(self, notification=None, user_id=''):
user = self.event_processor.add_notification_for_user(notification_request=notification, user_id=user_id)
+ # Update the user info object with the notification
+ self.update_user_info_dictionary(user_id=user_id, new_notification=notification, old_notification=None)
+
#-------------------------------------------------------------------------------------------------------------------
# Generate an event that can be picked by a notification worker so that it can update its user_info dictionary
#-------------------------------------------------------------------------------------------------------------------
@@ -528,8 +366,6 @@ def delete_notification(self, notification_id=''):
notification_request = self.clients.resource_registry.read(notification_id)
old_notification = notification_request
- self.event_processor.stop_notification_subscriber(notification_request=notification_request)
-
#-------------------------------------------------------------------------------------------------------------------
# Update the resource registry
#-------------------------------------------------------------------------------------------------------------------
@@ -542,7 +378,7 @@ def delete_notification(self, notification_id=''):
# Update the user info dictionaries
#-------------------------------------------------------------------------------------------------------------------
- for user_id in self.event_processor.user_info.iterkeys():
+ for user_id in self.user_info.iterkeys():
self.update_user_info_dictionary(user_id, notification_request, old_notification)
#-------------------------------------------------------------------------------------------------------------------
@@ -569,16 +405,14 @@ def delete_notification_from_user_info(self, notification_id):
for user_id in user_ids:
- value = self.event_processor.user_info[user_id]
+ value = self.user_info[user_id]
for notif in value['notifications']:
if notification_id == notif._id:
# remove the notification
value['notifications'].remove(notif)
- # remove the notification_subscription
- self.event_processor.user_info[user_id]['notification_subscriptions'].pop(notification_id)
- self.event_processor.reverse_user_info = calculate_reverse_user_info(self.event_processor.user_info)
+ self.reverse_user_info = calculate_reverse_user_info(self.user_info)
def find_events(self, origin='', type='', min_datetime=0, max_datetime=0, limit= -1, descending=False):
"""
@@ -741,8 +575,8 @@ def get_user_notifications(self, user_id=''):
@retval notifications list of NotificationRequest objects
"""
- if self.event_processor.user_info.has_key(user_id):
- notifications = self.event_processor.user_info[user_id]['notifications']
+ if self.user_info.has_key(user_id):
+ notifications = self.user_info[user_id]['notifications']
ret = IonObject(OT.ComputedListValue)
if notifications:
@@ -828,7 +662,7 @@ def process_batch(self, start_time = 0, end_time = 0):
if end_time <= start_time:
return
- for user_id, value in self.event_processor.user_info.iteritems():
+ for user_id, value in self.user_info.iteritems():
notifications = value['notifications']
@@ -915,7 +749,7 @@ def format_and_send_email(self, events_for_message, user_id):
self.send_batch_email( msg_body = msg_body,
msg_subject = msg_subject,
- msg_recipient=self.event_processor.user_info[user_id]['user_contact'].email,
+ msg_recipient=self.user_info[user_id]['user_contact'].email,
smtp_client=self.smtp_client )
def send_batch_email(self, msg_body, msg_subject, msg_recipient, smtp_client):
@@ -982,44 +816,31 @@ def update_user_info_object(self, user_id, new_notification, old_notification):
def update_user_info_dictionary(self, user_id, new_notification, old_notification):
+ notifications = []
+ user = self.clients.resource_registry.read(user_id)
+
#------------------------------------------------------------------------------------
- # Remove the old notifications
+ # If there was a previous notification which is being updated, check the dictionaries and update there
#------------------------------------------------------------------------------------
+ if old_notification:
+ # Remove the old notifications
+ if old_notification in self.user_info[user_id]['notifications']:
- if old_notification in self.event_processor.user_info[user_id]['notifications']:
-
- # remove from notifications list
- self.event_processor.user_info[user_id]['notifications'].remove(old_notification)
-
- #------------------------------------------------------------------------------------
- # update the notification subscription object
- #------------------------------------------------------------------------------------
-
- # get the old notification_subscription
- notification_subscription = self.event_processor.user_info[user_id]['notification_subscriptions'].pop(old_notification._id)
+ # remove from notifications list
+ self.user_info[user_id]['notifications'].remove(old_notification)
- # update that old notification subscription
- notification_subscription._res_obj = new_notification
-
- # feed the updated notification subscription back into the user info dictionary
- self.event_processor.user_info[user_id]['notification_subscriptions'][old_notification._id] = notification_subscription
-
- #------------------------------------------------------------------------------------
# find the already existing notifications for the user
- #------------------------------------------------------------------------------------
-
- notifications = self.event_processor.user_info[user_id]['notifications']
- notifications.append(new_notification)
+ if self.user_info.has_key(user_id):
+ notifications = self.user_info[user_id]['notifications']
#------------------------------------------------------------------------------------
# update the user info - contact information, notifications
#------------------------------------------------------------------------------------
- user = self.clients.resource_registry.read(user_id)
+ notifications.append(new_notification)
- self.event_processor.user_info[user_id]['user_contact'] = user.contact
- self.event_processor.user_info[user_id]['notifications'] = notifications
+ self.user_info[user_id] = {'user_contact' : user.contact, 'notifications' : notifications}
- self.event_processor.reverse_user_info = calculate_reverse_user_info(self.event_processor.user_info)
+ self.reverse_user_info = calculate_reverse_user_info(self.user_info)
def get_subscriptions(self, resource_id='', include_nonactive=False):
"""
Please sign in to comment.
Something went wrong with that request. Please try again.