Skip to content

Commit

Permalink
Add Retry for ENOTCONN errors
Browse files Browse the repository at this point in the history
Fixes: #187
Updates: #186
Signed-off-by: Aravinda Vishwanathapura <aravinda@kadalu.io>
  • Loading branch information
aravindavk committed Feb 26, 2020
1 parent 41f5cc5 commit d9f8e67
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 5 deletions.
37 changes: 33 additions & 4 deletions csi/volumeutils.py
Expand Up @@ -7,11 +7,13 @@
import time
import logging
import threading
from errno import ENOTCONN

from jinja2 import Template

from kadalulib import execute, PV_TYPE_SUBVOL, PV_TYPE_VIRTBLOCK, \
get_volname_hash, get_volume_path, logf, makedirs, CommandException
get_volname_hash, get_volume_path, logf, makedirs, CommandException, \
retry_errors


GLUSTERFS_CMD = "/usr/sbin/glusterfs"
Expand Down Expand Up @@ -120,6 +122,10 @@ def get_pv_hosting_volumes(filters=None):

def update_free_size(hostvol, sizechange):
"""Update the free size in respective hosting Volume's stat file"""

# Check for mount availability before updating the free size
retry_errors(os.statvfs, [os.path.join(HOSTVOL_MOUNTDIR, hostvol)], [ENOTCONN])

stat_file_path = os.path.join(HOSTVOL_MOUNTDIR, hostvol, ".stat")

with statfile_lock:
Expand Down Expand Up @@ -152,8 +158,10 @@ def mount_and_select_hosting_volume(pv_hosting_volumes, required_size):
stat_file_path = os.path.join(mntdir, ".stat")
data = {}
with statfile_lock:
# Stat done before `os.path.exists` to prevent ignoring
# file not exists even in case of ENOTCONN
mntdir_stat = retry_errors(os.statvfs, [mntdir], [ENOTCONN])
if not os.path.exists(stat_file_path):
mntdir_stat = os.statvfs(mntdir)
data = {
"size": mntdir_stat.f_bavail * mntdir_stat.f_bsize,
"free_size": mntdir_stat.f_bavail * mntdir_stat.f_bsize
Expand Down Expand Up @@ -189,6 +197,9 @@ def create_virtblock_volume(hostvol_mnt, volname, size):
volhash=volhash
))

# Check for mount availability before creating virtblock volume
retry_errors(os.statvfs, [hostvol_mnt], [ENOTCONN])

# Create a file with required size
makedirs(os.path.dirname(volpath_full))
logging.debug(logf(
Expand Down Expand Up @@ -229,7 +240,7 @@ def save_pv_metadata(hostvol_mnt, pvpath, pvsize):
info_file_path = os.path.join(hostvol_mnt, "info", pvpath)
info_file_dir = os.path.dirname(info_file_path)

makedirs(info_file_dir)
retry_errors(makedirs, [info_file_dir], [ENOTCONN])
logging.debug(logf(
"Created metadata directory",
metadata_dir=info_file_dir
Expand All @@ -255,6 +266,9 @@ def create_subdir_volume(hostvol_mnt, volname, size):
volhash=volhash
))

# Check for mount availability before creating subdir volume
retry_errors(os.statvfs, [hostvol_mnt], [ENOTCONN])

# Create a subdir
makedirs(os.path.join(hostvol_mnt, volpath))
logging.debug(logf(
Expand All @@ -280,7 +294,7 @@ def create_subdir_volume(hostvol_mnt, volname, size):
count = 0
while True:
count += 1
pvstat = os.statvfs(os.path.join(hostvol_mnt, volpath))
pvstat = retry_errors(os.statvfs, [os.path.join(hostvol_mnt, volpath)], [ENOTCONN])
volsize = pvstat.f_blocks * pvstat.f_bsize
if pvsize_min < volsize < pvsize_max:
logging.debug(logf(
Expand Down Expand Up @@ -327,6 +341,10 @@ def delete_volume(volname):
volhash=vol.volhash,
hostvol=vol.hostvol
))
# Check for mount availability before deleting the volume
retry_errors(os.statvfs, [os.path.join(HOSTVOL_MOUNTDIR, vol.hostvol)],
[ENOTCONN])

volpath = os.path.join(HOSTVOL_MOUNTDIR, vol.hostvol, vol.volpath)
try:
if vol.voltype == PV_TYPE_SUBVOL:
Expand Down Expand Up @@ -388,6 +406,9 @@ def search_volume(volname):
hvol = volume['name']
mntdir = os.path.join(HOSTVOL_MOUNTDIR, hvol)
mount_glusterfs(volume, mntdir)
# Check for mount availability before checking the info file
retry_errors(os.statvfs, [mntdir], [ENOTCONN])

for info_path in [subdir_path, virtblock_path]:
info_path_full = os.path.join(mntdir, "info", info_path + ".json")
voltype = PV_TYPE_SUBVOL if "/%s/" % PV_TYPE_SUBVOL \
Expand Down Expand Up @@ -429,6 +450,10 @@ def volume_list(voltype=None):
hvol = volume['name']
mntdir = os.path.join(HOSTVOL_MOUNTDIR, hvol)
mount_glusterfs(volume, mntdir)

# Check for mount availability before listing the Volumes
retry_errors(os.statvfs, [mntdir], [ENOTCONN])

if voltype is None or voltype == PV_TYPE_SUBVOL:
get_subdir_virtblock_vols(mntdir, volumes, PV_TYPE_SUBVOL)
if voltype is None or voltype == PV_TYPE_VIRTBLOCK:
Expand All @@ -445,6 +470,8 @@ def mount_volume(pvpath, target_path, pvtype, fstype=None):
else:
execute(MOUNT_CMD, "--bind", pvpath, target_path)

os.chmod(target_path, 0o777)


def unmount_volume(target_path):
"""Unmount a Volume"""
Expand Down Expand Up @@ -478,6 +505,8 @@ def mount_glusterfs(volume, target_path):
"""Mount Glusterfs Volume"""
if not os.path.exists(target_path):
makedirs(target_path)
# Make mount point immutable if not mounted
execute("chattr", "+i", target_path)

# Ignore if already mounted
if os.path.ismount(target_path):
Expand Down
30 changes: 29 additions & 1 deletion lib/kadalulib.py
Expand Up @@ -5,6 +5,7 @@
import sys
import os
from datetime import datetime
import time

import requests
import xxhash
Expand All @@ -15,6 +16,32 @@

KADALU_VERSION = os.environ.get("KADALU_VERSION", "latest")


class TimeoutOSError(OSError):
"""Timeout after retries"""
pass # noqa # pylint: disable=unnecessary-pass


def retry_errors(func, args, errors, timeout=120, interval=2):
"""Retries given function in case of specified errors"""
starttime = int(time.time())

while True:
try:
return func(*args)
except (OSError, IOError) as err:
currtime = int(time.time())
if (currtime - starttime) >= timeout:
raise TimeoutOSError(err.errno, err.strerror) from None

if err.errno in errors:
time.sleep(interval)
continue

# Reraise the same error
raise


def makedirs(dirpath):
"""exist_ok=True parameter will raise exception even if directory
exists with different attributes. Handle EEXIST gracefully."""
Expand Down Expand Up @@ -55,7 +82,8 @@ def execute(*cmd):
Raises CommandException on error
"""
proc = subprocess.Popen(cmd, stderr=subprocess.PIPE,
stdout=subprocess.PIPE)
stdout=subprocess.PIPE,
universal_newlines=True)
out, err = proc.communicate()
if proc.returncode != 0:
raise CommandException(proc.returncode, out.strip(), err.strip())
Expand Down

0 comments on commit d9f8e67

Please sign in to comment.