71 changes: 48 additions & 23 deletions daemon/ovirt_imageio/client/_api.py
Expand Up @@ -29,7 +29,7 @@


def upload(filename, url, cafile, buffer_size=io.BUFFER_SIZE, secure=True,
progress=None, proxy_url=None):
progress=None, proxy_url=None, max_workers=io.MAX_WORKERS):
"""
Upload filename to url
Expand All @@ -51,27 +51,44 @@ def upload(filename, url, cafile, buffer_size=io.BUFFER_SIZE, secure=True,
proxy_url (str): Proxy url on the host running imageio as proxy, used
if url is not accessible.
e.g. https://{proxy.server}:{port}/images/{ticket-id}.
max_workers (int): Maximum number of worker threads to use.
"""
if callable(progress):
progress = ProgressWrapper(progress)

info = qemu_img.info(filename)
if progress:
progress.size = info["virtual-size"]

with _open_nbd(filename, info["format"], read_only=True) as src, \
_open_http(
url,
"w",
cafile=cafile,
secure=secure,
proxy_url=proxy_url) as dst:
io.copy(src, dst, buffer_size=buffer_size, progress=progress)
# Open the destination backend to get number of workers.
with _open_http(
url,
"r+",
cafile=cafile,
secure=secure,
proxy_url=proxy_url) as dst:

max_workers = min(dst.max_writers, max_workers)

# Open the source backend using avialable workers + extra worker used
# for getting image extents.
with _open_nbd(
filename,
info["format"],
read_only=True,
shared=max_workers + 1) as src:

# Upload the image to the server.
io.copy(
src,
dst,
max_workers=max_workers,
buffer_size=buffer_size,
progress=progress,
name="upload")


def download(url, filename, cafile, fmt="qcow2", incremental=False,
buffer_size=io.BUFFER_SIZE, secure=True, progress=None,
proxy_url=None):
proxy_url=None, max_workers=io.MAX_WORKERS):
"""
Download url to filename.
Expand All @@ -96,33 +113,40 @@ def download(url, filename, cafile, fmt="qcow2", incremental=False,
proxy_url (str): Proxy url on the host running imageio as proxy, used
as if url is not accessible.
e.g. https://{proxy.server}:{port}/images/{ticket-id}.
max_workers (int): Maximum number of worker threads to use.
"""
if incremental and fmt != "qcow2":
raise ValueError(
"incremental={} is incompatible with fmt={}"
.format(incremental, fmt))

# Open the source backend to get number of workers and image size.
with _open_http(
url,
"r",
cafile=cafile,
secure=secure,
proxy_url=proxy_url) as src:
size = src.size()
if progress:
progress.size = size

qemu_img.create(filename, fmt, size=size)
# Create a local image. We know that this image is zeroed, so we don't
# need to zero during copy.
qemu_img.create(filename, fmt, size=src.size())

max_workers = min(src.max_readers, max_workers)

# Open the destination backend.
with _open_nbd(filename, fmt, shared=max_workers) as dst:

with _open_nbd(filename, fmt) as dst:
# We created new empty file, no need to zero.
# Download the image from the server to the local image.
io.copy(
src,
dst,
dirty=incremental,
max_workers=max_workers,
buffer_size=buffer_size,
zero=False,
progress=progress)
progress=progress,
name="download")


class ProgressWrapper:
Expand All @@ -136,7 +160,7 @@ def __init__(self, update):


@contextmanager
def _open_nbd(filename, fmt, read_only=False):
def _open_nbd(filename, fmt, read_only=False, shared=1):
with _tmp_dir("imageio-") as base:
sock = UnixAddress(os.path.join(base, "sock"))
with qemu_nbd.run(
Expand All @@ -146,10 +170,11 @@ def _open_nbd(filename, fmt, read_only=False):
read_only=read_only,
cache=None,
aio=None,
discard=None):
nbd_url = urlparse(sock.url())
discard=None,
shared=shared):
url = urlparse(sock.url())
mode = "r" if read_only else "r+"
yield nbd.open(nbd_url, mode)
yield nbd.open(url, mode=mode)


def _open_http(transfer_url, mode, cafile=None, secure=True, proxy_url=None):
Expand Down
111 changes: 72 additions & 39 deletions daemon/test/io_test.py
Expand Up @@ -28,6 +28,15 @@
]


class FakeProgress:

def __init__(self):
self.updates = []

def update(self, n):
self.updates.append(n)


@pytest.mark.parametrize("src_fmt,dst_fmt", [
("raw", "raw"),
("qcow2", "qcow2"),
Expand Down Expand Up @@ -74,26 +83,38 @@ def test_copy_nbd_to_nbd(tmpdir, src_fmt, dst_fmt, zero):
dst_sock = UnixAddress(tmpdir.join("dst.sock"))
dst_url = urlparse(dst_sock.url())

with qemu_nbd.run(src, src_fmt, src_sock, read_only=True), \
qemu_nbd.run(dst, dst_fmt, dst_sock), \
# Note: We need extra worker for reading extents for source.
max_workers = 2
with qemu_nbd.run(
src, src_fmt, src_sock,
read_only=True,
shared=max_workers + 1), \
qemu_nbd.run(
dst, dst_fmt, dst_sock,
shared=max_workers), \
nbd.open(src_url, "r") as src_backend, \
nbd.open(dst_url, "r+") as dst_backend:

# Because we copy to new image, we can alays use zero=False, but we
# test both to verify that the result is the same.
io.copy(src_backend, dst_backend, zero=zero)
io.copy(src_backend, dst_backend, max_workers=max_workers, zero=zero)

qemu_img.compare(src, dst)


@pytest.mark.parametrize("buffer_size", [128, 1024])
@pytest.mark.parametrize("zero", ZERO_PARAMS)
def test_copy_generic(zero):
@pytest.mark.parametrize("progress", [None, FakeProgress()])
def test_copy_generic(buffer_size, zero, progress):
size = 1024
chunk_size = size // 2

src_backing = bytearray(b"x" * chunk_size + b"\0" * chunk_size)
dst_backing = bytearray((b"y" if zero else b"\0") * size)

src = memory.Backend(
mode="r",
data=bytearray(b"x" * chunk_size + b"\0" * chunk_size),
data=src_backing,
extents={
"zero": [
image.ZeroExtent(0 * chunk_size, chunk_size, False),
Expand All @@ -102,23 +123,31 @@ def test_copy_generic(zero):
}
)

dst = memory.Backend(
"r+", data=bytearray((b"y" if zero else b"\0") * size))
dst = memory.Backend("r+", data=dst_backing)

io.copy(src, dst, buffer_size=128, zero=zero)
io.copy(
src, dst,
max_workers=1,
buffer_size=buffer_size,
zero=zero,
progress=progress)

assert dst.size() == src.size()
assert dst.data() == src.data()
assert dst_backing == src_backing


@pytest.mark.parametrize("buffer_size", [128, 1024])
@pytest.mark.parametrize("zero", ZERO_PARAMS)
def test_copy_read_from(zero):
@pytest.mark.parametrize("progress", [None, FakeProgress()])
def test_copy_read_from(buffer_size, zero, progress):
size = 1024
chunk_size = size // 2

src_backing = bytearray(b"x" * chunk_size + b"\0" * chunk_size)
dst_backing = bytearray((b"y" if zero else b"\0") * size)

src = memory.Backend(
mode="r",
data=bytearray(b"x" * chunk_size + b"\0" * chunk_size),
data=src_backing,
extents={
"zero": [
image.ZeroExtent(0 * chunk_size, chunk_size, False),
Expand All @@ -127,23 +156,31 @@ def test_copy_read_from(zero):
}
)

dst = memory.ReaderFrom(
"r+", data=bytearray((b"y" if zero else b"\0") * size))
dst = memory.ReaderFrom("r+", data=dst_backing)

io.copy(src, dst, buffer_size=128)
io.copy(
src, dst,
max_workers=1,
buffer_size=buffer_size,
zero=zero,
progress=progress)

assert dst.size() == src.size()
assert dst.data() == src.data()
assert dst_backing == src_backing


@pytest.mark.parametrize("buffer_size", [128, 1024])
@pytest.mark.parametrize("zero", ZERO_PARAMS)
def test_copy_write_to(zero):
@pytest.mark.parametrize("progress", [None, FakeProgress()])
def test_copy_write_to(buffer_size, zero, progress):
size = 1024
chunk_size = size // 2

src_backing = bytearray(b"x" * chunk_size + b"\0" * chunk_size)
dst_backing = bytearray((b"y" if zero else b"\0") * size)

src = memory.WriterTo(
mode="r",
data=bytearray(b"x" * chunk_size + b"\0" * chunk_size),
data=src_backing,
extents={
"zero": [
image.ZeroExtent(0 * chunk_size, chunk_size, False),
Expand All @@ -152,16 +189,20 @@ def test_copy_write_to(zero):
}
)

dst = memory.Backend(
"r+", data=bytearray((b"y" if zero else b"\0") * size))
dst = memory.Backend("r+", data=dst_backing)

io.copy(src, dst, buffer_size=128, zero=zero)
io.copy(
src, dst,
max_workers=1,
buffer_size=buffer_size,
zero=zero,
progress=progress)

assert dst.size() == src.size()
assert dst.data() == src.data()
assert dst_backing == src_backing


def test_copy_dirty():
@pytest.mark.parametrize("progress", [None, FakeProgress()])
def test_copy_dirty(progress):
size = 1024
chunk_size = size // 4

Expand All @@ -183,27 +224,19 @@ def test_copy_dirty():
}
)

dst = memory.Backend("r+", data=bytearray(b"\0" * size))
dst_backing = bytearray(b"\0" * size)
dst = memory.Backend("r+", data=dst_backing)

io.copy(src, dst, dirty=True)
io.copy(src, dst, dirty=True, max_workers=1, progress=progress)

assert dst.data() == (
assert dst_backing == (
b"a" * chunk_size +
b"\0" * chunk_size +
b"c" * chunk_size +
b"\0" * chunk_size
)


class FakeProgress:

def __init__(self):
self.updates = []

def update(self, n):
self.updates.append(n)


@pytest.mark.parametrize("zero", ZERO_PARAMS)
def test_copy_data_progress(zero):
size = 1024
Expand All @@ -230,7 +263,7 @@ def test_copy_data_progress(zero):
dst = memory.Backend("r+", data=bytearray(b"\0" * size))

p = FakeProgress()
io.copy(src, dst, zero=zero, progress=p)
io.copy(src, dst, max_workers=1, zero=zero, progress=p)

# Report at least every extent.
assert len(p.updates) >= 4
Expand Down Expand Up @@ -264,7 +297,7 @@ def test_copy_dirty_progress():
dst = memory.Backend("r+", bytearray(b"\0" * size))

p = FakeProgress()
io.copy(src, dst, dirty=True, progress=p)
io.copy(src, dst, dirty=True, max_workers=1, progress=p)

# Report at least every extent.
assert len(p.updates) >= 4
Expand Down