Skip to content

Commit

Permalink
repair: Adjust parallelism according to memory size (#4696)
Browse files Browse the repository at this point in the history
After commit 8a0c4d5 (Merge "Repair switch to rpc stream" from Asias),
we increased the row buffer size for repair from 512KiB to 32MiB per
repair instance. We allow repairing 16 ranges (16 repair instance) in
parallel per repair request. So, a node can consume 16 * 32MiB = 512MiB
per user requested repair. In addition, the repair master node can hold
data from all the repair followers, so the memory usage on repair master
can be larger than 512MiB. We need to provide a way to limit the memory
usage.

In this patch, we limit the total memory used by repair to 10% of the
shard memory. The ranges that can be repaired in parallel is:

max_repair_ranges_in_parallel = max_repair_memory / max_repair_memory_per_range.

For example, if each shard has 4096MiB of memory, then we will have
max_repair_ranges_in_parallel = 4096MiB / 32MiB = 12.

Fixes #4675
  • Loading branch information
asias authored and avikivity committed Aug 12, 2019
1 parent e6cde72 commit 131acc0
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 10 deletions.
3 changes: 2 additions & 1 deletion main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -939,7 +939,8 @@ int main(int ac, char** av) {
});
});
}).get();
repair_service rs(gossiper);
auto max_memory_repair = db.local().get_available_memory() * 0.1;
repair_service rs(gossiper, max_memory_repair);
repair_init_messaging_service_handler(rs, sys_dist_ks, view_update_generator).get();
supervisor::notify("starting storage service", true);
auto& ss = service::get_local_storage_service();
Expand Down
17 changes: 13 additions & 4 deletions repair/repair.cc
Original file line number Diff line number Diff line change
Expand Up @@ -217,9 +217,16 @@ tracker& repair_tracker() {
}
}

tracker::tracker(size_t nr_shards)
tracker::tracker(size_t nr_shards, size_t max_repair_memory)
: _shutdown(false)
, _repairs(nr_shards) {
auto nr = std::max(size_t(1), size_t(max_repair_memory / max_repair_memory_per_range()));
rlogger.info("Setting max_repair_memory={}, max_repair_memory_per_range={}, max_repair_ranges_in_parallel={}",
max_repair_memory, max_repair_memory_per_range(), nr);
_range_parallelism_semaphores.reserve(nr_shards);
while (nr_shards--) {
_range_parallelism_semaphores.emplace_back(semaphore(nr));
}
_the_tracker = this;
}

Expand Down Expand Up @@ -314,6 +321,10 @@ void tracker::abort_all_repairs() {
rlogger.info0("Aborted {} repair job(s)", count);
}

semaphore& tracker::range_parallelism_semaphore() {
return _range_parallelism_semaphores[engine().cpu_id()];
}

void check_in_shutdown() {
repair_tracker().check_in_shutdown();
}
Expand Down Expand Up @@ -1234,13 +1245,11 @@ struct repair_options {
};


static thread_local semaphore ranges_parallelism_semaphore(16);

static future<> do_repair_ranges(lw_shared_ptr<repair_info> ri) {
if (ri->row_level_repair()) {
// repair all the ranges in limited parallelism
return parallel_for_each(ri->ranges, [ri] (auto&& range) {
return with_semaphore(ranges_parallelism_semaphore, 1, [ri, &range] {
return with_semaphore(repair_tracker().range_parallelism_semaphore(), 1, [ri, &range] {
check_in_shutdown();
ri->check_in_abort();
ri->ranges_index++;
Expand Down
9 changes: 8 additions & 1 deletion repair/repair.hh
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,13 @@ private:
// Map repair id into repair_info. The vector has smp::count elements, each
// element will be accessed by only one shard.
std::vector<std::unordered_map<int, lw_shared_ptr<repair_info>>> _repairs;
// Each element in the vector is the semaphore used to control the maximum
// ranges that can be repaired in parallel. Each element will be accessed
// by one shared.
std::vector<semaphore> _range_parallelism_semaphores;
static const size_t _max_repair_memory_per_range = 32 * 1024 * 1024;
public:
explicit tracker(size_t nr_shards);
explicit tracker(size_t nr_shards, size_t max_repair_memory);
~tracker();
void start(int id);
void done(int id, bool succeeded);
Expand All @@ -244,6 +249,8 @@ public:
std::vector<int> get_active() const;
size_t nr_running_repair_jobs();
void abort_all_repairs();
semaphore& range_parallelism_semaphore();
static size_t max_repair_memory_per_range() { return _max_repair_memory_per_range; }
};

future<uint64_t> estimate_partitions(seastar::sharded<database>& db, const sstring& keyspace,
Expand Down
6 changes: 3 additions & 3 deletions repair/row_level.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2203,7 +2203,7 @@ class row_level_repair {

size_t get_max_row_buf_size(row_level_diff_detect_algorithm algo) {
// Max buffer size per repair round
return is_rpc_stream_supported(algo) ? 32 * 1024 * 1024 : 256 * 1024;
return is_rpc_stream_supported(algo) ? tracker::max_repair_memory_per_range() : 256 * 1024;
}

// Step A: Negotiate sync boundary to use
Expand Down Expand Up @@ -2547,10 +2547,10 @@ class row_level_repair_gossip_helper : public gms::i_endpoint_state_change_subsc
}
};

repair_service::repair_service(distributed<gms::gossiper>& gossiper)
repair_service::repair_service(distributed<gms::gossiper>& gossiper, size_t max_repair_memory)
: _gossiper(gossiper)
, _gossip_helper(make_shared<row_level_repair_gossip_helper>())
, _tracker(smp::count) {
, _tracker(smp::count, max_repair_memory) {
_gossiper.local().register_(_gossip_helper);
}

Expand Down
2 changes: 1 addition & 1 deletion repair/row_level.hh
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ struct repair_service {
distributed<gms::gossiper>& _gossiper;
shared_ptr<row_level_repair_gossip_helper> _gossip_helper;
tracker _tracker;
repair_service(distributed<gms::gossiper>& gossiper);
repair_service(distributed<gms::gossiper>& gossiper, size_t max_repair_memory);
~repair_service();
};

Expand Down

0 comments on commit 131acc0

Please sign in to comment.