Skip to content

Commit

Permalink
Merge pull request #964 from longfin/feature/relaunch-transport
Browse files Browse the repository at this point in the history
Add Swarm<T>.LastMessageTimestamp
  • Loading branch information
longfin committed Aug 26, 2020
2 parents 742b0c7 + 6a6b97c commit 8e8af7f
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 3 deletions.
2 changes: 2 additions & 0 deletions CHANGES.md
Expand Up @@ -144,6 +144,7 @@ To be released.
- Added `SwarmOptions` class. [[#926]]
- Added `PeerChainState` struct. [[#936]]
- Added `Swarm<T>.GetPeerChainStateAsync()` method. [[#936]]
- Added `Swarm<T>.LastMessageTimestamp` property. [[#964]]
- Added `Block<T>.EvaluationDigest` property. [[#931], [#935]]
- Added `BlockHeader.EvaluationDigest` property. [[#931], [#935]]
- Added `Block<T>.PreEvaluationHash` property. [[#931], [#935]]
Expand Down Expand Up @@ -285,6 +286,7 @@ To be released.
[#959]: https://github.com/planetarium/libplanet/issues/959
[#954]: https://github.com/planetarium/libplanet/pull/954
[#963]: https://github.com/planetarium/libplanet/pull/963
[#964]: https://github.com/planetarium/libplanet/pull/964
[sleep mode]: https://en.wikipedia.org/wiki/Sleep_mode


Expand Down
6 changes: 5 additions & 1 deletion Libplanet.Tests/Net/Protocols/TestTransport.cs
Expand Up @@ -80,6 +80,8 @@ internal class TestTransport : ITransport

public IEnumerable<BoundPeer> Peers => Protocol.Peers;

public DateTimeOffset? LastMessageTimestamp { get; private set; }

internal ConcurrentBag<Message> ReceivedMessages { get; }

internal IProtocol Protocol { get; }
Expand Down Expand Up @@ -241,7 +243,7 @@ public void SendPing(Peer target, TimeSpan? timeSpan = null)

Task.Run(() =>
{
(Protocol as KademliaProtocol).PingAsync(
_ = (Protocol as KademliaProtocol).PingAsync(
boundPeer,
timeSpan,
default(CancellationToken));
Expand Down Expand Up @@ -341,6 +343,7 @@ public void BroadcastMessage(Address? except, Message message)
"Received reply {Reply} of message with identity {identity}.",
reply,
message.Identity);
LastMessageTimestamp = DateTimeOffset.UtcNow;
ReceivedMessages.Add(reply);
Protocol.ReceiveMessage(reply);
MessageReceived.Set();
Expand Down Expand Up @@ -454,6 +457,7 @@ private void ReceiveMessage(Message message)
});
}

LastMessageTimestamp = DateTimeOffset.UtcNow;
ReceivedMessages.Add(message);
Protocol.ReceiveMessage(message);
MessageReceived.Set();
Expand Down
30 changes: 30 additions & 0 deletions Libplanet.Tests/Net/SwarmTest.cs
Expand Up @@ -2080,6 +2080,36 @@ public async Task GetPeerChainStateAsync()
}
}

[Fact(Timeout = Timeout)]
public async Task LastMessageTimestamp()
{
Swarm<DumbAction> swarm1 = _swarms[0];
Swarm<DumbAction> swarm2 = _swarms[1];

Assert.Null(swarm1.LastMessageTimestamp);

try
{
await StartAsync(swarm1);
Assert.Null(swarm1.LastMessageTimestamp);
DateTimeOffset bootstrappedAt = DateTimeOffset.UtcNow;
await BootstrapAsync(swarm2, swarm1.AsPeer);
await StartAsync(swarm2);

Assert.NotNull(swarm1.LastMessageTimestamp);
Assert.InRange(
swarm1.LastMessageTimestamp.Value,
bootstrappedAt,
DateTimeOffset.UtcNow
);
}
finally
{
await StopAsync(swarm1);
await StopAsync(swarm2);
}
}

private async Task<Task> StartAsync<T>(
Swarm<T> swarm,
IImmutableSet<Address> trustedStateValidators = null,
Expand Down
2 changes: 2 additions & 0 deletions Libplanet/Net/ITransport.cs
Expand Up @@ -12,6 +12,8 @@ internal interface ITransport : IDisposable

IEnumerable<BoundPeer> Peers { get; }

DateTimeOffset? LastMessageTimestamp { get; }

Task StartAsync(CancellationToken cancellationToken = default(CancellationToken));

Task RunAsync(CancellationToken cancellationToken = default(CancellationToken));
Expand Down
3 changes: 3 additions & 0 deletions Libplanet/Net/NetMQTransport.cs
Expand Up @@ -180,6 +180,8 @@ internal class NetMQTransport : ITransport

public IEnumerable<BoundPeer> Peers => Protocol.Peers;

public DateTimeOffset? LastMessageTimestamp { get; private set; }

/// <summary>
/// Whether this <see cref="NetMQTransport"/> instance is running.
/// </summary>
Expand Down Expand Up @@ -582,6 +584,7 @@ private void ReceiveMessage(object sender, NetMQSocketEventArgs e)
_differentAppProtocolVersionEncountered);
_logger.Debug("A message has parsed: {0}, from {1}", message, message.Remote);
MessageHistory.Enqueue(message);
LastMessageTimestamp = DateTimeOffset.UtcNow;

try
{
Expand Down
8 changes: 6 additions & 2 deletions Libplanet/Net/Swarm.cs
Expand Up @@ -129,7 +129,6 @@ static Swarm()

DateTimeOffset now = createdAt.GetValueOrDefault(
DateTimeOffset.UtcNow);
LastReceived = now;
TxReceived = new AsyncAutoResetEvent();
BlockHeaderReceived = new AsyncAutoResetEvent();
BlockAppended = new AsyncAutoResetEvent();
Expand Down Expand Up @@ -182,7 +181,12 @@ static Swarm()

public Peer AsPeer => Transport.AsPeer;

public DateTimeOffset LastReceived { get; private set; }
/// <summary>
/// The last time when any message was arrived.
/// It can be <c>null</c> if no message has been arrived yet.
/// </summary>
public DateTimeOffset? LastMessageTimestamp =>
Running ? Transport.LastMessageTimestamp : (DateTimeOffset?)null;

public IDictionary<Peer, DateTimeOffset> LastSeenTimestamps
{
Expand Down

0 comments on commit 8e8af7f

Please sign in to comment.