Skip to content

Commit

Permalink
Python API: fix the object_link operation
Browse files Browse the repository at this point in the history
  • Loading branch information
fvennetier committed Mar 26, 2021
1 parent 5f99b9d commit 4fac986
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 24 deletions.
24 changes: 19 additions & 5 deletions oio/api/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@

from oio.common import exceptions as exc, green
from oio.common.constants import REQID_HEADER
from oio.common.fullpath import decode_fullpath
from oio.common.http import parse_content_type,\
parse_content_range, ranges_from_http_header, http_header_from_ranges
from oio.common.http_eventlet import http_connect
from oio.common.utils import GeneratorIO, group_chunk_errors, \
deadline_to_timeout, monotonic_time, set_deadline_from_read_timeout
deadline_to_timeout, monotonic_time, set_deadline_from_read_timeout, \
cid_from_name, compute_chunk_id
from oio.common.storage_method import STORAGE_METHODS
from oio.common.logger import get_logger

Expand Down Expand Up @@ -116,11 +118,12 @@ def write_timeout(self):

class LinkHandler(_WriteHandler):
def __init__(self, fullpath, chunk_preparer, storage_method, blob_client,
headers=None, **kwargs):
policy, headers=None, **kwargs):
super(LinkHandler, self).__init__(
chunk_preparer, storage_method, headers=headers, **kwargs)
self.fullpath = fullpath
self.blob_client = blob_client
self.policy = policy

def link(self):
content_chunks = list()
Expand All @@ -131,6 +134,7 @@ def link(self):
handler = MetachunkLinker(
meta_chunk, self.fullpath, self.blob_client,
storage_method=self.storage_method,
policy=self.policy,
reqid=self.headers.get(REQID_HEADER),
connection_timeout=self.connection_timeout,
write_timeout=self.write_timeout, **kwargs)
Expand Down Expand Up @@ -681,9 +685,9 @@ def quorum_or_fail(self, successes, failures):

class MetachunkLinker(_MetachunkWriter):
"""
Base class for metachunk linkers
Create new hard links for all the chunks of a metachunk.
"""
def __init__(self, meta_chunk_target, fullpath, blob_client,
def __init__(self, meta_chunk_target, fullpath, blob_client, policy,
storage_method=None, quorum=None, reqid=None, perfdata=None,
connection_timeout=None, write_timeout=None,
**kwargs):
Expand All @@ -693,6 +697,7 @@ def __init__(self, meta_chunk_target, fullpath, blob_client,
self.meta_chunk_target = meta_chunk_target
self.fullpath = fullpath
self.blob_client = blob_client
self.policy = policy
self.connection_timeout = connection_timeout or CONNECTION_TIMEOUT
self.write_timeout = write_timeout or CHUNK_TIMEOUT
self.logger = kwargs.get('logger', LOGGER)
Expand All @@ -704,12 +709,21 @@ def filter_kwargs(cls, kwargs):
'logger')}

def link(self):
"""
Create new hard links for all the chunks of a metachunk.
"""
new_meta_chunks = list()
failed_chunks = list()
# pylint: disable=unbalanced-tuple-unpacking
acct, ct, path, vers, _ = decode_fullpath(self.fullpath)
cid = cid_from_name(acct, ct)
for chunk_target in self.meta_chunk_target:
try:
chunk_id = compute_chunk_id(cid, path, vers,
chunk_target['pos'],
self.policy)
resp, new_chunk_url = self.blob_client.chunk_link(
chunk_target['url'], None, self.fullpath,
chunk_target['url'], chunk_id, self.fullpath,
connection_timeout=self.connection_timeout,
write_timeout=self.write_timeout, reqid=self.reqid,
perfdata=self.perfdata, logger=self.logger)
Expand Down
40 changes: 26 additions & 14 deletions oio/api/object_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ def container_snapshot(self, account, container, dst_account,
target_beans = []
copy_beans = []
for obj in obj_gen:
_, chunks = self.object_locate(
obj_meta, chunks = self.object_locate(
account, container, obj["name"], version=obj['version'],
**kwargs)
fullpath = encode_fullpath(
Expand All @@ -490,16 +490,17 @@ def container_snapshot(self, account, container, dst_account,
logger=self.logger)
handler = LinkHandler(
fullpath, chunks_by_pos, storage_method,
self.blob_client, **kwargs)
self.blob_client, policy=obj_meta['policy'],
**kwargs)
try:
chunks_copies = handler.link()
except exc.UnfinishedUploadException as ex:
self.logger.warn(
'Failed to upload all data (%s), deleting chunks',
ex.exception)
kwargs['cid'] = obj['container_id']
self._delete_orphan_chunks(
ex.chunks_already_uploaded, obj['container_id'],
**kwargs)
ex.chunks_already_uploaded, **kwargs)
ex.reraise()
t_beans, c_beans = self._prepare_meta2_raw_update(
chunks, chunks_copies, obj['content'])
Expand Down Expand Up @@ -797,6 +798,16 @@ def object_change_policy(self, account, container, obj, policy, **kwargs):
"""
meta, stream = self.object_fetch(
account, container, obj, **kwargs)
# Before we started generating predictable chunk IDs, it was possible
# to change to the same policy: it just renewed all chunks and updated
# the modification time.
# Now that we generate predictable chunk IDs, we must change something
# in the object description in order to get a different set of chunks
# (we don't want to change the object version).
if meta['policy'] == policy:
del stream
raise exc.Conflict(
'The object is already using the %s storage policy' % policy)
kwargs['version'] = meta['version']
return self.object_create_ext(
account, container, obj_name=meta['name'],
Expand Down Expand Up @@ -1082,9 +1093,9 @@ def object_link(self, target_account, target_container, target_obj,
self.logger.warn(
'Failed to upload all data (%s), deleting chunks',
ex.exception)
kwargs['cid'] = link_meta['container_id']
self._delete_orphan_chunks(
ex.chunks_already_uploaded, link_meta['container_id'],
**kwargs)
ex.chunks_already_uploaded, **kwargs)
ex.reraise()

data = {'chunks': chunks_copies,
Expand All @@ -1109,8 +1120,8 @@ def object_link(self, target_account, target_container, target_obj,
except (exc.Conflict, exc.DeadlineReached) as ex:
self.logger.warn(
'Failed to commit to meta2 (%s), deleting chunks', ex)
self._delete_orphan_chunks(
chunks_copies, link_meta['container_id'], **kwargs)
kwargs['cid'] = link_meta['container_id']
self._delete_orphan_chunks(chunks_copies, **kwargs)
raise
return link_meta

Expand Down Expand Up @@ -1385,9 +1396,11 @@ def _object_prepare(self, account, container, obj_name, source,

storage_method = STORAGE_METHODS.load(obj_meta['chunk_method'])
if link:
if not policy:
policy = obj_meta['policy']
handler = LinkHandler(
obj_meta['full_path'], None, storage_method,
self.blob_client, **kwargs)
self.blob_client, policy=policy, **kwargs)
return obj_meta, handler, None

if storage_method.ec:
Expand Down Expand Up @@ -1425,10 +1438,9 @@ def _object_create(self, account, container, obj_name, source,
except exc.OioException as ex:
self.logger.warn(
'Failed to upload all data (%s), deleting chunks', ex)
kwargs['cid'] = obj_meta['container_id']
self._delete_orphan_chunks(
chunk_prep.all_chunks_so_far(),
obj_meta['container_id'],
**kwargs)
chunk_prep.all_chunks_so_far(), **kwargs)
raise

etag = obj_meta.get('etag')
Expand Down Expand Up @@ -1461,8 +1473,8 @@ def _object_create(self, account, container, obj_name, source,
except (exc.Conflict, exc.DeadlineReached) as ex:
self.logger.warn(
'Failed to commit to meta2 (%s), deleting chunks', ex)
self._delete_orphan_chunks(ul_chunks, obj_meta['container_id'],
**kwargs)
kwargs['cid'] = obj_meta['container_id']
self._delete_orphan_chunks(ul_chunks, **kwargs)
raise
return ul_chunks, ul_bytes, obj_checksum, obj_meta

Expand Down
6 changes: 6 additions & 0 deletions oio/blob/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,8 @@ def _generate_fullchunk_copy(self, chunk, random_hex=60, **kwargs):
Generate new chunk URLs, by replacing the last `random_hex`
characters of the original URLs by random hexadecimal digits.
"""
maxlen = len(chunk) - chunk.rfind('/') - 1
random_hex = min(random_hex, maxlen)
rnd = ''.join(random.choice('0123456789ABCDEF')
for _ in range(random_hex))
return chunk[:-random_hex] + rnd
Expand All @@ -285,6 +287,10 @@ def chunk_link(self, target, link, fullpath, headers=None,
hdrs = headers.copy()
if link is None:
link = self._generate_fullchunk_copy(target, **kwargs)
elif not link.startswith('http://'):
offset = target.rfind('/')
maxlen = len(target) - offset - 1
link = target[:offset+1] + link[:maxlen]
hdrs['Destination'] = link
hdrs[CHUNK_HEADERS['full_path']] = fullpath
if write_timeout is not None:
Expand Down
2 changes: 1 addition & 1 deletion oio/blob/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def _get_path(self, fd, chunk_id):
end = start + len(chunk_part)
chunk_path += '/' + chunk_id[start:end]
start = end
chunk_path += '/' + chunk_path_split[-1]
chunk_path += '/' + chunk_id
return chunk_path

def cid_from_name(self, account, container):
Expand Down
15 changes: 13 additions & 2 deletions oio/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
import ctypes
import os
import grp
import hashlib
import pwd
import fcntl
from collections import OrderedDict
from hashlib import sha256
from math import sqrt
from random import getrandbits
from io import RawIOBase
Expand Down Expand Up @@ -195,7 +195,7 @@ def cid_from_name(account, ref):
"""
Compute a container ID from an account and a reference name.
"""
hash_ = sha256()
hash_ = hashlib.new('sha256')
for v in [account, '\0', ref]:
if isinstance(v, text_type):
v = v.encode('utf-8')
Expand Down Expand Up @@ -526,3 +526,14 @@ def compute_perfdata_stats(perfdata, prefix='upload.'):
rawx_perfdata[prefix + 'AVG'] = avg
rawx_perfdata[prefix + 'SD'] = sdev
rawx_perfdata[prefix + 'RSD'] = sdev/avg


def compute_chunk_id(cid, path, version, position, policy, hash_algo='sha256'):
"""
Compute the predictable chunk ID for the specified object version,
position and policy.
"""
base = cid + path + str(version) + str(position) + policy
hash_ = hashlib.new(hash_algo)
hash_.update(base.encode('utf-8'))
return hash_.hexdigest().upper()
5 changes: 3 additions & 2 deletions tests/functional/api/test_objectstorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -1640,8 +1640,9 @@ def test_change_content_2xchunksize_bytes_policy_3copies_to_single(self):
self.chunk_size * 2, 'THREECOPIES', 'SINGLE')

def test_change_content_with_same_policy(self):
self._test_change_policy(
1, 'TWOCOPIES', 'TWOCOPIES')
self.assertRaises(exc.Conflict,
self._test_change_policy,
1, 'TWOCOPIES', 'TWOCOPIES')

def test_change_policy_with_versioning(self):
self._test_change_policy(
Expand Down

0 comments on commit 4fac986

Please sign in to comment.