Skip to content

Commit

Permalink
alternator ttl: when node is down, secondary node continues to expire
Browse files Browse the repository at this point in the history
The current implementation of the Alternator expiration (TTL) feature
has each node scan for expired partitions in its own primary ranges.
This means that while a node is down, items in its primary ranges will
not get expired.

But we note that doesn't have to be this way: If only a single node is
down, and RF=3, the items that node owns are still readable with QUORUM -
so these items can still be safely read and checked for expiration - and
also deleted.

This patch implements a fairly simple solution: When a node completes
scanning its own primary ranges, also checks whether any of its *secondary*
ranges (ranges where it is the *second* replica) has its primary owner
down. For such ranges, this node will scan them as well. This secondary
scan stops if the remote node comes back up, but in that case it may
happen that both nodes will work on the same range at the same time.
The risks in that are minimal, though, and amount to wasted work and
duplicate deletion records in CDC. In the future we could avoid this by
using LWT to claim ownership on a range being scanned.

We have a new dtest (see a separate patch), alternator_ttl_tests.py::
TestAlternatorTTL::test_expiration_with_down_node, which reproduces this
and verifies this fix. The test starts a 5-node cluster, with 1000 items
with random tokens which are due to be expired immediately. The test
expects to see all items expiring ASAP, but when one of the five nodes
is brought down, this doesn't happen: Some of the items are not expired,
until this patch is used.

Fixes #9787

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <20211222131933.406148-1-nyh@scylladb.com>
  • Loading branch information
nyh authored and avikivity committed Dec 26, 2021
1 parent f7b8b80 commit e4b2dfb
Showing 1 changed file with 131 additions and 10 deletions.
141 changes: 131 additions & 10 deletions alternator/ttl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
#include <seastar/coroutine/maybe_yield.hh>
#include <boost/multiprecision/cpp_int.hpp>

#include "gms/gossiper.hh"
#include "gms/inet_address.hh"
#include "inet_address_vectors.hh"
#include "locator/abstract_replication_strategy.hh"
#include "log.hh"
#include "gc_clock.hh"
#include "database.hh"
Expand Down Expand Up @@ -304,31 +308,128 @@ static size_t random_offset(size_t min, size_t max) {
return dist(re);
}

// Get a list of secondary token ranges for the given node, and the primary
// node responsible for each of these token ranges.
// A "secondary range" is a range of tokens where for each token, the second
// node (in ring order) out of the RF replicas that hold this token is the
// given node.
// In the expiration scanner, we want to scan a secondary range but only if
// this range's primary node is down. For this we need to return not just
// a list of this node's secondary ranges - but also the primary owner of
// each of those ranges.
static std::vector<std::pair<dht::token_range, gms::inet_address>> get_secondary_ranges(
const locator::effective_replication_map_ptr& erm,
gms::inet_address ep) {
const auto& tm = *erm->get_token_metadata_ptr();
const auto& sorted_tokens = tm.sorted_tokens();
std::vector<std::pair<dht::token_range, gms::inet_address>> ret;
if (sorted_tokens.empty()) {
on_internal_error(tlogger, "Token metadata is empty");
}
auto prev_tok = sorted_tokens.back();
for (const auto& tok : sorted_tokens) {
inet_address_vector_replica_set eps = erm->get_natural_endpoints(tok);
if (eps.size() <= 1 || eps[1] != ep) {
prev_tok = tok;
continue;
}
// Add the range (prev_tok, tok] to ret. However, if the range wraps
// around, split it to two non-wrapping ranges.
if (prev_tok < tok) {
ret.emplace_back(
dht::token_range{
dht::token_range::bound(prev_tok, false),
dht::token_range::bound(tok, true)},
eps[0]);
} else {
ret.emplace_back(
dht::token_range{
dht::token_range::bound(prev_tok, false),
std::nullopt},
eps[0]);
ret.emplace_back(
dht::token_range{
std::nullopt,
dht::token_range::bound(tok, true)},
eps[0]);
}
prev_tok = tok;
}
return ret;
}


// A class for iterating over all the token ranges *owned* by this shard.
// We consider a token *owned* by this shard if:
// To avoid code duplication, it is a template with two distinct cases -
// <primary> and <secondary>:
//
// In the <primary> case, we consider a token *owned* by this shard if:
// 1. This node is a replica for this token.
// 2. Moreover, this node is the *primary* replica of the token (i.e., the
// first replica in the ring).
// 3. In this node, this shard is responsible for this token.
// We will use this definition of which shard in the cluster owns which tokens
// to split the expiration scanner's work between all the shards of the
// system.
// When some of the nodes in the system are down, nobody will be expiring
// tokens owned by them, so we will need to separately detect when this
// happens and let living nodes take over the work that a dead node cannot
// do.
//
// In the <secondary> case, we consider a token *owned* by this shard if:
// 1. This node is the *secondary* replica for this token (i.e., the second
// replica in the ring).
// 2. The primary replica for this token is currently marked down.
// 3. In this node, this shard is responsible for this token.
// We use the <secondary> case to handle the possibility that some of the
// nodes in the system are down. A dead node will not be expiring expiring
// the tokens owned by it, so we want the secondary owner to take over its
// primary ranges.
//
// FIXME: need to decide how to choose primary ranges in multi-DC setup!
// We could call get_primary_ranges_within_dc() below instead of get_primary_ranges().
// NOTICE: Iteration currently starts from a random token range in order to improve
// the chances of covering all ranges during a scan when restarts occur.
// A more deterministic way would be to regularly persist the scanning state,
// but that incurs overhead that we want to avoid if not needed.
enum primary_or_secondary_t {primary, secondary};
template<primary_or_secondary_t primary_or_secondary>
class token_ranges_owned_by_this_shard {
template<primary_or_secondary_t> class ranges_holder;
// ranges_holder<primary> holds just the primary ranges themselves
template<> class ranges_holder<primary> {
const dht::token_range_vector _token_ranges;
public:
ranges_holder(const locator::effective_replication_map_ptr& erm, gms::inet_address ep)
: _token_ranges(erm->get_primary_ranges(ep)) {}
std::size_t size() const { return _token_ranges.size(); }
const dht::token_range& operator[](std::size_t i) const {
return _token_ranges[i];
}
bool should_skip(std::size_t i) const {
return false;
}
};
// ranges_holder<secondary> holds the secondary token ranges plus each
// range's primary owner, needed to implement should_skip().
template<> class ranges_holder<secondary> {
std::vector<std::pair<dht::token_range, gms::inet_address>> _token_ranges;
gms::gossiper& _gossiper;
public:
ranges_holder(const locator::effective_replication_map_ptr& erm, gms::inet_address ep)
: _token_ranges(get_secondary_ranges(erm, ep))
, _gossiper(gms::get_local_gossiper()) {}
std::size_t size() const { return _token_ranges.size(); }
const dht::token_range& operator[](std::size_t i) const {
return _token_ranges[i].first;
}
// range i should be skipped if its primary owner is alive.
bool should_skip(std::size_t i) const {
return _gossiper.is_alive(_token_ranges[i].second);
}
};

schema_ptr _s;
// _token_ranges will contain a list of token ranges owned by this node.
// We'll further need to split each such range to the pieces owned by
// the current shard, using _intersecter.
const dht::token_range_vector _token_ranges;
const ranges_holder<primary_or_secondary> _token_ranges;
// NOTICE: _range_idx is used modulo _token_ranges size when accessing
// the data to ensure that it doesn't go out of bounds
size_t _range_idx;
Expand All @@ -337,9 +438,8 @@ class token_ranges_owned_by_this_shard {
public:
token_ranges_owned_by_this_shard(database& db, schema_ptr s)
: _s(s)
, _token_ranges(db.find_keyspace(s->ks_name()).
get_effective_replication_map()->get_primary_ranges(
utils::fb_utilities::get_broadcast_address()))
, _token_ranges(db.find_keyspace(s->ks_name()).get_effective_replication_map(),
utils::fb_utilities::get_broadcast_address())
, _range_idx(random_offset(0, _token_ranges.size() - 1))
, _end_idx(_range_idx + _token_ranges.size())
{
Expand All @@ -366,6 +466,14 @@ class token_ranges_owned_by_this_shard {
if (_range_idx == _end_idx) {
return std::nullopt;
}
// If should_skip(), the range should be skipped. This happens for
// a secondary range whose primary owning node is still alive.
while (_token_ranges.should_skip(_range_idx % _token_ranges.size())) {
++_range_idx;
if (_range_idx == _end_idx) {
return std::nullopt;
}
}
_intersecter.emplace(_s->get_sharder(), _token_ranges[_range_idx % _token_ranges.size()], this_shard_id());
}
}
Expand Down Expand Up @@ -586,7 +694,7 @@ static future<bool> scan_table(
// FIXME: consider if we should ask the scan without caching?
// can we use cache but not fill it?
scan_ranges_context scan_ctx{s, proxy, std::move(column_name), std::move(member)};
token_ranges_owned_by_this_shard my_ranges(db, s);
token_ranges_owned_by_this_shard<primary> my_ranges(db, s);
while (std::optional<dht::partition_range> range = my_ranges.next_partition_range()) {
// Note that because of issue #9167 we need to run a separate
// query on each partition range, and can't pass several of
Expand All @@ -599,6 +707,19 @@ static future<bool> scan_table(
// solution for this problem.
co_await scan_table_ranges(proxy, scan_ctx, std::move(partition_ranges), abort_source, page_sem);
}
// If each node only scans its own primary ranges, then when any node is
// down part of the token range will not get scanned. This can be viewed
// as acceptable (when the comes back online, it will resume its scan),
// but as noted in issue #9787, we can allow more prompt expiration
// by tasking another node to take over scanning of the dead node's primary
// ranges. What we do here is that this node will also check expiration
// on its *secondary* ranges - but only those whose primary owner is down.
token_ranges_owned_by_this_shard<secondary> my_secondary_ranges(db, s);
while (std::optional<dht::partition_range> range = my_secondary_ranges.next_partition_range()) {
dht::partition_range_vector partition_ranges;
partition_ranges.push_back(std::move(*range));
co_await scan_table_ranges(proxy, scan_ctx, std::move(partition_ranges), abort_source, page_sem);
}
co_return true;
}

Expand Down

0 comments on commit e4b2dfb

Please sign in to comment.