Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
client: Support multiple connections
Reimplement io.copy() using multiple connections and thread pool.

Every worker thread opens a connection to the source and destination
backends, and process I/O requests submitted to a work queue by the main
thread.

Multiple connections can work only when accessing imageio daemon
reporting the number of allowed readers and writers. When accessing old
imageio daemon we use single connection for reading or writing. Reading
image extents is always done in a separate thread, so we can read data
from an image while getting the next extent.

Testing in the scale lab show up to 101% improvement compared with
imageio 2.0.6.

All tests done with 100 GiB image containing 48 GiB of data. Rate is
virtual copy rate (virtual size / seconds).

Used upload_disk.py and download_disk.py from ovirt-engine-sdk with
a small tweak to select the current host for the transfer.

The server has 64 CPUs and 250 GiB of RAM. During the test 47 idle VMs
were running bug cpu usage was mostly idle.

Storage is iSCSI over 10Gbit network. Management network is only 1Gbit
so I did only few tests for remote transfers. iSCSI multipathing was not
configured so this system can probably show better performance.

For reference I also include timing of uploading the image directly from
local storage to volume and downloading volume to local storage using
qemu-img convert.

Higher change(%) values are better.

version   test            trans  workers    time(s)  rate(m/s)     change(%)
============================================================================
2.0.6     upload raw      unix         1    170.78    599.62      (baseline)
2.0.6     downld raw      unix         1    320.38    319.62      (baseline)
----------------------------------------------------------------------------
2.0.8     upload raw      unix         4     84.71   1208.32            +101
2.0.8     upload raw      unix         2     94.86   1075.20             +79
2.0.8     upload raw      unix         1    151.67    675.15             +12
2.0.8     upload qcow2    unix         4     89.25   1146.88               -
2.0.8     downld raw      unix         4    168.02    609.45             +90
2.0.8     downld raw      unix         1    296.69    345.14              +8
2.0.8     downld qocw2    unix         4    182.35    564.65               -
2.0.8     downld qocw2    unix         1    306.01    334.62               -
----------------------------------------------------------------------------
2.0.8     upload raw      http[1]      4    471.47    217.19               -
2.0.8     downld raw      http[1]      4    510.41    200.62               -
----------------------------------------------------------------------------
-        convert up[2] from raw        8    138.29    740.47             +23
-        convert up[2] from qcow2      8     90.74   1128.49             +88
-        convert down[3] to raw        8    194.08    527.61             +65
-        convert down[3] to qcow2      8    249.36    410.65             +28

[1] Using 1Gbit manangment network

[2] Convert image to qcow2 disk:

    qemu-img convert -f {raw|qcow2} -O qcow2 -t none -W {image} {/dev/vg/lv}

[3] Convert qcow2 disk to image:

    qemu-img convert -f qcow2 -O {raw|qcow2} -T none -W {/dev/vg/lv} {image}

Here are transfer stats from one of the workers uploading raw image with
4 workers on imageio side.

connection 1 ops, 84.369468 s
dispatch 138 ops, 84.253179 s
operation 138 ops, 84.217723 s
read 1558 ops, 7.878163 s, 12.06 GiB, 1.53 GiB/s
write 1558 ops, 75.615316 s, 12.06 GiB, 163.38 MiB/s
zero 27 ops, 0.639572 s, 13.09 GiB, 20.47 GiB/s
flush 1 ops, 0.017855 s

We can see that bottleneck is writing data to storage.

Change-Id: Id1ffad521ca5349da7cace4d49d261fde081f48d
Bug-Url: https://bugzilla.redhat.com/1591439
Signed-off-by: Nir Soffer <nsoffer@redhat.com>
  • Loading branch information
nirs committed Jun 10, 2020
1 parent b4c5dd3 commit 97f2e27
Show file tree
Hide file tree
Showing 3 changed files with 402 additions and 116 deletions.
336 changes: 282 additions & 54 deletions daemon/ovirt_imageio/_internal/io.py
Expand Up @@ -12,101 +12,329 @@
from __future__ import absolute_import

import logging
import threading

# Limit maximum zero and copy size to ensure frequent progress updates when
# handling large extents.
from collections import deque, namedtuple
from contextlib import closing
from functools import partial

from . import util

# Limit maximum zero and copy size to spread the workload better to multiple
# workers and ensure frequent progress updates when handling large extents.
MAX_ZERO_SIZE = 1024**3
MAX_COPY_SIZE = 128 * 1024**2

# NBD hard limit.
MAX_BUFFER_SIZE = 32 * 1024**2

# TODO: Needs testing.
# TODO: Needs more testing.
BUFFER_SIZE = 4 * 1024**2
MAX_WORKERS = 4

log = logging.getLogger("io")


def copy(src, dst, dirty=False, buffer_size=BUFFER_SIZE, zero=True,
progress=None):
def copy(src, dst, dirty=False, max_workers=MAX_WORKERS,
buffer_size=BUFFER_SIZE, zero=True, progress=None, name="copy"):

buffer_size = min(buffer_size, MAX_BUFFER_SIZE)
buf = bytearray(buffer_size)

if dirty:
_copy_dirty(src, dst, buf, progress)
else:
_copy_data(src, dst, buf, zero, progress)
with Executor(name=name) as executor:
# This is a bit ugly. We get src and dst backends, to keep same
# interface as the non-concurrent version. We use src backend here to
# iterate over image extents. We need to clone src backend max_workers
# times, and dst backend max_workers - 1) times.

# The first worker clones src and use dst itself.
executor.add_worker(
partial(Handler, src.clone, lambda: dst, buffer_size, progress))

# The rest of the workers clone both src and dst.
for _ in range(max_workers - 1):
executor.add_worker(
partial(Handler, src.clone, dst.clone, buffer_size, progress))

if progress:
progress.size = src.size()

dst.flush()
# Submit requests to executor.
if dirty:
_submit_dirty_extents(src, executor, progress)
else:
_submit_data_extents(src, executor, zero, progress)


def _copy_data(src, dst, buf, zero, progress):
def _submit_data_extents(src, executor, zero=True, progress=None):
"""
Generic copy that can work with both new destination image or existing
image. When using new image that is known to be zeroed, you can use
zero=False to skip zeroing.
"""
for ext in src.extents("zero"):
if not ext.zero:
_copy_extent(src, dst, ext, buf, progress)
executor.submit(Request(COPY, ext.start, ext.length))
elif zero:
_zero_extent(dst, ext, progress)
executor.submit(Request(ZERO, ext.start, ext.length))
elif progress:
progress.update(ext.length)


def _copy_dirty(src, dst, buf, progress):
def _submit_dirty_extents(src, executor, progress=None):
"""
This is for incremental backup using qcow2 format, and we assume a new
empty image, so we never want to zero anything.
"""
for ext in src.extents("dirty"):
if ext.dirty:
_copy_extent(src, dst, ext, buf, progress)
executor.submit(Request(COPY, ext.start, ext.length))
elif progress:
progress.update(ext.length)


def _copy_extent(src, dst, ext, buf, progress):
for start, length in _split(ext.start, ext.length, MAX_COPY_SIZE):
src.seek(start)
dst.seek(start)
# Request ops.
ZERO = "zero"
COPY = "copy"
STOP = "stop"


class Request(namedtuple("Request", "op,start,length")):

if hasattr(dst, "read_from"):
dst.read_from(src, length, buf)
elif hasattr(src, "write_to"):
src.write_to(dst, length, buf)
def __new__(cls, op, start=0, length=0):
return tuple.__new__(cls, (op, start, length))


class Executor(object):

def __init__(self, name="executor", queue_depth=32):
self._name = name
self._workers = []
self._queue = Queue(queue_depth)

# Public interface.

def add_worker(self, handler_factory):
name = "{}/{}".format(self._name, len(self._workers))
w = Worker(handler_factory, self._queue, name=name)
self._workers.append(w)

def submit(self, req):
"""
Submit request to queue. Blocks if the queue is full.
"""
for req in self._split(req):
self._queue.put(req)

def stop(self):
"""
Stop the executor when pending requests are processed. Blocks until all
workers exit.
"""
log.debug("Stopping executor %s", self._name)
for _ in self._workers:
self._queue.put(Request(STOP))
self._join_workers()
if self._queue.closed:
raise RuntimeError("Execution failed")

def abort(self):
"""
Drops pending requests and terminate all workers. Blocks until all
workers exit.
"""
log.debug("Aborting executor %s", self._name)
self._queue.close()
self._join_workers()

# Private.

def _join_workers(self):
for w in self._workers:
w.join()

def __enter__(self):
return self

def __exit__(self, t, v, tb):
if t is None:
# Normal shutdown.
self.stop()
else:
_generic_copy(src, dst, start, length, buf)
# Do not hide exception in user context.
try:
self.abort()
except Exception:
log.exception("Error aborting executor")

if progress:
progress.update(length)
def _split(self, req):
"""
Spread workload on all workers by splitting large requests.
"""
step = MAX_ZERO_SIZE if req.op == ZERO else MAX_COPY_SIZE
start = req.start
length = req.length

while length > step:
yield Request(req.op, start, step)
start += step
length -= step

def _zero_extent(dst, ext, progress):
# TODO: Assumes complete zero(); works with the nbd and http backends but
# not with the file backend.
for start, length in _split(ext.start, ext.length, MAX_ZERO_SIZE):
dst.seek(start)
dst.zero(length)
if progress:
progress.update(length)
yield Request(req.op, start, length)


def _split(start, length, max_length):
class Worker(object):

def __init__(self, handler_factory, queue, name="worker"):
self._handler_factory = handler_factory
self._queue = queue
self._name = name

log.debug("Starting worker %s", name)
self._thread = util.start_thread(self._run, name=name)

def join(self):
log.debug("Waiting for worker %s", self._name)
self._thread.join()

def _run(self):
try:
log.debug("Worker %s started", self._name)
handler = self._handler_factory()
with closing(handler):
while True:
req = self._queue.get()
if req.op is ZERO:
handler.zero(req)
elif req.op is COPY:
handler.copy(req)
elif req.op is STOP:
handler.flush(req)
break
except Closed:
log.debug("Worker %s cancelled", self._name)
except Exception:
self._queue.close()
log.exception("Worker %s failed", self._name)
else:
log.debug("Worker %s finished", self._name)


class Handler(object):

def __init__(self, src_factory, dst_factory, buffer_size=BUFFER_SIZE,
progress=None):
# Connecting to backend server may fail. Don't leave open connections
# after failures.
self._src = src_factory()
try:
self._dst = dst_factory()
except Exception:
self._src.close()
raise

self._buf = bytearray(buffer_size)
self._progress = progress

def zero(self, req):
# TODO: Assumes complete zero(); not compatible with file backend.
self._dst.seek(req.start)
self._dst.zero(req.length)
if self._progress:
self._progress.update(req.length)

def copy(self, req):
self._src.seek(req.start)
self._dst.seek(req.start)

if hasattr(self._dst, "read_from"):
self._dst.read_from(self._src, req.length, self._buf)
elif hasattr(self._src, "write_to"):
self._src.write_to(self._dst, req.length, self._buf)
else:
self._generic_copy(req)

if self._progress:
self._progress.update(req.length)

def flush(self, req):
self._dst.flush()

def close(self):
# Error while closing the destination backend should fail the
# operation. Error in closing source is not fatal, but we want to know
# about it.
try:
self._dst.close()
finally:
try:
self._src.close()
except Exception:
log.exception("Error closing %s", self._src)

def _generic_copy(self, req):
# TODO: Assumes complete readinto() and write(); not compatible with
# file backend.
step = len(self._buf)
todo = req.length

while todo > step:
self._src.readinto(self._buf)
self._dst.write(self._buf)
todo -= step

with memoryview(self._buf)[:todo] as view:
self._src.readinto(view)
self._dst.write(view)


class Closed(Exception):
"""
Split big range to smaller ones.
Raised when trying to access a closed queue.
"""
while length > max_length:
yield start, max_length
length -= max_length
start += max_length

yield start, length

class Queue(object):
"""
A simple queue supporting cancellation.
Once a queue is closed, putting items or getting items will raise a Closed
exception. This makes it easy to cancel group of threads waiting on the
queue.
"""

def __init__(self, max_size):
self._cond = threading.Condition(threading.Lock())
self._queue = deque(maxlen=max_size)
self._closed = False

@property
def closed(self):
return self._closed

def put(self, item):
with self._cond:
self._wait_while(length=self._queue.maxlen)
self._queue.append(item)
self._cond.notify()

def _generic_copy(src, dst, start, length, buf):
# TODO: Assumes complete readinto() and write(); works with the nbd and
# http backends but not with the file backend.
step = len(buf)
todo = length
def get(self):
with self._cond:
self._wait_while(length=0)
item = self._queue.popleft()
if len(self._queue) == self._queue.maxlen - 1:
self._cond.notify()
return item

while todo > step:
src.readinto(buf)
dst.write(buf)
todo -= step
def _wait_while(self, length):
if self._closed:
raise Closed
while len(self._queue) == length:
self._cond.wait()
if self._closed:
raise Closed

with memoryview(buf)[:todo] as last:
src.readinto(last)
dst.write(last)
def close(self):
with self._cond:
self._closed = True
self._queue.clear()
self._cond.notify_all()

0 comments on commit 97f2e27

Please sign in to comment.