Skip to content

Commit

Permalink
rgw: keep track of written_objs correctly
Browse files Browse the repository at this point in the history
Fixes: http://tracker.ceph.com/issues/15886

Only add a rados object to the written_objs list if the write
was successful. Otherwise if the write will be canceled for some
reason, we'd remove an object that we didn't write to. This was
a problem in a case where there's multiple writes that went to
the same part. The second writer should fail the write, since
we do an exclusive write. However, we added the object's name
to the written_objs list anyway, which was a real problem when
the old processor was disposed (as it was clearing the objects).

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
  • Loading branch information
yehudasa committed May 18, 2016
1 parent e219e85 commit 8c46999
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 27 deletions.
5 changes: 3 additions & 2 deletions src/rgw/rgw_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1684,12 +1684,13 @@ static int put_data_and_throttle(RGWPutObjProcessor *processor, bufferlist& data

do {
void *handle;
rgw_obj obj;

int ret = processor->handle_data(data, ofs, hash, &handle, &again);
int ret = processor->handle_data(data, ofs, hash, &handle, &obj, &again);
if (ret < 0)
return ret;

ret = processor->throttle_data(handle, need_to_wait);
ret = processor->throttle_data(handle, obj, need_to_wait);
if (ret < 0)
return ret;

Expand Down
41 changes: 25 additions & 16 deletions src/rgw/rgw_rados.cc
Original file line number Diff line number Diff line change
Expand Up @@ -914,7 +914,7 @@ RGWPutObjProcessor_Aio::~RGWPutObjProcessor_Aio()
if (is_complete)
return;

list<rgw_obj>::iterator iter;
set<rgw_obj>::iterator iter;
bool is_multipart_obj = false;
rgw_obj multipart_obj;

Expand All @@ -926,7 +926,7 @@ RGWPutObjProcessor_Aio::~RGWPutObjProcessor_Aio()
* details is describled on #11749
*/
for (iter = written_objs.begin(); iter != written_objs.end(); ++iter) {
rgw_obj &obj = *iter;
const rgw_obj &obj = *iter;
if (RGW_OBJ_NS_MULTIPART == obj.ns) {
ldout(store->ctx(), 5) << "NOTE: we should not process the multipart object (" << obj << ") here" << dendl;
multipart_obj = *iter;
Expand Down Expand Up @@ -955,7 +955,6 @@ int RGWPutObjProcessor_Aio::handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t
obj_len = abs_ofs + bl.length();

if (!(obj == last_written_obj)) {
add_written_obj(obj);
last_written_obj = obj;
}

Expand All @@ -965,7 +964,6 @@ int RGWPutObjProcessor_Aio::handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t
bl,
((ofs != 0) ? ofs : -1),
exclusive, phandle);

return r;
}

Expand All @@ -984,6 +982,11 @@ int RGWPutObjProcessor_Aio::wait_pending_front()
}
struct put_obj_aio_info info = pop_pending();
int ret = store->aio_wait(info.handle);

if (ret >= 0) {
add_written_obj(info.obj);
}

return ret;
}

Expand All @@ -1007,11 +1010,12 @@ int RGWPutObjProcessor_Aio::drain_pending()
return ret;
}

int RGWPutObjProcessor_Aio::throttle_data(void *handle, bool need_to_wait)
int RGWPutObjProcessor_Aio::throttle_data(void *handle, const rgw_obj& obj, bool need_to_wait)
{
if (handle) {
struct put_obj_aio_info info;
info.handle = handle;
info.obj = obj;
pending.push_back(info);
}
size_t orig_size = pending.size();
Expand Down Expand Up @@ -1042,7 +1046,7 @@ int RGWPutObjProcessor_Aio::throttle_data(void *handle, bool need_to_wait)
return 0;
}

int RGWPutObjProcessor_Atomic::write_data(bufferlist& bl, off_t ofs, void **phandle, bool exclusive)
int RGWPutObjProcessor_Atomic::write_data(bufferlist& bl, off_t ofs, void **phandle, rgw_obj *pobj, bool exclusive)
{
if (ofs >= next_part_ofs) {
int r = prepare_next_part(ofs);
Expand All @@ -1051,10 +1055,12 @@ int RGWPutObjProcessor_Atomic::write_data(bufferlist& bl, off_t ofs, void **phan
}
}

*pobj = cur_obj;

return RGWPutObjProcessor_Aio::handle_obj_data(cur_obj, bl, ofs - cur_part_ofs, ofs, phandle, exclusive);
}

int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, MD5 *hash, void **phandle, bool *again)
int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, MD5 *hash, void **phandle, rgw_obj *pobj, bool *again)
{
*again = false;

Expand Down Expand Up @@ -1103,7 +1109,7 @@ int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, MD5 *hash,
bool exclusive = (!write_ofs && immutable_head()); /* immutable head object, need to verify nothing exists there
we could be racing with another upload, to the same
object and cleanup can be messy */
int ret = write_data(bl, write_ofs, phandle, exclusive);
int ret = write_data(bl, write_ofs, phandle, pobj, exclusive);
if (ret >= 0) { /* we might return, need to clear bl as it was already sent */
if (hash) {
hash->Update((const byte *)bl.c_str(), bl.length());
Expand Down Expand Up @@ -1185,12 +1191,13 @@ int RGWPutObjProcessor_Atomic::complete_writing_data()
}
if (pending_data_bl.length()) {
void *handle;
int r = write_data(pending_data_bl, data_ofs, &handle, false);
rgw_obj obj;
int r = write_data(pending_data_bl, data_ofs, &handle, &obj, false);
if (r < 0) {
ldout(store->ctx(), 0) << "ERROR: write_data() returned " << r << dendl;
return r;
}
r = throttle_data(handle, false);
r = throttle_data(handle, obj, false);
if (r < 0) {
ldout(store->ctx(), 0) << "ERROR: throttle_data() returned " << r << dendl;
return r;
Expand Down Expand Up @@ -3658,7 +3665,8 @@ class RGWRadosPutObj : public RGWGetDataCB

do {
void *handle;
int ret = processor->handle_data(bl, ofs, NULL, &handle, &again);
rgw_obj obj;
int ret = processor->handle_data(bl, ofs, NULL, &handle, &obj, &again);
if (ret < 0)
return ret;

Expand All @@ -3669,7 +3677,7 @@ class RGWRadosPutObj : public RGWGetDataCB
ret = opstate->renew_state();
if (ret < 0) {
ldout(processor->ctx(), 0) << "ERROR: RGWRadosPutObj::handle_data(): failed to renew op state ret=" << ret << dendl;
int r = processor->throttle_data(handle, false);
int r = processor->throttle_data(handle, obj, false);
if (r < 0) {
ldout(processor->ctx(), 0) << "ERROR: RGWRadosPutObj::handle_data(): processor->throttle_data() returned " << r << dendl;
}
Expand All @@ -3680,7 +3688,7 @@ class RGWRadosPutObj : public RGWGetDataCB
need_opstate = false;
}

ret = processor->throttle_data(handle, false);
ret = processor->throttle_data(handle, obj, false);
if (ret < 0)
return ret;
} while (again);
Expand Down Expand Up @@ -4240,12 +4248,13 @@ int RGWRados::copy_obj_data(RGWObjectCtx& obj_ctx,

do {
void *handle;
rgw_obj obj;

ret = processor.handle_data(bl, ofs, NULL, &handle, &again);
ret = processor.handle_data(bl, ofs, NULL, &handle, &obj, &again);
if (ret < 0) {
return ret;
}
ret = processor.throttle_data(handle, false);
ret = processor.throttle_data(handle, obj, false);
if (ret < 0)
return ret;
} while (again);
Expand Down Expand Up @@ -4794,7 +4803,7 @@ int RGWRados::Object::Delete::delete_obj()
return 0;
}

int RGWRados::delete_obj(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info, rgw_obj& obj,
int RGWRados::delete_obj(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info, const rgw_obj& obj,
int versioning_status, uint16_t bilog_flags)
{
RGWRados::Object del_target(this, bucket_info, obj_ctx, obj);
Expand Down
19 changes: 10 additions & 9 deletions src/rgw/rgw_rados.h
Original file line number Diff line number Diff line change
Expand Up @@ -1487,7 +1487,7 @@ class RGWRados
int complete_atomic_modification();

public:
Object(RGWRados *_store, RGWBucketInfo& _bucket_info, RGWObjectCtx& _ctx, rgw_obj& _obj) : store(_store), bucket_info(_bucket_info),
Object(RGWRados *_store, RGWBucketInfo& _bucket_info, RGWObjectCtx& _ctx, const rgw_obj& _obj) : store(_store), bucket_info(_bucket_info),
ctx(_ctx), obj(_obj), bs(store),
state(NULL), versioning_disabled(false),
bs_initialized(false) {}
Expand Down Expand Up @@ -1857,7 +1857,7 @@ class RGWRados
int bucket_suspended(rgw_bucket& bucket, bool *suspended);

/** Delete an object.*/
virtual int delete_obj(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_owner, rgw_obj& src_obj,
virtual int delete_obj(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_owner, const rgw_obj& src_obj,
int versioning_status, uint16_t bilog_flags = 0);

/* Delete a system object */
Expand Down Expand Up @@ -2317,8 +2317,8 @@ class RGWPutObjProcessor
store = _store;
return 0;
}
virtual int handle_data(bufferlist& bl, off_t ofs, MD5 *hash, void **phandle, bool *again) = 0;
virtual int throttle_data(void *handle, bool need_to_wait) = 0;
virtual int handle_data(bufferlist& bl, off_t ofs, MD5 *hash, void **phandle, rgw_obj *pobj, bool *again) = 0;
virtual int throttle_data(void *handle, const rgw_obj& obj, bool need_to_wait) = 0;
virtual void complete_hash(MD5 *hash) {
assert(0);
}
Expand All @@ -2331,6 +2331,7 @@ class RGWPutObjProcessor

struct put_obj_aio_info {
void *handle;
rgw_obj obj;
};

class RGWPutObjProcessor_Aio : public RGWPutObjProcessor
Expand All @@ -2347,17 +2348,17 @@ class RGWPutObjProcessor_Aio : public RGWPutObjProcessor
protected:
uint64_t obj_len;

list<rgw_obj> written_objs;
set<rgw_obj> written_objs;

void add_written_obj(const rgw_obj& obj) {
written_objs.push_back(obj);
written_objs.insert(obj);
}

int drain_pending();
int handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t ofs, off_t abs_ofs, void **phandle, bool exclusive);

public:
int throttle_data(void *handle, bool need_to_wait);
int throttle_data(void *handle, const rgw_obj& obj, bool need_to_wait);

RGWPutObjProcessor_Aio(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info) : RGWPutObjProcessor(obj_ctx, bucket_info), max_chunks(RGW_MAX_PENDING_CHUNKS), obj_len(0) {}
virtual ~RGWPutObjProcessor_Aio();
Expand Down Expand Up @@ -2392,7 +2393,7 @@ class RGWPutObjProcessor_Atomic : public RGWPutObjProcessor_Aio
RGWObjManifest manifest;
RGWObjManifest::generator manifest_gen;

int write_data(bufferlist& bl, off_t ofs, void **phandle, bool exclusive);
int write_data(bufferlist& bl, off_t ofs, void **phandle, rgw_obj *pobj, bool exclusive);
virtual int do_complete(string& etag, time_t *mtime, time_t set_mtime,
map<string, bufferlist>& attrs,
const char *if_match = NULL, const char *if_nomatch = NULL);
Expand Down Expand Up @@ -2425,7 +2426,7 @@ class RGWPutObjProcessor_Atomic : public RGWPutObjProcessor_Aio
void set_extra_data_len(uint64_t len) {
extra_data_len = len;
}
virtual int handle_data(bufferlist& bl, off_t ofs, MD5 *hash, void **phandle, bool *again);
virtual int handle_data(bufferlist& bl, off_t ofs, MD5 *hash, void **phandle, rgw_obj *pobj, bool *again);
virtual void complete_hash(MD5 *hash);
bufferlist& get_extra_data() { return extra_data_bl; }

Expand Down

0 comments on commit 8c46999

Please sign in to comment.