Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

UNS is now working fine with UX #391

Merged
merged 19 commits into from

2 participants

@swarbhanu

No external api change was made to UNS.

The smtp client is created immediately before sending an email to avoid timeout issues when there is no activity.

The code was cleaned up so that unnecessary legacy code was cleaned up.

All tests are working.

UNS can now be used in the UX system and it has been verified that notification emails do get sent after subscriptions are created and the platform agent is in running mode.

Added description to the notification emails.

swarbhanu added some commits
@swarbhanu swarbhanu removing the notification subscriptions. They seem to be unnecessary …
…duplicate work
341087d
@swarbhanu swarbhanu Placed the user_info underneath the UNS service and not the email eve…
…nt processor it launches. Makes the code more direct. Also fixed unit tests. All unit tests passing
20b9bf5
@swarbhanu swarbhanu Fixed test_pub_reload_user_info_event method c5c65e7
@swarbhanu swarbhanu since no subscribers are created in event processor anymore, no need …
…to clean up
117209d
@swarbhanu swarbhanu fixed bug and test_user_info_UNS 006fa84
@swarbhanu swarbhanu Made test_user_info_notification_worker better. It is working 98997c3
@swarbhanu swarbhanu cleaned up. All tests working 2588998
@swarbhanu swarbhanu UNS sending emails using the data_alerts email address using mail.oce…
…anobservatories.org
e7c28cf
@swarbhanu swarbhanu added a log debug statement f4aabd1
@swarbhanu swarbhanu referencing the latest pyon commit in master d6bdf62
@swarbhanu swarbhanu using the latest ion def referenced in master 6b6925e
@swarbhanu swarbhanu Merge branch 'master' into fix_uns_for_ux 6d8baf1
@swarbhanu swarbhanu All tests working. Sending real email using UNS via a script e5996b4
@swarbhanu swarbhanu fixing so that batch email smtp clients get closed after batch emails fcfa05d
@swarbhanu swarbhanu fixed test_process_batch() and cleaned up some log debugs 56db8f7
@swarbhanu swarbhanu fixed test_batch_notifications() af7c75d
@swarbhanu swarbhanu Using the port while creating the smtp connection 9223b44
@swarbhanu swarbhanu setting up the client immediately before sending an email 62eb8be
@swarbhanu swarbhanu Added description in the email sent d13ee72
@daf
daf commented

Looks good, thanks.

One thing to note is that we'd like to remove the self.smtp_client in the future as we don't need to store it as an instance attribute but Swarbhanu tells me this would require a lot of test correction. It's safe now due to the single-threaded style execution of the callback from the endpoint/subscriber/transform code. Just something that would be nice to change to make the code more concise later on.

@daf daf merged commit d44e61c into ooici:master
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Nov 29, 2012
  1. @swarbhanu
  2. @swarbhanu

    Placed the user_info underneath the UNS service and not the email eve…

    swarbhanu authored
    …nt processor it launches. Makes the code more direct. Also fixed unit tests. All unit tests passing
  3. @swarbhanu
  4. @swarbhanu
  5. @swarbhanu
  6. @swarbhanu
  7. @swarbhanu
Commits on Dec 5, 2012
  1. @swarbhanu
  2. @swarbhanu

    added a log debug statement

    swarbhanu authored
  3. @swarbhanu
  4. @swarbhanu
  5. @swarbhanu
  6. @swarbhanu
  7. @swarbhanu
Commits on Dec 6, 2012
  1. @swarbhanu
  2. @swarbhanu
  3. @swarbhanu
  4. @swarbhanu
  5. @swarbhanu
This page is out of date. Refresh to see the latest.
View
7 ion/processes/data/transforms/notification_worker.py
@@ -38,7 +38,6 @@ def on_start(self):
self.reverse_user_info = None
self.user_info = None
- self.smtp_client = setting_up_smtp_client()
#------------------------------------------------------------------------------------
# Start by loading the user info and reverse user info dictionaries
@@ -99,19 +98,19 @@ def process_event(self, msg, headers):
user_ids = []
if self.reverse_user_info:
+ log.debug("Notification worker checking for users interested in %s" % msg.type_)
user_ids = check_user_notification_interest(event = msg, reverse_user_info = self.reverse_user_info)
- log.debug("Type of event received by notification worker: %s" % msg.type_)
- log.debug("Event received by notification worker: %s" % msg)
log.debug("Notification worker deduced the following users were interested in the event: %s, event_type: %s, origin: %s" % (user_ids, msg.type_, msg.origin ))
-
#------------------------------------------------------------------------------------
# Send email to the users
#------------------------------------------------------------------------------------
for user_id in user_ids:
msg_recipient = self.user_info[user_id]['user_contact'].email
+ self.smtp_client = setting_up_smtp_client()
send_email(message = msg, msg_recipient = msg_recipient, smtp_client = self.smtp_client )
+ self.smtp_client.quit()
def on_stop(self):
# close subscribers safely
View
86 ion/services/dm/presentation/test/user_notification_test.py
@@ -81,7 +81,7 @@ def setUp(self):
self.user_notification.smtp_server = 'smtp_server'
self.user_notification.smtp_client = 'smtp_client'
self.user_notification.event_publisher = EventPublisher()
- self.user_notification.event_processor = EmailEventProcessor('an_smtp_client')
+ self.user_notification.event_processor = EmailEventProcessor()
def test_create_notification(self):
'''
@@ -101,7 +101,7 @@ def test_create_notification(self):
self.user_notification.notifications = {}
self.user_notification.event_processor.add_notification_for_user = mocksignature(self.user_notification.event_processor.add_notification_for_user)
-
+ self.user_notification.update_user_info_dictionary = mocksignature(self.user_notification.update_user_info_dictionary)
self.user_notification.event_publisher.publish_event = mocksignature(self.user_notification.event_publisher.publish_event)
self.user_notification._notification_in_notifications = mocksignature(self.user_notification._notification_in_notifications)
@@ -219,8 +219,8 @@ def test_delete_user_notification(self):
notification_id = 'notification_id_1'
- self.user_notification.event_processor.stop_notification_subscriber = mocksignature(self.user_notification.event_processor.stop_notification_subscriber)
self.user_notification.event_publisher.publish_event = mocksignature(self.user_notification.event_publisher.publish_event)
+ self.user_notification.user_info = {}
#-------------------------------------------------------------------------------------------------------------------
# Create a notification object
@@ -327,8 +327,6 @@ def test_pub_reload_user_info_event(self):
Test that the publishing of reload user info event occurs every time a create, update
or delete notification occurs.
'''
- proc1 = self.container.proc_manager.procs_by_name['user_notification']
-
#--------------------------------------------------------------------------------------
# Create subscribers for reload events
#--------------------------------------------------------------------------------------
@@ -492,23 +490,23 @@ def test_user_info_UNS(self):
notification_request_2 = self.rrc.read(notification_id_2)
# check user_info dictionary
- self.assertEquals(proc1.event_processor.user_info[user_id_1]['user_contact'].email, 'user_1@gmail.com' )
- self.assertEquals(proc1.event_processor.user_info[user_id_1]['notifications'], [notification_request_correct])
+ self.assertEquals(proc1.user_info[user_id_1]['user_contact'].email, 'user_1@gmail.com' )
+ self.assertEquals(proc1.user_info[user_id_1]['notifications'], [notification_request_correct])
- self.assertEquals(proc1.event_processor.user_info[user_id_2]['user_contact'].email, 'user_2@gmail.com' )
- self.assertEquals(proc1.event_processor.user_info[user_id_2]['notifications'], [notification_request_2])
+ self.assertEquals(proc1.user_info[user_id_2]['user_contact'].email, 'user_2@gmail.com' )
+ self.assertEquals(proc1.user_info[user_id_2]['notifications'], [notification_request_2])
- self.assertEquals(proc1.event_processor.reverse_user_info['event_origin']['instrument_1'], [user_id_1])
- self.assertEquals(proc1.event_processor.reverse_user_info['event_origin']['instrument_2'], [user_id_2])
+ self.assertEquals(proc1.reverse_user_info['event_origin']['instrument_1'], [user_id_1])
+ self.assertEquals(proc1.reverse_user_info['event_origin']['instrument_2'], [user_id_2])
- self.assertEquals(proc1.event_processor.reverse_user_info['event_type']['ResourceLifecycleEvent'], [user_id_1])
- self.assertEquals(proc1.event_processor.reverse_user_info['event_type']['DetectionEvent'], [user_id_2])
+ self.assertEquals(proc1.reverse_user_info['event_type']['ResourceLifecycleEvent'], [user_id_1])
+ self.assertEquals(proc1.reverse_user_info['event_type']['DetectionEvent'], [user_id_2])
- self.assertEquals(proc1.event_processor.reverse_user_info['event_subtype']['subtype_1'], [user_id_1])
- self.assertEquals(proc1.event_processor.reverse_user_info['event_subtype']['subtype_2'], [user_id_2])
+ self.assertEquals(proc1.reverse_user_info['event_subtype']['subtype_1'], [user_id_1])
+ self.assertEquals(proc1.reverse_user_info['event_subtype']['subtype_2'], [user_id_2])
- self.assertEquals(proc1.event_processor.reverse_user_info['event_origin_type']['type_1'], [user_id_1])
- self.assertEquals(proc1.event_processor.reverse_user_info['event_origin_type']['type_2'], [user_id_2])
+ self.assertEquals(proc1.reverse_user_info['event_origin_type']['type_1'], [user_id_1])
+ self.assertEquals(proc1.reverse_user_info['event_origin_type']['type_2'], [user_id_2])
log.debug("The event processor received the notification topics after a create_notification() for two users")
log.debug("Verified that the event processor correctly updated its user info dictionaries")
@@ -519,21 +517,29 @@ def test_user_info_UNS(self):
self.unsc.create_notification(notification=notification_request_2, user_id=user_id_1)
# Check in UNS ------------>
- self.assertEquals(proc1.event_processor.user_info[user_id_1]['user_contact'].email, 'user_1@gmail.com' )
+ self.assertEquals(proc1.user_info[user_id_1]['user_contact'].email, 'user_1@gmail.com' )
+
+ notifications = proc1.user_info[user_id_1]['notifications']
+ origins = []
+ event_types = []
+ for notific in notifications:
+ origins.append(notific.origin)
+ event_types.append(notific.event_type)
- self.assertEquals(proc1.event_processor.user_info[user_id_1]['notifications'], [notification_request_correct, notification_request_2])
+ self.assertEquals(set(origins), set(['instrument_1', 'instrument_2']))
+ self.assertEquals(set(event_types), set(['ResourceLifecycleEvent', 'DetectionEvent']))
- self.assertEquals(proc1.event_processor.reverse_user_info['event_origin']['instrument_1'], [user_id_1])
- self.assertEquals(set(proc1.event_processor.reverse_user_info['event_origin']['instrument_2']), set([user_id_2, user_id_1]))
+ self.assertEquals(proc1.reverse_user_info['event_origin']['instrument_1'], [user_id_1])
+ self.assertEquals(set(proc1.reverse_user_info['event_origin']['instrument_2']), set([user_id_2, user_id_1]))
- self.assertEquals(proc1.event_processor.reverse_user_info['event_type']['ResourceLifecycleEvent'], [user_id_1])
- self.assertEquals(set(proc1.event_processor.reverse_user_info['event_type']['DetectionEvent']), set([user_id_2, user_id_1]))
+ self.assertEquals(proc1.reverse_user_info['event_type']['ResourceLifecycleEvent'], [user_id_1])
+ self.assertEquals(set(proc1.reverse_user_info['event_type']['DetectionEvent']), set([user_id_2, user_id_1]))
- self.assertEquals(proc1.event_processor.reverse_user_info['event_subtype']['subtype_1'], [user_id_1])
- self.assertEquals(set(proc1.event_processor.reverse_user_info['event_subtype']['subtype_2']), set([user_id_2, user_id_1]))
+ self.assertEquals(proc1.reverse_user_info['event_subtype']['subtype_1'], [user_id_1])
+ self.assertEquals(set(proc1.reverse_user_info['event_subtype']['subtype_2']), set([user_id_2, user_id_1]))
- self.assertEquals(proc1.event_processor.reverse_user_info['event_origin_type']['type_1'], [user_id_1])
- self.assertEquals(set(proc1.event_processor.reverse_user_info['event_origin_type']['type_2']), set([user_id_2, user_id_1]))
+ self.assertEquals(proc1.reverse_user_info['event_origin_type']['type_1'], [user_id_1])
+ self.assertEquals(set(proc1.reverse_user_info['event_origin_type']['type_2']), set([user_id_2, user_id_1]))
log.debug("The event processor received the notification topics after another create_notification() for the first user")
log.debug("Verified that the event processor correctly updated its user info dictionaries")
@@ -552,11 +558,11 @@ def test_user_info_UNS(self):
notification_request_correct = self.rrc.read(notification_id_1)
# check that the updated notification is in the user info dictionary
- self.assertTrue(notification_request_correct in proc1.event_processor.user_info[user_id_1]['notifications'] )
+ self.assertTrue(notification_request_correct in proc1.user_info[user_id_1]['notifications'] )
# check that the notifications in the user info dictionary got updated
update_worked = False
- for notification in proc1.event_processor.user_info[user_id_1]['notifications']:
+ for notification in proc1.user_info[user_id_1]['notifications']:
if notification.origin == "newly_changed_instrument":
update_worked = True
break
@@ -564,7 +570,7 @@ def test_user_info_UNS(self):
self.assertTrue(update_worked)
# reverse_user_info
- self.assertTrue(user_id_1 in proc1.event_processor.reverse_user_info['event_origin']["newly_changed_instrument"])
+ self.assertTrue(user_id_1 in proc1.reverse_user_info['event_origin']["newly_changed_instrument"])
log.debug("Verified that the event processor correctly updated its user info dictionaries after an update_notification()")
@@ -681,8 +687,24 @@ def received_reload(msg, headers):
notification_request_2 = self.rrc.read(notification_id_2)
+ #--------------------------------------------------------------------------------------------------------------------------
+ # Check that the two notifications created for the same user got properly reloaded in the user_info dictionaries of the workers
+ #--------------------------------------------------------------------------------------------------------------------------
+ notifications = reloaded_user_info[user_id]['notifications']
+ origins = []
+ event_types = []
+ for notific in notifications:
+ origins.append(notific.origin)
+ event_types.append(notific.event_type)
+
+ shouldbe_origins = []
+ shouldbe_event_types = []
+ for notific in [notification_request_correct, notification_request_2]:
+ shouldbe_origins.append(notific.origin)
+ shouldbe_event_types.append(notific.event_type)
- self.assertEquals(reloaded_user_info[user_id]['notifications'], [notification_request_correct, notification_request_2] )
+ self.assertEquals(set(origins), set(shouldbe_origins))
+ self.assertEquals(set(event_types), set(shouldbe_event_types))
self.assertEquals(reloaded_reverse_user_info['event_origin']['instrument_1'], [user_id] )
self.assertEquals(reloaded_reverse_user_info['event_origin']['instrument_2'], [user_id] )
@@ -1375,7 +1397,7 @@ def cleanup_timer(scheduler, schedule_id):
ar_1 = gevent.event.AsyncResult()
ar_2 = gevent.event.AsyncResult()
- def send_email(events_for_message, user_id):
+ def send_email(events_for_message, user_id, *args, **kwargs):
log.warning("(in asyncresult) events_for_message: %s" % events_for_message)
ar_1.set(events_for_message)
ar_2.set(user_id)
View
263 ion/services/dm/presentation/user_notification_service.py
@@ -12,7 +12,6 @@
from pyon.util.async import spawn
from pyon.util.log import log
from pyon.util.containers import DotDict
-from pyon.datastore.datastore import DatastoreManager
from pyon.event.event import EventPublisher, EventSubscriber
from interface.services.dm.idiscovery_service import DiscoveryServiceClient
from interface.services.coi.iresource_registry_service import ResourceRegistryServiceClient
@@ -51,92 +50,16 @@
list of event subscribers (only one for LCA) that listen for the events in the notification.
"""
-class NotificationSubscription(object):
- """
- Ties a notification's info to it's event subscriber
- """
-
- def __init__(self, notification_request=None, callback=None):
- self._res_obj = notification_request # The Notification Request Resource Object
- self.subscriber = EventSubscriber( origin=notification_request.origin,
- origin_type = notification_request.origin_type,
- event_type=notification_request.event_type,
- sub_type=notification_request.event_subtype,
- callback=callback)
- self.notification_subscription_id = None
-
- def set_notification_id(self, id_=None):
- """
- Set the notification id of the notification object
- @param notification id
- """
- self.notification_subscription_id = id_
- def activate(self):
- """
- Start subscribing
- """
- self.subscriber.start()
-
- def deactivate(self):
- """
- Stop subscribing
- """
- self.subscriber.stop()
-
-class EventProcessor(object):
+class EmailEventProcessor(object):
"""
- The Event Processor is the object that knows all about the user's subscriptions. There will be one event
- processor in the system.
+ A class that helps to get user subscribed to notifications
"""
def __init__(self):
-
- #---------------------------------------------------------------------------------------------------
- # Dictionaries that maintain information about users and their subscribed notifications
- # The user_info dictionary is loaded from the User Info Base (stored in couchdb)
- # The reverse_user_info is calculated from the user_info dictionary
- #---------------------------------------------------------------------------------------------------
-
- # user_info = {'user_id' : [list of notifications]}
- self.user_info = {}
- self.reverse_user_info = {}
-
# the resource registry
self.rr = ResourceRegistryServiceClient()
- def stop_notification_subscriber(self, notification_request):
- """
- Stops the subscriber of a notification
-
- @param notification_request NotificationRequest
- """
-
- for val in self.user_info.itervalues():
- if notification_request in val['notifications']:
- notification_subscription = val['notification_subscriptions'][notification_request._id]
- notification_subscription.deactivate()
- # once the subscription is deactivated, exit, so as not to try deactivating an already deactivated subscription
- return
-
- def __str__(self):
- return str(self.__dict__)
-
-
-
-class EmailEventProcessor(EventProcessor):
- """
- Contains email related info.
- """
-
- def __init__(self, smtp_client):
- """
- Contain information about the smtp_client being used for that user
- """
- super(EmailEventProcessor, self).__init__()
- self.smtp_client = smtp_client
- self.notification_subscriptions = []
-
def add_notification_for_user(self, notification_request, user_id):
"""
Add a notification to the user's list of subscribed notifications
@@ -144,43 +67,11 @@ def add_notification_for_user(self, notification_request, user_id):
@param user_id str
"""
- #---------------------------------------------------------------------------------------------------
- # Make a callback function that is right for the user
- #---------------------------------------------------------------------------------------------------
-
- def callback(message, headers):
- """
- This callback is given to all the event subscribers that this user wants notifications for.
- If this callback gets called the user in this processor should get an email
- """
-
- # find the email address of the user
- user = self.rr.read(user_id)
- msg_recipient = user.contact.email
-
- # send email to the user
- send_email(message, msg_recipient, self.smtp_client)
-
user = self.rr.read(user_id)
- #---------------------------------------------------------------------------------------------------
# Add the notification into the user info object
- #---------------------------------------------------------------------------------------------------
-
user = self.put_notification_in_user_object(user, notification_request)
- #---------------------------------------------------------------------------------------------------
- # Add a notification to the list of subscribed notifications for the user
- #---------------------------------------------------------------------------------------------------
-
- notification_subscription = self._add_callback_to_notification(notification_request, callback)
-
- #---------------------------------------------------------------------------------------------------
- # Update the user_info dictionary and also calculate the reverse user info dictionary
- #---------------------------------------------------------------------------------------------------
-
- self.update_user_info_dictionary(user, notification_subscription)
-
# return the updated user object
return user
@@ -208,63 +99,6 @@ def put_notification_in_user_object(self, user, notification_request):
return user
-
- def update_user_info_dictionary(self, user, notification_subscription):
- """
- Update the user info and reverse user info dictionaries.
- @param user UserInfo object
- @param notification_subscription NotificationSubscription object
- """
-
- if self.user_info.has_key(user._id):
- # the user is already known by the EventProcessor
- self.user_info[user._id]['notifications'].append(notification_subscription._res_obj)
- self.user_info[user._id]['notification_subscriptions'][notification_subscription._res_obj._id] = notification_subscription
- else:
- # it is a new user
- self.user_info[user._id] = { 'user_contact' : user.contact,
- 'notifications' : [notification_subscription._res_obj],
- 'notification_subscriptions' : { notification_subscription._res_obj._id :
- notification_subscription}}
- self.reverse_user_info = calculate_reverse_user_info(self.user_info)
-
- def _add_callback_to_notification(self, notification_request=None, callback = None):
- """
- Adds a notification that this user then subscribes to.
-
- @param notification_request
- @retval notification object
- """
-
- #---------------------------------------------------------------------------------------------------
- # create and save notification in notifications list
- #---------------------------------------------------------------------------------------------------
-
- notification_subscription = NotificationSubscription(notification_request, callback)
- self.notification_subscriptions.append(notification_subscription)
-
- #---------------------------------------------------------------------------------------------------
- # start the event subscriber listening
- #---------------------------------------------------------------------------------------------------
-
- notification_subscription.activate()
-
- log.debug("EventProcessor.add_notification(): added notification " + str(notification_request))
-
- return notification_subscription
-
- def cleanup(self):
- for notif_sub in self.notification_subscriptions:
- notif_sub.deactivate()
-
- def stop_notification_subscriber(self, notification_request):
- """
- Stops the listener of the notification subscriber object
- @param notification_request NotificationRequest
- """
-
- super(EmailEventProcessor, self).stop_notification_subscriber(notification_request)
-
#----------------------------------------------------------------------------------------------------------------
# Keep this note for the time when we need to also include sms delivery via email to sms providers
# provider_email = sms_providers[provider] # self.notification.delivery_config.delivery['provider']
@@ -289,7 +123,6 @@ def on_start(self):
self.event_repo = self.container.instance.event_repository
- self.smtp_client = setting_up_smtp_client()
self.ION_NOTIFICATION_EMAIL_ADDRESS = 'data_alerts@oceanobservatories.org'
@@ -297,7 +130,7 @@ def on_start(self):
# Create an event processor
#---------------------------------------------------------------------------------------------------
- self.event_processor = EmailEventProcessor(self.smtp_client)
+ self.event_processor = EmailEventProcessor()
#---------------------------------------------------------------------------------------------------
# load event originators, types, and table
@@ -306,6 +139,13 @@ def on_start(self):
self.notifications = {}
#---------------------------------------------------------------------------------------------------
+ # Dictionaries that maintain information about users and their subscribed notifications
+ # The reverse_user_info is calculated from the user_info dictionary
+ #---------------------------------------------------------------------------------------------------
+ self.user_info = {}
+ self.reverse_user_info = {}
+
+ #---------------------------------------------------------------------------------------------------
# Get the clients
#---------------------------------------------------------------------------------------------------
@@ -330,12 +170,6 @@ def on_quit(self):
except IonException as ex:
log.info("Ignoring exception while cancelling schedule id (%s): %s: %s", sid, ex.__class__.__name__, ex)
- # Clean up the notification subscriptions' subscribers created in EmailEventProcessor object
- self.event_processor.cleanup()
-
- # Close the smtp server
- self.smtp_client.quit()
-
super(UserNotificationService, self).on_quit()
def __now(self):
@@ -428,6 +262,9 @@ def create_notification(self, notification=None, user_id=''):
user = self.event_processor.add_notification_for_user(notification_request=notification, user_id=user_id)
+ # Update the user info object with the notification
+ self.update_user_info_dictionary(user_id=user_id, new_notification=notification, old_notification=None)
+
#-------------------------------------------------------------------------------------------------------------------
# Generate an event that can be picked by a notification worker so that it can update its user_info dictionary
#-------------------------------------------------------------------------------------------------------------------
@@ -528,8 +365,6 @@ def delete_notification(self, notification_id=''):
notification_request = self.clients.resource_registry.read(notification_id)
old_notification = notification_request
- self.event_processor.stop_notification_subscriber(notification_request=notification_request)
-
#-------------------------------------------------------------------------------------------------------------------
# Update the resource registry
#-------------------------------------------------------------------------------------------------------------------
@@ -542,7 +377,7 @@ def delete_notification(self, notification_id=''):
# Update the user info dictionaries
#-------------------------------------------------------------------------------------------------------------------
- for user_id in self.event_processor.user_info.iterkeys():
+ for user_id in self.user_info.iterkeys():
self.update_user_info_dictionary(user_id, notification_request, old_notification)
#-------------------------------------------------------------------------------------------------------------------
@@ -569,16 +404,14 @@ def delete_notification_from_user_info(self, notification_id):
for user_id in user_ids:
- value = self.event_processor.user_info[user_id]
+ value = self.user_info[user_id]
for notif in value['notifications']:
if notification_id == notif._id:
# remove the notification
value['notifications'].remove(notif)
- # remove the notification_subscription
- self.event_processor.user_info[user_id]['notification_subscriptions'].pop(notification_id)
- self.event_processor.reverse_user_info = calculate_reverse_user_info(self.event_processor.user_info)
+ self.reverse_user_info = calculate_reverse_user_info(self.user_info)
def find_events(self, origin='', type='', min_datetime=0, max_datetime=0, limit= -1, descending=False):
"""
@@ -741,8 +574,8 @@ def get_user_notifications(self, user_id=''):
@retval notifications list of NotificationRequest objects
"""
- if self.event_processor.user_info.has_key(user_id):
- notifications = self.event_processor.user_info[user_id]['notifications']
+ if self.user_info.has_key(user_id):
+ notifications = self.user_info[user_id]['notifications']
ret = IonObject(OT.ComputedListValue)
if notifications:
@@ -824,11 +657,12 @@ def process_batch(self, start_time = 0, end_time = 0):
@param start_time int
@param end_time int
"""
+ self.smtp_client = setting_up_smtp_client()
if end_time <= start_time:
return
- for user_id, value in self.event_processor.user_info.iteritems():
+ for user_id, value in self.user_info.iteritems():
notifications = value['notifications']
@@ -871,9 +705,14 @@ def process_batch(self, start_time = 0, end_time = 0):
# send a notification email to each user using a _send_email() method
if events_for_message:
- self.format_and_send_email(events_for_message, user_id)
+ self.format_and_send_email(events_for_message = events_for_message,
+ user_id = user_id,
+ smtp_client=self.smtp_client)
+
+ self.smtp_client.quit()
- def format_and_send_email(self, events_for_message, user_id):
+
+ def format_and_send_email(self, events_for_message = None, user_id = None, smtp_client = None):
"""
Format the message for a particular user containing information about the events he is to be notified about
@@ -915,10 +754,11 @@ def format_and_send_email(self, events_for_message, user_id):
self.send_batch_email( msg_body = msg_body,
msg_subject = msg_subject,
- msg_recipient=self.event_processor.user_info[user_id]['user_contact'].email,
- smtp_client=self.smtp_client )
+ msg_recipient=self.user_info[user_id]['user_contact'].email,
+ smtp_client=smtp_client )
+
- def send_batch_email(self, msg_body, msg_subject, msg_recipient, smtp_client):
+ def send_batch_email(self, msg_body = None, msg_subject = None, msg_recipient = None, smtp_client = None):
"""
Send the email
@@ -982,44 +822,31 @@ def update_user_info_object(self, user_id, new_notification, old_notification):
def update_user_info_dictionary(self, user_id, new_notification, old_notification):
+ notifications = []
+ user = self.clients.resource_registry.read(user_id)
+
#------------------------------------------------------------------------------------
- # Remove the old notifications
+ # If there was a previous notification which is being updated, check the dictionaries and update there
#------------------------------------------------------------------------------------
+ if old_notification:
+ # Remove the old notifications
+ if old_notification in self.user_info[user_id]['notifications']:
- if old_notification in self.event_processor.user_info[user_id]['notifications']:
-
- # remove from notifications list
- self.event_processor.user_info[user_id]['notifications'].remove(old_notification)
-
- #------------------------------------------------------------------------------------
- # update the notification subscription object
- #------------------------------------------------------------------------------------
-
- # get the old notification_subscription
- notification_subscription = self.event_processor.user_info[user_id]['notification_subscriptions'].pop(old_notification._id)
-
- # update that old notification subscription
- notification_subscription._res_obj = new_notification
-
- # feed the updated notification subscription back into the user info dictionary
- self.event_processor.user_info[user_id]['notification_subscriptions'][old_notification._id] = notification_subscription
+ # remove from notifications list
+ self.user_info[user_id]['notifications'].remove(old_notification)
- #------------------------------------------------------------------------------------
# find the already existing notifications for the user
- #------------------------------------------------------------------------------------
-
- notifications = self.event_processor.user_info[user_id]['notifications']
- notifications.append(new_notification)
+ if self.user_info.has_key(user_id):
+ notifications = self.user_info[user_id]['notifications']
#------------------------------------------------------------------------------------
# update the user info - contact information, notifications
#------------------------------------------------------------------------------------
- user = self.clients.resource_registry.read(user_id)
+ notifications.append(new_notification)
- self.event_processor.user_info[user_id]['user_contact'] = user.contact
- self.event_processor.user_info[user_id]['notifications'] = notifications
+ self.user_info[user_id] = {'user_contact' : user.contact, 'notifications' : notifications}
- self.event_processor.reverse_user_info = calculate_reverse_user_info(self.event_processor.user_info)
+ self.reverse_user_info = calculate_reverse_user_info(self.user_info)
def get_subscriptions(self, resource_id='', include_nonactive=False):
"""
View
43 ion/services/dm/utility/uns_utility_methods.py
@@ -4,6 +4,7 @@
@description A module containing common utility methods used by UNS and the notification workers.
'''
from pyon.public import get_sys_name, CFG
+from pyon.util.arg_check import validate_is_not_none
from pyon.util.log import log
from pyon.core.exception import NotFound, BadRequest
from pyon.event.event import EventPublisher
@@ -84,23 +85,24 @@ def setting_up_smtp_client():
#------------------------------------------------------------------------------------
# the default smtp server
#------------------------------------------------------------------------------------
-
+ smtp_client = None
smtp_host = CFG.get_safe('server.smtp.host')
-# smtp_port = CFG.get_safe('server.smtp.port', 25)
-# smtp_sender = CFG.get_safe('server.smtp.sender')
-# smtp_password = CFG.get_safe('server.smtp.password')
+ smtp_port = CFG.get_safe('server.smtp.port', 25)
+ smtp_sender = CFG.get_safe('server.smtp.sender')
+ smtp_password = CFG.get_safe('server.smtp.password')
if CFG.get_safe('system.smtp',False): #Default is False - use the fake_smtp
- log.debug('Using the real SMTP library to send email notifications!')
+ log.debug('Using the real SMTP library to send email notifications! host = %s' % smtp_host)
# smtp_client = smtplib.SMTP(smtp_host)
# smtp_client.ehlo()
# smtp_client.starttls()
# smtp_client.login(smtp_sender, smtp_password)
- smtp_client = smtplib.SMTP(smtp_host)
-
-
+ smtp_client = smtplib.SMTP(smtp_host, smtp_port)
+ log.debug("In setting up smtp client using the smtp client: %s" % smtp_client)
+ log.debug("Message received after ehlo exchange: %s" % str(smtp_client.ehlo()))
+# smtp_client.login(smtp_sender, smtp_password)
else:
log.debug('Using a fake SMTP library to simulate email notifications!')
@@ -122,20 +124,23 @@ def send_email(message, msg_recipient, smtp_client):
event = message.type_
origin = message.origin
description = message.description
+ event_obj_as_string = str(message)
#------------------------------------------------------------------------------------
# build the email from the event content
#------------------------------------------------------------------------------------
- msg_body = string.join(("Event: %s," % event,
+ msg_body = string.join(("Event type: %s," % event,
"",
"Originator: %s," % origin,
"",
- "Description: %s," % description ,
+ "Description: %s," % description,
"",
"Time stamp: %s," % time_stamp,
"",
+ "Event object as a dictionary: %s," % event_obj_as_string,
+ "",
"You received this notification from ION because you asked to be "\
"notified about this event from this source. ",
"To modify or remove notifications about this event, "\
@@ -157,11 +162,15 @@ def send_email(message, msg_recipient, smtp_client):
msg['From'] = smtp_sender
msg['To'] = msg_recipient
log.debug("UNS sending email from %s to %s" % ( smtp_sender,msg_recipient))
+ log.debug("UNS using the smtp client: %s" % smtp_client)
- smtp_client.sendmail(smtp_sender, [msg_recipient], msg.as_string())
+ try:
+ smtp_client.sendmail(smtp_sender, [msg_recipient], msg.as_string())
+ except: # Can be due to a broken connection... try to create a connection
+ smtp_client = setting_up_smtp_client()
+ log.debug("Connect again...message received after ehlo exchange: %s" % str(smtp_client.ehlo()))
+ smtp_client.sendmail(smtp_sender, [msg_recipient], msg.as_string())
-# if CFG.get_safe('system.smtp',False):
-# smtp_client.close()
def check_user_notification_interest(event, reverse_user_info):
'''
@@ -199,7 +208,7 @@ def check_user_notification_interest(event, reverse_user_info):
user_list_1 += reverse_user_info['event_origin']['']
users = user_list_1
- log.debug("for event origin %s got interested users here %s" % (event.origin, users))
+ log.debug("For event origin = %s, UNS got interested users here %s" % (event.origin, users))
if reverse_user_info['event_origin_type'].has_key(event.origin_type):
if event.origin_type: # for an incoming event with origin type specified
@@ -208,7 +217,7 @@ def check_user_notification_interest(event, reverse_user_info):
user_list_2 += reverse_user_info['event_origin_type']['']
users = set.intersection(users, user_list_2)
- log.debug("for event_origin_type: %s got interested users here %s" % (event.origin_type, users))
+ log.debug("For event_origin_type = %s too, UNS got interested users here %s" % (event.origin_type, users))
if reverse_user_info['event_type'].has_key(event.type_):
user_list_3 = reverse_user_info['event_type'][event.type_]
@@ -216,7 +225,7 @@ def check_user_notification_interest(event, reverse_user_info):
user_list_3 += reverse_user_info['event_type']['']
users = set.intersection(users, user_list_3)
- log.debug("for event_type: %s got interested users here %s" % (event.type_, users))
+ log.debug("For event_type = %s too, UNS got interested users here %s" % (event.type_, users))
if reverse_user_info['event_subtype'].has_key(event.sub_type):
@@ -226,7 +235,7 @@ def check_user_notification_interest(event, reverse_user_info):
user_list_4 += reverse_user_info['event_subtype']['']
users = set.intersection(users, user_list_4)
- log.debug("for event_subtype: %s got interested users here %s" % (event.sub_type, users))
+ log.debug("For event_subtype = %s too, UNS got interested users here %s" % (event.sub_type, users))
users = list( users)
Something went wrong with that request. Please try again.