Skip to content

Commit

Permalink
Decouple read chunk size from write chunk size
Browse files Browse the repository at this point in the history
When using the copy_from option, readers and writers can have different
speeds to respectively read and write.
A reader timeout will happen if the writer is slow and the writer is
being asked to write a lot. This is currently happening when using
the VMware store and copying from an HTTP server. The reader is reading
16MB which takes too long to upload to vCenter which is causing a
timeout from the HTTP server. The writer should be able to control the
size of the chunks being read when using copy_from: this way the writer
will write fast enough to not make the reader timeout.

This patch addresses the issue by introducing the notion of read chunk
size and write chunk size. Each store can have its own value for read
and write. The write chunk size of the destination store will be used
as the read chunk size of the source store in case of an image-create
where the copy_from option is specified.

Closes-Bug: #1336168

Signed-off-by: Arnaud Legendre <alegendre@vmware.com>
Signed-off-by: Zhi Yan Liu <zhiyanl@cn.ibm.com>

Change-Id: I4e0c563b8f3a5ced8f65fcca83d341a97729a5d4
  • Loading branch information
arnaudleg committed Jul 23, 2014
1 parent 323aba6 commit 5148c96
Show file tree
Hide file tree
Showing 14 changed files with 61 additions and 48 deletions.
17 changes: 9 additions & 8 deletions glance/api/v1/images.py
Expand Up @@ -447,9 +447,10 @@ def _external_source(self, image_meta, req):
return Controller._validate_source(source, req)

@staticmethod
def _get_from_store(context, where):
def _get_from_store(context, where, dest=None):
try:
image_data, image_size = get_from_backend(context, where)
image_data, image_size = get_from_backend(
context, where, dest=dest)
except exception.NotFound as e:
raise HTTPNotFound(explanation=e.msg)
image_size = int(image_size) if image_size else None
Expand Down Expand Up @@ -571,11 +572,15 @@ def _upload(self, req, image_meta):
:retval The location where the image was stored
"""

scheme = req.headers.get('x-image-meta-store', CONF.default_store)
store = self.get_store_or_400(req, scheme)

copy_from = self._copy_from(req)
if copy_from:
try:
image_data, image_size = self._get_from_store(req.context,
copy_from)
copy_from,
dest=store)
except Exception as e:
upload_utils.safe_kill(req, image_meta['id'])
msg = ("Copy from external source failed: %s" %
Expand All @@ -594,10 +599,6 @@ def _upload(self, req, image_meta):

image_data = req.body_file

scheme = req.headers.get('x-image-meta-store', CONF.default_store)

store = self.get_store_or_400(req, scheme)

image_id = image_meta['id']
LOG.debug("Setting image %s to status 'saving'", image_id)
registry.update_image_metadata(req.context, image_id,
Expand Down Expand Up @@ -1102,7 +1103,7 @@ def _deserialize(self, request):
if image_size is None and data is not None:
data = utils.LimitingReader(data, CONF.image_size_cap)

#NOTE(bcwaldon): this is a hack to make sure the downstream code
# NOTE(bcwaldon): this is a hack to make sure the downstream code
# gets the correct image data
request.body_file = data

Expand Down
8 changes: 5 additions & 3 deletions glance/store/__init__.py
Expand Up @@ -258,10 +258,12 @@ def get_from_backend(context, uri, **kwargs):
"""Yields chunks of data from backend specified by uri"""

loc = location.get_location_from_uri(uri)
store = get_store_from_uri(context, uri, loc)

src_store = get_store_from_uri(context, uri, loc)
dest_store = kwargs.get('dest')
if dest_store is not None:
src_store.READ_CHUNKSIZE = dest_store.WRITE_CHUNKSIZE
try:
return store.get(loc)
return src_store.get(loc)
except NotImplementedError:
raise exception.StoreGetNotSupported

Expand Down
3 changes: 2 additions & 1 deletion glance/store/base.py
Expand Up @@ -27,7 +27,8 @@

class Store(object):

CHUNKSIZE = 16 * units.Mi # 16M
READ_CHUNKSIZE = 16 * units.Mi # 16M
WRITE_CHUNKSIZE = READ_CHUNKSIZE

@staticmethod
def _unconfigured(*args, **kwargs):
Expand Down
9 changes: 5 additions & 4 deletions glance/store/filesystem.py
Expand Up @@ -89,8 +89,6 @@ class ChunkedFile(object):
something that can iterate over a large file
"""

CHUNKSIZE = 65536

def __init__(self, filepath):
self.filepath = filepath
self.fp = open(self.filepath, 'rb')
Expand All @@ -100,7 +98,7 @@ def __iter__(self):
try:
if self.fp:
while True:
chunk = self.fp.read(ChunkedFile.CHUNKSIZE)
chunk = self.fp.read(Store.READ_CHUNKSIZE)
if chunk:
yield chunk
else:
Expand All @@ -117,6 +115,9 @@ def close(self):

class Store(glance.store.base.Store):

READ_CHUNKSIZE = 64 * units.Ki
WRITE_CHUNKSIZE = READ_CHUNKSIZE

def get_schemes(self):
return ('file', 'filesystem')

Expand Down Expand Up @@ -431,7 +432,7 @@ def add(self, image_id, image_file, image_size):
try:
with open(filepath, 'wb') as f:
for buf in utils.chunkreadable(image_file,
ChunkedFile.CHUNKSIZE):
self.WRITE_CHUNKSIZE):
bytes_written += len(buf)
checksum.update(buf)
f.write(buf)
Expand Down
2 changes: 1 addition & 1 deletion glance/store/http.py
Expand Up @@ -122,7 +122,7 @@ def get(self, location):
"""
conn, resp, content_length = self._query(location, 'GET')

iterator = http_response_iterator(conn, resp, self.CHUNKSIZE)
iterator = http_response_iterator(conn, resp, self.READ_CHUNKSIZE)

class ResponseIndexable(glance.store.Indexable):
def another(self):
Expand Down
7 changes: 4 additions & 3 deletions glance/store/rbd.py
Expand Up @@ -188,7 +188,8 @@ def configure_add(self):
itself, it should raise `exception.BadStoreConfiguration`
"""
try:
self.chunk_size = CONF.rbd_store_chunk_size * units.Mi
self.READ_CHUNKSIZE = CONF.rbd_store_chunk_size * units.Mi
self.WRITE_CHUNKSIZE = self.READ_CHUNKSIZE

# these must not be unicode since they will be passed to a
# non-unicode-aware C library
Expand Down Expand Up @@ -323,7 +324,7 @@ def add(self, image_id, image_file, image_size):
if hasattr(conn, 'get_fsid'):
fsid = conn.get_fsid()
with conn.open_ioctx(self.pool) as ioctx:
order = int(math.log(self.chunk_size, 2))
order = int(math.log(self.WRITE_CHUNKSIZE, 2))
LOG.debug('creating image %(name)s with order %(order)d and '
'size %(size)d',
{'name': text_type(image_name),
Expand All @@ -345,7 +346,7 @@ def add(self, image_id, image_file, image_size):
bytes_written = 0
offset = 0
chunks = utils.chunkreadable(image_file,
self.chunk_size)
self.WRITE_CHUNKSIZE)
for chunk in chunks:
# If the image size provided is zero we need to do
# a resize for the amount we are writing. This will
Expand Down
13 changes: 7 additions & 6 deletions glance/store/s3.py
Expand Up @@ -257,8 +257,6 @@ class ChunkedFile(object):
something that can iterate over a ``boto.s3.key.Key``
"""

CHUNKSIZE = 65536

def __init__(self, fp):
self.fp = fp

Expand All @@ -267,7 +265,7 @@ def __iter__(self):
try:
if self.fp:
while True:
chunk = self.fp.read(ChunkedFile.CHUNKSIZE)
chunk = self.fp.read(Store.READ_CHUNKSIZE)
if chunk:
yield chunk
else:
Expand Down Expand Up @@ -295,6 +293,9 @@ def close(self):
class Store(glance.store.base.Store):
"""An implementation of the s3 adapter."""

READ_CHUNKSIZE = 64 * units.Ki
WRITE_CHUNKSIZE = READ_CHUNKSIZE

EXAMPLE_URL = "s3://<ACCESS_KEY>:<SECRET_KEY>@<S3_URL>/<BUCKET>/<OBJ>"

def get_schemes(self):
Expand Down Expand Up @@ -372,11 +373,11 @@ def get(self, location):
"""
key = self._retrieve_key(location)

key.BufferSize = self.CHUNKSIZE
key.BufferSize = self.READ_CHUNKSIZE

class ChunkedIndexable(glance.store.Indexable):
def another(self):
return (self.wrapped.fp.read(ChunkedFile.CHUNKSIZE)
return (self.wrapped.fp.read(self.READ_CHUNKSIZE)
if self.wrapped.fp else None)

return (ChunkedIndexable(ChunkedFile(key), key.size), key.size)
Expand Down Expand Up @@ -504,7 +505,7 @@ def _sanitize(uri):
tmpdir = self.s3_store_object_buffer_dir
temp_file = tempfile.NamedTemporaryFile(dir=tmpdir)
checksum = hashlib.md5()
for chunk in utils.chunkreadable(image_file, self.CHUNKSIZE):
for chunk in utils.chunkreadable(image_file, self.WRITE_CHUNKSIZE):
checksum.update(chunk)
temp_file.write(chunk)
temp_file.flush()
Expand Down
13 changes: 7 additions & 6 deletions glance/store/sheepdog.py
Expand Up @@ -191,7 +191,8 @@ def configure(self):
"""

try:
self.chunk_size = CONF.sheepdog_store_chunk_size * units.Mi
self.READ_CHUNKSIZE = CONF.sheepdog_store_chunk_size * units.Mi
self.WRITE_CHUNKSIZE = self.READ_CHUNKSIZE
self.addr = CONF.sheepdog_store_address.strip()
self.port = CONF.sheepdog_store_port
except cfg.ConfigFileValueError as e:
Expand Down Expand Up @@ -231,7 +232,7 @@ def get(self, location):

loc = location.store_location
image = SheepdogImage(self.addr, self.port, loc.image,
self.chunk_size)
self.READ_CHUNKSIZE)
if not image.exist():
raise exception.NotFound(_("Sheepdog image %s does not exist")
% image.name)
Expand All @@ -250,7 +251,7 @@ def get_size(self, location):

loc = location.store_location
image = SheepdogImage(self.addr, self.port, loc.image,
self.chunk_size)
self.READ_CHUNKSIZE)
if not image.exist():
raise exception.NotFound(_("Sheepdog image %s does not exist")
% image.name)
Expand All @@ -272,7 +273,7 @@ def add(self, image_id, image_file, image_size):
"""

image = SheepdogImage(self.addr, self.port, image_id,
self.chunk_size)
self.WRITE_CHUNKSIZE)
if image.exist():
raise exception.Duplicate(_("Sheepdog image %s already exists")
% image_id)
Expand All @@ -285,7 +286,7 @@ def add(self, image_id, image_file, image_size):
try:
total = left = image_size
while left > 0:
length = min(self.chunk_size, left)
length = min(self.WRITE_CHUNKSIZE, left)
data = image_file.read(length)
image.write(data, total - left, length)
left -= length
Expand All @@ -311,7 +312,7 @@ def delete(self, location):

loc = location.store_location
image = SheepdogImage(self.addr, self.port, loc.image,
self.chunk_size)
self.WRITe_CHUNKSIZE)
if not image.exist():
raise exception.NotFound(_("Sheepdog image %s does not exist") %
loc.image)
Expand Down
5 changes: 3 additions & 2 deletions glance/store/swift.py
Expand Up @@ -31,6 +31,7 @@
from glance import i18n
from glance.openstack.common import excutils
import glance.openstack.common.log as logging
from glance.openstack.common import units
import glance.store
import glance.store.base
import glance.store.location
Expand Down Expand Up @@ -350,7 +351,7 @@ def Store(context=None, loc=None, configure=True):


class BaseStore(glance.store.base.Store):
CHUNKSIZE = 65536
READ_CHUNKSIZE = 64 * units.Ki

def get_schemes(self):
return ('swift+https', 'swift', 'swift+http', 'swift+config')
Expand Down Expand Up @@ -379,7 +380,7 @@ def _get_object(self, location, connection=None, start=None):
try:
resp_headers, resp_body = connection.get_object(
container=location.container, obj=location.obj,
resp_chunk_size=self.CHUNKSIZE, headers=headers)
resp_chunk_size=self.READ_CHUNKSIZE, headers=headers)
except swiftclient.ClientException as e:
if e.http_status == httplib.NOT_FOUND:
msg = _("Swift could not find object %s.") % location.obj
Expand Down
5 changes: 4 additions & 1 deletion glance/store/vmware_datastore.py
Expand Up @@ -30,6 +30,7 @@
from glance.openstack.common import excutils
from glance.openstack.common import gettextutils
import glance.openstack.common.log as logging
from glance.openstack.common import units
import glance.store
import glance.store.base
import glance.store.location
Expand Down Expand Up @@ -221,6 +222,8 @@ def parse_uri(self, uri):
class Store(glance.store.base.Store):
"""An implementation of the VMware datastore adapter."""

WRITE_CHUNKSIZE = units.Mi

def get_schemes(self):
return (STORE_SCHEME,)

Expand Down Expand Up @@ -366,7 +369,7 @@ def get(self, location):
from glance.store.location.get_location_from_uri()
"""
conn, resp, content_length = self._query(location, 'GET')
iterator = http_response_iterator(conn, resp, self.CHUNKSIZE)
iterator = http_response_iterator(conn, resp, self.READ_CHUNKSIZE)

class ResponseIndexable(glance.store.Indexable):

Expand Down
21 changes: 11 additions & 10 deletions glance/tests/unit/test_filesystem_store.py
Expand Up @@ -30,7 +30,6 @@
from glance.common import exception
from glance.openstack.common import units

from glance.store.filesystem import ChunkedFile
from glance.store.filesystem import Store
from glance.store.location import get_location_from_uri
from glance.tests.unit import base
Expand All @@ -43,14 +42,16 @@ class TestStore(base.IsolatedUnitTest):
def setUp(self):
"""Establish a clean test environment"""
super(TestStore, self).setUp()
self.orig_chunksize = ChunkedFile.CHUNKSIZE
ChunkedFile.CHUNKSIZE = 10
self.orig_read_chunksize = Store.READ_CHUNKSIZE
self.orig_write_chunksize = Store.WRITE_CHUNKSIZE
Store.READ_CHUNKSIZE = Store.WRITE_CHUNKSIZE = 10
self.store = Store()

def tearDown(self):
"""Clear the test environment"""
super(TestStore, self).tearDown()
ChunkedFile.CHUNKSIZE = self.orig_chunksize
Store.READ_CHUNKSIZE = self.orig_read_chunksize
Store.WRITE_CHUNKSIZE = self.orig_write_chunksize

def test_configure_add_single_datadir(self):
"""
Expand Down Expand Up @@ -191,7 +192,7 @@ def test_get_non_existing(self):

def test_add(self):
"""Test that we can add an image via the filesystem backend"""
ChunkedFile.CHUNKSIZE = 1024
Store.WRITE_CHUNKSIZE = 1024
expected_image_id = str(uuid.uuid4())
expected_file_size = 5 * units.Ki # 5K
expected_file_contents = "*" * expected_file_size
Expand Down Expand Up @@ -232,7 +233,7 @@ def test_add_with_multiple_dirs(self):
self.store.configure_add()

"""Test that we can add an image via the filesystem backend"""
ChunkedFile.CHUNKSIZE = 1024
Store.WRITE_CHUNKSIZE = 1024
expected_image_id = str(uuid.uuid4())
expected_file_size = 5 * units.Ki # 5K
expected_file_contents = "*" * expected_file_size
Expand Down Expand Up @@ -279,7 +280,7 @@ def fake_get_capacity_info(mount_point):

self.stubs.Set(self.store, '_get_capacity_info',
fake_get_capacity_info)
ChunkedFile.CHUNKSIZE = 1024
Store.WRITE_CHUNKSIZE = 1024
expected_image_id = str(uuid.uuid4())
expected_file_size = 5 * units.Ki # 5K
expected_file_contents = "*" * expected_file_size
Expand Down Expand Up @@ -347,7 +348,7 @@ def test_add_already_existing(self):
Tests that adding an image with an existing identifier
raises an appropriate exception
"""
ChunkedFile.CHUNKSIZE = 1024
Store.WRITE_CHUNKSIZE = 1024
image_id = str(uuid.uuid4())
file_size = 5 * units.Ki # 5K
file_contents = "*" * file_size
Expand All @@ -362,7 +363,7 @@ def test_add_already_existing(self):
image_id, image_file, 0)

def _do_test_add_write_failure(self, errno, exception):
ChunkedFile.CHUNKSIZE = 1024
Store.WRITE_CHUNKSIZE = 1024
image_id = str(uuid.uuid4())
file_size = 5 * units.Ki # 5K
file_contents = "*" * file_size
Expand Down Expand Up @@ -419,7 +420,7 @@ def test_add_cleanup_on_read_failure(self):
Tests the partial image file is cleaned up after a read
failure.
"""
ChunkedFile.CHUNKSIZE = 1024
Store.WRITE_CHUNKSIZE = 1024
image_id = str(uuid.uuid4())
file_size = 5 * units.Ki # 5K
file_contents = "*" * file_size
Expand Down
2 changes: 1 addition & 1 deletion glance/tests/unit/test_http_store.py
Expand Up @@ -93,7 +93,7 @@ def setUp(self):
super(TestHttpStore, self).setUp()
self.stubs = stubout.StubOutForTesting()
stub_out_http_backend(self.stubs)
Store.CHUNKSIZE = 2
Store.READ_CHUNKSIZE = 2
self.store = Store()
configure_registry_client()

Expand Down

0 comments on commit 5148c96

Please sign in to comment.