Skip to content

Commit

Permalink
Merge 'repair: optimise repair reader with different shard count' fro…
Browse files Browse the repository at this point in the history
…m Gusev Petr

Consider a cluster with no data, e.g. in tests. When a new node is bootstrapped with repair we iterate over all (shard, table, range), read data from all the peer nodes for the range, look for any discrepancies and heal them. Even for small num_tokens (16 in the tests) the number of affected ranges (those we need to consider) amounts to total number of tokens in the cluster, which is 32 for the second node and 48 for the third. Multiplying this by the number of shards and the number of tables in each keyspace gives thousands of ranges. For each of them we need to follow some row level repair protocol, which includes several RPC exchanges between the peer nodes and creating some data structures on them. These exchanges are processed sequentially for each shard, there are `parallel_for_each` in code, but they are throttled by the choosen memory constraints and in fact execute sequentially.

When the bootstrapping node (master) reaches a peer node and asks for data in the specific range and master shard, two options exist. If sharder parameters (primarily, `--smp`) are the same on the master and on the peer, we can just read one local shard, this is fast. If, on the other hand, `--smp` is different, we need to do a multishard query. The given range from the master can contain data from different peer shards, so we split this range into a number of subranges such that each of them contain data only from the given master shard (`dht::selective_token_range_sharder`). The number of these subranges can be quite big (300 in the tests). For each of these subranges we do `fast_forward_to` on the `multishard_reader`, and this incurs a lot of overhead, mainly becuse of `smp::submit_to`.

In this series we optimize this case. Instead of splitting the master range and reading only what's needed, we read all the data in the range and then apply the filter by the master shard. We do this if the estimated number of partitions is small (<=100).

This is the logs of starting a second node with `--smp 4`, first node was `--smp 3`:

```
with this patch
    20:58:49.644 INFO> [debug/topology_custom.test_topology_smp.1] starting server at host 127.222.46.3 in scylla-2...
    20:59:22.713 INFO> [debug/topology_custom.test_topology_smp.1] started server at host 127.222.46.3 in scylla-2, pid 1132859

without this patch
    21:04:06.424 INFO> [debug/topology_custom.test_topology_smp.1] starting server at host 127.181.31.3 in scylla-2...
    21:06:01.287 INFO> [debug/topology_custom.test_topology_smp.1] started server at host 127.181.31.3 in scylla-2, pid 1134140
```

Fixes: #14093

Closes #14178

* github.com:scylladb/scylladb:
  repair_test: add test_reader_with_different_strategies
  repair: extract repair_reader declaration into reader.hh
  repair_meta: get_estimated_partitions fix
  repair_meta: use multishard_filter reader if the number of partitions is small
  repair_meta: delay _repair_reader creation
  database.hh: make_multishard_streaming_reader with range parameter
  database.cc: extract streaming_reader_lifecycle_policy
  • Loading branch information
avikivity committed Jul 9, 2023
2 parents 61dc98b + b69bc97 commit 850d759
Show file tree
Hide file tree
Showing 5 changed files with 463 additions and 205 deletions.
87 changes: 87 additions & 0 deletions repair/reader.hh
@@ -0,0 +1,87 @@
#pragma once

#include "repair/decorated_key_with_hash.hh"
#include "readers/evictable.hh"
#include "dht/sharder.hh"
#include "reader_permit.hh"
#include "utils/phased_barrier.hh"
#include "mutation/mutation_fragment.hh"
#include "readers/mutation_fragment_v1_stream.hh"

class repair_reader {
public:
enum class read_strategy {
local,
multishard_split,
multishard_filter
};

friend std::ostream& operator<<(std::ostream& out, read_strategy s) {
switch (s) {
case read_strategy::local:
return out << "local";
case read_strategy::multishard_split:
return out << "multishard_split";
case read_strategy::multishard_filter:
return out << "multishard_filter";
};
return out << "unknown";
}
private:
schema_ptr _schema;
reader_permit _permit;
dht::partition_range _range;
// Used to find the range that repair master will work on
dht::selective_token_range_sharder _sharder;
// Seed for the repair row hashing
uint64_t _seed;
// Pin the table while the reader is alive.
// Only needed for local readers, the multishard reader takes care
// of pinning tables on used shards.
std::optional<utils::phased_barrier::operation> _local_read_op;
std::optional<evictable_reader_handle_v2> _reader_handle;
// Fragment stream of either local or multishard reader for the range
mutation_fragment_v1_stream _reader;
// Current partition read from disk
lw_shared_ptr<const decorated_key_with_hash> _current_dk;
uint64_t _reads_issued = 0;
uint64_t _reads_finished = 0;

flat_mutation_reader_v2 make_reader(
seastar::sharded<replica::database>& db,
replica::column_family& cf,
read_strategy strategy,
const dht::sharder& remote_sharder,
unsigned remote_shard);

public:
repair_reader(
seastar::sharded<replica::database>& db,
replica::column_family& cf,
schema_ptr s,
reader_permit permit,
dht::token_range range,
const dht::sharder& remote_sharder,
unsigned remote_shard,
uint64_t seed,
read_strategy strategy);

future<mutation_fragment_opt>
read_mutation_fragment();

future<> on_end_of_stream() noexcept;

future<> close() noexcept;

lw_shared_ptr<const decorated_key_with_hash>& get_current_dk() {
return _current_dk;
}

void set_current_dk(const dht::decorated_key& key);

void clear_current_dk();

void check_current_dk();

void pause();
};

0 comments on commit 850d759

Please sign in to comment.