Skip to content

Commit

Permalink
[FOLD] Prepare preferred ledger by branch test:
Browse files Browse the repository at this point in the history
Create a test that forks using the current preferred branching
approach that will not fork using the preferred by branch algorithm.

Update Peer to retry acquiring ledgers and txSets if a request times
out.
  • Loading branch information
bachase committed Dec 7, 2017
1 parent febedc4 commit 9949483
Show file tree
Hide file tree
Showing 5 changed files with 254 additions and 23 deletions.
170 changes: 170 additions & 0 deletions src/test/consensus/Consensus_test.cpp
Expand Up @@ -825,6 +825,175 @@ class Consensus_test : public beast::unit_test::suite
BEAST_EXPECT(sim.synchronized());
}


// Helper collector for testPreferredByBranch
// Invasively disconnects network at bad times to cause splits
struct Disruptor
{
csf::PeerGroup& network;
csf::PeerGroup& groupCfast;
csf::PeerGroup& groupCsplit;
csf::SimDuration delay;
bool reconnected = false;

Disruptor(
csf::PeerGroup& net,
csf::PeerGroup& c,
csf::PeerGroup& split,
csf::SimDuration d)
: network(net), groupCfast(c), groupCsplit(split), delay(d)
{
}

template <class E>
void
on(csf::PeerID, csf::SimTime, E const&)
{
}


void
on(csf::PeerID who, csf::SimTime, csf::FullyValidateLedger const& e)
{
using namespace std::chrono;
// As soon as the the fastC node fully validates C, disconnect
// ALL c nodes from the network. The fast C node needs to disconnect
// as well to prevent it from relaying the validations it did see
if (who == groupCfast[0]->id &&
e.ledger.seq() == csf::Ledger::Seq{2})
{
network.disconnect(groupCsplit);
network.disconnect(groupCfast);
}
}

void
on(csf::PeerID who, csf::SimTime, csf::AcceptLedger const& e)
{
// As soon as anyone generates a child of B or C, reconnect the
// network so those validation make it through
if (!reconnected && e.ledger.seq() == csf::Ledger::Seq{3})
{
reconnected = true;
network.connect(groupCsplit, delay);
}
}


};

void
testPreferredByBranch()
{
using namespace csf;
using namespace std::chrono;

// Simulate network splits that are prevented from forking when using
// preferred ledger by trie. This is a contrived example that involves
// excessive network splits, but demonstrates the safety improvement
// from the preferred ledger by trie approach.

// Consider 10 validating nodes that comprise a single common UNL
// Ledger history:
// 1: A
// _/ \_
// 2: B C
// _/ _/ \_
// 3: D C' |||||||| (8 different ledgers)

// - All nodes generate the common ledger A
// - 2 nodes generate B and 8 nodes generate C
// - Only 1 of the C nodes sees all the C validations and fully
// validates C. The rest of the C nodes disconnect split at just
// the right time such that they never see any C validations but
// their own.
// - The C nodes continue and generate 8 different child ledgers.
// - Meanwhile, the D nodes only saw 1 validation for C and 2 validations
// for C.
// - The network reconnects and the validations for generation 3 ledgers
// are observed (D and the 8 C's)
// - In the old approach, 2 votes for D outweights 1 vote for each C'
// so the network would avalanche towards D and fully validate it
// EVEN though C was fully validated by one node
// - In the new approach, 2 votes for D are not enough to outweight the
// 8 implicit votes for C, so nodes will avalanche to C instead


ConsensusParms const parms{};
Sim sim;

// Goes A->B->D
PeerGroup groupABD = sim.createGroup(2);
// Single node that initially fully validates C before the split
PeerGroup groupCfast = sim.createGroup(1);
// Generates C, but fails to fully validate before the split
PeerGroup groupCsplit = sim.createGroup(7);

PeerGroup groupNotFastC = groupABD + groupCsplit;
PeerGroup network = groupABD + groupCsplit + groupCfast;

SimDuration delay = round<milliseconds>(0.2 * parms.ledgerGRANULARITY);
SimDuration fDelay = round<milliseconds>(0.1 * parms.ledgerGRANULARITY);

network.trust(network);
// C must have a shorter delay to see all the validations before the
// other nodes
network.connect(groupCfast, fDelay);
// The rest of the network is connected at the same speed
(network - groupCfast).connect(network - groupCfast, delay);

Disruptor dc(network, groupCfast, groupCsplit, delay);
sim.collectors.add(dc);

// Consensus round to generate ledger A
sim.run(1);
BEAST_EXPECT(sim.synchronized());

// Next round generates B and C
// To force B, we inject an extra transaction in to those nodes
for(Peer * peer : groupABD)
{
peer->txInjections.emplace(
peer->lastClosedLedger.seq(), Tx{42});
}
// The Disruptor will ensure that nodes disconnect before the C
// validations make it to all but the fastC node
sim.run(1);

// We are no longer in sync, but have not yet forked:
// 9 nodes consider A the last fully validated ledger and fastC sees C
BEAST_EXPECT(!sim.synchronized());
BEAST_EXPECT(sim.branches() == 1);

// Run another round to generate the 8 different C' ledgers
for (Peer * p : network)
p->submit(Tx(static_cast<std::uint32_t>(p->id)));
sim.run(1);

// Still not forked
BEAST_EXPECT(!sim.synchronized());
BEAST_EXPECT(sim.branches() == 1);

// Disruptor will reconnect all but the fastC node
sim.run(1);
BEAST_EXPECT(!sim.synchronized());

if(BEAST_EXPECT(sim.branches() == 1))
{
// New approach will not fork and will resync once the fast node
// reconnects for a few rounds
network.connect(groupCfast, fDelay);
sim.run(2);
BEAST_EXPECT(sim.synchronized());
BEAST_EXPECT(sim.branches() == 1);

}
else // old approach caused a fork
{
BEAST_EXPECT(sim.branches(groupNotFastC) == 1);
BEAST_EXPECT(sim.synchronized(groupNotFastC) == 1);
}
}
void
run() override
{
Expand All @@ -839,6 +1008,7 @@ class Consensus_test : public beast::unit_test::suite
testConsensusCloseTimeRounding();
testFork();
testHubNetwork();
testPreferredByBranch();
}
};

Expand Down
46 changes: 37 additions & 9 deletions src/test/csf/Peer.h
Expand Up @@ -213,9 +213,9 @@ struct Peer
//! TxSet associated with a TxSet::ID
bc::flat_map<TxSet::ID, TxSet> txSets;

// Ledgers and txSets that we have already attempted to acquire
bc::flat_set<Ledger::ID> acquiringLedgers;
bc::flat_set<TxSet::ID> acquiringTxSets;
// Ledgers/TxSets we are acquiring and when that request times out
bc::flat_map<Ledger::ID,SimTime> acquiringLedgers;
bc::flat_map<TxSet::ID,SimTime> acquiringTxSets;

//! The number of ledgers this peer has completed
int completedLedgers = 0;
Expand Down Expand Up @@ -380,16 +380,30 @@ struct Peer
Ledger const*
acquireLedger(Ledger::ID const& ledgerID)
{
using namespace std::chrono;

auto it = ledgers.find(ledgerID);
if (it != ledgers.end())
return &(it->second);

// Don't retry if we already are acquiring it
if(!acquiringLedgers.emplace(ledgerID).second)
// No peers
if(net.links(this).empty())
return nullptr;

// Don't retry if we already are acquiring it and haven't timed out
auto aIt = acquiringLedgers.find(ledgerID);
if(aIt!= acquiringLedgers.end())
{
if(scheduler.now() < aIt->second)
return nullptr;
}


SimDuration minDuration{10s};
for (auto const& link : net.links(this))
{
minDuration = std::min(minDuration, link.data.delay);

// Send a messsage to neighbors to find the ledger
net.send(
this, link.target, [ to = link.target, from = this, ledgerID ]() {
Expand All @@ -400,11 +414,13 @@ struct Peer
// requesting peer where it is added to the available
// ledgers
to->net.send(to, from, [ from, ledger = it->second ]() {
from->acquiringLedgers.erase(ledger.id());
from->ledgers.emplace(ledger.id(), ledger);
});
}
});
}
acquiringLedgers[ledgerID] = scheduler.now() + 2 * minDuration;
return nullptr;
}

Expand All @@ -416,12 +432,22 @@ struct Peer
if (it != txSets.end())
return &(it->second);

// Don't retry if we already are acquiring it
if(!acquiringTxSets.emplace(setId).second)
// No peers
if(net.links(this).empty())
return nullptr;

// Don't retry if we already are acquiring it and haven't timed out
auto aIt = acquiringTxSets.find(setId);
if(aIt!= acquiringTxSets.end())
{
if(scheduler.now() < aIt->second)
return nullptr;
}

SimDuration minDuration{10s};
for (auto const& link : net.links(this))
{
minDuration = std::min(minDuration, link.data.delay);
// Send a message to neighbors to find the tx set
net.send(
this, link.target, [ to = link.target, from = this, setId ]() {
Expand All @@ -432,11 +458,13 @@ struct Peer
// requesting peer, where it is handled like a TxSet
// that was broadcast over the network
to->net.send(to, from, [ from, txSet = it->second ]() {
from->acquiringTxSets.erase(txSet.id());
from->handle(txSet);
});
}
});
}
acquiringTxSets[setId] = scheduler.now() + 2 * minDuration;
return nullptr;
}

Expand Down Expand Up @@ -663,7 +691,7 @@ struct Peer
std::size_t const count = validations.numTrustedForLedger(ledger.id());
std::size_t const numTrustedPeers = trustGraph.graph().outDegree(this);
quorum = static_cast<std::size_t>(std::ceil(numTrustedPeers * 0.8));
if (count >= quorum)
if (count >= quorum && oracle.isAncestor(fullyValidatedLedger, ledger))
{
issue(FullyValidateLedger{ledger, fullyValidatedLedger});
fullyValidatedLedger = ledger;
Expand Down Expand Up @@ -830,7 +858,7 @@ struct Peer
lastClosedLedger.parentID(),
earliestAllowedSeq());

// Between rounds, we take the majority ledger and use the
// Between rounds, we take the majority ledger and use the
Ledger::ID const bestLCL =
getPreferredLedger(lastClosedLedger.id(), valDistribution);

Expand Down
9 changes: 8 additions & 1 deletion src/test/csf/PeerGroup.h
Expand Up @@ -62,7 +62,6 @@ class PeerGroup

PeerGroup(std::set<Peer*> const& peers) : peers_{peers.begin(), peers.end()}
{

}

iterator
Expand Down Expand Up @@ -101,6 +100,14 @@ class PeerGroup
return std::find(peers_.begin(), peers_.end(), p) != peers_.end();
}

bool
contains(PeerID id)
{
return std::find_if(peers_.begin(), peers_.end(), [id](Peer const* p) {
return p->id == id;
}) != peers_.end();
}

std::size_t
size() const
{
Expand Down
21 changes: 18 additions & 3 deletions src/test/csf/Sim.h
Expand Up @@ -65,6 +65,7 @@ class Sim
// Use a deque to have stable pointers even when dynamically adding peers
// - Alternatively consider using unique_ptrs allocated from arena
std::deque<Peer> peers;
PeerGroup allPeers;

public:
std::mt19937_64 rng;
Expand Down Expand Up @@ -113,7 +114,9 @@ class Sim
j);
newPeers.emplace_back(&peers.back());
}
return PeerGroup{newPeers};
PeerGroup res{newPeers};
allPeers = allPeers + res;
return res;
}

//! The number of peers in the simulation
Expand All @@ -136,20 +139,32 @@ class Sim
void
run(SimDuration const& dur);

/** Check whether all peers in the network are synchronized.
/** Check whether all peers in the group are synchronized.
Nodes in the network are synchronized if they share the same last
fully validated and last generated ledger.
*/
bool
synchronized(PeerGroup const & g) const;


/** Check whether all peers in the network are synchronized
*/
bool
synchronized() const;

/** Calculate the number of branches in the network.

/** Calculate the number of branches in the group.
A branch occurs if two peers have fullyValidatedLedgers that are not on
the same chain of ledgers.
*/
std::size_t
branches(PeerGroup const & g) const;

/** Calculate the number of branches in the network
*/
std::size_t
branches() const;

};
Expand Down

0 comments on commit 9949483

Please sign in to comment.