Skip to content

Commit

Permalink
libvirt: Kill rsync/scp processes before deleting instance
Browse files Browse the repository at this point in the history
In the resize operation, during copying files from source to
destination compute node scp/rsync processes are not aborted after
the instance is deleted because linux kernel doesn't delete instance
files physically until all processes using the file handle is closed
completely. Hence rsync/scp process keeps on running until it
transfers 100% of file data.

Added new module instancejobtracker to libvirt driver which will add,
remove or terminate the processes running against particular instances.
Added callback methods to execute call which will store the pid of
scp/rsync process in cache as a key: value pair and to remove the
pid from the cache after process completion. Process id will be used to
kill the process if it is running while deleting the instance. Instance
uuid is used as a key in the cache and pid will be the value.

Conflicts:
        nova/virt/libvirt/driver.py

SecurityImpact

Closes-bug: #1387543
Change-Id: Ie03acc00a7c904aec13c90ae6a53938d08e5e0c9
(cherry picked from commit 7ab75d5)
  • Loading branch information
abhishekkekane committed Aug 6, 2015
1 parent 90e1eac commit b5020a0
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 9 deletions.
38 changes: 38 additions & 0 deletions nova/tests/unit/virt/libvirt/test_driver.py
Expand Up @@ -23,6 +23,7 @@
import random
import re
import shutil
import signal
import threading
import time
import uuid
Expand Down Expand Up @@ -9817,6 +9818,15 @@ def test_shared_storage_detection_easy(self):
self.mox.ReplayAll()
self.assertTrue(drvr._is_storage_shared_with('foo', '/path'))

def test_store_pid_remove_pid(self):
instance = objects.Instance(**self.test_instance)
drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False)
popen = mock.Mock(pid=3)
drvr.job_tracker.add_job(instance, popen.pid)
self.assertIn(3, drvr.job_tracker.jobs[instance.uuid])
drvr.job_tracker.remove_job(instance, popen.pid)
self.assertNotIn(instance.uuid, drvr.job_tracker.jobs)

@mock.patch('nova.virt.libvirt.host.Host.get_domain')
def test_get_domain_info_with_more_return(self, mock_get_domain):
instance = objects.Instance(**self.test_instance)
Expand Down Expand Up @@ -11316,12 +11326,18 @@ def fake_get_host_ip_addr():
def fake_execute(*args, **kwargs):
pass

def fake_copy_image(src, dest, host=None, receive=False,
on_execute=None, on_completion=None):
self.assertIsNotNone(on_execute)
self.assertIsNotNone(on_completion)

self.stubs.Set(self.drvr, 'get_instance_disk_info',
fake_get_instance_disk_info)
self.stubs.Set(self.drvr, '_destroy', fake_destroy)
self.stubs.Set(self.drvr, 'get_host_ip_addr',
fake_get_host_ip_addr)
self.stubs.Set(utils, 'execute', fake_execute)
self.stubs.Set(libvirt_utils, 'copy_image', fake_copy_image)

ins_ref = self._create_instance(params=params_for_instance)

Expand Down Expand Up @@ -12425,6 +12441,28 @@ def test_delete_instance_files(self, get_instance_path, exists, exe,
shutil.assert_called_with('/path_del')
self.assertTrue(result)

@mock.patch('shutil.rmtree')
@mock.patch('nova.utils.execute')
@mock.patch('os.path.exists')
@mock.patch('os.kill')
@mock.patch('nova.virt.libvirt.utils.get_instance_path')
def test_delete_instance_files_kill_running(
self, get_instance_path, kill, exists, exe, shutil):
get_instance_path.return_value = '/path'
instance = objects.Instance(uuid='fake-uuid', id=1)
self.drvr.job_tracker.jobs[instance.uuid] = [3, 4]

exists.side_effect = [False, False, True, False]

result = self.drvr.delete_instance_files(instance)
get_instance_path.assert_called_with(instance)
exe.assert_called_with('mv', '/path', '/path_del')
kill.assert_has_calls([mock.call(3, signal.SIGKILL), mock.call(3, 0),
mock.call(4, signal.SIGKILL), mock.call(4, 0)])
shutil.assert_called_with('/path_del')
self.assertTrue(result)
self.assertNotIn(instance.uuid, self.drvr.job_tracker.jobs)

@mock.patch('shutil.rmtree')
@mock.patch('nova.utils.execute')
@mock.patch('os.path.exists')
Expand Down
9 changes: 6 additions & 3 deletions nova/tests/unit/virt/libvirt/test_utils.py
Expand Up @@ -62,7 +62,8 @@ def test_copy_image_local_cp(self, mock_execute):
mock_execute.assert_called_once_with('cp', 'src', 'dest')

_rsync_call = functools.partial(mock.call,
'rsync', '--sparse', '--compress')
'rsync', '--sparse', '--compress',
on_execute=None, on_completion=None)

@mock.patch('nova.utils.execute')
def test_copy_image_rsync(self, mock_execute):
Expand All @@ -85,7 +86,8 @@ def test_copy_image_scp(self, mock_execute):

mock_execute.assert_has_calls([
self._rsync_call('--dry-run', 'src', 'host:dest'),
mock.call('scp', 'src', 'host:dest'),
mock.call('scp', 'src', 'host:dest',
on_execute=None, on_completion=None),
])
self.assertEqual(2, mock_execute.call_count)

Expand All @@ -110,7 +112,8 @@ def test_copy_image_scp_ipv6(self, mock_execute):

mock_execute.assert_has_calls([
self._rsync_call('--dry-run', 'src', '[2600::]:dest'),
mock.call('scp', 'src', '[2600::]:dest'),
mock.call('scp', 'src', '[2600::]:dest',
on_execute=None, on_completion=None),
])
self.assertEqual(2, mock_execute.call_count)

Expand Down
18 changes: 16 additions & 2 deletions nova/virt/libvirt/driver.py
Expand Up @@ -95,6 +95,7 @@
from nova.virt.libvirt import host
from nova.virt.libvirt import imagebackend
from nova.virt.libvirt import imagecache
from nova.virt.libvirt import instancejobtracker
from nova.virt.libvirt import lvm
from nova.virt.libvirt import rbd_utils
from nova.virt.libvirt import utils as libvirt_utils
Expand Down Expand Up @@ -465,6 +466,8 @@ def __init__(self, virtapi, read_only=False):
'expect': ', '.join("'%s'" % k for k in
sysinfo_serial_funcs.keys())})

self.job_tracker = instancejobtracker.InstanceJobTracker()

def _get_volume_drivers(self):
return libvirt_volume_drivers

Expand Down Expand Up @@ -6301,6 +6304,11 @@ def migrate_disk_and_power_off(self, context, instance, dest,
# finish_migration/_create_image to re-create it for us.
continue

on_execute = lambda process: self.job_tracker.add_job(
instance, process.pid)
on_completion = lambda process: self.job_tracker.remove_job(
instance, process.pid)

if info['type'] == 'qcow2' and info['backing_file']:
tmp_path = from_path + "_rbase"
# merge backing file
Expand All @@ -6310,11 +6318,15 @@ def migrate_disk_and_power_off(self, context, instance, dest,
if shared_storage:
utils.execute('mv', tmp_path, img_path)
else:
libvirt_utils.copy_image(tmp_path, img_path, host=dest)
libvirt_utils.copy_image(tmp_path, img_path, host=dest,
on_execute=on_execute,
on_completion=on_completion)
utils.execute('rm', '-f', tmp_path)

else: # raw or qcow2 with no backing file
libvirt_utils.copy_image(from_path, img_path, host=dest)
libvirt_utils.copy_image(from_path, img_path, host=dest,
on_execute=on_execute,
on_completion=on_completion)
except Exception:
with excutils.save_and_reraise_exception():
self._cleanup_remote_migration(dest, inst_base,
Expand Down Expand Up @@ -6683,6 +6695,8 @@ def delete_instance_files(self, instance):
# invocation failed due to the absence of both target and
# target_resize.
if not remaining_path and os.path.exists(target_del):
self.job_tracker.terminate_jobs(instance)

LOG.info(_LI('Deleting instance files %s'), target_del,
instance=instance)
remaining_path = target_del
Expand Down
98 changes: 98 additions & 0 deletions nova/virt/libvirt/instancejobtracker.py
@@ -0,0 +1,98 @@
# Copyright 2015 NTT corp.
# All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.


import collections
import errno
import os
import signal

from oslo_log import log as logging

from nova.i18n import _LE
from nova.i18n import _LW


LOG = logging.getLogger(__name__)


class InstanceJobTracker(object):
def __init__(self):
self.jobs = collections.defaultdict(list)

def add_job(self, instance, pid):
"""Appends process_id of instance to cache.
This method will store the pid of a process in cache as
a key: value pair which will be used to kill the process if it
is running while deleting the instance. Instance uuid is used as
a key in the cache and pid will be the value.
:param instance: Object of instance
:param pid: Id of the process
"""
self.jobs[instance.uuid].append(pid)

def remove_job(self, instance, pid):
"""Removes pid of process from cache.
This method will remove the pid of a process from the cache.
:param instance: Object of instance
:param pid: Id of the process
"""
uuid = instance.uuid
if uuid in self.jobs and pid in self.jobs[uuid]:
self.jobs[uuid].remove(pid)

# remove instance.uuid if no pid's remaining
if not self.jobs[uuid]:
self.jobs.pop(uuid, None)

def terminate_jobs(self, instance):
"""Kills the running processes for given instance.
This method is used to kill all running processes of the instance if
it is deleted in between.
:param instance: Object of instance
"""
pids_to_remove = list(self.jobs.get(instance.uuid, []))
for pid in pids_to_remove:
try:
# Try to kill the process
os.kill(pid, signal.SIGKILL)
except OSError as exc:
if exc.errno != errno.ESRCH:
LOG.error(_LE('Failed to kill process %(pid)s '
'due to %(reason)s, while deleting the '
'instance.'), {'pid': pid, 'reason': exc},
instance=instance)

try:
# Check if the process is still alive.
os.kill(pid, 0)
except OSError as exc:
if exc.errno != errno.ESRCH:
LOG.error(_LE('Unexpected error while checking process '
'%(pid)s.'), {'pid': pid},
instance=instance)
else:
# The process is still around
LOG.warn(_LW("Failed to kill a long running process "
"%(pid)s related to the instance when "
"deleting it."), {'pid': pid},
instance=instance)

self.remove_job(instance, pid)
14 changes: 10 additions & 4 deletions nova/virt/libvirt/utils.py
Expand Up @@ -294,13 +294,16 @@ def get_disk_backing_file(path, basename=True):
return backing_file


def copy_image(src, dest, host=None, receive=False):
def copy_image(src, dest, host=None, receive=False,
on_execute=None, on_completion=None):
"""Copy a disk image to an existing directory
:param src: Source image
:param dest: Destination path
:param host: Remote host
:param receive: Reverse the rsync direction
:param on_execute: Callback method to store pid of process in cache
:param on_completion: Callback method to remove pid of process from cache
"""

if not host:
Expand All @@ -322,11 +325,14 @@ def copy_image(src, dest, host=None, receive=False):
# Do a relatively light weight test first, so that we
# can fall back to scp, without having run out of space
# on the destination for example.
execute('rsync', '--sparse', '--compress', '--dry-run', src, dest)
execute('rsync', '--sparse', '--compress', '--dry-run', src, dest,
on_execute=on_execute, on_completion=on_completion)
except processutils.ProcessExecutionError:
execute('scp', src, dest)
execute('scp', src, dest, on_execute=on_execute,
on_completion=on_completion)
else:
execute('rsync', '--sparse', '--compress', src, dest)
execute('rsync', '--sparse', '--compress', src, dest,
on_execute=on_execute, on_completion=on_completion)


def write_to_file(path, contents, umask=None):
Expand Down

0 comments on commit b5020a0

Please sign in to comment.