Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/consensus/aft/raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -1857,7 +1857,8 @@ namespace aft
void become_leader()
{
election_index = last_committable_index();
LOG_DEBUG_FMT("Election index is {}", election_index);
LOG_DEBUG_FMT(
"Election index is {} in term {}", election_index, state->current_view);
// Discard any un-committable updates we may hold,
// since we have no signature for them. Except at startup,
// where we do not want to roll back the genesis transaction.
Expand Down
3 changes: 2 additions & 1 deletion src/kv/kv_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,9 @@ namespace kv
const std::vector<uint8_t>& request,
uint8_t frame_format) = 0;
virtual void append(const std::vector<uint8_t>& replicated) = 0;
virtual void rollback(Version v) = 0;
virtual void rollback(Version v, kv::Term) = 0;
virtual void compact(Version v) = 0;
virtual void set_term(kv::Term) = 0;
};

class Consensus : public ConfigurableConsensus
Expand Down
38 changes: 27 additions & 11 deletions src/kv/store.h
Original file line number Diff line number Diff line change
Expand Up @@ -534,13 +534,6 @@ namespace kv

{
std::lock_guard<SpinLock> vguard(version_lock);
// The term should always be updated on rollback() when passed
// regardless of whether version needs to be updated or not
if (t.has_value())
term = t.value();
if (v >= version)
return;

if (v < compacted)
{
throw std::logic_error(fmt::format(
Expand All @@ -549,17 +542,35 @@ namespace kv
compacted));
}

// The term should always be updated on rollback() when passed
// regardless of whether version needs to be updated or not
if (t.has_value())
Comment thread
achamayou marked this conversation as resolved.
{
term = t.value();
}
// History must be informed of the term change, even if no
// actual rollback is required
auto h = get_history();
if (h)
{
h->rollback(v, term);
}

if (v >= version)
{
return;
}

version = v;
last_replicated = v;
last_committable = v;
rollback_count++;
pending_txs.clear();
auto h = get_history();
if (h)
h->rollback(v);
auto e = get_encryptor();
if (e)
{
e->rollback(v);
}
}

for (auto& it : maps)
Expand Down Expand Up @@ -599,6 +610,11 @@ namespace kv
{
std::lock_guard<SpinLock> vguard(version_lock);
term = t;
auto h = get_history();
if (h)
{
h->set_term(term);
}
}

DeserialiseSuccess deserialise_views(
Expand Down Expand Up @@ -1259,4 +1275,4 @@ namespace kv
return ReservedTx(this, v);
}
};
}
}
16 changes: 14 additions & 2 deletions src/node/history.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ namespace ccf
return true;
}

void rollback(kv::Version) override {}
void set_term(kv::Term) override {}

void rollback(kv::Version, kv::Term) override {}

void compact(kv::Version) override {}

Expand Down Expand Up @@ -459,6 +461,9 @@ namespace ccf
size_t sig_tx_interval;
size_t sig_ms_interval;

SpinLock term_lock;
kv::Term term = 0;

public:
HashedTxHistory(
kv::Store& store_,
Expand Down Expand Up @@ -657,8 +662,15 @@ namespace ccf
return true;
}

void rollback(kv::Version v) override
void set_term(kv::Term t) override
{
std::lock_guard<SpinLock> tguard(term_lock);
term = t;
}

void rollback(kv::Version v, kv::Term t) override
{
set_term(t);
replicated_state_tree.retract(v);
log_hash(replicated_state_tree.get_root(), ROLLBACK);
}
Expand Down