Skip to content

Commit

Permalink
Bootstrap connection cleanup (#912)
Browse files Browse the repository at this point in the history
* Deserialize in to receive buffer without any pointer arithmetic.

* Creating rai socket class that has timeout capability encapsulated.

* Putting message header in its own class and removing buffer pointer arithmetic when parsing bootstrap messages.

* Converting bootstrap_server connection to a rai::socket to ensure timeouts are used when sending data.
  • Loading branch information
clemahieu authored and argakiig committed Jun 11, 2018
1 parent b50d148 commit 2900590
Show file tree
Hide file tree
Showing 9 changed files with 483 additions and 418 deletions.
20 changes: 13 additions & 7 deletions rai/core_test/block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -282,9 +282,12 @@ TEST (frontier_req, serialization)
rai::vectorstream stream (bytes);
request1.serialize (stream);
}
rai::bufferstream buffer (bytes.data (), bytes.size ());
rai::frontier_req request2;
ASSERT_FALSE (request2.deserialize (buffer));
auto error (false);
rai::bufferstream stream (bytes.data (), bytes.size ());
rai::message_header header (error, stream);
ASSERT_FALSE (error);
rai::frontier_req request2 (error, stream, header);
ASSERT_FALSE (error);
ASSERT_EQ (request1, request2);
}

Expand All @@ -299,9 +302,11 @@ TEST (block, publish_req_serialization)
rai::vectorstream stream (bytes);
req.serialize (stream);
}
rai::publish req2;
auto error (false);
rai::bufferstream stream2 (bytes.data (), bytes.size ());
auto error (req2.deserialize (stream2));
rai::message_header header (error, stream2);
ASSERT_FALSE (error);
rai::publish req2 (error, stream2, header);
ASSERT_FALSE (error);
ASSERT_EQ (req, req2);
ASSERT_EQ (*req.block, *req2.block);
Expand All @@ -318,9 +323,10 @@ TEST (block, confirm_req_serialization)
rai::vectorstream stream (bytes);
req.serialize (stream);
}
rai::confirm_req req2;
auto error (false);
rai::bufferstream stream2 (bytes.data (), bytes.size ());
auto error (req2.deserialize (stream2));
rai::message_header header (error, stream2);
rai::confirm_req req2 (error, stream2, header);
ASSERT_FALSE (error);
ASSERT_EQ (req, req2);
ASSERT_EQ (*req.block, *req2.block);
Expand Down
55 changes: 26 additions & 29 deletions rai/core_test/message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@ TEST (message, keepalive_serialization)
rai::vectorstream stream (bytes);
request1.serialize (stream);
}
rai::keepalive request2;
rai::bufferstream buffer (bytes.data (), bytes.size ());
ASSERT_FALSE (request2.deserialize (buffer));
auto error (false);
rai::bufferstream stream (bytes.data (), bytes.size ());
rai::message_header header (error, stream);
ASSERT_FALSE (error);
rai::keepalive request2 (error, stream, header);
ASSERT_FALSE (error);
ASSERT_EQ (request1, request2);
}

Expand All @@ -25,31 +28,27 @@ TEST (message, keepalive_deserialize)
rai::vectorstream stream (bytes);
message1.serialize (stream);
}
uint8_t version_max;
uint8_t version_using;
uint8_t version_min;
rai::message_type type;
std::bitset<16> extensions;
rai::bufferstream header_stream (bytes.data (), bytes.size ());
ASSERT_FALSE (rai::message::read_header (header_stream, version_max, version_using, version_min, type, extensions));
ASSERT_EQ (rai::message_type::keepalive, type);
rai::keepalive message2;
rai::bufferstream stream (bytes.data (), bytes.size ());
ASSERT_FALSE (message2.deserialize (stream));
auto error (false);
rai::message_header header (error, stream);
ASSERT_FALSE (error);
ASSERT_EQ (rai::message_type::keepalive, header.type);
rai::keepalive message2 (error, stream, header);
ASSERT_FALSE (error);
ASSERT_EQ (message1.peers, message2.peers);
}

TEST (message, publish_serialization)
{
rai::publish publish (std::unique_ptr<rai::block> (new rai::send_block (0, 1, 2, rai::keypair ().prv, 4, 5)));
ASSERT_EQ (rai::block_type::send, publish.block_type ());
ASSERT_FALSE (publish.ipv4_only ());
publish.ipv4_only_set (true);
ASSERT_TRUE (publish.ipv4_only ());
ASSERT_EQ (rai::block_type::send, publish.header.block_type ());
ASSERT_FALSE (publish.header.ipv4_only ());
publish.header.ipv4_only_set (true);
ASSERT_TRUE (publish.header.ipv4_only ());
std::vector<uint8_t> bytes;
{
rai::vectorstream stream (bytes);
publish.write_header (stream);
publish.header.serialize (stream);
}
ASSERT_EQ (8, bytes.size ());
ASSERT_EQ (0x52, bytes[0]);
Expand All @@ -61,16 +60,13 @@ TEST (message, publish_serialization)
ASSERT_EQ (0x02, bytes[6]);
ASSERT_EQ (static_cast<uint8_t> (rai::block_type::send), bytes[7]);
rai::bufferstream stream (bytes.data (), bytes.size ());
uint8_t version_max;
uint8_t version_using;
uint8_t version_min;
rai::message_type type;
std::bitset<16> extensions;
ASSERT_FALSE (rai::message::read_header (stream, version_max, version_using, version_min, type, extensions));
ASSERT_EQ (rai::protocol_version_min, version_min);
ASSERT_EQ (rai::protocol_version, version_using);
ASSERT_EQ (rai::protocol_version, version_max);
ASSERT_EQ (rai::message_type::publish, type);
auto error (false);
rai::message_header header (error, stream);
ASSERT_FALSE (error);
ASSERT_EQ (rai::protocol_version_min, header.version_min);
ASSERT_EQ (rai::protocol_version, header.version_using);
ASSERT_EQ (rai::protocol_version, header.version_max);
ASSERT_EQ (rai::message_type::publish, header.type);
}

TEST (message, confirm_ack_serialization)
Expand All @@ -85,7 +81,8 @@ TEST (message, confirm_ack_serialization)
}
rai::bufferstream stream2 (bytes.data (), bytes.size ());
bool error;
rai::confirm_ack con2 (error, stream2);
rai::message_header header (error, stream2);
rai::confirm_ack con2 (error, stream2, header);
ASSERT_FALSE (error);
ASSERT_EQ (con1, con2);
}
44 changes: 36 additions & 8 deletions rai/core_test/message_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,18 @@ TEST (message_parser, exact_confirm_ack_size)
}
ASSERT_EQ (0, visitor.confirm_ack_count);
ASSERT_EQ (parser.status, rai::message_parser::parse_status::success);
parser.deserialize_confirm_ack (bytes.data (), bytes.size ());
auto error (false);
rai::bufferstream stream1 (bytes.data (), bytes.size ());
rai::message_header header1 (error, stream1);
ASSERT_FALSE (error);
parser.deserialize_confirm_ack (stream1, header1);
ASSERT_EQ (1, visitor.confirm_ack_count);
ASSERT_EQ (parser.status, rai::message_parser::parse_status::success);
bytes.push_back (0);
parser.deserialize_confirm_ack (bytes.data (), bytes.size ());
rai::bufferstream stream2 (bytes.data (), bytes.size ());
rai::message_header header2 (error, stream2);
ASSERT_FALSE (error);
parser.deserialize_confirm_ack (stream2, header2);
ASSERT_EQ (1, visitor.confirm_ack_count);
ASSERT_NE (parser.status, rai::message_parser::parse_status::success);
}
Expand All @@ -98,11 +105,18 @@ TEST (message_parser, exact_confirm_req_size)
}
ASSERT_EQ (0, visitor.confirm_req_count);
ASSERT_EQ (parser.status, rai::message_parser::parse_status::success);
parser.deserialize_confirm_req (bytes.data (), bytes.size ());
auto error (false);
rai::bufferstream stream1 (bytes.data (), bytes.size ());
rai::message_header header1 (error, stream1);
ASSERT_FALSE (error);
parser.deserialize_confirm_req (stream1, header1);
ASSERT_EQ (1, visitor.confirm_req_count);
ASSERT_EQ (parser.status, rai::message_parser::parse_status::success);
bytes.push_back (0);
parser.deserialize_confirm_req (bytes.data (), bytes.size ());
rai::bufferstream stream2 (bytes.data (), bytes.size ());
rai::message_header header2 (error, stream2);
ASSERT_FALSE (error);
parser.deserialize_confirm_req (stream2, header2);
ASSERT_EQ (1, visitor.confirm_req_count);
ASSERT_NE (parser.status, rai::message_parser::parse_status::success);
}
Expand All @@ -121,11 +135,18 @@ TEST (message_parser, exact_publish_size)
}
ASSERT_EQ (0, visitor.publish_count);
ASSERT_EQ (parser.status, rai::message_parser::parse_status::success);
parser.deserialize_publish (bytes.data (), bytes.size ());
auto error (false);
rai::bufferstream stream1 (bytes.data (), bytes.size ());
rai::message_header header1 (error, stream1);
ASSERT_FALSE (error);
parser.deserialize_publish (stream1, header1);
ASSERT_EQ (1, visitor.publish_count);
ASSERT_EQ (parser.status, rai::message_parser::parse_status::success);
bytes.push_back (0);
parser.deserialize_publish (bytes.data (), bytes.size ());
rai::bufferstream stream2 (bytes.data (), bytes.size ());
rai::message_header header2 (error, stream2);
ASSERT_FALSE (error);
parser.deserialize_publish (stream2, header2);
ASSERT_EQ (1, visitor.publish_count);
ASSERT_NE (parser.status, rai::message_parser::parse_status::success);
}
Expand All @@ -143,11 +164,18 @@ TEST (message_parser, exact_keepalive_size)
}
ASSERT_EQ (0, visitor.keepalive_count);
ASSERT_EQ (parser.status, rai::message_parser::parse_status::success);
parser.deserialize_keepalive (bytes.data (), bytes.size ());
auto error (false);
rai::bufferstream stream1 (bytes.data (), bytes.size ());
rai::message_header header1 (error, stream1);
ASSERT_FALSE (error);
parser.deserialize_keepalive (stream1, header1);
ASSERT_EQ (1, visitor.keepalive_count);
ASSERT_EQ (parser.status, rai::message_parser::parse_status::success);
bytes.push_back (0);
parser.deserialize_keepalive (bytes.data (), bytes.size ());
rai::bufferstream stream2 (bytes.data (), bytes.size ());
rai::message_header header2 (error, stream2);
ASSERT_FALSE (error);
parser.deserialize_keepalive (stream2, header2);
ASSERT_EQ (1, visitor.keepalive_count);
ASSERT_NE (parser.status, rai::message_parser::parse_status::success);
}
43 changes: 17 additions & 26 deletions rai/core_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,7 @@ TEST (node, receive_gap)
ASSERT_EQ (0, node1.gap_cache.blocks.size ());
auto block (std::make_shared<rai::send_block> (5, 1, 2, rai::keypair ().prv, 4, 0));
node1.work_generate_blocking (*block);
rai::confirm_req message;
message.block = block;
rai::confirm_req message (block);
node1.process_message (message, node1.network.endpoint ());
node1.block_processor.flush ();
ASSERT_EQ (1, node1.gap_cache.blocks.size ());
Expand Down Expand Up @@ -745,13 +744,11 @@ TEST (node, fork_flip)
system.wallet (0)->insert_adhoc (rai::test_genesis_key.prv);
rai::keypair key1;
rai::genesis genesis;
std::unique_ptr<rai::send_block> send1 (new rai::send_block (genesis.hash (), key1.pub, rai::genesis_amount - 100, rai::test_genesis_key.prv, rai::test_genesis_key.pub, system.work.generate (genesis.hash ())));
rai::publish publish1;
publish1.block = std::move (send1);
auto send1 (std::make_shared<rai::send_block> (genesis.hash (), key1.pub, rai::genesis_amount - 100, rai::test_genesis_key.prv, rai::test_genesis_key.pub, system.work.generate (genesis.hash ())));
rai::publish publish1 (send1);
rai::keypair key2;
std::unique_ptr<rai::send_block> send2 (new rai::send_block (genesis.hash (), key2.pub, rai::genesis_amount - 100, rai::test_genesis_key.prv, rai::test_genesis_key.pub, system.work.generate (genesis.hash ())));
rai::publish publish2;
publish2.block = std::move (send2);
auto send2 (std::make_shared<rai::send_block> (genesis.hash (), key2.pub, rai::genesis_amount - 100, rai::test_genesis_key.prv, rai::test_genesis_key.pub, system.work.generate (genesis.hash ())));
rai::publish publish2 (send2);
node1.process_message (publish1, node1.network.endpoint ());
node1.block_processor.flush ();
node2.process_message (publish2, node1.network.endpoint ());
Expand Down Expand Up @@ -800,16 +797,13 @@ TEST (node, fork_multi_flip)
system.wallet (0)->insert_adhoc (rai::test_genesis_key.prv);
rai::keypair key1;
rai::genesis genesis;
std::unique_ptr<rai::send_block> send1 (new rai::send_block (genesis.hash (), key1.pub, rai::genesis_amount - 100, rai::test_genesis_key.prv, rai::test_genesis_key.pub, system.work.generate (genesis.hash ())));
rai::publish publish1;
publish1.block = std::move (send1);
auto send1 (std::make_shared<rai::send_block> (genesis.hash (), key1.pub, rai::genesis_amount - 100, rai::test_genesis_key.prv, rai::test_genesis_key.pub, system.work.generate (genesis.hash ())));
rai::publish publish1 (send1);
rai::keypair key2;
std::unique_ptr<rai::send_block> send2 (new rai::send_block (genesis.hash (), key2.pub, rai::genesis_amount - 100, rai::test_genesis_key.prv, rai::test_genesis_key.pub, system.work.generate (genesis.hash ())));
rai::publish publish2;
publish2.block = std::move (send2);
std::unique_ptr<rai::send_block> send3 (new rai::send_block (publish2.block->hash (), key2.pub, rai::genesis_amount - 100, rai::test_genesis_key.prv, rai::test_genesis_key.pub, system.work.generate (publish2.block->hash ())));
rai::publish publish3;
publish3.block = std::move (send3);
auto send2 (std::make_shared<rai::send_block> (genesis.hash (), key2.pub, rai::genesis_amount - 100, rai::test_genesis_key.prv, rai::test_genesis_key.pub, system.work.generate (genesis.hash ())));
rai::publish publish2 (send2);
auto send3 (std::make_shared<rai::send_block> (publish2.block->hash (), key2.pub, rai::genesis_amount - 100, rai::test_genesis_key.prv, rai::test_genesis_key.pub, system.work.generate (publish2.block->hash ())));
rai::publish publish3 (send3);
node1.process_message (publish1, node1.network.endpoint ());
node1.block_processor.flush ();
node2.process_message (publish2, node2.network.endpoint ());
Expand Down Expand Up @@ -906,19 +900,16 @@ TEST (node, fork_open)
system.wallet (0)->insert_adhoc (rai::test_genesis_key.prv);
rai::keypair key1;
rai::genesis genesis;
std::unique_ptr<rai::send_block> send1 (new rai::send_block (genesis.hash (), key1.pub, 0, rai::test_genesis_key.prv, rai::test_genesis_key.pub, system.work.generate (genesis.hash ())));
rai::publish publish1;
publish1.block = std::move (send1);
auto send1 (std::make_shared<rai::send_block> (genesis.hash (), key1.pub, 0, rai::test_genesis_key.prv, rai::test_genesis_key.pub, system.work.generate (genesis.hash ())));
rai::publish publish1 (send1);
node1.process_message (publish1, node1.network.endpoint ());
node1.block_processor.flush ();
std::unique_ptr<rai::open_block> open1 (new rai::open_block (publish1.block->hash (), 1, key1.pub, key1.prv, key1.pub, system.work.generate (key1.pub)));
rai::publish publish2;
publish2.block = std::move (open1);
auto open1 (std::make_shared<rai::open_block> (publish1.block->hash (), 1, key1.pub, key1.prv, key1.pub, system.work.generate (key1.pub)));
rai::publish publish2 (open1);
node1.process_message (publish2, node1.network.endpoint ());
node1.block_processor.flush ();
std::unique_ptr<rai::open_block> open2 (new rai::open_block (publish1.block->hash (), 2, key1.pub, key1.prv, key1.pub, system.work.generate (key1.pub)));
rai::publish publish3;
publish3.block = std::move (open2);
auto open2 (std::make_shared<rai::open_block> (publish1.block->hash (), 2, key1.pub, key1.prv, key1.pub, system.work.generate (key1.pub)));
rai::publish publish3 (open2);
ASSERT_EQ (2, node1.active.roots.size ());
node1.process_message (publish3, node1.network.endpoint ());
node1.block_processor.flush ();
Expand Down
Loading

0 comments on commit 2900590

Please sign in to comment.