Skip to content

Commit

Permalink
thinp: Support immediate extend on events
Browse files Browse the repository at this point in the history
If the monitor is configured with an executor dispatch function, it
dispatches a call to extend the drive when getting a block threshold or
enospc events. The drive will be extended on the first available
executor worker.

Using a dispatch function keeps the thinp module decoupled from the
periodic module, makes testing easy, and prepares for removing the
periodic monitor and use thinp executor.

The periodic monitor checks if VM is ready for commands before
monitoring. Testing extending of 4 drives at the same time shows that
this check returns False randomly for no reason. Since this cause a
delay before extending the drive, we don't do this check when receiving
an event.

We also don't check migration status or use it when handling libvirt
calls failures, since it should be needed for volume monitoring,
disabled on migration destination VM.

When extending a drive we hold the drive monitor lock, to avoid
conflicts with the periodic monitor and extend completion threads.

Signed-off-by: Nir Soffer <nsoffer@redhat.com>
  • Loading branch information
nirs committed May 9, 2022
1 parent f24f3b2 commit 78a65ff
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 35 deletions.
57 changes: 54 additions & 3 deletions lib/vdsm/virt/thinp.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import time

from collections import namedtuple
from functools import partial

import libvirt

Expand Down Expand Up @@ -57,9 +58,22 @@ class VolumeMonitor(object):
triggering the extension flow when needed.
"""

def __init__(self, vm, log, enabled=True):
def __init__(self, vm, log, dispatch=None, enabled=True):
"""
Arguemnts:
vm (vdsm.virt.VM): Owner VM
log (logging.Logger): logger to use
dispatch (vdsm.executor.Executor.dispatch): An executor dispatch
function for handling block threshold or enospc events. If set,
drive will be extended immediately when receiving events.
Otherwise drive is marked for extension and will be extended
when the periodic monitor wakes up.
enabled (bool): True to enable the monitor when created. If False,
the monitor need to be enabled later.
"""
self._vm = vm
self._log = log
self._dispatch = dispatch
self._enabled = enabled

# Enabling and disabling the monitor.
Expand Down Expand Up @@ -182,6 +196,8 @@ def clear_threshold(self, drive, index):
# TODO: file a libvirt documentation bug
self._vm._dom.setBlockThreshold(target, 0)

# Handling libvirt events.

def on_block_threshold(self, target, path, threshold, excess):
"""
Callback to be executed in the libvirt event handler when
Expand Down Expand Up @@ -216,13 +232,48 @@ def on_block_threshold(self, target, path, threshold, excess):
'Unknown drive %r for vm %s - ignored block threshold event',
drive_name, self._vm.id)
else:
drive.on_block_threshold(path)
if drive.on_block_threshold(path):
self._extend_drive_soon(drive)

def on_enospc(self, drive):
"""
Called when a VM pauses because of ENOSPC error writing to drive.
"""
drive.on_enospc()
if drive.on_enospc():
self._extend_drive_soon(drive)

def _extend_drive_soon(self, drive):
if self._dispatch is None:
return
self._log.debug("Scheduling drive %s extension", drive.name)
try:
self._dispatch(
partial(self._extend_drive, drive),
timeout=config.getfloat("thinp", "extend_timeout"))
except exception.ResourceExhausted:
# Drive will be extended later by the periodic monitor.
self._log.warning(
"Executor queue full, extending drive %s in the next "
"monitoring cycle",
drive.name)

def _extend_drive(self, drive):
if not self._update_block_info([drive]):
self._log.warning(
"Cannot update block info for drive %s, retrying in "
"the next monitoring cycle",
drive.name)
return

timeout = config.getfloat("thinp", "extend_timeout")
try:
with drive.monitor_lock(timeout):
self._handle_exceeded(drive, urgent=True)
except storage.MonitorBusy:
self._log.warning(
"Timeout acquiring monitor lock for drive %s, retrying "
"in the next monitoring cycle",
drive.name)

# Monitoring volumes.

Expand Down
43 changes: 16 additions & 27 deletions tests/virt/thinp_monitor_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ def test_extend(tmp_config):

# first run: does nothing but set the block thresholds
vm.volume_monitor.monitor_volumes()
assert drv.threshold_state == BLOCK_THRESHOLD.SET

# Simulate writing to drive vdb
vdb = vm.block_stats[2]
Expand All @@ -219,22 +220,18 @@ def test_extend(tmp_config):
vm.volume_monitor.on_block_threshold(
'vdb', '/virtio/1', alloc, 1 * MiB)
assert drv.threshold_state == BLOCK_THRESHOLD.SET
assert len(vm.cif.irs.extensions) == 0

# Simulating block threshold event
vm.volume_monitor.on_block_threshold(
'vdb[1]', '/virtio/1', alloc, 1 * MiB)
assert drv.threshold_state == BLOCK_THRESHOLD.EXCEEDED

# Simulating periodic check
vm.volume_monitor.monitor_volumes()
assert len(vm.cif.irs.extensions) == 1
check_extension(vdb, drives[1], vm.cif.irs.extensions[0])
assert drv.threshold_state == BLOCK_THRESHOLD.EXCEEDED

# Simulate completed extend operation, invoking callback

simulate_extend_callback(vm.cif.irs, extension_id=0)

assert drv.threshold_state == BLOCK_THRESHOLD.SET


Expand All @@ -258,9 +255,6 @@ def test_extend_no_allocation(tmp_config):
vm.volume_monitor.on_block_threshold(
'vdb[1]', '/virtio/1', 0, 1 * MiB)
assert drv.threshold_state == BLOCK_THRESHOLD.EXCEEDED

# Simulating periodic check
vm.volume_monitor.monitor_volumes()
assert len(vm.cif.irs.extensions) == 1
check_extension(vdb, drives[1], vm.cif.irs.extensions[0])
assert drv.threshold_state == BLOCK_THRESHOLD.EXCEEDED
Expand Down Expand Up @@ -521,18 +515,12 @@ def test_event_received_before_write_completes(tmp_config):
vm.volume_monitor.on_block_threshold(
'vda', '/virtio/0', alloc, 1 * MiB)
assert drv.threshold_state == BLOCK_THRESHOLD.UNSET
assert len(vm.cif.irs.extensions) == 0

# Simulating block threshold event
vm.volume_monitor.on_block_threshold(
'vda[0]', '/virtio/0', alloc, 1 * MiB)
assert drv.threshold_state == BLOCK_THRESHOLD.EXCEEDED

vm.volume_monitor.monitor_volumes()

# The drive is marked for extension.
assert drv.threshold_state == BLOCK_THRESHOLD.EXCEEDED

# And try to exend.
assert len(vm.cif.irs.extensions) == 1
check_extension(vda, drives[0], vm.cif.irs.extensions[0])

Expand Down Expand Up @@ -563,24 +551,19 @@ def test_block_threshold_set_failure_after_drive_extended(tmp_config):
vm.volume_monitor.on_block_threshold(
'vdb', '/virtio/1', alloc, 1 * MiB)
assert drv.threshold_state == BLOCK_THRESHOLD.SET
assert len(vm.cif.irs.extensions) == 0

# Simulating block threshold event
vm.volume_monitor.on_block_threshold(
'vdb[1]', '/virtio/1', alloc, 1 * MiB)
assert drv.threshold_state == BLOCK_THRESHOLD.EXCEEDED

# Simulating periodic check
vm.volume_monitor.monitor_volumes()
assert len(vm.cif.irs.extensions) == 1

# Simulate completed extend operation, invoking callback

# Simulate setBlockThreshold failure
# Simulate completed extend operation, failing to set block threshold.
vm._dom.errors["setBlockThreshold"] = fake.Error(
libvirt.VIR_ERR_OPERATION_FAILED, "fake error")

simulate_extend_callback(vm.cif.irs, extension_id=0)

assert drv.threshold_state == BLOCK_THRESHOLD.UNSET


Expand All @@ -602,17 +585,18 @@ def test_exceeded_max_size(tmp_config):
alloc = allocation_threshold_for_resize_mb(vdb, drive) + 1
vdb["allocation"] = alloc
vm.volume_monitor.on_block_threshold('vdb[1]', '/virtio/1', alloc, 1)
assert drive.threshold_state == BLOCK_THRESHOLD.EXCEEDED

# Simulate the next monitoring cycle.
vm.volume_monitor.monitor_volumes()

# Because the drive is already extended, disable monitoring.
assert drive.threshold_state == BLOCK_THRESHOLD.DISABLED

# And no extesion request should be sent.
assert len(vm.cif.irs.extensions) == 0

# Next monitoring cycle will not modify the drive.
vm.volume_monitor.monitor_volumes()
assert drive.threshold_state == BLOCK_THRESHOLD.DISABLED
assert len(vm.cif.irs.extensions) == 0


def test_resize_maxed_drive(tmp_config):
vm = FakeVM(drive_infos())
Expand Down Expand Up @@ -689,7 +673,12 @@ def __init__(self, drive_infos):
self._dom = FakeDomain()
self.cif = FakeClientIF(FakeIRS())
self.id = 'volume_monitor_vm'
self.volume_monitor = thinp.VolumeMonitor(self, self.log)

# Simplify testing by dispatching on the calling thread.
self.volume_monitor = thinp.VolumeMonitor(
self, self.log,
dispatch=lambda func, **kw: func())

self.block_stats = {}

disks = []
Expand Down
39 changes: 34 additions & 5 deletions tests/virt/thinp_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@

import logging

import pytest

from vdsm.virt import thinp

from vdsm.common.config import config
from vdsm.common.units import MiB, GiB
from vdsm.virt.vmdevices.storage import Drive, BLOCK_THRESHOLD

import pytest
EXTEND_TIMEOUT = config.getfloat("thinp", "extend_timeout")


@pytest.mark.parametrize("enabled", [True, False])
Expand Down Expand Up @@ -100,43 +102,61 @@ def test_clear_threshold():

def test_on_block_threshold_drive_name_ignored():
vm = FakeVM()
mon = thinp.VolumeMonitor(vm, vm.log)
dispatch = FakeDispatch()
mon = thinp.VolumeMonitor(vm, vm.log, dispatch=dispatch)
vda = make_drive(vm.log, index=0, iface='virtio')
vm.drives.append(vda)

mon.on_block_threshold("vda", vda.path, 512 * MiB, 10 * MiB)
assert vda.threshold_state == BLOCK_THRESHOLD.UNSET
assert len(dispatch.calls) == 0


def test_on_block_threshold_indexed_name_handled():
vm = FakeVM()
mon = thinp.VolumeMonitor(vm, vm.log)
dispatch = FakeDispatch()
mon = thinp.VolumeMonitor(vm, vm.log, dispatch=dispatch)
vda = make_drive(vm.log, index=0, iface='virtio')
vm.drives.append(vda)

mon.on_block_threshold("vda[1]", vda.path, 512 * MiB, 10 * MiB)
assert vda.threshold_state == BLOCK_THRESHOLD.EXCEEDED

assert len(dispatch.calls) == 1
part, args = dispatch.calls[0]
assert part.func == mon._extend_drive
assert part.args == (vda,)
assert args == dict(timeout=EXTEND_TIMEOUT, discard=True)


def test_on_block_threshold_unknown_drive():
vm = FakeVM()
mon = thinp.VolumeMonitor(vm, vm.log)
dispatch = FakeDispatch()
mon = thinp.VolumeMonitor(vm, vm.log, dispatch=dispatch)
vda = make_drive(vm.log, index=0, iface='virtio')
vm.drives.append(vda)

mon.on_block_threshold("vdb", "/unkown/path", 512 * MiB, 10 * MiB)
assert vda.threshold_state == BLOCK_THRESHOLD.UNSET
assert len(dispatch.calls) == 0


def test_on_enospc():
vm = FakeVM()
mon = thinp.VolumeMonitor(vm, vm.log)
dispatch = FakeDispatch()
mon = thinp.VolumeMonitor(vm, vm.log, dispatch=dispatch)
vda = make_drive(vm.log, index=0, iface='virtio')
vm.drives.append(vda)

mon.on_enospc(vda)
assert vda.threshold_state == BLOCK_THRESHOLD.EXCEEDED

assert len(dispatch.calls) == 1
part, args = dispatch.calls[0]
assert part.func == mon._extend_drive
assert part.args == (vda,)
assert args == dict(timeout=EXTEND_TIMEOUT, discard=True)


def test_monitoring_needed():

Expand Down Expand Up @@ -184,6 +204,15 @@ def getDiskDevices(self):
return self.drives[:]


class FakeDispatch:

def __init__(self):
self.calls = []

def __call__(self, func, timeout=None, discard=True):
self.calls.append((func, dict(timeout=timeout, discard=discard)))


class FakeDomain(object):
def __init__(self):
self.thresholds = []
Expand Down

0 comments on commit 78a65ff

Please sign in to comment.