Skip to content

Commit

Permalink
Merge "Replace loopingcall in notifier with a delayed send"
Browse files Browse the repository at this point in the history
  • Loading branch information
Jenkins authored and openstack-gerrit committed Apr 24, 2014
2 parents a017a0b + 3be1d7a commit b315169
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 7 deletions.
48 changes: 41 additions & 7 deletions neutron/notifiers/nova.py
Expand Up @@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.

import eventlet
from novaclient import exceptions as nova_exceptions
import novaclient.v1_1.client as nclient
from novaclient.v1_1.contrib import server_external_events
Expand All @@ -23,7 +24,6 @@
from neutron import context
from neutron import manager
from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall
from neutron.openstack.common import uuidutils


Expand Down Expand Up @@ -55,8 +55,44 @@ def __init__(self):
region_name=cfg.CONF.nova_region_name,
extensions=[server_external_events])
self.pending_events = []
event_sender = loopingcall.FixedIntervalLoopingCall(self.send_events)
event_sender.start(interval=cfg.CONF.send_events_interval)
self._waiting_to_send = False

def queue_event(self, event):
"""Called to queue sending an event with the next batch of events.
Sending events individually, as they occur, has been problematic as it
can result in a flood of sends. Previously, there was a loopingcall
thread that would send batched events on a periodic interval. However,
maintaining a persistent thread in the loopingcall was also
problematic.
This replaces the loopingcall with a mechanism that creates a
short-lived thread on demand when the first event is queued. That
thread will sleep once for the same send_events_interval to allow other
events to queue up in pending_events and then will send them when it
wakes.
If a thread is already alive and waiting, this call will simply queue
the event and return leaving it up to the thread to send it.
:param event: the event that occured.
"""
if not event:
return

self.pending_events.append(event)

if self._waiting_to_send:
return

self._waiting_to_send = True

def last_out_sends():
eventlet.sleep(cfg.CONF.send_events_interval)
self._waiting_to_send = False
self.send_events()

eventlet.spawn_n(last_out_sends)

def _is_compute_port(self, port):
try:
Expand Down Expand Up @@ -94,8 +130,7 @@ def send_network_change(self, action, original_obj,

event = self.create_port_changed_event(action, original_obj,
returned_obj)
if event:
self.pending_events.append(event)
self.queue_event(event)

def create_port_changed_event(self, action, original_obj, returned_obj):
port = None
Expand Down Expand Up @@ -172,8 +207,7 @@ def record_port_status_changed(self, port, current_port_status,

def send_port_status(self, mapper, connection, port):
event = getattr(port, "_notify_event", None)
if event:
self.pending_events.append(event)
self.queue_event(event)
port._notify_event = None

def send_events(self):
Expand Down
29 changes: 29 additions & 0 deletions neutron/tests/unit/notifiers/test_notifiers_nova.py
Expand Up @@ -274,3 +274,32 @@ def test_nova_send_events_multiple(self):
self.nova_notifier.pending_events.append(
{'name': 'network-changed', 'server_uuid': device_id})
self.nova_notifier.send_events()

def test_queue_event_no_event(self):
with mock.patch('eventlet.spawn_n') as spawn_n:
self.nova_notifier.queue_event(None)
self.assertEqual(0, len(self.nova_notifier.pending_events))
self.assertEqual(0, spawn_n.call_count)

def test_queue_event_first_event(self):
with mock.patch('eventlet.spawn_n') as spawn_n:
self.nova_notifier.queue_event(mock.Mock())
self.assertEqual(1, len(self.nova_notifier.pending_events))
self.assertEqual(1, spawn_n.call_count)

def test_queue_event_multiple_events(self):
with mock.patch('eventlet.spawn_n') as spawn_n:
events = 6
for i in range(0, events):
self.nova_notifier.queue_event(mock.Mock())
self.assertEqual(events, len(self.nova_notifier.pending_events))
self.assertEqual(1, spawn_n.call_count)

def test_queue_event_call_send_events(self):
with mock.patch.object(self.nova_notifier,
'send_events') as send_events:
with mock.patch('eventlet.spawn_n') as spawn_n:
spawn_n.side_effect = lambda func: func()
self.nova_notifier.queue_event(mock.Mock())
self.assertFalse(self.nova_notifier._waiting_to_send)
send_events.assert_called_once_with()

0 comments on commit b315169

Please sign in to comment.