Skip to content

Commit

Permalink
Merge "Zero-copy object-server GET responses with splice()"
Browse files Browse the repository at this point in the history
  • Loading branch information
Jenkins authored and openstack-gerrit committed Sep 26, 2014
2 parents 5d9190c + 7d0e5eb commit 0d502de
Show file tree
Hide file tree
Showing 9 changed files with 610 additions and 21 deletions.
7 changes: 7 additions & 0 deletions etc/object-server.conf-sample
Expand Up @@ -127,6 +127,13 @@ use = egg:swift#object
# an abort to occur.
# replication_failure_threshold = 100
# replication_failure_ratio = 1.0
#
# Use splice() for zero-copy object GETs. This requires Linux kernel
# version 3.0 or greater. If you set "splice = yes" but the kernel
# does not support it, error messages will appear in the object server
# logs at startup, but your object servers should continue to function.
#
# splice = no

[filter:healthcheck]
use = egg:swift#healthcheck
Expand Down
163 changes: 162 additions & 1 deletion swift/common/utils.py
Expand Up @@ -84,6 +84,11 @@
# These are lazily pulled from libc elsewhere
_sys_fallocate = None
_posix_fadvise = None
_libc_socket = None
_libc_bind = None
_libc_accept = None
_libc_splice = None
_libc_tee = None

# If set to non-zero, fallocate routines will fail based on free space
# available being at or below this amount, in bytes.
Expand All @@ -97,6 +102,13 @@

SWIFT_CONF_FILE = '/etc/swift/swift.conf'

# These constants are Linux-specific, and Python doesn't seem to know
# about them. We ask anyway just in case that ever gets fixed.
#
# The values were copied from the Linux 3.0 kernel headers.
AF_ALG = getattr(socket, 'AF_ALG', 38)
F_SETPIPE_SZ = getattr(fcntl, 'F_SETPIPE_SZ', 1031)


class InvalidHashPathConfigError(ValueError):

Expand Down Expand Up @@ -292,16 +304,22 @@ def validate_configuration():
sys.exit("Error: %s" % e)


def load_libc_function(func_name, log_error=True):
def load_libc_function(func_name, log_error=True,
fail_if_missing=False):
"""
Attempt to find the function in libc, otherwise return a no-op func.
:param func_name: name of the function to pull from libc.
:param log_error: log an error when a function can't be found
:param fail_if_missing: raise an exception when a function can't be found.
Default behavior is to return a no-op function.
"""
try:
libc = ctypes.CDLL(ctypes.util.find_library('c'), use_errno=True)
return getattr(libc, func_name)
except AttributeError:
if fail_if_missing:
raise
if log_error:
logging.warn(_("Unable to locate %s in libc. Leaving as a "
"no-op."), func_name)
Expand Down Expand Up @@ -3123,3 +3141,146 @@ def parse_content_disposition(header):
attrs = attrs[len(m.group(0)):]
attributes[m.group(1)] = m.group(2).strip('"')
return header, attributes


class sockaddr_alg(ctypes.Structure):
_fields_ = [("salg_family", ctypes.c_ushort),
("salg_type", ctypes.c_ubyte * 14),
("salg_feat", ctypes.c_uint),
("salg_mask", ctypes.c_uint),
("salg_name", ctypes.c_ubyte * 64)]


_bound_md5_sockfd = None


def get_md5_socket():
"""
Get an MD5 socket file descriptor. One can MD5 data with it by writing it
to the socket with os.write, then os.read the 16 bytes of the checksum out
later.
NOTE: It is the caller's responsibility to ensure that os.close() is
called on the returned file descriptor. This is a bare file descriptor,
not a Python object. It doesn't close itself.
"""

# Linux's AF_ALG sockets work like this:
#
# First, initialize a socket with socket() and bind(). This tells the
# socket what algorithm to use, as well as setting up any necessary bits
# like crypto keys. Of course, MD5 doesn't need any keys, so it's just the
# algorithm name.
#
# Second, to hash some data, get a second socket by calling accept() on
# the first socket. Write data to the socket, then when finished, read the
# checksum from the socket and close it. This lets you checksum multiple
# things without repeating all the setup code each time.
#
# Since we only need to bind() one socket, we do that here and save it for
# future re-use. That way, we only use one file descriptor to get an MD5
# socket instead of two, and we also get to save some syscalls.

global _bound_md5_sockfd
global _libc_socket
global _libc_bind
global _libc_accept

if _libc_accept is None:
_libc_accept = load_libc_function('accept', fail_if_missing=True)
if _libc_socket is None:
_libc_socket = load_libc_function('socket', fail_if_missing=True)
if _libc_bind is None:
_libc_bind = load_libc_function('bind', fail_if_missing=True)

# Do this at first call rather than at import time so that we don't use a
# file descriptor on systems that aren't using any MD5 sockets.
if _bound_md5_sockfd is None:
sockaddr_setup = sockaddr_alg(
AF_ALG,
(ord('h'), ord('a'), ord('s'), ord('h'), 0),
0, 0,
(ord('m'), ord('d'), ord('5'), 0))
hash_sockfd = _libc_socket(ctypes.c_int(AF_ALG),
ctypes.c_int(socket.SOCK_SEQPACKET),
ctypes.c_int(0))
if hash_sockfd < 0:
raise IOError(ctypes.get_errno(),
"Failed to initialize MD5 socket")

bind_result = _libc_bind(ctypes.c_int(hash_sockfd),
ctypes.pointer(sockaddr_setup),
ctypes.c_int(ctypes.sizeof(sockaddr_alg)))
if bind_result < 0:
os.close(hash_sockfd)
raise IOError(ctypes.get_errno(), "Failed to bind MD5 socket")

_bound_md5_sockfd = hash_sockfd

md5_sockfd = _libc_accept(ctypes.c_int(_bound_md5_sockfd), None, 0)
if md5_sockfd < 0:
raise IOError(ctypes.get_errno(), "Failed to accept MD5 socket")

return md5_sockfd


# Flags for splice() and tee()
SPLICE_F_MOVE = 1
SPLICE_F_NONBLOCK = 2
SPLICE_F_MORE = 4
SPLICE_F_GIFT = 8


def splice(fd_in, off_in, fd_out, off_out, length, flags):
"""
Calls splice - a Linux-specific syscall for zero-copy data movement.
On success, returns the number of bytes moved.
On failure where errno is EWOULDBLOCK, returns None.
On all other failures, raises IOError.
"""
global _libc_splice
if _libc_splice is None:
_libc_splice = load_libc_function('splice', fail_if_missing=True)

ret = _libc_splice(ctypes.c_int(fd_in), ctypes.c_long(off_in),
ctypes.c_int(fd_out), ctypes.c_long(off_out),
ctypes.c_int(length), ctypes.c_int(flags))
if ret < 0:
err = ctypes.get_errno()
if err == errno.EWOULDBLOCK:
return None
else:
raise IOError(err, "splice() failed: %s" % os.strerror(err))
return ret


def tee(fd_in, fd_out, length, flags):
"""
Calls tee - a Linux-specific syscall to let pipes share data.
On success, returns the number of bytes "copied".
On failure, raises IOError.
"""
global _libc_tee
if _libc_tee is None:
_libc_tee = load_libc_function('tee', fail_if_missing=True)

ret = _libc_tee(ctypes.c_int(fd_in), ctypes.c_int(fd_out),
ctypes.c_int(length), ctypes.c_int(flags))
if ret < 0:
err = ctypes.get_errno()
raise IOError(err, "tee() failed: %s" % os.strerror(err))
return ret


def system_has_splice():
global _libc_splice
try:
_libc_splice = load_libc_function('splice', fail_if_missing=True)
return True
except AttributeError:
return False

0 comments on commit 0d502de

Please sign in to comment.