Skip to content

Commit

Permalink
Merged #4979 Fixes metadata writes being interrupted
Browse files Browse the repository at this point in the history
  • Loading branch information
VeXocide committed Oct 29, 2015
1 parent 82c745e commit 944b402
Show file tree
Hide file tree
Showing 17 changed files with 106 additions and 138 deletions.
42 changes: 21 additions & 21 deletions src/clustering/administration/persist/branch_history_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,21 @@

void real_branch_history_manager_t::erase(
metadata_file_t::write_txn_t *write_txn,
const namespace_id_t &table_id,
signal_t *interruptor) {
const namespace_id_t &table_id) {
cond_t non_interruptor;
std::set<branch_id_t> branch_ids;
write_txn->read_many<branch_birth_certificate_t>(
mdprefix_branch_birth_certificate().suffix(uuid_to_str(table_id) + "/"),
[&](const std::string &branch_id_str, const branch_birth_certificate_t &) {
branch_id_t branch_id = str_to_uuid(branch_id_str);
branch_ids.insert(branch_id);
},
interruptor);
&non_interruptor);
for (const branch_id_t &b : branch_ids) {
write_txn->erase(
mdprefix_branch_birth_certificate().suffix(
uuid_to_str(table_id) + "/" + uuid_to_str(b)),
interruptor);
&non_interruptor);
}
}

Expand Down Expand Up @@ -54,36 +54,36 @@ bool real_branch_history_manager_t::is_branch_known(const branch_id_t &branch)

void real_branch_history_manager_t::create_branch(
branch_id_t branch_id,
const branch_birth_certificate_t &bc,
signal_t *interruptor)
THROWS_ONLY(interrupted_exc_t) {
const branch_birth_certificate_t &bc)
THROWS_NOTHING {
assert_thread();
{
metadata_file_t::write_txn_t write_txn(metadata_file, interruptor);
cond_t non_interruptor;
metadata_file_t::write_txn_t write_txn(metadata_file, &non_interruptor);
write_txn.write(
mdprefix_branch_birth_certificate().suffix(
uuid_to_str(table_id) + "/" + uuid_to_str(branch_id)),
bc,
interruptor);
&non_interruptor);
}
cache.branches.insert(std::make_pair(branch_id, bc));
}

void real_branch_history_manager_t::import_branch_history(
const branch_history_t &new_records,
signal_t *interruptor)
THROWS_ONLY(interrupted_exc_t) {
const branch_history_t &new_records)
THROWS_NOTHING {
assert_thread();
{
metadata_file_t::write_txn_t write_txn(metadata_file, interruptor);
cond_t non_interruptor;
metadata_file_t::write_txn_t write_txn(metadata_file, &non_interruptor);
for (const auto &pair : new_records.branches) {
if (!is_branch_known(pair.first)) {
cache.branches.insert(pair);
write_txn.write(
mdprefix_branch_birth_certificate().suffix(
uuid_to_str(table_id) + "/" + uuid_to_str(pair.first)),
pair.second,
interruptor);
&non_interruptor);
cache.branches.insert(pair);
}
}
}
Expand All @@ -99,17 +99,17 @@ void real_branch_history_manager_t::prepare_gc(
}

void real_branch_history_manager_t::perform_gc(
const std::set<branch_id_t> &remove_branches,
signal_t *interruptor)
THROWS_ONLY(interrupted_exc_t) {
metadata_file_t::write_txn_t write_txn(metadata_file, interruptor);
const std::set<branch_id_t> &remove_branches)
THROWS_NOTHING {
cond_t non_interruptor;
metadata_file_t::write_txn_t write_txn(metadata_file, &non_interruptor);
for (const branch_id_t &bid : remove_branches) {
if (is_branch_known(bid)) {
cache.branches.erase(bid);
write_txn.erase(
mdprefix_branch_birth_certificate().suffix(
uuid_to_str(table_id) + "/" + uuid_to_str(bid)),
interruptor);
&non_interruptor);
cache.branches.erase(bid);
}
}
}
Expand Down
19 changes: 8 additions & 11 deletions src/clustering/administration/persist/branch_history_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ class real_branch_history_manager_t : public branch_history_manager_t {
public:
static void erase(
metadata_file_t::write_txn_t *write_txn,
const namespace_id_t &_table_id,
signal_t *interruptor);
const namespace_id_t &_table_id);

real_branch_history_manager_t(
const namespace_id_t &_table_id,
metadata_file_t *_metadata_file,
Expand All @@ -22,20 +22,17 @@ class real_branch_history_manager_t : public branch_history_manager_t {
bool is_branch_known(const branch_id_t &branch) const THROWS_NOTHING;
void create_branch(
branch_id_t branch_id,
const branch_birth_certificate_t &bc,
signal_t *interruptor)
THROWS_ONLY(interrupted_exc_t);
const branch_birth_certificate_t &bc)
THROWS_NOTHING;
void import_branch_history(
const branch_history_t &new_records,
signal_t *interruptor)
THROWS_ONLY(interrupted_exc_t);
const branch_history_t &new_records)
THROWS_NOTHING;
void prepare_gc(
std::set<branch_id_t> *branches_out)
THROWS_NOTHING;
void perform_gc(
const std::set<branch_id_t> &remove_branches,
signal_t *interruptor)
THROWS_ONLY(interrupted_exc_t);
const std::set<branch_id_t> &remove_branches)
THROWS_NOTHING;

private:
namespace_id_t const table_id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,8 +388,8 @@ void migrate_table(const server_id_t &this_server_id,
out->write(mdprefix_table_active().suffix(uuid_to_str(table_id)), active_state, interruptor);

// The `table_raft_storage_interface_t` constructor will persist the header, snapshot, and logs
table_raft_storage_interface_t storage_interface(nullptr, out, table_id,
persistent_state, interruptor);
table_raft_storage_interface_t storage_interface(
nullptr, out, table_id, persistent_state);
}
}

Expand Down
23 changes: 12 additions & 11 deletions src/clustering/administration/persist/raft_storage_interface.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,14 @@ table_raft_storage_interface_t::table_raft_storage_interface_t(
metadata_file_t *_file,
metadata_file_t::write_txn_t *txn,
const namespace_id_t &_table_id,
const raft_persistent_state_t<table_raft_state_t> &_state,
signal_t *interruptor) :
const raft_persistent_state_t<table_raft_state_t> &_state) :
file(_file), table_id(_table_id), state(_state) {
cond_t non_interruptor;

txn->write(
mdprefix_table_raft_header().suffix(uuid_to_str(table_id)),
table_raft_stored_header_t::from_state(state),
interruptor);
&non_interruptor);

/* To avoid expensive copies of `state`, we move `state` into the snapshot and then
back out after we're done */
Expand All @@ -89,7 +90,7 @@ table_raft_storage_interface_t::table_raft_storage_interface_t(
txn->write(
mdprefix_table_raft_snapshot().suffix(uuid_to_str(table_id)),
snapshot,
interruptor);
&non_interruptor);
state.snapshot_state = std::move(snapshot.snapshot_state);
state.snapshot_config = std::move(snapshot.snapshot_config);

Expand All @@ -99,31 +100,31 @@ table_raft_storage_interface_t::table_raft_storage_interface_t(
mdprefix_table_raft_log().suffix(
uuid_to_str(table_id) + "/" + log_index_to_str(i)),
state.log.get_entry_ref(i),
interruptor);
&non_interruptor);
}
}

void table_raft_storage_interface_t::erase(
metadata_file_t::write_txn_t *txn,
const namespace_id_t &table_id,
signal_t *interruptor) {
const namespace_id_t &table_id) {
cond_t non_interruptor;
txn->erase(
mdprefix_table_raft_header().suffix(uuid_to_str(table_id)),
interruptor);
&non_interruptor);
txn->erase(
mdprefix_table_raft_snapshot().suffix(uuid_to_str(table_id)),
interruptor);
&non_interruptor);
std::vector<std::string> log_keys;
txn->read_many<raft_log_entry_t<table_raft_state_t> >(
mdprefix_table_raft_log().suffix(uuid_to_str(table_id) + "/"),
[&](const std::string &index_str, const raft_log_entry_t<table_raft_state_t> &) {
log_keys.push_back(index_str);
},
interruptor);
&non_interruptor);
for (const std::string &key : log_keys) {
txn->erase(
mdprefix_table_raft_log().suffix(uuid_to_str(table_id) + "/" + key),
interruptor);
&non_interruptor);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,12 @@ class table_raft_storage_interface_t :
metadata_file_t *file,
metadata_file_t::write_txn_t *txn,
const namespace_id_t &table_id,
const raft_persistent_state_t<table_raft_state_t> &state,
signal_t *interruptor);
const raft_persistent_state_t<table_raft_state_t> &state);

/* This method erases an existing Raft state in the metadata file. */
static void erase(
metadata_file_t::write_txn_t *txn,
const namespace_id_t &table_id,
signal_t *interruptor);
const namespace_id_t &table_id);

/* `raft_storage_interface_t` methods */
const raft_persistent_state_t<table_raft_state_t> *get();
Expand Down
41 changes: 20 additions & 21 deletions src/clustering/administration/persist/table_interface.cc
Original file line number Diff line number Diff line change
Expand Up @@ -219,54 +219,53 @@ void real_table_persistence_interface_t::write_metadata_active(
const namespace_id_t &table_id,
const table_active_persistent_state_t &state,
const raft_persistent_state_t<table_raft_state_t> &raft_state,
signal_t *interruptor,
raft_storage_interface_t<table_raft_state_t> **raft_storage_out) {
cond_t non_interruptor;
storage_interfaces.erase(table_id);
metadata_file_t::write_txn_t write_txn(metadata_file, interruptor);
metadata_file_t::write_txn_t write_txn(metadata_file, &non_interruptor);
write_txn.erase(
mdprefix_table_inactive().suffix(uuid_to_str(table_id)),
interruptor);
table_raft_storage_interface_t::erase(&write_txn, table_id, interruptor);
&non_interruptor);
table_raft_storage_interface_t::erase(&write_txn, table_id);
write_txn.write(
mdprefix_table_active().suffix(uuid_to_str(table_id)),
state,
interruptor);
&non_interruptor);
storage_interfaces[table_id].init(new table_raft_storage_interface_t(
metadata_file, &write_txn, table_id, raft_state, interruptor));
metadata_file, &write_txn, table_id, raft_state));
*raft_storage_out = storage_interfaces[table_id].get();
}

void real_table_persistence_interface_t::write_metadata_inactive(
const namespace_id_t &table_id,
const table_inactive_persistent_state_t &state,
signal_t *interruptor) {
const table_inactive_persistent_state_t &state) {
cond_t non_interruptor;
storage_interfaces.erase(table_id);
metadata_file_t::write_txn_t write_txn(metadata_file, interruptor);
metadata_file_t::write_txn_t write_txn(metadata_file, &non_interruptor);
write_txn.erase(
mdprefix_table_active().suffix(uuid_to_str(table_id)),
interruptor);
&non_interruptor);
write_txn.write(
mdprefix_table_inactive().suffix(uuid_to_str(table_id)),
state,
interruptor);

table_raft_storage_interface_t::erase(&write_txn, table_id, interruptor);
real_branch_history_manager_t::erase(&write_txn, table_id, interruptor);
&non_interruptor);
table_raft_storage_interface_t::erase(&write_txn, table_id);
real_branch_history_manager_t::erase(&write_txn, table_id);
}

void real_table_persistence_interface_t::delete_metadata(
const namespace_id_t &table_id,
signal_t *interruptor) {
const namespace_id_t &table_id) {
cond_t non_interruptor;
storage_interfaces.erase(table_id);
metadata_file_t::write_txn_t write_txn(metadata_file, interruptor);
metadata_file_t::write_txn_t write_txn(metadata_file, &non_interruptor);
write_txn.erase(
mdprefix_table_active().suffix(uuid_to_str(table_id)),
interruptor);
&non_interruptor);
write_txn.erase(
mdprefix_table_inactive().suffix(uuid_to_str(table_id)),
interruptor);
table_raft_storage_interface_t::erase(&write_txn, table_id, interruptor);
real_branch_history_manager_t::erase(&write_txn, table_id, interruptor);
&non_interruptor);
table_raft_storage_interface_t::erase(&write_txn, table_id);
real_branch_history_manager_t::erase(&write_txn, table_id);
}

void real_table_persistence_interface_t::load_multistore(
Expand Down
7 changes: 2 additions & 5 deletions src/clustering/administration/persist/table_interface.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,12 @@ class real_table_persistence_interface_t :
const namespace_id_t &table_id,
const table_active_persistent_state_t &state,
const raft_persistent_state_t<table_raft_state_t> &raft_state,
signal_t *interruptor,
raft_storage_interface_t<table_raft_state_t> **raft_storage_out);
void write_metadata_inactive(
const namespace_id_t &table_id,
const table_inactive_persistent_state_t &state,
signal_t *interruptor);
const table_inactive_persistent_state_t &state);
void delete_metadata(
const namespace_id_t &table_id,
signal_t *interruptor);
const namespace_id_t &table_id);

void load_multistore(
const namespace_id_t &table_id,
Expand Down
5 changes: 1 addition & 4 deletions src/clustering/immediate_consistency/backfillee.cc
Original file line number Diff line number Diff line change
Expand Up @@ -388,11 +388,8 @@ backfillee_t::backfillee_t(

/* Record the branch history we got from the backfiller */
{
cross_thread_signal_t interruptor_on_bhm_thread(
interruptor, branch_history_manager->home_thread());
on_thread_t thread_switcher(branch_history_manager->home_thread());
branch_history_manager->import_branch_history(
intro.final_version_history, &interruptor_on_bhm_thread);
branch_history_manager->import_branch_history(intro.final_version_history);
}

/* Spawn the coroutine that will stream pre-items to the backfiller. */
Expand Down
19 changes: 8 additions & 11 deletions src/clustering/immediate_consistency/history.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ class branch_history_combiner_t : public branch_history_reader_t {
branch_history_combiner_t(
const branch_history_reader_t *_r1,
const branch_history_reader_t *_r2)
: r1(_r1), r2(_r2) { }
: r1(_r1), r2(_r2) { }
branch_birth_certificate_t get_branch(const branch_id_t& branch)
const THROWS_ONLY(missing_branch_exc_t);
bool is_branch_known(const branch_id_t &branch) const THROWS_NOTHING;
Expand All @@ -208,7 +208,7 @@ class branch_history_combiner_t : public branch_history_reader_t {
to add branches to the branch history. This is used for a branch history which is backed
to disk. */
class branch_history_manager_t :
public branch_history_reader_t,
public branch_history_reader_t,
public home_thread_mixin_t
{
public:
Expand All @@ -219,16 +219,14 @@ class branch_history_manager_t :
*/
virtual void create_branch(
branch_id_t branch_id,
const branch_birth_certificate_t &bc,
signal_t *interruptor)
THROWS_ONLY(interrupted_exc_t) = 0;
const branch_birth_certificate_t &bc)
THROWS_NOTHING = 0;

/* Like `create_branch` but for all the records in a `branch_history_t`, atomically.
*/
virtual void import_branch_history(
const branch_history_t &new_records,
signal_t *interruptor)
THROWS_ONLY(interrupted_exc_t) = 0;
const branch_history_t &new_records)
THROWS_NOTHING = 0;

/* `prepare_gc()` fills `branches` with the IDs of all known branches. `perform_gc()`
deletes any branches whose IDs are in `remove_branches`. The standard procedure is to
Expand All @@ -238,9 +236,8 @@ class branch_history_manager_t :
std::set<branch_id_t> *branches_out)
THROWS_NOTHING = 0;
virtual void perform_gc(
const std::set<branch_id_t> &remove_branches,
signal_t *interruptor)
THROWS_ONLY(interrupted_exc_t) = 0;
const std::set<branch_id_t> &remove_branches)
THROWS_NOTHING = 0;

protected:
virtual ~branch_history_manager_t() { }
Expand Down
Loading

0 comments on commit 944b402

Please sign in to comment.