Skip to content

Commit

Permalink
Ensure to store context in thread local after spawn/spawn_n
Browse files Browse the repository at this point in the history
https://review.openstack.org/#/c/171299/ introduces a wrapper for utils.spawn_n
to update context in thread local store. However, there are other routines in
driver code which calls greenthread.spawn or greenthread.spawn_n, so that they
will not update context in thread local store. The commit adds utils.spawn as a
new wrapper, and also make those codes call spawn/spawn_n of utils, in order to
ensure the context of logging is correct.

Change-Id: I3623e60c49e442e2431cf017540422aa59bc285a
Related-Bug: 1404268
(cherry picked from commit 7f293ec)
  • Loading branch information
zhaoqin-github committed Jun 19, 2015
1 parent fc7f1ab commit 48a6217
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 13 deletions.
19 changes: 13 additions & 6 deletions nova/tests/unit/test_utils.py
Expand Up @@ -1024,6 +1024,7 @@ class SpawnNTestCase(test.NoDBTestCase):
def setUp(self):
super(SpawnNTestCase, self).setUp()
self.useFixture(context_fixture.ClearRequestContext())
self.spawn_name = 'spawn_n'

def test_spawn_n_no_context(self):
self.assertIsNone(common_context.get_current())
Expand All @@ -1036,8 +1037,8 @@ def _fake_spawn(func, *args, **kwargs):
def fake(arg):
pass

with mock.patch.object(eventlet, 'spawn_n', _fake_spawn):
utils.spawn_n(fake, 'test')
with mock.patch.object(eventlet, self.spawn_name, _fake_spawn):
getattr(utils, self.spawn_name)(fake, 'test')
self.assertIsNone(common_context.get_current())

def test_spawn_n_context(self):
Expand All @@ -1053,8 +1054,8 @@ def _fake_spawn(func, *args, **kwargs):
def fake(context, kwarg1=None):
pass

with mock.patch.object(eventlet, 'spawn_n', _fake_spawn):
utils.spawn_n(fake, ctxt, kwarg1='test')
with mock.patch.object(eventlet, self.spawn_name, _fake_spawn):
getattr(utils, self.spawn_name)(fake, ctxt, kwarg1='test')
self.assertEqual(ctxt, common_context.get_current())

def test_spawn_n_context_different_from_passed(self):
Expand All @@ -1073,6 +1074,12 @@ def _fake_spawn(func, *args, **kwargs):
def fake(context, kwarg1=None):
pass

with mock.patch.object(eventlet, 'spawn_n', _fake_spawn):
utils.spawn_n(fake, ctxt_passed, kwarg1='test')
with mock.patch.object(eventlet, self.spawn_name, _fake_spawn):
getattr(utils, self.spawn_name)(fake, ctxt_passed, kwarg1='test')
self.assertEqual(ctxt, common_context.get_current())


class SpawnTestCase(SpawnNTestCase):
def setUp(self):
super(SpawnTestCase, self).setUp()
self.spawn_name = 'spawn'
6 changes: 3 additions & 3 deletions nova/tests/unit/virt/libvirt/test_driver.py
Expand Up @@ -6141,7 +6141,7 @@ def test_live_migration_monitor_cancelled(self):

self._test_live_migration_monitoring(domain_info_records, False)

@mock.patch.object(greenthread, "spawn")
@mock.patch.object(utils, "spawn")
@mock.patch.object(libvirt_driver.LibvirtDriver, "_live_migration_monitor")
@mock.patch.object(host.Host, "get_domain")
@mock.patch.object(fakelibvirt.Connection, "_mark_running")
Expand Down Expand Up @@ -10908,7 +10908,7 @@ def test_live_snapshot(self):
dstfile, "qcow2")
mock_define.assert_called_once_with(xmldoc)

@mock.patch.object(greenthread, "spawn")
@mock.patch.object(utils, "spawn")
def test_live_migration_hostname_valid(self, mock_spawn):
drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False)
drvr.live_migration(self.context, self.test_instance,
Expand All @@ -10917,7 +10917,7 @@ def test_live_migration_hostname_valid(self, mock_spawn):
lambda x: x)
self.assertEqual(1, mock_spawn.call_count)

@mock.patch.object(greenthread, "spawn")
@mock.patch.object(utils, "spawn")
@mock.patch.object(fake_libvirt_utils, "is_valid_hostname")
def test_live_migration_hostname_invalid(self, mock_hostname, mock_spawn):
drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False)
Expand Down
23 changes: 23 additions & 0 deletions nova/utils.py
Expand Up @@ -945,6 +945,29 @@ def validate_integer(value, name, min_value=None, max_value=None):
return value


def spawn(func, *args, **kwargs):
"""Passthrough method for eventlet.spawn.
This utility exists so that it can be stubbed for testing without
interfering with the service spawns.
It will also grab the context from the threadlocal store and add it to
the store on the new thread. This allows for continuity in logging the
context when using this method to spawn a new thread.
"""
_context = common_context.get_current()

@functools.wraps(func)
def context_wrapper(*args, **kwargs):
# NOTE: If update_store is not called after spawn it won't be
# available for the logger to pull from threadlocal storage.
if _context is not None:
_context.update_store()
return func(*args, **kwargs)

return eventlet.spawn(context_wrapper, *args, **kwargs)


def spawn_n(func, *args, **kwargs):
"""Passthrough method for eventlet.spawn_n.
Expand Down
4 changes: 2 additions & 2 deletions nova/virt/libvirt/driver.py
Expand Up @@ -5281,7 +5281,7 @@ def live_migration(self, context, instance, dest,
if not libvirt_utils.is_valid_hostname(dest):
raise exception.InvalidHostname(hostname=dest)

greenthread.spawn(self._live_migration, context, instance, dest,
utils.spawn(self._live_migration, context, instance, dest,
post_method, recover_method, block_migration,
migrate_data)

Expand Down Expand Up @@ -5672,7 +5672,7 @@ def _live_migration(self, context, instance, dest, post_method,

dom = self._host.get_domain(instance)

opthread = greenthread.spawn(self._live_migration_operation,
opthread = utils.spawn(self._live_migration_operation,
context, instance, dest,
block_migration,
migrate_data, dom)
Expand Down
5 changes: 3 additions & 2 deletions nova/virt/vmwareapi/io_util.py
Expand Up @@ -27,6 +27,7 @@
from nova import exception
from nova.i18n import _, _LE
from nova import image
from nova import utils

LOG = logging.getLogger(__name__)
IMAGE_API = image.API()
Expand Down Expand Up @@ -138,7 +139,7 @@ def _inner():
self.stop()
self.done.send_exception(exc)

greenthread.spawn(_inner)
utils.spawn(_inner)
return self.done

def stop(self):
Expand Down Expand Up @@ -183,7 +184,7 @@ def _inner():
LOG.exception(_LE('Read/Write data failed'))
self.done.send_exception(exc)

greenthread.spawn(_inner)
utils.spawn(_inner)
return self.done

def stop(self):
Expand Down

0 comments on commit 48a6217

Please sign in to comment.