Skip to content

Commit

Permalink
Merge "Replace ThreadPoolExecutor with GreenThreadPoolExecutor"
Browse files Browse the repository at this point in the history
  • Loading branch information
Zuul authored and openstack-gerrit committed Dec 17, 2018
2 parents 95a5ae3 + 1f2a80c commit eb0efcf
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 20 deletions.
3 changes: 1 addition & 2 deletions lower-constraints.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ fasteners==0.14.1
fixtures==3.0.0
flake8==2.5.5
future==0.16.0
futurist==1.6.0
futures==3.0.0
futurist==1.8.0
gabbi==1.35.0
gitdb2==2.0.3
GitPython==2.1.8
Expand Down
16 changes: 6 additions & 10 deletions nova/compute/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@

import base64
import binascii
# If py2, concurrent.futures comes from the futures library otherwise it
# comes from the py3 standard library.
from concurrent import futures
import contextlib
import functools
import inspect
Expand All @@ -43,6 +40,7 @@
from eventlet import greenthread
import eventlet.semaphore
import eventlet.timeout
import futurist
from keystoneauth1 import exceptions as keystone_exception
from oslo_log import log as logging
import oslo_messaging as messaging
Expand Down Expand Up @@ -523,13 +521,10 @@ def __init__(self, compute_driver=None, *args, **kwargs):
else:
self._build_semaphore = compute_utils.UnlimitedSemaphore()
if max(CONF.max_concurrent_live_migrations, 0) != 0:
self._live_migration_executor = futures.ThreadPoolExecutor(
self._live_migration_executor = futurist.GreenThreadPoolExecutor(
max_workers=CONF.max_concurrent_live_migrations)
else:
# Starting in python 3.5, this is technically bounded, but it's
# ncpu * 5 which is probably much higher than anyone would sanely
# use for concurrently running live migrations.
self._live_migration_executor = futures.ThreadPoolExecutor()
self._live_migration_executor = futurist.GreenThreadPoolExecutor()
# This is a dict, keyed by instance uuid, to a two-item tuple of
# migration object and Future for the queued live migration.
self._waiting_live_migrations = {}
Expand Down Expand Up @@ -6357,8 +6352,9 @@ def live_migration(self, context, dest, instance, block_migration,
block_migration, migration, migrate_data)
self._waiting_live_migrations[instance.uuid] = (migration, future)
except RuntimeError:
# ThreadPoolExecutor.submit will raise RuntimeError if the pool
# is shutdown, which happens in _cleanup_live_migrations_in_pool.
# GreenThreadPoolExecutor.submit will raise RuntimeError if the
# pool is shutdown, which happens in
# _cleanup_live_migrations_in_pool.
LOG.info('Migration %s failed to submit as the compute service '
'is shutting down.', migration.uuid, instance=instance)
self._set_migration_status(migration, 'error')
Expand Down
6 changes: 3 additions & 3 deletions nova/tests/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -1065,7 +1065,7 @@ def setUp(self):


class SynchronousThreadPoolExecutorFixture(fixtures.Fixture):
"""Make ThreadPoolExecutor.submit() synchronous.
"""Make GreenThreadPoolExecutor.submit() synchronous.
The function passed to submit() will be executed and a mock.Mock
object will be returned as the Future where Future.result() will
Expand All @@ -1076,11 +1076,11 @@ def setUp(self):

def fake_submit(_self, fn, *args, **kwargs):
result = fn(*args, **kwargs)
future = mock.Mock(spec='concurrent.futures.Future')
future = mock.Mock(spec='futurist.Future')
future.return_value.result.return_value = result
return future
self.useFixture(fixtures.MonkeyPatch(
'concurrent.futures.ThreadPoolExecutor.submit',
'futurist.GreenThreadPoolExecutor.submit',
fake_submit))


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
# License for the specific language governing permissions and limitations
# under the License.

from concurrent import futures
import datetime

import futurist
import mock

from nova.conductor import manager as conductor_manager
Expand Down Expand Up @@ -241,7 +241,7 @@ def test_live_migrate_abort_migration_queued(self, _live_migrate):
self._do_post('servers/%s/action' % self.uuid, 'live-migrate-server',
{'hostname': self.compute.host})
self.compute._waiting_live_migrations[self.uuid] = (
self.migration, futures.Future())
self.migration, futurist.Future())
uri = 'servers/%s/migrations/%s' % (self.uuid, self.migration.id)
response = self._do_delete(uri)
self.assertEqual(202, response.status_code)
4 changes: 2 additions & 2 deletions nova/tests/unit/compute/test_compute_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -7214,7 +7214,7 @@ def test_max_concurrent_live_unlimited(self):
self.flags(max_concurrent_live_migrations=0)
self._test_max_concurrent_live()

@mock.patch('concurrent.futures.ThreadPoolExecutor')
@mock.patch('futurist.GreenThreadPoolExecutor')
def test_max_concurrent_live_semaphore_limited(self, mock_executor):
self.flags(max_concurrent_live_migrations=123)
manager.ComputeManager()
Expand All @@ -7224,7 +7224,7 @@ def test_max_concurrent_live_semaphore_limited(self, mock_executor):
def test_max_concurrent_live_semaphore_unlimited(self, max_concurrent):
self.flags(max_concurrent_live_migrations=max_concurrent)
with mock.patch(
'concurrent.futures.ThreadPoolExecutor') as mock_executor:
'futurist.GreenThreadPoolExecutor') as mock_executor:
manager.ComputeManager()
mock_executor.assert_called_once_with()

Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,4 @@ os-service-types>=1.2.0 # Apache-2.0
taskflow>=2.16.0 # Apache-2.0
python-dateutil>=2.5.3 # BSD
zVMCloudConnector>=1.1.1;sys_platform!='win32' # Apache 2.0 License
futures>=3.0.0;python_version=='2.7' or python_version=='2.6' # PSF
futurist>=1.8.0 # Apache-2.0

0 comments on commit eb0efcf

Please sign in to comment.