Skip to content

Commit

Permalink
mgr/volumes: cleanup libcephfs handles when stopping
Browse files Browse the repository at this point in the history
Signed-off-by: Venky Shankar <vshankar@redhat.com>
  • Loading branch information
vshankar committed Oct 24, 2019
1 parent a7994a0 commit 2eb0c50
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 14 deletions.
22 changes: 16 additions & 6 deletions src/pybind/mgr/volumes/fs/purge_queue.py
Expand Up @@ -75,10 +75,9 @@ def queue_purge_job(self, volname):
self.jobs[volname] = []
self.cv.notifyAll()

def cancel_purge_job(self, volname):
def _cancel_purge_job(self, volname):
log.info("cancelling purge jobs for volume '{0}'".format(volname))
self.lock.acquire()
unlock = True
locked = True
try:
if not self.q.count(volname):
return
Expand All @@ -90,16 +89,27 @@ def cancel_purge_job(self, volname):
j[1].cancel_job()
# wait for cancellation to complete
with self.c_lock:
unlock = False
locked = False
self.waiting = True
self.lock.release()
while self.waiting:
log.debug("waiting for {0} in-progress purge jobs for volume '{1}' to " \
"cancel".format(len(self.jobs[volname]), volname))
self.c_cv.wait()
finally:
if unlock:
self.lock.release()
if not locked:
self.lock.acquire()

def cancel_purge_job(self, volname):
self.lock.acquire()
self._cancel_purge_job(volname)
self.lock.release()

def cancel_all_jobs(self):
self.lock.acquire()
for volname in list(self.q):
self._cancel_purge_job(volname)
self.lock.release()

def register_job(self, volname, purge_dir):
log.debug("registering purge job: {0}.{1}".format(volname, purge_dir))
Expand Down
55 changes: 47 additions & 8 deletions src/pybind/mgr/volumes/fs/volume.py
Expand Up @@ -2,7 +2,7 @@
import time
import errno
import logging
from threading import Lock
from threading import Lock, Condition, Event
try:
# py2
from threading import _Timer as Timer
Expand Down Expand Up @@ -43,11 +43,16 @@ def get_fs_handle(self):
self.ops_in_progress += 1
return self.fs

def put_fs_handle(self):
def put_fs_handle(self, notify):
assert self.ops_in_progress > 0
self.ops_in_progress -= 1
if self.ops_in_progress == 0:
notify()

def del_fs_handle(self):
def del_fs_handle(self, waiter):
if waiter:
while self.ops_in_progress != 0:
waiter()
if self.is_connection_valid():
self.disconnect()
else:
Expand Down Expand Up @@ -108,6 +113,7 @@ def __init__(self, mgr):
self.mgr = mgr
self.connections = {}
self.lock = Lock()
self.cond = Condition(self.lock)
self.timer_task = ConnectionPool.RTimer(ConnectionPool.TIMER_TASK_RUN_INTERVAL,
self.cleanup_connections)
self.timer_task.start()
Expand Down Expand Up @@ -150,19 +156,31 @@ def put_fs_handle(self, fs_name):
with self.lock:
conn = self.connections.get(fs_name, None)
if conn:
conn.put_fs_handle()
conn.put_fs_handle(notify=lambda: self.cond.notifyAll())

def _del_fs_handle(self, fs_name):
def _del_fs_handle(self, fs_name, wait=False):
conn = self.connections.pop(fs_name, None)
if conn:
conn.del_fs_handle()
def del_fs_handle(self, fs_name):
conn.del_fs_handle(waiter=None if not wait else lambda: self.cond.wait())

def del_fs_handle(self, fs_name, wait=False):
with self.lock:
self._del_fs_handle(fs_name, wait)

def del_all_handles(self):
with self.lock:
self._del_fs_handle(fs_name)
for fs_name in list(self.connections.keys()):
log.info("waiting for pending ops for '{}'".format(fs_name))
self._del_fs_handle(fs_name, wait=True)
log.info("pending ops completed for '{}'".format(fs_name))
# no new connections should have been initialized since its
# guarded on shutdown.
assert len(self.connections) == 0

class VolumeClient(object):
def __init__(self, mgr):
self.mgr = mgr
self.stopping = Event()
self.connection_pool = ConnectionPool(self.mgr)
# TODO: make thread pool size configurable
self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4)
Expand All @@ -175,6 +193,15 @@ def __init__(self, mgr):
for fs in fs_map['filesystems']:
self.purge_queue.queue_purge_job(fs['mdsmap']['fs_name'])

def shutdown(self):
log.info("shutting down")
# first, note that we're shutting down
self.stopping.set()
# second, ask purge threads to quit
self.purge_queue.cancel_all_jobs()
# third, delete all libcephfs handles from connection pool
self.connection_pool.del_all_handles()

def cluster_log(self, msg, lvl=None):
"""
log to cluster log with default log level as WARN.
Expand Down Expand Up @@ -266,6 +293,9 @@ def create_volume(self, volname):
"""
create volume (pool, filesystem and mds)
"""
if self.stopping.isSet():
return -errno.ESHUTDOWN, "", "shutdown in progress"

metadata_pool, data_pool = self.gen_pool_names(volname)
# create pools
r, outs, outb = self.create_pool(metadata_pool)
Expand All @@ -286,6 +316,9 @@ def delete_volume(self, volname, confirm):
"""
delete the given module (tear down mds, remove filesystem)
"""
if self.stopping.isSet():
return -errno.ESHUTDOWN, "", "shutdown in progress"

self.purge_queue.cancel_purge_job(volname)
self.connection_pool.del_fs_handle(volname)
# Tear down MDS daemons
Expand Down Expand Up @@ -318,6 +351,9 @@ def delete_volume(self, volname, confirm):
return self.remove_pool(data_pool)

def list_volumes(self):
if self.stopping.isSet():
return -errno.ESHUTDOWN, "", "shutdown in progress"

result = []
fs_map = self.mgr.get("fs_map")
for f in fs_map['filesystems']:
Expand Down Expand Up @@ -349,6 +385,9 @@ def conn_wrapper(self, fs_handle, **kwargs):
# note that force arg is available for remove type commands
force = kwargs.get('force', False)

if self.stopping.isSet():
return -errno.ESHUTDOWN, "", "shutdown in progress"

# fetch the connection from the pool
if not fs_handle:
try:
Expand Down
6 changes: 6 additions & 0 deletions src/pybind/mgr/volumes/module.py
Expand Up @@ -171,6 +171,12 @@ def __init__(self, *args, **kwargs):
super(Module, self).__init__(*args, **kwargs)
self.vc = VolumeClient(self)

def __del__(self):
self.vc.shutdown()

def shutdown(self):
self.vc.shutdown()

def handle_command(self, inbuf, cmd):
handler_name = "_cmd_" + cmd['prefix'].replace(" ", "_")
try:
Expand Down

0 comments on commit 2eb0c50

Please sign in to comment.