Skip to content

Commit

Permalink
Merge pull request ceph#12978 from asheplyakov/jewel-18581
Browse files Browse the repository at this point in the history
jewel: ReplicatedBackend: take read locks for clone sources during recovery

Reviewed-by: Josh Durgin <jdurgin@redhat.com>
  • Loading branch information
smithfarm committed Feb 1, 2017
2 parents c05730c + 509de4d commit 8e69580
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 68 deletions.
6 changes: 6 additions & 0 deletions src/osd/PGBackend.h
Expand Up @@ -173,6 +173,12 @@ struct shard_info_wrapper;
const hobject_t &hoid,
map<string, bufferlist> &attrs) = 0;

virtual bool try_lock_for_read(
const hobject_t &hoid,
ObcLockManager &manager) = 0;

virtual void release_locks(ObcLockManager &manager) = 0;

virtual void op_applied(
const eversion_t &applied_version) = 0;

Expand Down
142 changes: 94 additions & 48 deletions src/osd/ReplicatedBackend.cc
Expand Up @@ -118,9 +118,8 @@ void ReplicatedBackend::check_recovery_sources(const OSDMapRef osdmap)
for (set<hobject_t, hobject_t::BitwiseComparator>::iterator j = i->second.begin();
j != i->second.end();
++j) {
assert(pulling.count(*j) == 1);
get_parent()->cancel_pull(*j);
pulling.erase(*j);
clear_pull(pulling.find(*j), false);
}
pull_from_peer.erase(i++);
} else {
Expand Down Expand Up @@ -226,7 +225,16 @@ bool ReplicatedBackend::handle_message(
void ReplicatedBackend::clear_recovery_state()
{
// clear pushing/pulling maps
for (auto &&i: pushing) {
for (auto &&j: i.second) {
get_parent()->release_locks(j.second.lock_manager);
}
}
pushing.clear();

for (auto &&i: pulling) {
get_parent()->release_locks(i.second.lock_manager);
}
pulling.clear();
pull_from_peer.clear();
}
Expand Down Expand Up @@ -866,25 +874,18 @@ void ReplicatedBackend::_do_push(OpRequestRef op)

struct C_ReplicatedBackend_OnPullComplete : GenContext<ThreadPool::TPHandle&> {
ReplicatedBackend *bc;
list<hobject_t> to_continue;
list<ReplicatedBackend::pull_complete_info> to_continue;
int priority;
C_ReplicatedBackend_OnPullComplete(ReplicatedBackend *bc, int priority)
: bc(bc), priority(priority) {}

void finish(ThreadPool::TPHandle &handle) {
ReplicatedBackend::RPGHandle *h = bc->_open_recovery_op();
for (list<hobject_t>::iterator i =
to_continue.begin();
i != to_continue.end();
++i) {
map<hobject_t, ReplicatedBackend::PullInfo, hobject_t::BitwiseComparator>::iterator j =
bc->pulling.find(*i);
assert(j != bc->pulling.end());
if (!bc->start_pushes(*i, j->second.obc, h)) {
for (auto &&i: to_continue) {
if (!bc->start_pushes(i.hoid, i.obc, h)) {
bc->get_parent()->on_global_recover(
*i, j->second.stat);
i.hoid, i.stat);
}
bc->pulling.erase(*i);
handle.reset_tp_timeout();
}
bc->run_recovery_op(h, priority);
Expand All @@ -899,7 +900,7 @@ void ReplicatedBackend::_do_pull_response(OpRequestRef op)

vector<PullOp> replies(1);
ObjectStore::Transaction t;
list<hobject_t> to_continue;
list<pull_complete_info> to_continue;
for (vector<PushOp>::iterator i = m->pushes.begin();
i != m->pushes.end();
++i) {
Expand Down Expand Up @@ -1261,7 +1262,8 @@ void ReplicatedBackend::calc_head_subsets(
const pg_missing_t& missing,
const hobject_t &last_backfill,
interval_set<uint64_t>& data_subset,
map<hobject_t, interval_set<uint64_t>, hobject_t::BitwiseComparator>& clone_subsets)
map<hobject_t, interval_set<uint64_t>, hobject_t::BitwiseComparator>& clone_subsets,
ObcLockManager &manager)
{
dout(10) << "calc_head_subsets " << head
<< " clone_overlap " << snapset.clone_overlap << dendl;
Expand Down Expand Up @@ -1291,7 +1293,8 @@ void ReplicatedBackend::calc_head_subsets(
c.snap = snapset.clones[j];
prev.intersection_of(snapset.clone_overlap[snapset.clones[j]]);
if (!missing.is_missing(c) &&
cmp(c, last_backfill, get_parent()->sort_bitwise()) < 0) {
cmp(c, last_backfill, get_parent()->sort_bitwise()) < 0 &&
get_parent()->try_lock_for_read(c, manager)) {
dout(10) << "calc_head_subsets " << head << " has prev " << c
<< " overlap " << prev << dendl;
clone_subsets[c] = prev;
Expand All @@ -1305,6 +1308,7 @@ void ReplicatedBackend::calc_head_subsets(

if (cloning.num_intervals() > cct->_conf->osd_recover_clone_overlap_limit) {
dout(10) << "skipping clone, too many holes" << dendl;
get_parent()->release_locks(manager);
clone_subsets.clear();
cloning.clear();
}
Expand All @@ -1322,7 +1326,8 @@ void ReplicatedBackend::calc_clone_subsets(
const pg_missing_t& missing,
const hobject_t &last_backfill,
interval_set<uint64_t>& data_subset,
map<hobject_t, interval_set<uint64_t>, hobject_t::BitwiseComparator>& clone_subsets)
map<hobject_t, interval_set<uint64_t>, hobject_t::BitwiseComparator>& clone_subsets,
ObcLockManager &manager)
{
dout(10) << "calc_clone_subsets " << soid
<< " clone_overlap " << snapset.clone_overlap << dendl;
Expand Down Expand Up @@ -1356,7 +1361,8 @@ void ReplicatedBackend::calc_clone_subsets(
c.snap = snapset.clones[j];
prev.intersection_of(snapset.clone_overlap[snapset.clones[j]]);
if (!missing.is_missing(c) &&
cmp(c, last_backfill, get_parent()->sort_bitwise()) < 0) {
cmp(c, last_backfill, get_parent()->sort_bitwise()) < 0 &&
get_parent()->try_lock_for_read(c, manager)) {
dout(10) << "calc_clone_subsets " << soid << " has prev " << c
<< " overlap " << prev << dendl;
clone_subsets[c] = prev;
Expand All @@ -1376,7 +1382,8 @@ void ReplicatedBackend::calc_clone_subsets(
c.snap = snapset.clones[j];
next.intersection_of(snapset.clone_overlap[snapset.clones[j-1]]);
if (!missing.is_missing(c) &&
cmp(c, last_backfill, get_parent()->sort_bitwise()) < 0) {
cmp(c, last_backfill, get_parent()->sort_bitwise()) < 0 &&
get_parent()->try_lock_for_read(c, manager)) {
dout(10) << "calc_clone_subsets " << soid << " has next " << c
<< " overlap " << next << dendl;
clone_subsets[c] = next;
Expand All @@ -1389,6 +1396,7 @@ void ReplicatedBackend::calc_clone_subsets(

if (cloning.num_intervals() > cct->_conf->osd_recover_clone_overlap_limit) {
dout(10) << "skipping clone, too many holes" << dendl;
get_parent()->release_locks(manager);
clone_subsets.clear();
cloning.clear();
}
Expand Down Expand Up @@ -1450,6 +1458,7 @@ void ReplicatedBackend::prepare_pull(
}

ObjectRecoveryInfo recovery_info;
ObcLockManager lock_manager;

if (soid.is_snap()) {
assert(!get_parent()->get_local_missing().is_missing(
Expand All @@ -1461,10 +1470,12 @@ void ReplicatedBackend::prepare_pull(
SnapSetContext *ssc = headctx->ssc;
assert(ssc);
dout(10) << " snapset " << ssc->snapset << dendl;
calc_clone_subsets(ssc->snapset, soid, get_parent()->get_local_missing(),
get_info().last_backfill,
recovery_info.copy_subset,
recovery_info.clone_subset);
calc_clone_subsets(
ssc->snapset, soid, get_parent()->get_local_missing(),
get_info().last_backfill,
recovery_info.copy_subset,
recovery_info.clone_subset,
lock_manager);
// FIXME: this may overestimate if we are pulling multiple clones in parallel...
dout(10) << " pulling " << recovery_info << dendl;

Expand Down Expand Up @@ -1492,10 +1503,13 @@ void ReplicatedBackend::prepare_pull(
assert(!pulling.count(soid));
pull_from_peer[fromshard].insert(soid);
PullInfo &pi = pulling[soid];
pi.from = fromshard;
pi.soid = soid;
pi.head_ctx = headctx;
pi.recovery_info = op.recovery_info;
pi.recovery_progress = op.recovery_progress;
pi.cache_dont_need = h->cache_dont_need;
pi.lock_manager = std::move(lock_manager);
}

/*
Expand All @@ -1515,6 +1529,7 @@ void ReplicatedBackend::prep_push_to_replica(
map<hobject_t, interval_set<uint64_t>, hobject_t::BitwiseComparator> clone_subsets;
interval_set<uint64_t> data_subset;

ObcLockManager lock_manager;
// are we doing a clone on the replica?
if (soid.snap && soid.snap < CEPH_NOSNAP) {
hobject_t head = soid;
Expand Down Expand Up @@ -1543,10 +1558,12 @@ void ReplicatedBackend::prep_push_to_replica(
map<pg_shard_t, pg_info_t>::const_iterator pi =
get_parent()->get_shard_info().find(peer);
assert(pi != get_parent()->get_shard_info().end());
calc_clone_subsets(ssc->snapset, soid,
pm->second,
pi->second.last_backfill,
data_subset, clone_subsets);
calc_clone_subsets(
ssc->snapset, soid,
pm->second,
pi->second.last_backfill,
data_subset, clone_subsets,
lock_manager);
} else if (soid.snap == CEPH_NOSNAP) {
// pushing head or unversioned object.
// base this on partially on replica's clones?
Expand All @@ -1557,10 +1574,20 @@ void ReplicatedBackend::prep_push_to_replica(
obc,
ssc->snapset, soid, get_parent()->get_shard_missing().find(peer)->second,
get_parent()->get_shard_info().find(peer)->second.last_backfill,
data_subset, clone_subsets);
data_subset, clone_subsets,
lock_manager);
}

prep_push(obc, soid, peer, oi.version, data_subset, clone_subsets, pop, cache_dont_need);
prep_push(
obc,
soid,
peer,
oi.version,
data_subset,
clone_subsets,
pop,
cache_dont_need,
std::move(lock_manager));
}

void ReplicatedBackend::prep_push(ObjectContextRef obc,
Expand All @@ -1574,7 +1601,7 @@ void ReplicatedBackend::prep_push(ObjectContextRef obc,

prep_push(obc, soid, peer,
obc->obs.oi.version, data_subset, clone_subsets,
pop, cache_dont_need);
pop, cache_dont_need, ObcLockManager());
}

void ReplicatedBackend::prep_push(
Expand All @@ -1584,7 +1611,8 @@ void ReplicatedBackend::prep_push(
interval_set<uint64_t> &data_subset,
map<hobject_t, interval_set<uint64_t>, hobject_t::BitwiseComparator>& clone_subsets,
PushOp *pop,
bool cache_dont_need)
bool cache_dont_need,
ObcLockManager &&lock_manager)
{
get_parent()->begin_peer_recover(peer, soid);
// take note.
Expand All @@ -1600,6 +1628,7 @@ void ReplicatedBackend::prep_push(
pi.recovery_progress.data_recovered_to = 0;
pi.recovery_progress.data_complete = 0;
pi.recovery_progress.omap_complete = 0;
pi.lock_manager = std::move(lock_manager);

ObjectRecoveryProgress new_progress;
int r = build_push_op(pi.recovery_info,
Expand Down Expand Up @@ -1730,25 +1759,28 @@ void ReplicatedBackend::submit_push_complete(ObjectRecoveryInfo &recovery_info,

ObjectRecoveryInfo ReplicatedBackend::recalc_subsets(
const ObjectRecoveryInfo& recovery_info,
SnapSetContext *ssc)
SnapSetContext *ssc,
ObcLockManager &manager)
{
if (!recovery_info.soid.snap || recovery_info.soid.snap >= CEPH_NOSNAP)
return recovery_info;
ObjectRecoveryInfo new_info = recovery_info;
new_info.copy_subset.clear();
new_info.clone_subset.clear();
assert(ssc);
calc_clone_subsets(ssc->snapset, new_info.soid, get_parent()->get_local_missing(),
get_info().last_backfill,
new_info.copy_subset, new_info.clone_subset);
get_parent()->release_locks(manager); // might already have locks
calc_clone_subsets(
ssc->snapset, new_info.soid, get_parent()->get_local_missing(),
get_info().last_backfill,
new_info.copy_subset, new_info.clone_subset,
manager);
return new_info;
}

bool ReplicatedBackend::handle_pull_response(
pg_shard_t from, PushOp &pop, PullOp *response,
list<hobject_t> *to_continue,
ObjectStore::Transaction *t
)
list<pull_complete_info> *to_continue,
ObjectStore::Transaction *t)
{
interval_set<uint64_t> data_included = pop.data_included;
bufferlist data;
Expand Down Expand Up @@ -1793,7 +1825,10 @@ bool ReplicatedBackend::handle_pull_response(
}
pi.obc = get_parent()->get_obc(pi.recovery_info.soid, pop.attrset);
pi.recovery_info.oi = pi.obc->obs.oi;
pi.recovery_info = recalc_subsets(pi.recovery_info, pi.obc->ssc);
pi.recovery_info = recalc_subsets(
pi.recovery_info,
pi.obc->ssc,
pi.lock_manager);
}


Expand Down Expand Up @@ -1829,12 +1864,10 @@ bool ReplicatedBackend::handle_pull_response(

if (complete) {
pi.stat.num_objects_recovered++;
to_continue->push_back(hoid);
to_continue->push_back({hoid, pi.obc, pi.stat});
get_parent()->on_local_recover(
hoid, pi.recovery_info, pi.obc, t);
pull_from_peer[from].erase(hoid);
if (pull_from_peer[from].empty())
pull_from_peer.erase(from);
clear_pull(pulling.find(hoid));
return false;
} else {
response->soid = pop.soid;
Expand Down Expand Up @@ -2172,6 +2205,7 @@ bool ReplicatedBackend::handle_push_reply(pg_shard_t peer, PushReplyOp &op, Push
stat.num_keys_recovered = reply->omap_entries.size();
stat.num_objects_recovered = 1;

get_parent()->release_locks(pi->lock_manager);
pushing[soid].erase(peer);
pi = NULL;

Expand Down Expand Up @@ -2318,7 +2352,7 @@ void ReplicatedBackend::sub_op_push(OpRequestRef op)
if (is_primary()) {
PullOp resp;
RPGHandle *h = _open_recovery_op();
list<hobject_t> to_continue;
list<pull_complete_info> to_continue;
bool more = handle_pull_response(
m->from, pop, &resp,
&to_continue, &t);
Expand Down Expand Up @@ -2358,10 +2392,22 @@ void ReplicatedBackend::sub_op_push(OpRequestRef op)
void ReplicatedBackend::_failed_push(pg_shard_t from, const hobject_t &soid)
{
get_parent()->failed_push(from, soid);
pull_from_peer[from].erase(soid);
if (pull_from_peer[from].empty())
pull_from_peer.erase(from);
pulling.erase(soid);

clear_pull(pulling.find(soid));
}

void ReplicatedBackend::clear_pull(
map<hobject_t, PullInfo, hobject_t::BitwiseComparator>::iterator piter,
bool clear_pull_from_peer)
{
auto from = piter->second.from;
if (clear_pull_from_peer) {
pull_from_peer[from].erase(piter->second.soid);
if (pull_from_peer[from].empty())
pull_from_peer.erase(from);
}
get_parent()->release_locks(piter->second.lock_manager);
pulling.erase(piter);
}

int ReplicatedBackend::start_pushes(
Expand Down

0 comments on commit 8e69580

Please sign in to comment.