Skip to content

Commit

Permalink
repair: Reduce unnecessary streaming traffic
Browse files Browse the repository at this point in the history
If the remote peers have the same checksum, we can only fetch from
one of the peer node instead of all of them since they all have the same
data anyway. No need to fetch from all of them.

In addition to above optimization, if the local peer has no data, we can
skip sending the data back to the remote peer. Due to the fact that all
the remote peers have the same checksum and local peer has no data, so
each and every remote peer has all the data. There is no need to merge
the remote data with local data and send back the merged data back to
remote peers.

Refs: #1617
  • Loading branch information
asias committed Sep 25, 2016
1 parent 99e77e8 commit 7c873f0
Showing 1 changed file with 37 additions and 8 deletions.
45 changes: 37 additions & 8 deletions repair/repair.cc
Expand Up @@ -420,11 +420,14 @@ future<partition_checksum> checksum_range(seastar::sharded<database> &db,
static void sync_range(seastar::sharded<database>& db,
const sstring& keyspace, const sstring& cf,
const ::range<dht::token>& range,
std::vector<gms::inet_address>& neighbors,
const std::vector<gms::inet_address>& neighbors_in,
const std::vector<gms::inet_address>& neighbors_out,
streaming::stream_plan& sp_in,
streaming::stream_plan& sp_out) {
for (const auto& peer : neighbors) {
for (const auto& peer : neighbors_in) {
sp_in.request_ranges(peer, keyspace, {range}, {cf});
}
for (const auto& peer : neighbors_out) {
sp_out.transfer_ranges(peer, keyspace, {range}, {cf});
}
}
Expand Down Expand Up @@ -541,6 +544,7 @@ static future<> repair_cf_range(seastar::sharded<database>& db,
// we set success=false so repair will fail, but we can
// still do our best to repair available replicas.
std::vector<gms::inet_address> live_neighbors;
std::vector<partition_checksum> live_neighbors_checksum;
for (unsigned i = 0; i < checksums.size(); i++) {
if (checksums[i].failed()) {
logger.warn(
Expand All @@ -555,18 +559,43 @@ static future<> repair_cf_range(seastar::sharded<database>& db,
// (and discard) all the exceptions.
} else if (i > 0) {
live_neighbors.push_back(neighbors[i - 1]);
live_neighbors_checksum.push_back(checksums[i].get0());
}
}
if (!checksums[0].available() || live_neighbors.empty()) {
if (!checksums[0].available() || live_neighbors.empty() || live_neighbors_checksum.empty()) {
return make_ready_future<>();
}
// If one of the available checksums is different, repair
// all the neighbors which returned a checksum.
auto checksum0 = checksums[0].get();
for (unsigned i = 1; i < checksums.size(); i++) {
if (checksums[i].available() && checksum0 != checksums[i].get()) {
logger.info("Found differing range {} on nodes {}", range, live_neighbors);
sync_range(db, keyspace, cf, range, live_neighbors, sp_in, sp_out);
auto checksum0 = checksums[0].get0();
auto all_live_neighbors_have_same_checksum = std::all_of(live_neighbors_checksum.begin() + 1,
live_neighbors_checksum.end(), [&live_neighbors_checksum] (const auto& checksum) {
return checksum == live_neighbors_checksum.front();
});
std::vector<gms::inet_address> live_neighbors_in(live_neighbors);
std::vector<gms::inet_address> live_neighbors_out(live_neighbors);
if (all_live_neighbors_have_same_checksum) {
// Since all the live neighbors have the same checksum,
// we can fetch data from one of the them instead all of them.
// TODO: Choose a best node from live_neighbors, not the first one
logger.debug("Reduce live_neighbors_in {} to one node, range = {}", live_neighbors_in, range);
live_neighbors_in.resize(1);
// - If local node has zero data and all peer nodes have
// the same data we can skip sending data to peer node.
// - If local node has data and all peer nodes have the
// same data, we need to fetch data from one of the
// peer node and merge the data with local data and
// send back to *all* the peer node.
if (checksum0 == partition_checksum()) {
logger.debug("Reduce live_neighbors_out {} to zero node, range = {}", live_neighbors_out, range);
live_neighbors_out.clear();
}
}
for (const auto& checksum : live_neighbors_checksum) {
if (checksum0 != checksum) {
logger.info("Found differing range {} on nodes {}, in = {}, out = {}", range,
live_neighbors, live_neighbors_in, live_neighbors_out);
sync_range(db, keyspace, cf, range, live_neighbors_in, live_neighbors_out, sp_in, sp_out);
return make_ready_future<>();
}
}
Expand Down

0 comments on commit 7c873f0

Please sign in to comment.