Skip to content

Commit

Permalink
Merge pull request ceph#42086 from kotreshhr/wip-51200-pacific
Browse files Browse the repository at this point in the history
pacific: mgr/volumes: Add config to insert delay at the beginning of the clone

Reviewed-by: Jos Collin <jcollin@redhat.com>
  • Loading branch information
yuriw authored and votdev committed Aug 2, 2021
2 parents 09fd677 + 984cfd6 commit 400ec39
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 8 deletions.
34 changes: 34 additions & 0 deletions qa/tasks/cephfs/test_volumes.py
Expand Up @@ -3226,6 +3226,9 @@ def test_subvolume_clone_in_progress_getpath(self):
# snapshot subvolume
self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)

# Insert delay at the beginning of snapshot clone
self.config_set('mgr', 'mgr/volumes/snapshot_clone_delay', 2)

# schedule a clone
self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)

Expand Down Expand Up @@ -3272,6 +3275,9 @@ def test_subvolume_clone_in_progress_snapshot_rm(self):
# snapshot subvolume
self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)

# Insert delay at the beginning of snapshot clone
self.config_set('mgr', 'mgr/volumes/snapshot_clone_delay', 2)

# schedule a clone
self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)

Expand Down Expand Up @@ -3317,6 +3323,9 @@ def test_subvolume_clone_in_progress_source(self):
# snapshot subvolume
self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)

# Insert delay at the beginning of snapshot clone
self.config_set('mgr', 'mgr/volumes/snapshot_clone_delay', 2)

# schedule a clone
self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)

Expand Down Expand Up @@ -3801,6 +3810,9 @@ def test_subvolume_snapshot_clone_cancel_in_progress(self):
# snapshot subvolume
self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)

# Insert delay at the beginning of snapshot clone
self.config_set('mgr', 'mgr/volumes/snapshot_clone_delay', 2)

# schedule a clone
self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)

Expand Down Expand Up @@ -4200,6 +4212,9 @@ def test_subvolume_snapshot_clone_with_upgrade(self):
# ensure metadata file is in legacy location, with required version v1
self._assert_meta_location_and_version(self.volname, subvolume, version=1, legacy=True)

# Insert delay at the beginning of snapshot clone
self.config_set('mgr', 'mgr/volumes/snapshot_clone_delay', 2)

# schedule a clone
self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)

Expand Down Expand Up @@ -4249,6 +4264,25 @@ def test_subvolume_snapshot_reconf_max_concurrent_clones(self):
max_concurrent_clones = int(self.config_get('mgr', 'mgr/volumes/max_concurrent_clones'))
self.assertEqual(max_concurrent_clones, 2)

def test_subvolume_snapshot_config_snapshot_clone_delay(self):
"""
Validate 'snapshot_clone_delay' config option
"""

# get the default delay before starting the clone
default_timeout = int(self.config_get('mgr', 'mgr/volumes/snapshot_clone_delay'))
self.assertEqual(default_timeout, 0)

# Insert delay of 2 seconds at the beginning of the snapshot clone
self.config_set('mgr', 'mgr/volumes/snapshot_clone_delay', 2)
default_timeout = int(self.config_get('mgr', 'mgr/volumes/snapshot_clone_delay'))
self.assertEqual(default_timeout, 2)

# Decrease number of cloner threads
self.config_set('mgr', 'mgr/volumes/max_concurrent_clones', 2)
max_concurrent_clones = int(self.config_get('mgr', 'mgr/volumes/max_concurrent_clones'))
self.assertEqual(max_concurrent_clones, 2)

def test_subvolume_under_group_snapshot_clone(self):
subvolume = self._generate_random_subvolume_name()
group = self._generate_random_group_name()
Expand Down
17 changes: 12 additions & 5 deletions src/pybind/mgr/volumes/fs/async_cloner.py
Expand Up @@ -224,12 +224,15 @@ def handle_clone_complete(volume_client, volname, index, groupname, subvolname,
log.error("failed to detach clone from snapshot: {0}".format(e))
return (None, True)

def start_clone_sm(volume_client, volname, index, groupname, subvolname, state_table, should_cancel):
def start_clone_sm(volume_client, volname, index, groupname, subvolname, state_table, should_cancel, snapshot_clone_delay):
finished = False
current_state = None
try:
current_state = get_clone_state(volume_client, volname, groupname, subvolname)
log.debug("cloning ({0}, {1}, {2}) -- starting state \"{3}\"".format(volname, groupname, subvolname, current_state))
if current_state == SubvolumeStates.STATE_PENDING:
time.sleep(snapshot_clone_delay)
log.info("Delayed cloning ({0}, {1}, {2}) -- by {3} seconds".format(volname, groupname, subvolname, snapshot_clone_delay))
while not finished:
handler = state_table.get(current_state, None)
if not handler:
Expand All @@ -244,7 +247,7 @@ def start_clone_sm(volume_client, volname, index, groupname, subvolname, state_t
log.error("clone failed for ({0}, {1}, {2}) (current_state: {3}, reason: {4})".format(volname, groupname,\
subvolname, current_state, ve))

def clone(volume_client, volname, index, clone_path, state_table, should_cancel):
def clone(volume_client, volname, index, clone_path, state_table, should_cancel, snapshot_clone_delay):
log.info("cloning to subvolume path: {0}".format(clone_path))
resolved = resolve(volume_client.volspec, clone_path)

Expand All @@ -254,7 +257,7 @@ def clone(volume_client, volname, index, clone_path, state_table, should_cancel)

try:
log.info("starting clone: ({0}, {1}, {2})".format(volname, groupname, subvolname))
start_clone_sm(volume_client, volname, index, groupname, subvolname, state_table, should_cancel)
start_clone_sm(volume_client, volname, index, groupname, subvolname, state_table, should_cancel, snapshot_clone_delay)
log.info("finished clone: ({0}, {1}, {2})".format(volname, groupname, subvolname))
except VolumeException as ve:
log.error("clone failed for ({0}, {1}, {2}), reason: {3}".format(volname, groupname, subvolname, ve))
Expand All @@ -265,8 +268,9 @@ class Cloner(AsyncJobs):
this relies on a simple state machine (which mimics states from SubvolumeOpSm class) as
the driver. file types supported are directories, symbolic links and regular files.
"""
def __init__(self, volume_client, tp_size):
def __init__(self, volume_client, tp_size, snapshot_clone_delay):
self.vc = volume_client
self.snapshot_clone_delay = snapshot_clone_delay
self.state_table = {
SubvolumeStates.STATE_PENDING : handle_clone_pending,
SubvolumeStates.STATE_INPROGRESS : handle_clone_in_progress,
Expand All @@ -279,6 +283,9 @@ def __init__(self, volume_client, tp_size):
def reconfigure_max_concurrent_clones(self, tp_size):
return super(Cloner, self).reconfigure_max_async_threads(tp_size)

def reconfigure_snapshot_clone_delay(self, timeout):
self.snapshot_clone_delay = timeout

def is_clone_cancelable(self, clone_state):
return not (SubvolumeOpSm.is_complete_state(clone_state) or SubvolumeOpSm.is_failed_state(clone_state))

Expand Down Expand Up @@ -344,4 +351,4 @@ def get_next_job(self, volname, running_jobs):
return get_next_clone_entry(self.vc, volname, running_jobs)

def execute_job(self, volname, job, should_cancel):
clone(self.vc, volname, job[0].decode('utf-8'), job[1].decode('utf-8'), self.state_table, should_cancel)
clone(self.vc, volname, job[0].decode('utf-8'), job[1].decode('utf-8'), self.state_table, should_cancel, self.snapshot_clone_delay)
2 changes: 1 addition & 1 deletion src/pybind/mgr/volumes/fs/volume.py
Expand Up @@ -51,7 +51,7 @@ def __init__(self, mgr):
super().__init__(mgr)
# volume specification
self.volspec = VolSpec(mgr.rados.conf_get('client_snapdir'))
self.cloner = Cloner(self, self.mgr.max_concurrent_clones)
self.cloner = Cloner(self, self.mgr.max_concurrent_clones, self.mgr.snapshot_clone_delay)
self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4)
# on startup, queue purge job for available volumes to kickstart
# purge for leftover subvolume entries in trash. note that, if the
Expand Down
11 changes: 9 additions & 2 deletions src/pybind/mgr/volumes/module.py
Expand Up @@ -342,14 +342,19 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule):
'max_concurrent_clones',
type='int',
default=4,
desc='Number of asynchronous cloner threads',
)
desc='Number of asynchronous cloner threads'),
Option(
'snapshot_clone_delay',
type='int',
default=0,
desc='Delay clone begin operation by snapshot_clone_delay seconds')
]

def __init__(self, *args, **kwargs):
self.inited = False
# for mypy
self.max_concurrent_clones = None
self.snapshot_clone_delay = None
self.lock = threading.Lock()
super(Module, self).__init__(*args, **kwargs)
# Initialize config option members
Expand Down Expand Up @@ -378,6 +383,8 @@ def config_notify(self):
if self.inited:
if opt['name'] == "max_concurrent_clones":
self.vc.cloner.reconfigure_max_concurrent_clones(self.max_concurrent_clones)
elif opt['name'] == "snapshot_clone_delay":
self.vc.cloner.reconfigure_snapshot_clone_delay(self.snapshot_clone_delay)

def handle_command(self, inbuf, cmd):
handler_name = "_cmd_" + cmd['prefix'].replace(" ", "_")
Expand Down

0 comments on commit 400ec39

Please sign in to comment.