Skip to content

Commit

Permalink
Merge e7c3573 into 05b537f
Browse files Browse the repository at this point in the history
  • Loading branch information
limebell committed Apr 8, 2021
2 parents 05b537f + e7c3573 commit b306792
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 10 deletions.
5 changes: 5 additions & 0 deletions CHANGES.md
Expand Up @@ -23,11 +23,16 @@ To be released.

### Bug fixes

- Fixed a bug where `KademliaProtocol.BootstrapAsync()` has sent multiple
`Ping` messages to other peers. [[#1219]]
- Fixed a bug where `KademliaProtocol.CheckReplacementCacheAsync()` has
updated cached peers multiple times. [[#1219]]
- Fixed memory leak due to undisposed `CancellationTokenRegistration`s.
[[#1228]]

### CLI tools

[#1219]: https://github.com/planetarium/libplanet/pull/1219
[#1228]: https://github.com/planetarium/libplanet/pull/1218


Expand Down
42 changes: 32 additions & 10 deletions Libplanet/Net/Protocols/KademliaProtocol.cs
Expand Up @@ -76,6 +76,7 @@ public class KademliaProtocol : IProtocol

var findPeerTasks = new List<Task>();
var history = new ConcurrentBag<BoundPeer>();
var dialHistory = new ConcurrentBag<BoundPeer>();

foreach (BoundPeer peer in bootstrapPeers.Where(peer => !peer.Address.Equals(_address)))
{
Expand All @@ -86,6 +87,7 @@ public class KademliaProtocol : IProtocol
findPeerTasks.Add(
FindPeerAsync(
history,
dialHistory,
_address,
peer,
depth,
Expand Down Expand Up @@ -252,11 +254,14 @@ public async Task RebuildConnectionAsync(int depth, CancellationToken cancellati
_logger.Verbose("Rebuilding connection...");
var buffer = new byte[20];
var tasks = new List<Task>();
var history = new ConcurrentBag<BoundPeer>();
var dialHistory = new ConcurrentBag<BoundPeer>();
for (int i = 0; i < _findConcurrency; i++)
{
_random.NextBytes(buffer);
tasks.Add(FindPeerAsync(
new ConcurrentBag<BoundPeer>(),
history,
dialHistory,
new Address(buffer),
null,
depth,
Expand All @@ -266,7 +271,8 @@ public async Task RebuildConnectionAsync(int depth, CancellationToken cancellati

tasks.Add(
FindPeerAsync(
new ConcurrentBag<BoundPeer>(),
history,
dialHistory,
_address,
null,
depth,
Expand All @@ -293,16 +299,14 @@ public async Task CheckReplacementCacheAsync(CancellationToken cancellationToken
{
_logger.Verbose("Check peer {Peer}.", replacement);

await PingAsync(replacement, _requestTimeout, cancellationToken);
_table.RemoveCache(replacement);
Update(replacement);
await PingAsync(replacement, _requestTimeout, cancellationToken);
}
catch (PingTimeoutException)
{
_logger.Verbose(
"Remove stale peer {Peer} from replacement cache.",
"Removed stale peer {Peer} from replacement cache.",
replacement);
_table.RemoveCache(replacement);
}
}
}
Expand Down Expand Up @@ -569,6 +573,7 @@ private void RemovePeer(BoundPeer peer)
/// to find <see cref="Peer"/>s near <paramref name="target"/>.
/// </summary>
/// <param name="history">The <see cref="Peer"/> that searched.</param>
/// <param name="dialHistory">The <see cref="Peer"/> that ping was sent.</param>
/// <param name="target">The <see cref="Address"/> to find.</param>
/// <param name="viaPeer">The target <see cref="Peer"/> to send <see cref="FindNeighbors"/>
/// message. If null, selects 3 <see cref="Peer"/>s from <see cref="RoutingTable"/> of
Expand All @@ -581,6 +586,7 @@ private void RemovePeer(BoundPeer peer)
/// <returns>An awaitable task without value.</returns>
private async Task FindPeerAsync(
ConcurrentBag<BoundPeer> history,
ConcurrentBag<BoundPeer> dialHistory,
Address target,
BoundPeer viaPeer,
int depth,
Expand Down Expand Up @@ -613,7 +619,14 @@ private void RemovePeer(BoundPeer peer)
// target. But our implementation contains target itself for FindSpecificPeerAsync(),
// so it should be excluded in here.
found = found.Where(peer => !peer.Address.Equals(target));
await ProcessFoundAsync(history, found, target, depth, timeout, cancellationToken);
await ProcessFoundAsync(
history,
dialHistory,
found,
target,
depth,
timeout,
cancellationToken);
}

private async Task<IEnumerable<BoundPeer>> QueryNeighborsAsync(
Expand Down Expand Up @@ -695,6 +708,7 @@ private void ReceivePing(Ping ping)
/// request.
/// </summary>
/// <param name="history"><see cref="Peer"/>s that already searched.</param>
/// <param name="dialHistory"><see cref="Peer"/>s that ping sent.</param>
/// <param name="found"><see cref="Peer"/>s that found.</param>
/// <param name="target">The target <see cref="Address"/> to search.</param>
/// <param name="depth">Target depth of recursive operation. If -1 is given,
Expand All @@ -708,6 +722,7 @@ private void ReceivePing(Ping ping)
/// not online.</exception>
private async Task ProcessFoundAsync(
ConcurrentBag<BoundPeer> history,
ConcurrentBag<BoundPeer> dialHistory,
IEnumerable<BoundPeer> found,
Address target,
int depth,
Expand All @@ -731,9 +746,15 @@ private void ReceivePing(Ping ping)
List<BoundPeer> closestCandidate =
_table.Neighbors(target, _table.BucketSize, false).ToList();

Task[] awaitables = peers.Select(peer =>
PingAsync(peer, _requestTimeout, cancellationToken)
).ToArray();
Task[] awaitables = peers
.Where(peer => !dialHistory.Contains(peer))
.Select(
peer =>
{
dialHistory.Add(peer);
return PingAsync(peer, _requestTimeout, cancellationToken);
}
).ToArray();
try
{
await Task.WhenAll(awaitables);
Expand Down Expand Up @@ -790,6 +811,7 @@ private void ReceivePing(Ping ping)

findPeerTasks.Add(FindPeerAsync(
history,
dialHistory,
target,
peer,
depth == -1 ? depth : depth - 1,
Expand Down

0 comments on commit b306792

Please sign in to comment.