Skip to content

Commit

Permalink
Merge pull request #336 from mmaslankaprv/fix-state-inconsistency-aft…
Browse files Browse the repository at this point in the history
…er-manual-deletion

Fix state inconsistency after manual deletion
  • Loading branch information
Alexander Gallego committed Dec 23, 2020
2 parents 5cf2dd6 + c5d2006 commit 9a9e644
Show file tree
Hide file tree
Showing 14 changed files with 237 additions and 37 deletions.
13 changes: 12 additions & 1 deletion src/v/raft/configuration_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,18 @@ ss::future<> configuration_manager::stop() {
return ss::now();
}

ss::future<> configuration_manager::start() {
ss::future<> configuration_manager::start(bool reset) {
if (reset) {
return _storage.kvs()
.remove(
storage::kvstore::key_space::consensus, configurations_map_key())
.then([this] {
return _storage.kvs().remove(
storage::kvstore::key_space::consensus,
highest_known_offset_key());
});
}

auto map_buf = _storage.kvs().get(
storage::kvstore::key_space::consensus, configurations_map_key());
return _lock.with([this, map_buf = std::move(map_buf)]() mutable {
Expand Down
2 changes: 1 addition & 1 deletion src/v/raft/configuration_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class configuration_manager {
configuration_manager(
group_configuration, raft::group_id, storage::api&, ctx_log&);

ss::future<> start();
ss::future<> start(bool reset);

ss::future<> stop();
/**
Expand Down
3 changes: 2 additions & 1 deletion src/v/raft/consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,8 @@ ss::future<> consensus::start() {
vlog(_ctxlog.info, "Starting");
return _op_lock.with([this] {
read_voted_for();
return _configuration_manager.start()

return _configuration_manager.start(is_initial_state())
.then([this] { return hydrate_snapshot(); })
.then([this] {
vlog(
Expand Down
14 changes: 14 additions & 0 deletions src/v/raft/consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

#include "hashing/crc32c.h"
#include "model/fundamental.h"
#include "model/metadata.h"
#include "raft/configuration_manager.h"
#include "raft/consensus_client_protocol.h"
#include "raft/event_manager.h"
Expand Down Expand Up @@ -338,6 +339,19 @@ class consensus {
voter_priority next_target_priority();
voter_priority get_node_priority(model::node_id id) const;

/**
* Return true if there is no state backing this consensus group i.e. there
* is no snapshot and log is empty
*/
bool is_initial_state() const {
static constexpr model::offset not_initialized{};
auto lstats = _log.offsets();
return _log.segment_count() == 0
&& lstats.dirty_offset == not_initialized
&& lstats.start_offset == not_initialized
&& _last_snapshot_index == not_initialized;
}

// args
model::node_id _self;
raft::group_id _group;
Expand Down
1 change: 1 addition & 0 deletions src/v/raft/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ set(srcs
offset_monitor_test.cc
mux_state_machine_test.cc
mutex_buffer_test.cc
manual_log_deletion_test.cc
configuration_manager_test.cc)

rp_test(
Expand Down
4 changes: 2 additions & 2 deletions src/v/raft/tests/configuration_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ struct config_manager_fixture {
raft::configuration_manager recovered(
raft::group_configuration({}), raft::group_id(1), _storage, _logger);

recovered.start().get0();
recovered.start(false).get0();

BOOST_REQUIRE_EQUAL(
recovered.get_highest_known_offset(),
Expand Down Expand Up @@ -210,7 +210,7 @@ FIXTURE_TEST(test_start_write_concurrency, config_manager_fixture) {
raft::configuration_manager new_cfg_manager(
raft::group_configuration({}), raft::group_id(1), _storage, _logger);

auto start = new_cfg_manager.start();
auto start = new_cfg_manager.start(false);
auto cfg = random_configuration();
auto add = new_cfg_manager.add(model::offset(3000), cfg);
configurations.push_back(cfg);
Expand Down
147 changes: 147 additions & 0 deletions src/v/raft/tests/manual_log_deletion_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
// Copyright 2020 Vectorized, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

#include "finjector/hbadger.h"
#include "model/fundamental.h"
#include "model/metadata.h"
#include "model/record.h"
#include "model/timestamp.h"
#include "raft/consensus_utils.h"
#include "raft/tests/raft_group_fixture.h"
#include "raft/types.h"
#include "random/generators.h"
#include "storage/record_batch_builder.h"
#include "storage/tests/utils/disk_log_builder.h"
#include "storage/tests/utils/random_batch.h"
#include "test_utils/async.h"

#include <seastar/core/abort_source.hh>

#include <filesystem>
#include <system_error>
#include <vector>

struct manual_deletion_fixture : public raft_test_fixture {
manual_deletion_fixture()
: gr(
raft::group_id(0),
3,
storage::log_config::storage_type::disk,
model::cleanup_policy_bitflags::deletion,
1_KiB) {
gr.enable_all();
}

void prepare_raft_group() {
auto leader_id = wait_for_group_leader(gr);
ss::abort_source as;

auto first_ts = model::timestamp::now();
// append some entries
bool res = replicate_compactible_batches(gr, first_ts).get0();
auto second_ts = model::timestamp(first_ts() + 200000);
// append some more entries
res = replicate_compactible_batches(gr, second_ts).get0();
retention_timestamp = first_ts;
validate_logs_replication(gr);
}

void apply_retention_policy() {
wait_for(
2s,
[this] {
for (auto& [_, n] : gr.get_members()) {
n.log
->compact(storage::compaction_config(
retention_timestamp,
100_MiB,
ss::default_priority_class(),
as,
storage::debug_sanitize_files::yes))
.get0();
if (n.log->offsets().start_offset <= model::offset(0)) {
return false;
}
}
return true;
},
"logs has prefix truncated");
}

void remove_data(std::vector<model::node_id> nodes) {
std::vector<std::filesystem::path> to_delete;
to_delete.reserve(nodes.size());

// disable and remove data
for (auto id : nodes) {
to_delete.push_back(std::filesystem::path(
gr.get_member(id).log->config().topic_directory()));
gr.disable_node(id);
}
for (auto& path : to_delete) {
std::filesystem::remove_all(path);
}
// enable back
for (auto id : nodes) {
gr.enable_node(id);
}
}

void remove_all_data() {
std::vector<model::node_id> nodes;
for (auto& [id, _] : gr.get_members()) {
nodes.push_back(id);
}
remove_data(nodes);
}

raft_group gr;
model::timestamp retention_timestamp;
ss::abort_source as;
};

FIXTURE_TEST(
test_collected_log_recovery_admin_deletion_all, manual_deletion_fixture) {
prepare_raft_group();
// compact logs
apply_retention_policy();

// simulate admin deleting log folders. For more details look here:
//
// https://github.com/vectorizedio/redpanda/issues/321

remove_all_data();

validate_logs_replication(gr);

wait_for(
10s,
[this] { return are_all_commit_indexes_the_same(gr); },
"After recovery state is consistent");
};

FIXTURE_TEST(
test_collected_log_recovery_admin_deletion_one, manual_deletion_fixture) {
prepare_raft_group();
// compact logs
apply_retention_policy();

// simulate admin deleting log folders. For more details look here:
//
// https://github.com/vectorizedio/redpanda/issues/321

remove_data({model::node_id(1)});

validate_logs_replication(gr);

wait_for(
10s,
[this] { return are_all_commit_indexes_the_same(gr); },
"After recovery state is consistent");
};
2 changes: 1 addition & 1 deletion src/v/raft/tests/raft_group_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,7 @@ static ss::future<bool> replicate_compactible_batches(
model::timestamp ts,
model::timeout_clock::duration tout = 1s) {
return retry_with_leader(gr, 5, tout, [ts](raft_node& leader_node) {
auto rdr = make_compactible_batches(5, 30, ts);
auto rdr = make_compactible_batches(5, 80, ts);
raft::replicate_options opts(raft::consistency_level::quorum_ack);

return leader_node.consensus->replicate(std::move(rdr), opts)
Expand Down
17 changes: 6 additions & 11 deletions src/v/storage/disk_log_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ ss::future<> disk_log_impl::remove() {
})
.then([this] {
return _kvstore.remove(
kvstore::key_space::storage, start_offset_key());
kvstore::key_space::storage,
internal::start_offset_key(config().ntp()));
});
}
ss::future<> disk_log_impl::close() {
Expand Down Expand Up @@ -211,7 +212,7 @@ ss::future<> disk_log_impl::garbage_collect_segments(
return _kvstore
.put(
kvstore::key_space::storage,
start_offset_key(),
internal::start_offset_key(config().ntp()),
reflection::to_iobuf(start_offset))
.then([this, ptr, ctx] {
if (!is_front_segment(ptr)) {
Expand Down Expand Up @@ -630,7 +631,7 @@ ss::future<> disk_log_impl::do_truncate_prefix(truncate_prefix_config cfg) {
return _kvstore
.put(
kvstore::key_space::storage,
start_offset_key(),
internal::start_offset_key(config().ntp()),
reflection::to_iobuf(cfg.start_offset))
.then([this, cfg] {
/*
Expand Down Expand Up @@ -772,15 +773,9 @@ ss::future<> disk_log_impl::do_truncate(truncate_config cfg) {
});
}

bytes disk_log_impl::start_offset_key() const {
iobuf buf;
auto ntp = config().ntp();
reflection::serialize(buf, kvstore_key_type::start_offset, std::move(ntp));
return iobuf_to_bytes(buf);
}

model::offset disk_log_impl::read_start_offset() const {
auto value = _kvstore.get(kvstore::key_space::storage, start_offset_key());
auto value = _kvstore.get(
kvstore::key_space::storage, internal::start_offset_key(config().ntp()));
if (value) {
auto offset = reflection::adl<model::offset>{}.from(std::move(*value));
return offset;
Expand Down
6 changes: 0 additions & 6 deletions src/v/storage/disk_log_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,9 @@ class disk_log_impl final : public log::impl {
friend class disk_log_builder; // for tests
friend std::ostream& operator<<(std::ostream& o, const disk_log_impl& d);

// key types used to store data in key-value store
enum class kvstore_key_type : int8_t {
start_offset = 0,
};

ss::future<model::record_batch_reader>
make_unchecked_reader(log_reader_config);

bytes start_offset_key() const;
model::offset read_start_offset() const;

ss::future<> do_compact(compaction_config);
Expand Down
48 changes: 34 additions & 14 deletions src/v/storage/log_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -171,33 +171,53 @@ ss::future<log> log_manager::manage(ntp_config cfg) {
});
}

ss::future<> log_manager::recover_log_state(const ntp_config& cfg) {
return ss::file_exists(cfg.work_directory())
.then(
[this, key = internal::start_offset_key(cfg.ntp())](bool dir_exists) {
if (dir_exists) {
return ss::now();
}
// directory was deleted, make sure we do not have any state in KV
// store.
return _kvstore.remove(kvstore::key_space::storage, key);
});
}

ss::future<log> log_manager::do_manage(ntp_config cfg) {
if (_config.base_dir.empty()) {
return ss::make_exception_future<log>(std::runtime_error(
"log_manager:: cannot have empty config.base_dir"));
}
ss::sstring path = cfg.work_directory();

vassert(
_logs.find(cfg.ntp()) == _logs.end(), "cannot double register same ntp");

if (_config.stype == log_config::storage_type::memory) {
auto path = cfg.work_directory();
auto l = storage::make_memory_backed_log(std::move(cfg));
_logs.emplace(l.config().ntp(), l);
// in-memory needs to write vote_for configuration
return ss::recursive_touch_directory(path).then([l] { return l; });
}
return recover_segments(
std::filesystem::path(path),
_config.sanitize_fileops,
cfg.is_compacted(),
[this] { return create_cache(); },
_abort_source)
.then([this, cfg = std::move(cfg)](segment_set segments) mutable {
auto l = storage::make_disk_backed_log(
std::move(cfg), *this, std::move(segments), _kvstore);
auto [_, success] = _logs.emplace(l.config().ntp(), l);
vassert(success, "Could not keep track of:{} - concurrency issue", l);
return l;
});

return recover_log_state(cfg).then([this, cfg = std::move(cfg)]() mutable {
ss::sstring path = cfg.work_directory();
return recover_segments(
std::filesystem::path(path),
_config.sanitize_fileops,
cfg.is_compacted(),
[this] { return create_cache(); },
_abort_source)
.then([this, cfg = std::move(cfg)](segment_set segments) mutable {
auto l = storage::make_disk_backed_log(
std::move(cfg), *this, std::move(segments), _kvstore);
auto [_, success] = _logs.emplace(l.config().ntp(), l);
vassert(
success, "Could not keep track of:{} - concurrency issue", l);
return l;
});
});
}

ss::future<> log_manager::remove(model::ntp ntp) {
Expand Down
1 change: 1 addition & 0 deletions src/v/storage/log_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ class log_manager {
std::optional<batch_cache_index> create_cache();

ss::future<> dispatch_topic_dir_deletion(ss::sstring dir);
ss::future<> recover_log_state(const ntp_config&);

log_config _config;
kvstore& _kvstore;
Expand Down
Loading

0 comments on commit 9a9e644

Please sign in to comment.