Skip to content

Commit

Permalink
rgw: don't update bucket index multiple times in overwrite
Browse files Browse the repository at this point in the history
Instead of this for overwrites:
 prepare (index),
   write (head) [-EEXIST]
 cancel (index)
 read (head)
 prepare (index)
   write (head)
 complete (index)

We now do:
 prepare (index),
   write (head) [-EEXIST]
   read (head)
   write (head)
 complete (index)

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
  • Loading branch information
yehudasa committed Feb 6, 2017
1 parent 6f27f60 commit 7f4818f
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 44 deletions.
88 changes: 53 additions & 35 deletions src/rgw/rgw_rados.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2732,7 +2732,18 @@ RGWObjState *RGWObjectCtx::get_state(rgw_obj& obj) {
void RGWObjectCtx::invalidate(rgw_obj& obj)
{
RWLock::WLocker wl(lock);
objs_state.erase(obj);
auto iter = objs_state.find(obj);
if (iter == objs_state.end()) {
return;
}
bool is_atomic = iter->second.is_atomic;
bool prefetch_data = iter->second.prefetch_data;

objs_state.erase(iter);

auto& s = objs_state[obj];
s.is_atomic = is_atomic;
s.prefetch_data = prefetch_data;
}

void RGWObjectCtx::set_atomic(rgw_obj& obj) {
Expand Down Expand Up @@ -6289,8 +6300,10 @@ int RGWRados::swift_versioning_restore(RGWObjectCtx& obj_ctx,
* Returns: 0 on success, -ERR# otherwise.
*/
int RGWRados::Object::Write::_do_write_meta(uint64_t size, uint64_t accounted_size,
map<string, bufferlist>& attrs, bool assume_noent)
map<string, bufferlist>& attrs, bool assume_noent,
void *_index_op)
{
RGWRados::Bucket::UpdateIndex *index_op = static_cast<RGWRados::Bucket::UpdateIndex *>(_index_op);
rgw_bucket bucket;
rgw_rados_ref ref;
RGWRados *store = target->get_store();
Expand All @@ -6316,7 +6329,12 @@ int RGWRados::Object::Write::_do_write_meta(uint64_t size, uint64_t accounted_si
bool is_olh = state->is_olh;

bool reset_obj = (meta.flags & PUT_OBJ_CREATE) != 0;
r = target->prepare_atomic_modification(op, reset_obj, meta.ptag, meta.if_match, meta.if_nomatch, false);

const string *ptag = meta.ptag;
if (!ptag && !index_op->get_optag()->empty()) {
ptag = index_op->get_optag();
}
r = target->prepare_atomic_modification(op, reset_obj, ptag, meta.if_match, meta.if_nomatch, false);
if (r < 0)
return r;

Expand Down Expand Up @@ -6390,7 +6408,6 @@ int RGWRados::Object::Write::_do_write_meta(uint64_t size, uint64_t accounted_si
if (!op.size())
return 0;

string index_tag;
uint64_t epoch;
int64_t poolid;

Expand All @@ -6399,23 +6416,17 @@ int RGWRados::Object::Write::_do_write_meta(uint64_t size, uint64_t accounted_si

bool versioned_target = (meta.olh_epoch > 0 || !obj.get_instance().empty());

index_tag = state->write_tag;

bool versioned_op = (target->versioning_enabled() || is_olh || versioned_target);

RGWBucketInfo& bucket_info = target->get_bucket_info();

RGWRados::Bucket bop(store, bucket_info);
RGWRados::Bucket::UpdateIndex index_op(&bop, obj, state);

if (versioned_op) {
index_op.set_bilog_flags(RGW_BILOG_FLAG_VERSIONED_OP);
index_op->set_bilog_flags(RGW_BILOG_FLAG_VERSIONED_OP);
}


r = index_op.prepare(CLS_RGW_OP_ADD);
if (r < 0)
return r;
if (!index_op->is_prepared()) {
r = index_op->prepare(CLS_RGW_OP_ADD, &state->write_tag);
if (r < 0)
return r;
}

r = ref.ioctx.operate(ref.oid, &op);
if (r < 0) { /* we can expect to get -ECANCELED if object was replaced under,
Expand All @@ -6436,7 +6447,7 @@ int RGWRados::Object::Write::_do_write_meta(uint64_t size, uint64_t accounted_si
ldout(store->ctx(), 0) << "ERROR: complete_atomic_modification returned r=" << r << dendl;
}

r = index_op.complete(poolid, epoch, size, accounted_size,
r = index_op->complete(poolid, epoch, size, accounted_size,
meta.set_mtime, etag, content_type, &acl_bl,
meta.category, meta.remove_objs);
if (r < 0)
Expand Down Expand Up @@ -6476,7 +6487,7 @@ int RGWRados::Object::Write::_do_write_meta(uint64_t size, uint64_t accounted_si
return 0;

done_cancel:
int ret = index_op.cancel();
int ret = index_op->cancel();
if (ret < 0) {
ldout(store->ctx(), 0) << "ERROR: index_op.cancel()() returned ret=" << ret << dendl;
}
Expand All @@ -6490,10 +6501,7 @@ int RGWRados::Object::Write::_do_write_meta(uint64_t size, uint64_t accounted_si
* should treat it as a success
*/
if (meta.if_match == NULL && meta.if_nomatch == NULL) {
if (r == -ECANCELED || r == -ENOENT ||
(r == -EEXIST && !assume_noent)) /* if assume_noent, we want to send back error so that
* we'd be called again with assume_noent == false
*/ {
if (r == -ECANCELED || r == -ENOENT || r == -EEXIST) {
r = 0;
}
} else {
Expand Down Expand Up @@ -6526,16 +6534,21 @@ int RGWRados::Object::Write::_do_write_meta(uint64_t size, uint64_t accounted_si
int RGWRados::Object::Write::write_meta(uint64_t size, uint64_t accounted_size,
map<string, bufferlist>& attrs)
{
RGWBucketInfo& bucket_info = target->get_bucket_info();

RGWRados::Bucket bop(target->get_store(), bucket_info);
RGWRados::Bucket::UpdateIndex index_op(&bop, target->get_obj());

bool assume_noent = (meta.if_match == NULL && meta.if_nomatch == NULL);
int r;
if (assume_noent) {
r = _do_write_meta(size, accounted_size, attrs, assume_noent);
r = _do_write_meta(size, accounted_size, attrs, assume_noent, (void *)&index_op);
if (r == -EEXIST) {
assume_noent = false;
}
}
if (!assume_noent) {
r = _do_write_meta(size, accounted_size, attrs, assume_noent);
r = _do_write_meta(size, accounted_size, attrs, assume_noent, (void *)&index_op);
}
return r;
}
Expand Down Expand Up @@ -8301,12 +8314,12 @@ int RGWRados::Object::Delete::delete_obj()
RGWBucketInfo& bucket_info = target->get_bucket_info();

RGWRados::Bucket bop(store, bucket_info);
RGWRados::Bucket::UpdateIndex index_op(&bop, obj, state);
RGWRados::Bucket::UpdateIndex index_op(&bop, obj);

index_op.set_bilog_flags(params.bilog_flags);


r = index_op.prepare(CLS_RGW_OP_DEL);
r = index_op.prepare(CLS_RGW_OP_DEL, &state->write_tag);
if (r < 0)
return r;

Expand Down Expand Up @@ -8412,7 +8425,7 @@ int RGWRados::delete_obj_index(rgw_obj& obj)
}

RGWRados::Bucket bop(this, bucket_info);
RGWRados::Bucket::UpdateIndex index_op(&bop, obj, NULL);
RGWRados::Bucket::UpdateIndex index_op(&bop, obj);

real_time removed_mtime;
int r = index_op.complete_del(-1 /* pool */, 0, removed_mtime, NULL);
Expand Down Expand Up @@ -9068,13 +9081,13 @@ int RGWRados::set_attrs(void *ctx, rgw_obj& obj,

bufferlist bl;
RGWRados::Bucket bop(this, bucket_info);
RGWRados::Bucket::UpdateIndex index_op(&bop, obj, state);
RGWRados::Bucket::UpdateIndex index_op(&bop, obj);

if (state) {
string tag;
append_rand_alpha(cct, tag, tag, 32);
state->write_tag = tag;
r = index_op.prepare(CLS_RGW_OP_ADD);
r = index_op.prepare(CLS_RGW_OP_ADD, &state->write_tag);

if (r < 0)
return r;
Expand Down Expand Up @@ -9306,7 +9319,7 @@ int RGWRados::SystemObject::Read::stat(RGWObjVersionTracker *objv_tracker)
stat_params.lastmod, stat_params.obj_size, objv_tracker);
}

int RGWRados::Bucket::UpdateIndex::prepare(RGWModifyOp op)
int RGWRados::Bucket::UpdateIndex::prepare(RGWModifyOp op, const string *write_tag)
{
if (blind) {
return 0;
Expand All @@ -9319,15 +9332,20 @@ int RGWRados::Bucket::UpdateIndex::prepare(RGWModifyOp op)
return ret;
}

if (obj_state && obj_state->write_tag.length()) {
optag = string(obj_state->write_tag.c_str(), obj_state->write_tag.length());
if (write_tag && write_tag->length()) {
optag = string(write_tag->c_str(), write_tag->length());
} else {
if (optag.empty()) {
append_rand_alpha(store->ctx(), optag, optag, 32);
}
}

return store->cls_obj_prepare_op(*bs, op, optag, obj, bilog_flags);
int r = store->cls_obj_prepare_op(*bs, op, optag, obj, bilog_flags);
if (r < 0) {
return r;
}
prepared = true;
return 0;
}

int RGWRados::Bucket::UpdateIndex::complete(int64_t poolid, uint64_t epoch,
Expand Down Expand Up @@ -12999,9 +13017,9 @@ int RGWRados::delete_obj_aio(rgw_obj& obj, rgw_bucket& bucket,

if (keep_index_consistent) {
RGWRados::Bucket bop(this, bucket_info);
RGWRados::Bucket::UpdateIndex index_op(&bop, obj, astate);
RGWRados::Bucket::UpdateIndex index_op(&bop, obj);

ret = index_op.prepare(CLS_RGW_OP_DEL);
ret = index_op.prepare(CLS_RGW_OP_DEL, &astate->write_tag);
if (ret < 0) {
lderr(cct) << "ERROR: failed to prepare index op with ret=" << ret << dendl;
return ret;
Expand Down
23 changes: 14 additions & 9 deletions src/rgw/rgw_rados.h
Original file line number Diff line number Diff line change
Expand Up @@ -2461,7 +2461,8 @@ class RGWRados

int _do_write_meta(uint64_t size, uint64_t accounted_size,
map<std::string, bufferlist>& attrs,
bool assume_noent);
bool assume_noent,
void *index_op);
int write_meta(uint64_t size, uint64_t accounted_size,
map<std::string, bufferlist>& attrs);
int write_data(const char *data, uint64_t ofs, uint64_t len, bool exclusive);
Expand Down Expand Up @@ -2553,17 +2554,17 @@ class RGWRados
RGWRados::Bucket *target;
string optag;
rgw_obj obj;
RGWObjState *obj_state;
uint16_t bilog_flags;
uint16_t bilog_flags{0};
BucketShard bs;
bool bs_initialized;
bool bs_initialized{false};
bool blind;
bool prepared{false};
public:

UpdateIndex(RGWRados::Bucket *_target, rgw_obj& _obj, RGWObjState *_state) : target(_target), obj(_obj), obj_state(_state), bilog_flags(0),
bs(target->get_store()), bs_initialized(false) {
blind = (target->get_bucket_info().index_type == RGWBIType_Indexless);
}
UpdateIndex(RGWRados::Bucket *_target, rgw_obj& _obj) : target(_target), obj(_obj),
bs(target->get_store()) {
blind = (target->get_bucket_info().index_type == RGWBIType_Indexless);
}

int get_bucket_shard(BucketShard **pbs) {
if (!bs_initialized) {
Expand All @@ -2581,7 +2582,7 @@ class RGWRados
bilog_flags = flags;
}

int prepare(RGWModifyOp);
int prepare(RGWModifyOp, const string *write_tag);
int complete(int64_t poolid, uint64_t epoch, uint64_t size,
uint64_t accounted_size, ceph::real_time& ut,
const string& etag, const string& content_type,
Expand All @@ -2591,6 +2592,10 @@ class RGWRados
ceph::real_time& removed_mtime, /* mtime of removed object */
list<rgw_obj_key> *remove_objs);
int cancel();

const string *get_optag() { return &optag; }

bool is_prepared() { return prepared; }
};

struct List {
Expand Down

0 comments on commit 7f4818f

Please sign in to comment.