Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Kademlia buckets to comms layer #2817

Closed

Conversation

stringhandler
Copy link
Collaborator

@stringhandler stringhandler commented Mar 30, 2021

Description

Added some Kademlia buckets to comms layer. I don't claim to have implemented pure Kademlia here, but this will split the network into buckets according to distance via the prefix of the node id.

There are quite a few smaller changes, that I will update and list here:

  • Added NodeId to the console wallet ui
  • Increased a failure on block sync from debug level to warn level. It was previously excluded from logs.
  • When syncing headers from a node, it will only provide headers that it has block bodies for.
  • Removed some expects
  • Renamed some methods for clarity
  • Removed HammingDistance
  • XorDistance inner type changed from byte array to u128

TODO: Get benchmarks of memorynet vs development branch

I have set the number of buckets currently as 4, meaning that for a node with node id starting with 10110....., all node ids starting with 0 will be in bucket 4, nodes starting with 11.... will be in bucket 3, nodes starting with 100.... will be in bucket 2, 1010.... in bucket 1, and all nodes starting with 1011..... will be in bucket 0 - considered it's neighbourhood. As the size of the network grows, the number of buckets should be increased.

Some terms that have changed:

  1. Definition of neighbourhood is now defined as nodes in the same bucket as we are. For num_buckets < 13^2, this will be the largest bucket

Motivation and Context

We had a number of issues where SAF messages would be sent and returned to the sending node, causing it to be repropagated in an infinite loop. By using the kademlia buckets and only propagating strictly once we are in the same bucket as we are, there is no chance of an infinite loop

How Has This Been Tested?

Manually tested and run many memorynet tests

@@ -612,6 +614,7 @@ impl CommandHandler {
peer.public_key,
conn.address(),
conn.direction(),
peer.node_id.distance(&node_id).get_bucket(4).2,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should use the config, I'm not sure if it is exposed all the way

@@ -70,7 +70,7 @@ appenders:

# Set the default logging level to "info"
root:
level: info
level: debug
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be rolled back

@@ -395,6 +399,40 @@ impl DhtActor {
.await?;
Ok(peers.into_iter().map(|p| p.peer_node_id().clone()).collect())
},

CloserOnly(destination_node_id) => {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the main addition that needs to be reviewed


let destination_distance = node_identity.node_id().distance(&destination_node_id);
let num_buckets = config.num_network_buckets;
let k = config.num_random_nodes;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This number might need to be chosen better

}
}
pub async fn wait_for_connectivity(&mut self, _timeout: Duration) -> Result<(), ConnectivityError> {
// TODO: re-enable this. It seems to fail
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method seems to fail due to a race condition

.read()
.await
.in_network_region(node_id, region_node_id, n)
Ok(node_id.distance(region_node_id).get_bucket(num_network_buckets).0 == NodeDistance::zero())
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: These methods can be removed from the service as well

Copy link
Contributor

@philipr-za philipr-za left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A fair amount of cleanup is needed in the branch, comments and what not.

So I think this approach works, it is a simplification of the Kademlia approach that requires more manual tuning than their approach but this will work but will need to be manually updated as conditions on the network change.

So just for clarity I want to present what my understanding is of Kademlia's k-bucket approach and then how I see your approach as a simplified version.

In Kademlia's k-buckets there are as many buckets as there are bits in the NodeID (160 in their paper because thats how many bits there are). The k is how many peers are maintained in a bucket, I will leave out the maintenance logic for now. The furthest bucket will contain all the nodes that have a distance betweeen 2^159 and 2^160 (half of the possible nodes in the space. The furthest half from this node). The next bucket will be nodes with a distance from 2^158 to 2^159 which is a quarter of all the possible nodes in the space. The 25% to 50% furthest nodes in the space from this node. In Kademlia k-buckets this continues all the way down until the last bucket which is 2^0 to 2^1 which is the single node whose ID only differs by the single LSB bit. Most are of course empty. They then keep k nodes in each bucket which means that you only have k nodes of the futhest 50% of all possible nodes, k nodes of next 25%, k nodes of next 12.5 %, k nodes of the next 6.25% of nodes and so on. This is nice because the resolution of the buckets gets denser the closer they get to you. In other words you have k peers for the bucket that spans 6.25% of the node space and k peers covering 50% of the furthest node space but the 6.25% space contains nodes much closer to you so you care about them more.

So when choosing nodes close to a target NodeID (tID) you figure out which of your buckets it would fall in and grab the best k nodes in that bucket. If the bucket is not full then you select from the surrounding buckets. The paper is very much not clear on how make this selection but whatevs.

I like this approach because it is deterministic and because XOR is a proper distance metric the nodes that are a similar distance away from you as the distance between you and tID will be closer to tID.

Now the method implemented here is similar but instead of extending all the way down the bit range you manually specify how many buckets there will be (lets go with 3) so you have a bucket for the furthest 50%, the next furthest 25% and what you call the "home" bucket for the closest 12.5% of nodes. This is essentially rolling up all the lower buckets into the "home" bucket. You then will choose X nodes from a given bucket when looking for the nodes to propagate a message to, which is similar to picking k nodes out of that bucket.

So your method has 2 parameters to tune, the number of buckets AND how many are selected from a bucket to propagate to where kademilia just chooses the size of k.

I think this method will work fine but I wonder if just extending it to the full bit range and adding some logic to select nodes in neighbouring buckets when a bucket is not full will make it more naturally adaptive?

);
return Ok(());
}
// if self.is_managed(conn.peer_node_id()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say either add a TODO on why this might come back or just cut it out

self.random_pool.push(node_id);
} else {
debug!(target: LOG_TARGET, "Removing peer '{}' from neighbouring pool", node_id);
let bucket_size = if bucket.2 == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a fan of the tuple. What is .0, .1 or .2? Think you should make a struct to name these fields as its hard to read.

@@ -1,6 +1,6 @@
#![cfg_attr(not(debug_assertions), deny(unused_variables))]
#![cfg_attr(not(debug_assertions), deny(unused_imports))]
#![cfg_attr(not(debug_assertions), deny(dead_code))]
// #![cfg_attr(not(debug_assertions), deny(dead_code))]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👀

@@ -0,0 +1,18 @@
Node 1: 80....
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a reminder to cleanup, if we want to keep it bang it into a md file in the docs folder?

Self(0)
}

/// Calculate the distance between two node ids using the Hamming distance.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Calculate the distance between two node ids using the Hamming distance.
/// Calculate the distance between two node ids using the XOR distance.

NODE_XOR_DISTANCE_ARRAY_SIZE
}

pub fn get_bucket(&self, num_buckets: u32) -> (XorDistance, XorDistance, u32) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I think you should make a return struct naming the output fields.

@stringhandler
Copy link
Collaborator Author

stringhandler commented Apr 16, 2021

Thanks for the detailed review @philipr-za.

Firstly, your description is correct. The main difference, as you have highlighted, is that when there are less than k nodes in a bucket, Kademlia chooses k closest nodes. It wasn't clear to me from the paper, but I assumed this includes nodes closer to and further from, the sender. While doing it this way would solve the manual process introduced in this PR, I don't think it's possible to achieve what I set out to achieve in this PR by doing so.

One of the problems (and the one I wanted to solve) we have with the current implementation in Tari comms is that messages are sent in loops because nodes propagate to nodes further away than the destination. For example, peer A sending it to peer B, so it sends it to peer C who is further away from peer B. Peer C then sends the message back to peer A and peer B. This is solved by 2 layers of deduplication, but they both have limits that can be overrun.

In the paper, the Kademlia nodes have a key difference to our network: Only the sending node repeats requests. So the sender calls out to 3 (can't remember the parameter name in the paper, perhaps alpha?) nodes in the destination bucket, which will get k results back. It then chooses from those 3 * k nodes and sends again. There is no propagation of messages on the nodes behalf. Another key difference is: who stores the SAF messages destined for a node in a particular bucket? It cannot be the 160th bucket, since it is unlikely to be filled. Ideally, you would want k - 1 other nodes to store the SAF messages destined for you. I will admit that there is possibly some clever thinking and improvement we can make in this area, but having a manually set number of buckets and home bucket solved this problem deterministically.

Tari's comms layer is different. Nodes propagate messages on behalf of other nodes. This, IMHO, opens Tari up to a much larger attack surface and also makes it harder to optimize. Changing from propagating messages to only the sender making messages seemed like a much larger and more disruptive exercise, so I began looking at ways to work within the propagation framework.

I believe the PR is the beginning of a compromise. My philosophy is to eliminate loops and unnecessary messages in propagation by only sending closer to the node at each stage. If I implemented the approach of Kademlia where there are not enough nodes in a bucket, to supplement it with nodes that are further away, this cannot be achieved. What we could possibly do is supplement the selection with only nodes that are closer to us than the destination bucket. This could possibly work. It does not solve the problem of who stores a SAF message though.

In terms of parameters to tune, these are my mappings to kademlia's parameters:

Param Kademlia PR Comment
Number of buckets 160 num_buckets (manually set as 3 for now) This is actually the number of bits used to create the buckets, and how many buckets a single node will store. The actual number of distinct "home" buckets is 2^num_buckets, resulting in 2^160 for kamedlia, and 2^3 = 8 buckets in the PR
Propagation factor alpha (suggested 3 in the paper) propagation_factor `propagation_factor used for backwards compatibility, as there are still some calls using other broadcast strategies
k k =20 num_random_nodes = 4for non-home buckets, num_neighbours = 8 for home bucket Because the home bucket could be much larger when the num_buckets value is stale, this is set to be larger. The same param could have been used for both. I also tried to keep these values similar to the current code so that I could benchmark against it. Using a large number like 20 for both would obviously result in better benchmarks due to many more nodes being connected

Perhaps oversimplifying, but the structure of the comms is basically this: In the inbound pipeline, it receives a message that is either for it (destination == me) or not. If it is for me, it is passes up the stack and processed in the domain layer. If not, it has a branch:
CASE 1: If the message is eligible for SAF (based on type of message) and is in our neighbourhood, store it and propagate it to a set of peers
CASE 2: If the message does not fit CASE 1, but has a destination, propagate it to a set of peers
CASE 3: If the message does not fit CASE 1 but has no destination, propagate it to a set of peers

In each case, it is propagated but the set of peers differs.

In the current approach, CASE 1 is propagated to the closest connected, but it can often happen that the closest connected node is further away than yourself, resulting in a loop.
In this PR, it is stored only in the neighbourhood of the home bucket, and then only propagated closer within the neighbourhood. I'll admit that this could be improved a bit, and could also result in non-delivery if the number of buckets we choose is too high.

In case 2, which I believe most of the flooding is happening on the development branch, this PR improves upon because it will send to an entirely different bucket and cannot return to itself.

In case 3, this PR has not changed the delivery strategy. I think this could probably be improved in a future PR.

There is also the case of the first node putting the message onto the network, such as wallets. I initially converted a few of those, but decided against it. It can be considered in a future PR.

# Conflicts:
#	applications/tari_base_node/src/bootstrap.rs
Copy link
Member

@sdbondi sdbondi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like many aspects of this technique (simplicity, performance of the algo compared to current, not a massive change). After the necessary code cleanup, I think we can merge, experiment and improve from there.

Some notes mostly for my understanding:

  1. Message 'loops'

We attempted using strictly closer to alleviate flooding before but due to other messaging/SAF reliability issues we erred on the side of more redundancy. Sending messages strictly closer does not require k-buckets.

Comparing this solution to an alternative that is possible without buckets:

For reference:

connections = connections

This code selects peers that are strictly closer from the pool of active connections. The same technique could be used with PeerQuery to select the closest n peers to the destination. Adding the sort by last connected is a nice/necessary addition to improve the chances of the message reaching an available peer. The active connections were used as that is the only guaranteed way to know that a peer is online at any given time. I suspect a strictly closer algo that uses the whole peer pool would be similar to selecting peers within a lower/upper bucketed boundary.

(a) No buckets: select all peers that are closer to the destination than I am, ordered by last connected
(b) With buckets: if the destination is within 0001.. and 0000.. select all peers that are closer to the destination than I am, ordered by last connected (could be 0 peers?). Otherwise, select peers within the range of the bucket ordered by last connected (could be 0 peers?).

EDIT: Not having peers to select from is a feature as we are only selecting strictly closer peers

Advantage (a) over (b):
There will always be peers available to select from

Advantage (b) over (a):
Speak directly to peers that are closer to the destination (reduce hops)

  1. SAF neighbourhood

Given current node N and peer X

The current solution uses if dist(N,X) < neighbour threshold where neighbour threshold = max distance of closest k nodes to N that are online/not banned etc. In other words, the neighbourhood criteria is dynamic - "widening" as peers go offline, narrowing as more peers enter the ID space.

(a) This leads to different neighbourhoods per node e.g node A may consider node B a neighbour of node C, but node B may not consider node C a neighbour, therefore not storing SAF messages for node C.

(b) Bucketed solution
If dist(N, X) < 0001... it is in the neighbourhood.

Advantage (a) over (b):
Potentially less unnecessary redundancy of stored messages (scalability)

Advantage (b) over (a):
Much simpler to reason about
Deterministic / can be calculated by any peer and answer will be the same - which should improve reliability of SAF

Reference links

k-buckets as described in the kademlia paper section 2.4 routing table, see also my DHT report.

Comment on lines 225 to 226
for (i, b) in buckets.iter().enumerate().skip(1) {
self.refresh_peer_bucket(i, b.0, b.1,self.config.num_random_nodes).await?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for (i, b) in buckets.iter().enumerate().skip(1) {
self.refresh_peer_bucket(i, b.0, b.1,self.config.num_random_nodes).await?;
for (i, (min, max)) in buckets.iter().enumerate().skip(1) {
self.refresh_peer_bucket(i, min, max,self.config.num_random_nodes).await?;

debug!(target: LOG_TARGET, "Sending Join message to closest peers");
debug!(
target: LOG_TARGET,
"[ThisNode={}] Sending Join message to closest peers",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update log message.

- renamed num_neighbour_nodes, num_random_nodes to num_nodes_in_home_bucket and num_nodes_in_other_bucket
- Changed CloserOnly to use propagation_factor instead of sending to all nodes in the bucket (k)
- changed SAF message to always use derived node id when sending
- renamed num_neighbour_nodes, num_random_nodes to num_nodes_in_home_bucket and num_nodes_in_other_bucket
- Changed CloserOnly to use propagation_factor instead of sending to all nodes in the bucket (k)
- changed SAF message to always use derived node id when sending
# Conflicts:
#	base_layer/core/src/base_node/sync/block_sync/synchronizer.rs
#	base_layer/core/src/chain_storage/blockchain_database.rs
#	base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs
#	base_layer/p2p/src/initialization.rs
#	comms/dht/examples/memory_net/utilities.rs
#	comms/dht/examples/memorynet.rs
#	comms/dht/src/broadcast_strategy.rs
#	comms/dht/src/config.rs
#	comms/dht/src/connectivity/mod.rs
#	comms/dht/src/network_discovery/test.rs
#	comms/dht/src/outbound/broadcast.rs
#	comms/src/peer_manager/manager.rs
#	comms/src/peer_manager/peer_storage.rs
#	docs/src/SUMMARY.md
#	integration_tests/helpers/baseNodeProcess.js
@stringhandler
Copy link
Collaborator Author

Closing, will reopen at some point in the future

aviator-app bot pushed a commit that referenced this pull request Dec 6, 2021
Description
---
- Removes peers that were previously offline; a connection was re-attempted and fails.
- Instead of attempting to replace a disconnected peer in the neighbouring and random peer pools, attempt to reconnect to the disconnected peer. If that fails, the peer is replaced.
- neighbouring peers ordered by last seen (based on @mikethetike 's PR #2817)
- Xor distance uses u128 (based on @mikethetike 's PR #2817)
- add cooldown if encountering many failed connection attempts from DHT pool peers
- add last_seen directly to peer (w/ migration)

Motivation and Context
---
By removing peers from the peer manager when offline, the number of peers stored on the network should gradually decrease.  This will only happen once all/most peers are upgraded.

How Has This Been Tested?
---
Memorynet
Manually, base node operates normally and peer list reduces (until peer sync pulls more in again from nodes without this change)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants