Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Single active_transactions mutex for vote_blocking #1350

Merged
merged 7 commits into from
Nov 14, 2018
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions rai/core_test/ledger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -786,6 +786,7 @@ TEST (votes, check_signature)
node1.active.start (send1);
auto votes1 (node1.active.roots.find (send1->root ())->election);
ASSERT_EQ (1, votes1->last_votes.size ());
std::unique_lock<std::mutex> lock (node1.active.mutex);
auto vote1 (std::make_shared<rai::vote> (rai::test_genesis_key.pub, rai::test_genesis_key.prv, 1, send1));
vote1->signature.bytes[0] ^= 1;
ASSERT_EQ (rai::vote_code::invalid, node1.vote_processor.vote_blocking (transaction, vote1, rai::endpoint (boost::asio::ip::address_v6 (), 0)));
Expand Down Expand Up @@ -906,12 +907,16 @@ TEST (votes, add_old)
node1.active.start (send1);
auto votes1 (node1.active.roots.find (send1->root ())->election);
auto vote1 (std::make_shared<rai::vote> (rai::test_genesis_key.pub, rai::test_genesis_key.prv, 2, send1));
std::unique_lock<std::mutex> lock (node1.active.mutex);
node1.vote_processor.vote_blocking (transaction, vote1, node1.network.endpoint ());
lock.unlock ();
rai::keypair key2;
auto send2 (std::make_shared<rai::send_block> (genesis.hash (), key2.pub, 0, rai::test_genesis_key.prv, rai::test_genesis_key.pub, 0));
auto vote2 (std::make_shared<rai::vote> (rai::test_genesis_key.pub, rai::test_genesis_key.prv, 1, send2));
votes1->last_votes[rai::test_genesis_key.pub].time = std::chrono::steady_clock::now () - std::chrono::seconds (20);
lock.lock ();
node1.vote_processor.vote_blocking (transaction, vote2, node1.network.endpoint ());
lock.unlock ();
ASSERT_EQ (2, votes1->last_votes.size ());
ASSERT_NE (votes1->last_votes.end (), votes1->last_votes.find (rai::test_genesis_key.pub));
ASSERT_EQ (send1->hash (), votes1->last_votes[rai::test_genesis_key.pub].hash);
Expand Down Expand Up @@ -942,12 +947,16 @@ TEST (votes, add_old_different_account)
ASSERT_EQ (1, votes2->last_votes.size ());
auto vote1 (std::make_shared<rai::vote> (rai::test_genesis_key.pub, rai::test_genesis_key.prv, 2, send1));
auto transaction (system.nodes[0]->store.tx_begin ());
std::unique_lock<std::mutex> lock (node1.active.mutex);
auto vote_result1 (node1.vote_processor.vote_blocking (transaction, vote1, node1.network.endpoint ()));
lock.unlock ();
ASSERT_EQ (rai::vote_code::vote, vote_result1);
ASSERT_EQ (2, votes1->last_votes.size ());
ASSERT_EQ (1, votes2->last_votes.size ());
lock.lock ();
auto vote2 (std::make_shared<rai::vote> (rai::test_genesis_key.pub, rai::test_genesis_key.prv, 1, send2));
auto vote_result2 (node1.vote_processor.vote_blocking (transaction, vote2, node1.network.endpoint ()));
lock.unlock ();
ASSERT_EQ (rai::vote_code::vote, vote_result2);
ASSERT_EQ (2, votes1->last_votes.size ());
ASSERT_EQ (2, votes2->last_votes.size ());
Expand Down Expand Up @@ -975,11 +984,15 @@ TEST (votes, add_cooldown)
node1.active.start (send1);
auto votes1 (node1.active.roots.find (send1->root ())->election);
auto vote1 (std::make_shared<rai::vote> (rai::test_genesis_key.pub, rai::test_genesis_key.prv, 1, send1));
std::unique_lock<std::mutex> lock (node1.active.mutex);
node1.vote_processor.vote_blocking (transaction, vote1, node1.network.endpoint ());
lock.unlock ();
rai::keypair key2;
auto send2 (std::make_shared<rai::send_block> (genesis.hash (), key2.pub, 0, rai::test_genesis_key.prv, rai::test_genesis_key.pub, 0));
auto vote2 (std::make_shared<rai::vote> (rai::test_genesis_key.pub, rai::test_genesis_key.prv, 2, send2));
lock.lock ();
node1.vote_processor.vote_blocking (transaction, vote2, node1.network.endpoint ());
lock.unlock ();
ASSERT_EQ (2, votes1->last_votes.size ());
ASSERT_NE (votes1->last_votes.end (), votes1->last_votes.find (rai::test_genesis_key.pub));
ASSERT_EQ (send1->hash (), votes1->last_votes[rai::test_genesis_key.pub].hash);
Expand Down
1 change: 1 addition & 0 deletions rai/core_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1824,6 +1824,7 @@ TEST (node, fork_invalid_block_signature_vote_by_hash)
auto vote (std::make_shared<rai::vote> (rai::test_genesis_key.pub, rai::test_genesis_key.prv, 0, vote_blocks));
{
auto transaction (system.nodes[0]->store.tx_begin_read ());
std::unique_lock<std::mutex> lock (system.nodes[0]->active.mutex);
system.nodes[0]->vote_processor.vote_blocking (transaction, vote, system.nodes[0]->network.endpoint ());
}
while (system.nodes[0]->block (send1->hash ()))
Expand Down
29 changes: 23 additions & 6 deletions rai/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -791,10 +791,19 @@ void rai::vote_processor::process_loop ()
active = true;
lock.unlock ();
{
std::unique_lock<std::mutex> active_single_lock (node.active.mutex);
auto transaction (node.store.tx_begin_read ());
uint64_t count (1);
for (auto & i : votes_l)
{
vote_blocking (transaction, i.first, i.second);
// Free active_transactions mutex each 100 processed votes
if (count % 100 == 0)
{
active_single_lock.unlock ();
active_single_lock.lock ();
}
count++;
}
}
lock.lock ();
Expand Down Expand Up @@ -827,7 +836,7 @@ rai::vote_code rai::vote_processor::vote_blocking (rai::transaction const & tran
{
auto max_vote (node.store.vote_max (transaction_a, vote_a));
result = rai::vote_code::replay;
if (!node.active.vote (vote_a))
if (!node.active.vote (vote_a, true))
{
result = rai::vote_code::vote;
}
Expand Down Expand Up @@ -1441,7 +1450,7 @@ vote_uniquer (block_uniquer)
{
BOOST_LOG (log) << boost::str (boost::format ("Found a representative at %1%") % endpoint_a);
// Rebroadcasting all active votes to new representative
auto blocks (this->active.list_blocks ());
auto blocks (this->active.list_blocks (true));
for (auto i (blocks.begin ()), n (blocks.end ()); i != n; ++i)
{
if (*i != nullptr)
Expand Down Expand Up @@ -3048,13 +3057,17 @@ bool rai::active_transactions::add (std::pair<std::shared_ptr<rai::block>, std::
}

// Validate a vote and apply it to the current election if one exists
bool rai::active_transactions::vote (std::shared_ptr<rai::vote> vote_a)
bool rai::active_transactions::vote (std::shared_ptr<rai::vote> vote_a, bool single_lock)
{
std::shared_ptr<rai::election> election;
bool replay (false);
bool processed (false);
{
std::lock_guard<std::mutex> lock (mutex);
std::unique_lock<std::mutex> lock;
if (!single_lock)
{
lock = std::unique_lock<std::mutex> (mutex);
}
for (auto vote_block : vote_a->blocks)
{
rai::election_vote_result result;
Expand Down Expand Up @@ -3094,10 +3107,14 @@ bool rai::active_transactions::active (rai::block const & block_a)
}

// List of active blocks in elections
std::deque<std::shared_ptr<rai::block>> rai::active_transactions::list_blocks ()
std::deque<std::shared_ptr<rai::block>> rai::active_transactions::list_blocks (bool single_lock)
{
std::deque<std::shared_ptr<rai::block>> result;
std::lock_guard<std::mutex> lock (mutex);
std::unique_lock<std::mutex> lock;
if (!single_lock)
{
lock = std::unique_lock<std::mutex> (mutex);
}
for (auto i (roots.begin ()), n (roots.end ()); i != n; ++i)
{
result.push_back (i->election->status.winner);
Expand Down
4 changes: 2 additions & 2 deletions rai/node/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,10 @@ class active_transactions
bool add (std::pair<std::shared_ptr<rai::block>, std::shared_ptr<rai::block>>, std::function<void(std::shared_ptr<rai::block>)> const & = [](std::shared_ptr<rai::block>) {});
// If this returns true, the vote is a replay
// If this returns false, the vote may or may not be a replay
bool vote (std::shared_ptr<rai::vote>);
bool vote (std::shared_ptr<rai::vote>, bool = false);
// Is the root of this block in the roots container
bool active (rai::block const &);
std::deque<std::shared_ptr<rai::block>> list_blocks ();
std::deque<std::shared_ptr<rai::block>> list_blocks (bool = false);
void erase (rai::block const &);
void stop ();
bool publish (std::shared_ptr<rai::block> block_a);
Expand Down