Skip to content

Commit

Permalink
Probabilistic network packet filter (#2543)
Browse files Browse the repository at this point in the history
* Extract stream.hpp from blocks.hpp

* Add a probabilistic filter based on direct mapped caches, keyed by 128-bit SipHash.

Co-Authored-By: Colin LeMahieu <clemahieu@gmail.com>

The filter receives an array of bytes representing a network packet, and is checked for duplicity of the packet. The probability of a false duplicate is marginal, but not zero, and decreases with the size of the filter. The probability of a false non-duplicate is the infinitesimal probability of a 128-bit SipHash collision.

Items are not normally erased. Instead, if a new item is different from the one at the insertion index (digest % capacity), the old item is replaced. There is also a function to erase an element from the filter, if the digest matches it.

The filter state is protected by a mutex, whereas hashing is performed while not holding it.

Uses 1MB of memory for every 64k elements.

* Remove explicit instantiations

* Remove alias

* Optionally set digest in ::apply and add ::clear method directly from a digest
  • Loading branch information
guilhermelawless committed Feb 11, 2020
1 parent f28580d commit dc3ad45
Show file tree
Hide file tree
Showing 9 changed files with 308 additions and 31 deletions.
1 change: 1 addition & 0 deletions nano/core_test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ add_executable (core_test
message_parser.cpp
memory_pool.cpp
network.cpp
network_filter.cpp
node.cpp
node_telemetry.cpp
processor_service.cpp
Expand Down
111 changes: 111 additions & 0 deletions nano/core_test/network_filter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
#include <nano/core_test/testutil.hpp>
#include <nano/node/common.hpp>
#include <nano/secure/buffer.hpp>
#include <nano/secure/common.hpp>
#include <nano/secure/network_filter.hpp>

#include <gtest/gtest.h>

TEST (network_filter, unit)
{
nano::genesis genesis;
nano::network_filter filter (1);
auto one_block = [&genesis, &filter](std::shared_ptr<nano::block> const & block_a, bool expect_duplicate_a) {
nano::publish message (block_a);
auto bytes (message.to_bytes ());
nano::bufferstream stream (bytes->data (), bytes->size ());

// First read the header
bool error{ false };
nano::message_header header (error, stream);
ASSERT_FALSE (error);

// This validates nano::message_header::size
ASSERT_EQ (bytes->size (), block_a->size (block_a->type ()) + header.size);

// Now filter the rest of the stream
bool duplicate (filter.apply (bytes->data (), bytes->size () - header.size));
ASSERT_EQ (expect_duplicate_a, duplicate);

// Make sure the stream was rewinded correctly
auto block (nano::deserialize_block (stream, header.block_type ()));
ASSERT_NE (nullptr, block);
ASSERT_EQ (*block, *block_a);
};
one_block (genesis.open, false);
for (int i = 0; i < 10; ++i)
{
one_block (genesis.open, true);
}
auto new_block (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, genesis.open->hash (), nano::test_genesis_key.pub, nano::genesis_amount - 10 * nano::xrb_ratio, nano::public_key (), nano::test_genesis_key.prv, nano::test_genesis_key.pub, 0));
one_block (new_block, false);
for (int i = 0; i < 10; ++i)
{
one_block (new_block, true);
}
for (int i = 0; i < 100; ++i)
{
one_block (genesis.open, false);
one_block (new_block, false);
}
}

TEST (network_filter, many)
{
nano::genesis genesis;
nano::network_filter filter (4);
nano::keypair key1;
for (int i = 0; i < 100; ++i)
{
auto block (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, genesis.open->hash (), nano::test_genesis_key.pub, nano::genesis_amount - i * 10 * nano::xrb_ratio, key1.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, 0));

nano::publish message (block);
auto bytes (message.to_bytes ());
nano::bufferstream stream (bytes->data (), bytes->size ());

// First read the header
bool error{ false };
nano::message_header header (error, stream);
ASSERT_FALSE (error);

// This validates nano::message_header::size
ASSERT_EQ (bytes->size (), block->size + header.size);

// Now filter the rest of the stream
// All blocks should pass through
ASSERT_FALSE (filter.apply (bytes->data (), block->size));
ASSERT_FALSE (error);

// Make sure the stream was rewinded correctly
auto deserialized_block (nano::deserialize_block (stream, header.block_type ()));
ASSERT_NE (nullptr, deserialized_block);
ASSERT_EQ (*block, *deserialized_block);
}
}

TEST (network_filter, clear)
{
nano::network_filter filter (1);
std::vector<uint8_t> bytes1{ 1, 2, 3 };
std::vector<uint8_t> bytes2{ 1 };
ASSERT_FALSE (filter.apply (bytes1.data (), bytes1.size ()));
ASSERT_TRUE (filter.apply (bytes1.data (), bytes1.size ()));
filter.clear (bytes1.data (), bytes1.size ());
ASSERT_FALSE (filter.apply (bytes1.data (), bytes1.size ()));
ASSERT_TRUE (filter.apply (bytes1.data (), bytes1.size ()));
filter.clear (bytes2.data (), bytes2.size ());
ASSERT_TRUE (filter.apply (bytes1.data (), bytes1.size ()));
ASSERT_FALSE (filter.apply (bytes2.data (), bytes2.size ()));
}

TEST (network_filter, optional_digest)
{
nano::network_filter filter (1);
std::vector<uint8_t> bytes1{ 1, 2, 3 };
nano::uint128_t digest{ 0 };
ASSERT_FALSE (filter.apply (bytes1.data (), bytes1.size (), &digest));
ASSERT_NE (0, digest);
ASSERT_TRUE (filter.apply (bytes1.data (), bytes1.size ()));
filter.clear (digest);
ASSERT_FALSE (filter.apply (bytes1.data (), bytes1.size ()));
}
1 change: 1 addition & 0 deletions nano/lib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ add_library (nano_lib
rpcconfig.cpp
stats.hpp
stats.cpp
stream.hpp
threading.hpp
threading.cpp
timer.hpp
Expand Down
32 changes: 1 addition & 31 deletions nano/lib/blocks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,45 +3,15 @@
#include <nano/crypto/blake2/blake2.h>
#include <nano/lib/errors.hpp>
#include <nano/lib/numbers.hpp>
#include <nano/lib/stream.hpp>
#include <nano/lib/utility.hpp>

#include <boost/property_tree/ptree_fwd.hpp>

#include <cassert>
#include <streambuf>
#include <unordered_map>

namespace nano
{
// We operate on streams of uint8_t by convention
using stream = std::basic_streambuf<uint8_t>;
// Read a raw byte stream the size of `T' and fill value. Returns true if there was an error, false otherwise
template <typename T>
bool try_read (nano::stream & stream_a, T & value)
{
static_assert (std::is_standard_layout<T>::value, "Can't stream read non-standard layout types");
auto amount_read (stream_a.sgetn (reinterpret_cast<uint8_t *> (&value), sizeof (value)));
return amount_read != sizeof (value);
}
// A wrapper of try_read which throws if there is an error
template <typename T>
void read (nano::stream & stream_a, T & value)
{
auto error = try_read (stream_a, value);
if (error)
{
throw std::runtime_error ("Failed to read type");
}
}

template <typename T>
void write (nano::stream & stream_a, T const & value)
{
static_assert (std::is_standard_layout<T>::value, "Can't stream write non-standard layout types");
auto amount_written (stream_a.sputn (reinterpret_cast<uint8_t const *> (&value), sizeof (value)));
(void)amount_written;
assert (amount_written == sizeof (value));
}
class block_visitor;
enum class block_type : uint8_t
{
Expand Down
38 changes: 38 additions & 0 deletions nano/lib/stream.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@

#pragma once

#include <cassert>
#include <streambuf>

namespace nano
{
// We operate on streams of uint8_t by convention
using stream = std::basic_streambuf<uint8_t>;
// Read a raw byte stream the size of `T' and fill value. Returns true if there was an error, false otherwise
template <typename T>
bool try_read (nano::stream & stream_a, T & value)
{
static_assert (std::is_standard_layout<T>::value, "Can't stream read non-standard layout types");
auto amount_read (stream_a.sgetn (reinterpret_cast<uint8_t *> (&value), sizeof (value)));
return amount_read != sizeof (value);
}
// A wrapper of try_read which throws if there is an error
template <typename T>
void read (nano::stream & stream_a, T & value)
{
auto error = try_read (stream_a, value);
if (error)
{
throw std::runtime_error ("Failed to read type");
}
}

template <typename T>
void write (nano::stream & stream_a, T const & value)
{
static_assert (std::is_standard_layout<T>::value, "Can't stream write non-standard layout types");
auto amount_written (stream_a.sputn (reinterpret_cast<uint8_t const *> (&value), sizeof (value)));
(void)amount_written;
assert (amount_written == sizeof (value));
}
}
1 change: 1 addition & 0 deletions nano/node/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ class message_header final
uint8_t version_min;
nano::message_type type;
std::bitset<16> extensions;
static size_t constexpr size = sizeof (network_params::header_magic_number) + sizeof (version_max) + sizeof (version_using) + sizeof (version_min) + sizeof (type) + sizeof (/* extensions */ uint16_t);

void flag_set (uint8_t);
static uint8_t constexpr bulk_pull_count_present_flag = 0;
Expand Down
2 changes: 2 additions & 0 deletions nano/secure/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ add_library (secure
epoch.cpp
ledger.hpp
ledger.cpp
network_filter.hpp
network_filter.cpp
utility.hpp
utility.cpp
versioning.hpp
Expand Down
79 changes: 79 additions & 0 deletions nano/secure/network_filter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
#include <nano/crypto_lib/random_pool.hpp>
#include <nano/lib/locks.hpp>
#include <nano/secure/buffer.hpp>
#include <nano/secure/common.hpp>
#include <nano/secure/network_filter.hpp>

nano::network_filter::network_filter (size_t size_a) :
items (size_a, nano::uint128_t{ 0 })
{
nano::random_pool::generate_block (key, key.size ());
}

bool nano::network_filter::apply (uint8_t const * bytes_a, size_t count_a, nano::uint128_t * digest_a)
{
// Get hash before locking
auto digest (hash (bytes_a, count_a));

nano::lock_guard<std::mutex> lock (mutex);
auto & element (get_element (digest));
bool existed (element == digest);
if (!existed)
{
// Replace likely old element with a new one
element = digest;
}
if (digest_a)
{
*digest_a = digest;
}
return existed;
}

void nano::network_filter::clear (nano::uint128_t const & digest_a)
{
nano::lock_guard<std::mutex> lock (mutex);
auto & element (get_element (digest_a));
if (element == digest_a)
{
element = nano::uint128_t{ 0 };
}
}

void nano::network_filter::clear (uint8_t const * bytes_a, size_t count_a)
{
clear (hash (bytes_a, count_a));
}

template <typename OBJECT>
void nano::network_filter::clear (OBJECT const & object_a)
{
std::vector<uint8_t> bytes;
{
nano::vectorstream stream (bytes);
object_a->serialize (stream);
}
clear (bytes.data (), bytes.size ());
}

void nano::network_filter::clear ()
{
nano::lock_guard<std::mutex> lock (mutex);
items.assign (items.size (), nano::uint128_t{ 0 });
}

nano::uint128_t & nano::network_filter::get_element (nano::uint128_t const & hash_a)
{
assert (!mutex.try_lock ());
assert (items.size () > 0);
size_t index (hash_a % items.size ());
return items[index];
}

nano::uint128_t nano::network_filter::hash (uint8_t const * bytes_a, size_t count_a) const
{
nano::uint128_union digest{ 0 };
siphash_t siphash (key, static_cast<unsigned int> (key.size ()));
siphash.CalculateDigest (digest.bytes.data (), bytes_a, count_a);
return digest.number ();
}
74 changes: 74 additions & 0 deletions nano/secure/network_filter.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@

#pragma once

#include <nano/lib/numbers.hpp>

#include <crypto/cryptopp/seckey.h>
#include <crypto/cryptopp/siphash.h>

#include <mutex>

namespace nano
{
/**
* A probabilistic duplicate filter based on directed map caches, using SipHash 2/4/128
* The probability of false negatives (unique packet marked as duplicate) is the probability of a 128-bit SipHash collision.
* The probability of false positives (duplicate packet marked as unique) shrinks with a larger filter.
* @note This class is thread-safe.
*/
class network_filter final
{
public:
network_filter () = delete;
network_filter (size_t size_a);
/**
* Reads \p count_a bytes starting from \p bytes_a and inserts the siphash digest in the filter.
* @param \p digest_a if given, will be set to the resulting siphash digest
* @warning will read out of bounds if [ \p bytes_a, \p bytes_a + \p count_a ] is not a valid range
* @return a boolean representing the previous existence of the hash in the filter.
**/
bool apply (uint8_t const * bytes_a, size_t count_a, nano::uint128_t * digest_a = nullptr);

/**
* Sets the corresponding element in the filter to zero, if it matches \p digest_a exactly.
**/
void clear (nano::uint128_t const & digest_a);

/**
* Reads \p count_a bytes starting from \p bytes_a and digests the contents.
* Then, sets the corresponding element in the filter to zero, if it matches the digest exactly.
* @warning will read out of bounds if [ \p bytes_a, \p bytes_a + \p count_a ] is not a valid range
**/
void clear (uint8_t const * bytes_a, size_t count_a);

/**
* Serializes \p object_a and runs clears the resulting siphash digest.
* @return a boolean representing the previous existence of the hash in the filter.
**/
template <typename OBJECT>
void clear (OBJECT const & object_a);

/** Sets every element of the filter to zero, keeping its size and capacity. */
void clear ();

private:
using siphash_t = CryptoPP::SipHash<2, 4, true>;

/**
* Get element from digest.
* @note must have a lock on mutex
* @return a reference to the element with key \p hash_a
**/
nano::uint128_t & get_element (nano::uint128_t const & hash_a);

/**
* Hashes \p count_a bytes starting from \p bytes_a .
* @return the siphash digest of the contents in \p bytes_a .
**/
nano::uint128_t hash (uint8_t const * bytes_a, size_t count_a) const;

std::vector<nano::uint128_t> items;
CryptoPP::SecByteBlock key{ siphash_t::KEYLENGTH };
std::mutex mutex;
};
}

0 comments on commit dc3ad45

Please sign in to comment.