diff --git a/nano/core_test/CMakeLists.txt b/nano/core_test/CMakeLists.txt index a1dd3ffcf6..3d8418e2eb 100644 --- a/nano/core_test/CMakeLists.txt +++ b/nano/core_test/CMakeLists.txt @@ -23,6 +23,7 @@ add_executable (core_test message_parser.cpp memory_pool.cpp network.cpp + network_filter.cpp node.cpp node_telemetry.cpp processor_service.cpp diff --git a/nano/core_test/network_filter.cpp b/nano/core_test/network_filter.cpp new file mode 100644 index 0000000000..a0df312df0 --- /dev/null +++ b/nano/core_test/network_filter.cpp @@ -0,0 +1,111 @@ +#include +#include +#include +#include +#include + +#include + +TEST (network_filter, unit) +{ + nano::genesis genesis; + nano::network_filter filter (1); + auto one_block = [&genesis, &filter](std::shared_ptr const & block_a, bool expect_duplicate_a) { + nano::publish message (block_a); + auto bytes (message.to_bytes ()); + nano::bufferstream stream (bytes->data (), bytes->size ()); + + // First read the header + bool error{ false }; + nano::message_header header (error, stream); + ASSERT_FALSE (error); + + // This validates nano::message_header::size + ASSERT_EQ (bytes->size (), block_a->size (block_a->type ()) + header.size); + + // Now filter the rest of the stream + bool duplicate (filter.apply (bytes->data (), bytes->size () - header.size)); + ASSERT_EQ (expect_duplicate_a, duplicate); + + // Make sure the stream was rewinded correctly + auto block (nano::deserialize_block (stream, header.block_type ())); + ASSERT_NE (nullptr, block); + ASSERT_EQ (*block, *block_a); + }; + one_block (genesis.open, false); + for (int i = 0; i < 10; ++i) + { + one_block (genesis.open, true); + } + auto new_block (std::make_shared (nano::test_genesis_key.pub, genesis.open->hash (), nano::test_genesis_key.pub, nano::genesis_amount - 10 * nano::xrb_ratio, nano::public_key (), nano::test_genesis_key.prv, nano::test_genesis_key.pub, 0)); + one_block (new_block, false); + for (int i = 0; i < 10; ++i) + { + one_block (new_block, true); + } + for (int i = 0; i < 100; ++i) + { + one_block (genesis.open, false); + one_block (new_block, false); + } +} + +TEST (network_filter, many) +{ + nano::genesis genesis; + nano::network_filter filter (4); + nano::keypair key1; + for (int i = 0; i < 100; ++i) + { + auto block (std::make_shared (nano::test_genesis_key.pub, genesis.open->hash (), nano::test_genesis_key.pub, nano::genesis_amount - i * 10 * nano::xrb_ratio, key1.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, 0)); + + nano::publish message (block); + auto bytes (message.to_bytes ()); + nano::bufferstream stream (bytes->data (), bytes->size ()); + + // First read the header + bool error{ false }; + nano::message_header header (error, stream); + ASSERT_FALSE (error); + + // This validates nano::message_header::size + ASSERT_EQ (bytes->size (), block->size + header.size); + + // Now filter the rest of the stream + // All blocks should pass through + ASSERT_FALSE (filter.apply (bytes->data (), block->size)); + ASSERT_FALSE (error); + + // Make sure the stream was rewinded correctly + auto deserialized_block (nano::deserialize_block (stream, header.block_type ())); + ASSERT_NE (nullptr, deserialized_block); + ASSERT_EQ (*block, *deserialized_block); + } +} + +TEST (network_filter, clear) +{ + nano::network_filter filter (1); + std::vector bytes1{ 1, 2, 3 }; + std::vector bytes2{ 1 }; + ASSERT_FALSE (filter.apply (bytes1.data (), bytes1.size ())); + ASSERT_TRUE (filter.apply (bytes1.data (), bytes1.size ())); + filter.clear (bytes1.data (), bytes1.size ()); + ASSERT_FALSE (filter.apply (bytes1.data (), bytes1.size ())); + ASSERT_TRUE (filter.apply (bytes1.data (), bytes1.size ())); + filter.clear (bytes2.data (), bytes2.size ()); + ASSERT_TRUE (filter.apply (bytes1.data (), bytes1.size ())); + ASSERT_FALSE (filter.apply (bytes2.data (), bytes2.size ())); +} + +TEST (network_filter, optional_digest) +{ + nano::network_filter filter (1); + std::vector bytes1{ 1, 2, 3 }; + nano::uint128_t digest{ 0 }; + ASSERT_FALSE (filter.apply (bytes1.data (), bytes1.size (), &digest)); + ASSERT_NE (0, digest); + ASSERT_TRUE (filter.apply (bytes1.data (), bytes1.size ())); + filter.clear (digest); + ASSERT_FALSE (filter.apply (bytes1.data (), bytes1.size ())); +} diff --git a/nano/lib/CMakeLists.txt b/nano/lib/CMakeLists.txt index b8468a6bcf..ad1b531d01 100644 --- a/nano/lib/CMakeLists.txt +++ b/nano/lib/CMakeLists.txt @@ -50,6 +50,7 @@ add_library (nano_lib rpcconfig.cpp stats.hpp stats.cpp + stream.hpp threading.hpp threading.cpp timer.hpp diff --git a/nano/lib/blocks.hpp b/nano/lib/blocks.hpp index a05aa59406..d355ea9030 100644 --- a/nano/lib/blocks.hpp +++ b/nano/lib/blocks.hpp @@ -3,45 +3,15 @@ #include #include #include +#include #include #include -#include -#include #include namespace nano { -// We operate on streams of uint8_t by convention -using stream = std::basic_streambuf; -// Read a raw byte stream the size of `T' and fill value. Returns true if there was an error, false otherwise -template -bool try_read (nano::stream & stream_a, T & value) -{ - static_assert (std::is_standard_layout::value, "Can't stream read non-standard layout types"); - auto amount_read (stream_a.sgetn (reinterpret_cast (&value), sizeof (value))); - return amount_read != sizeof (value); -} -// A wrapper of try_read which throws if there is an error -template -void read (nano::stream & stream_a, T & value) -{ - auto error = try_read (stream_a, value); - if (error) - { - throw std::runtime_error ("Failed to read type"); - } -} - -template -void write (nano::stream & stream_a, T const & value) -{ - static_assert (std::is_standard_layout::value, "Can't stream write non-standard layout types"); - auto amount_written (stream_a.sputn (reinterpret_cast (&value), sizeof (value))); - (void)amount_written; - assert (amount_written == sizeof (value)); -} class block_visitor; enum class block_type : uint8_t { diff --git a/nano/lib/stream.hpp b/nano/lib/stream.hpp new file mode 100644 index 0000000000..190fc0f178 --- /dev/null +++ b/nano/lib/stream.hpp @@ -0,0 +1,38 @@ + +#pragma once + +#include +#include + +namespace nano +{ +// We operate on streams of uint8_t by convention +using stream = std::basic_streambuf; +// Read a raw byte stream the size of `T' and fill value. Returns true if there was an error, false otherwise +template +bool try_read (nano::stream & stream_a, T & value) +{ + static_assert (std::is_standard_layout::value, "Can't stream read non-standard layout types"); + auto amount_read (stream_a.sgetn (reinterpret_cast (&value), sizeof (value))); + return amount_read != sizeof (value); +} +// A wrapper of try_read which throws if there is an error +template +void read (nano::stream & stream_a, T & value) +{ + auto error = try_read (stream_a, value); + if (error) + { + throw std::runtime_error ("Failed to read type"); + } +} + +template +void write (nano::stream & stream_a, T const & value) +{ + static_assert (std::is_standard_layout::value, "Can't stream write non-standard layout types"); + auto amount_written (stream_a.sputn (reinterpret_cast (&value), sizeof (value))); + (void)amount_written; + assert (amount_written == sizeof (value)); +} +} diff --git a/nano/node/common.hpp b/nano/node/common.hpp index 44d037ba4c..4d3a3174c6 100644 --- a/nano/node/common.hpp +++ b/nano/node/common.hpp @@ -199,6 +199,7 @@ class message_header final uint8_t version_min; nano::message_type type; std::bitset<16> extensions; + static size_t constexpr size = sizeof (network_params::header_magic_number) + sizeof (version_max) + sizeof (version_using) + sizeof (version_min) + sizeof (type) + sizeof (/* extensions */ uint16_t); void flag_set (uint8_t); static uint8_t constexpr bulk_pull_count_present_flag = 0; diff --git a/nano/secure/CMakeLists.txt b/nano/secure/CMakeLists.txt index 87c8cc3407..cfe874160b 100644 --- a/nano/secure/CMakeLists.txt +++ b/nano/secure/CMakeLists.txt @@ -45,6 +45,8 @@ add_library (secure epoch.cpp ledger.hpp ledger.cpp + network_filter.hpp + network_filter.cpp utility.hpp utility.cpp versioning.hpp diff --git a/nano/secure/network_filter.cpp b/nano/secure/network_filter.cpp new file mode 100644 index 0000000000..572d2e57d1 --- /dev/null +++ b/nano/secure/network_filter.cpp @@ -0,0 +1,79 @@ +#include +#include +#include +#include +#include + +nano::network_filter::network_filter (size_t size_a) : +items (size_a, nano::uint128_t{ 0 }) +{ + nano::random_pool::generate_block (key, key.size ()); +} + +bool nano::network_filter::apply (uint8_t const * bytes_a, size_t count_a, nano::uint128_t * digest_a) +{ + // Get hash before locking + auto digest (hash (bytes_a, count_a)); + + nano::lock_guard lock (mutex); + auto & element (get_element (digest)); + bool existed (element == digest); + if (!existed) + { + // Replace likely old element with a new one + element = digest; + } + if (digest_a) + { + *digest_a = digest; + } + return existed; +} + +void nano::network_filter::clear (nano::uint128_t const & digest_a) +{ + nano::lock_guard lock (mutex); + auto & element (get_element (digest_a)); + if (element == digest_a) + { + element = nano::uint128_t{ 0 }; + } +} + +void nano::network_filter::clear (uint8_t const * bytes_a, size_t count_a) +{ + clear (hash (bytes_a, count_a)); +} + +template +void nano::network_filter::clear (OBJECT const & object_a) +{ + std::vector bytes; + { + nano::vectorstream stream (bytes); + object_a->serialize (stream); + } + clear (bytes.data (), bytes.size ()); +} + +void nano::network_filter::clear () +{ + nano::lock_guard lock (mutex); + items.assign (items.size (), nano::uint128_t{ 0 }); +} + +nano::uint128_t & nano::network_filter::get_element (nano::uint128_t const & hash_a) +{ + assert (!mutex.try_lock ()); + assert (items.size () > 0); + size_t index (hash_a % items.size ()); + return items[index]; +} + +nano::uint128_t nano::network_filter::hash (uint8_t const * bytes_a, size_t count_a) const +{ + nano::uint128_union digest{ 0 }; + siphash_t siphash (key, static_cast (key.size ())); + siphash.CalculateDigest (digest.bytes.data (), bytes_a, count_a); + return digest.number (); +} diff --git a/nano/secure/network_filter.hpp b/nano/secure/network_filter.hpp new file mode 100644 index 0000000000..60b99dbd08 --- /dev/null +++ b/nano/secure/network_filter.hpp @@ -0,0 +1,74 @@ + +#pragma once + +#include + +#include +#include + +#include + +namespace nano +{ +/** + * A probabilistic duplicate filter based on directed map caches, using SipHash 2/4/128 + * The probability of false negatives (unique packet marked as duplicate) is the probability of a 128-bit SipHash collision. + * The probability of false positives (duplicate packet marked as unique) shrinks with a larger filter. + * @note This class is thread-safe. + */ +class network_filter final +{ +public: + network_filter () = delete; + network_filter (size_t size_a); + /** + * Reads \p count_a bytes starting from \p bytes_a and inserts the siphash digest in the filter. + * @param \p digest_a if given, will be set to the resulting siphash digest + * @warning will read out of bounds if [ \p bytes_a, \p bytes_a + \p count_a ] is not a valid range + * @return a boolean representing the previous existence of the hash in the filter. + **/ + bool apply (uint8_t const * bytes_a, size_t count_a, nano::uint128_t * digest_a = nullptr); + + /** + * Sets the corresponding element in the filter to zero, if it matches \p digest_a exactly. + **/ + void clear (nano::uint128_t const & digest_a); + + /** + * Reads \p count_a bytes starting from \p bytes_a and digests the contents. + * Then, sets the corresponding element in the filter to zero, if it matches the digest exactly. + * @warning will read out of bounds if [ \p bytes_a, \p bytes_a + \p count_a ] is not a valid range + **/ + void clear (uint8_t const * bytes_a, size_t count_a); + + /** + * Serializes \p object_a and runs clears the resulting siphash digest. + * @return a boolean representing the previous existence of the hash in the filter. + **/ + template + void clear (OBJECT const & object_a); + + /** Sets every element of the filter to zero, keeping its size and capacity. */ + void clear (); + +private: + using siphash_t = CryptoPP::SipHash<2, 4, true>; + + /** + * Get element from digest. + * @note must have a lock on mutex + * @return a reference to the element with key \p hash_a + **/ + nano::uint128_t & get_element (nano::uint128_t const & hash_a); + + /** + * Hashes \p count_a bytes starting from \p bytes_a . + * @return the siphash digest of the contents in \p bytes_a . + **/ + nano::uint128_t hash (uint8_t const * bytes_a, size_t count_a) const; + + std::vector items; + CryptoPP::SecByteBlock key{ siphash_t::KEYLENGTH }; + std::mutex mutex; +}; +}