Skip to content

Commit

Permalink
Merge pull request #1228 from limebell/chore/dispose-register
Browse files Browse the repository at this point in the history
Dispose `CancellationTokenRegistration`s
  • Loading branch information
limebell committed Apr 5, 2021
2 parents 8515e82 + 7868c4c commit 05b537f
Show file tree
Hide file tree
Showing 14 changed files with 66 additions and 33 deletions.
11 changes: 10 additions & 1 deletion CHANGES.md
Expand Up @@ -8,18 +8,28 @@ To be released.

### Backward-incompatible API changes

- Removed `StunMessage.Parse(Stream)` method. [[#1228]]

### Backward-incompatible network protocol changes

### Backward-incompatible storage format changes

### Added APIs

- Added `StunMessage.ParseAsync(Stream, CancellationToken)` method.
[[#1228]]

### Behavioral changes

### Bug fixes

- Fixed memory leak due to undisposed `CancellationTokenRegistration`s.
[[#1228]]

### CLI tools

[#1228]: https://github.com/planetarium/libplanet/pull/1218


Version 0.11.0
--------------
Expand Down Expand Up @@ -319,7 +329,6 @@ Released on March 30, 2021.
[#1218]: https://github.com/planetarium/libplanet/pull/1218



Version 0.10.3
--------------

Expand Down
Expand Up @@ -27,7 +27,7 @@ public async Task ParseBytes()
using (var stream = new MemoryStream(bytes))
{
var response =
(AllocateErrorResponse)await StunMessage.Parse(stream);
(AllocateErrorResponse)await StunMessage.ParseAsync(stream);
Assert.Equal(
new byte[]
{
Expand Down
Expand Up @@ -28,7 +28,7 @@ public async Task ParseBytes()
using (var stream = new MemoryStream(bytes))
{
var response =
(AllocateSuccessResponse)await StunMessage.Parse(stream);
(AllocateSuccessResponse)await StunMessage.ParseAsync(stream);
Assert.Equal(
new byte[]
{
Expand Down
Expand Up @@ -22,7 +22,7 @@ public async Task ParseBytes()
using (var stream = new MemoryStream(bytes))
{
var response =
(BindingSuccessResponse)await StunMessage.Parse(stream);
(BindingSuccessResponse)await StunMessage.ParseAsync(stream);
Assert.Equal(
new IPEndPoint(IPAddress.Parse("211.176.129.90"), 54141),
response.MappedAddress);
Expand Down
Expand Up @@ -23,7 +23,7 @@ public async Task ParseBytes()
using (var stream = new MemoryStream(bytes))
{
var response =
(ConnectionAttempt)await StunMessage.Parse(stream);
(ConnectionAttempt)await StunMessage.ParseAsync(stream);
Assert.Equal(
new byte[] { 0x21, 0x5a, 0x86, 0x01 },
response.ConnectionId);
Expand Down
Expand Up @@ -22,7 +22,7 @@ public async Task ParseBytes()

using (var stream = new MemoryStream(bytes))
{
var response = await StunMessage.Parse(stream);
var response = await StunMessage.ParseAsync(stream);
Assert.Equal(
new byte[]
{
Expand Down
Expand Up @@ -22,7 +22,7 @@ public async Task ParseBytes()

using (var stream = new MemoryStream(bytes))
{
var response = await StunMessage.Parse(stream);
var response = await StunMessage.ParseAsync(stream);
Assert.Equal(
new byte[]
{
Expand Down
Expand Up @@ -23,7 +23,7 @@ public async Task ParseBytes()

using (var stream = new MemoryStream(bytes))
{
var response = (RefreshErrorResponse)await StunMessage.Parse(stream);
var response = (RefreshErrorResponse)await StunMessage.ParseAsync(stream);
Assert.Equal(
new byte[]
{
Expand Down
Expand Up @@ -24,7 +24,7 @@ public async Task ParseBytes()
using (var stream = new MemoryStream(bytes))
{
var response =
(RefreshSuccessResponse)await StunMessage.Parse(stream);
(RefreshSuccessResponse)await StunMessage.ParseAsync(stream);
Assert.Equal(
new byte[]
{
Expand Down
13 changes: 10 additions & 3 deletions Libplanet.Stun/Stun/Messages/StunMessage.cs
Expand Up @@ -2,6 +2,7 @@
using System.IO;
using System.Linq;
using System.Security.Cryptography;
using System.Threading;
using System.Threading.Tasks;
using Libplanet.Stun.Attributes;

Expand Down Expand Up @@ -80,13 +81,19 @@ public enum MessageMethod : ushort
/// Parses <see cref="StunMessage"/> from <paramref name="stream"/>.
/// </summary>
/// <param name="stream">A view of a sequence of STUN packet's bytes.</param>
/// <param name="cancellationToken">
/// A cancellation token used to propagate notification that this
/// operation should be canceled.
/// </param>
/// <returns>A <see cref="StunMessage"/> derived on
/// bytes read from <paramref name="stream"/>.
/// </returns>
public static async Task<StunMessage> Parse(Stream stream)
public static async Task<StunMessage> ParseAsync(
Stream stream,
CancellationToken cancellationToken = default(CancellationToken))
{
var header = new byte[20];
await stream.ReadAsync(header, 0, 20);
await stream.ReadAsync(header, 0, 20, cancellationToken);

MessageMethod method = ParseMethod(header[0], header[1]);
MessageClass @class = ParseClass(header[0], header[1]);
Expand All @@ -98,7 +105,7 @@ public static async Task<StunMessage> Parse(Stream stream)
System.Array.Copy(header, 8, transactionId, 0, 12);

var body = new byte[length.ToUShort()];
await stream.ReadAsync(body, 0, body.Length);
await stream.ReadAsync(body, 0, body.Length, cancellationToken);
IEnumerable<Attribute> attributes = ParseAttributes(
body,
transactionId
Expand Down
50 changes: 33 additions & 17 deletions Libplanet.Stun/Stun/TurnClient.cs
Expand Up @@ -135,7 +135,7 @@ public async Task ReconnectTurn(int listenPort, CancellationToken cancellationTo
{
var request = new AllocateRequest((int)lifetime.TotalSeconds);
await SendMessageAsync(stream, request, cancellationToken);
response = await ReceiveMessage(request.TransactionId);
response = await ReceiveMessageAsync(request.TransactionId, cancellationToken);

if (response is AllocateErrorResponse allocError)
{
Expand Down Expand Up @@ -164,7 +164,9 @@ public async Task ReconnectTurn(int listenPort, CancellationToken cancellationTo
NetworkStream stream = _control.GetStream();
var request = new CreatePermissionRequest(peerAddress);
await SendMessageAsync(stream, request, cancellationToken);
StunMessage response = await ReceiveMessage(request.TransactionId);
StunMessage response = await ReceiveMessageAsync(
request.TransactionId,
cancellationToken);

if (response is CreatePermissionErrorResponse)
{
Expand All @@ -191,7 +193,8 @@ public async Task ReconnectTurn(int listenPort, CancellationToken cancellationTo
try
{
await SendMessageAsync(relayedStream, bindRequest, cancellationToken);
StunMessage bindResponse = await StunMessage.Parse(relayedStream);
StunMessage bindResponse =
await StunMessage.ParseAsync(relayedStream, cancellationToken);

if (bindResponse is ConnectionBindSuccessResponse)
{
Expand All @@ -216,7 +219,9 @@ public async Task ReconnectTurn(int listenPort, CancellationToken cancellationTo
NetworkStream stream = _control.GetStream();
var request = new BindingRequest();
await SendMessageAsync(stream, request, cancellationToken);
StunMessage response = await ReceiveMessage(request.TransactionId);
StunMessage response = await ReceiveMessageAsync(
request.TransactionId,
cancellationToken);

if (response is BindingSuccessResponse success)
{
Expand All @@ -237,7 +242,9 @@ public async Task ReconnectTurn(int listenPort, CancellationToken cancellationTo
var request = new RefreshRequest((int)lifetime.TotalSeconds);
await SendMessageAsync(stream, request, cancellationToken);

StunMessage response = await ReceiveMessage(request.TransactionId);
StunMessage response = await ReceiveMessageAsync(
request.TransactionId,
cancellationToken);
if (response is RefreshSuccessResponse success)
{
return TimeSpan.FromSeconds(success.Lifetime);
Expand Down Expand Up @@ -275,7 +282,7 @@ public async Task ReconnectTurn(int listenPort, CancellationToken cancellationTo
var request = new BindingRequest();
var asBytes = request.Encode(this);
await stream.WriteAsync(asBytes, 0, asBytes.Length, cancellationToken);
await StunMessage.Parse(stream);
await StunMessage.ParseAsync(stream, cancellationToken);

return true;
}
Expand Down Expand Up @@ -366,7 +373,6 @@ private async Task RefreshAllocate(CancellationToken cancellationToken)
CancellationToken cancellationToken)
{
var tcs = new TaskCompletionSource<StunMessage>();
cancellationToken.Register(() => tcs.TrySetCanceled());
_responses[message.TransactionId] = tcs;
var asBytes = message.Encode(this);
await stream.WriteAsync(
Expand All @@ -387,18 +393,13 @@ private async Task ProcessMessage(CancellationToken cancellationToken)
StunMessage message;
try
{
message = await StunMessage.Parse(stream);
message = await StunMessage.ParseAsync(stream, cancellationToken);
_logger.Debug("Stun Message is: {message}", message);
}
catch (TurnClientException e)
{
_logger.Error(e, "Failed to parse StunMessage. {e}", e);
foreach (TaskCompletionSource<StunMessage> tcs in _responses.Values)
{
tcs.TrySetCanceled();
}

_responses.Clear();
ClearResponses();
break;
}

Expand Down Expand Up @@ -426,9 +427,14 @@ private async Task ProcessMessage(CancellationToken cancellationToken)
_logger.Debug($"{nameof(ProcessMessage)} is ended. Connected: {_control.Connected}");
}

private async Task<StunMessage> ReceiveMessage(byte[] transactionId)
private async Task<StunMessage> ReceiveMessageAsync(
byte[] transactionId,
CancellationToken cancellationToken)
{
StunMessage response = await _responses[transactionId].Task;
TaskCompletionSource<StunMessage> tcs = _responses[transactionId];
using CancellationTokenRegistration ctr =
cancellationToken.Register(() => tcs.TrySetCanceled());
StunMessage response = await tcs.Task;
_responses.Remove(transactionId);

return response;
Expand All @@ -440,14 +446,24 @@ private void ClearSession()
_turnTaskCts.Cancel();
_turnTaskCts.Dispose();
_turnTasks.Clear();
_responses.Clear();
ClearResponses();

foreach (TcpClient relays in _relayedClients)
{
relays.Dispose();
}
}

private void ClearResponses()
{
foreach (TaskCompletionSource<StunMessage> tcs in _responses.Values)
{
tcs.TrySetCanceled();
}

_responses.Clear();
}

private class ByteArrayComparer : IEqualityComparer<byte[]>
{
public bool Equals(byte[] x, byte[] y)
Expand Down
4 changes: 2 additions & 2 deletions Libplanet/Net/BlockCompletion.cs
Expand Up @@ -348,7 +348,7 @@ private int Demand(IEnumerable<HashDigest<SHA256>> blockHashes, bool retry)
timeoutToken,
ct
);
timeoutToken.Register(() =>
using CancellationTokenRegistration ctr = timeoutToken.Register(() =>
_logger.Debug("Timed out to wait a response from {Peer}.", peer)
);
CancellationToken linkedToken = linkedTokenSource.Token;
Expand Down Expand Up @@ -486,7 +486,7 @@ public async Task WaitAll(CancellationToken cancellationToken = default)
if (tasks.Any())
{
var tcs = new TaskCompletionSource<object>();
cancellationToken.Register(
using CancellationTokenRegistration ctr = cancellationToken.Register(
() => tcs.TrySetCanceled(),
useSynchronizationContext: false
);
Expand Down
3 changes: 2 additions & 1 deletion Libplanet/Net/NetMQTransport.cs
Expand Up @@ -411,7 +411,8 @@ public void Dispose()
Interlocked.Increment(ref _requestCount);

// FIXME should we also cancel tcs sender side too?
cancellationToken.Register(() => tcs.TrySetCanceled());
using CancellationTokenRegistration ctr =
cancellationToken.Register(() => tcs.TrySetCanceled());
await _requests.AddAsync(
new MessageRequest(reqId, message, peer, now, timeout, expectedResponses, tcs),
cancellationToken
Expand Down
2 changes: 1 addition & 1 deletion Libplanet/Net/Swarm.cs
Expand Up @@ -525,7 +525,7 @@ CancellationToken cancellationToken
CancellationToken cancellationToken = default(CancellationToken)
)
{
cancellationToken.Register(() =>
using CancellationTokenRegistration ctr = cancellationToken.Register(() =>
_logger.Information("Preloading is requested to be cancelled.")
);

Expand Down

0 comments on commit 05b537f

Please sign in to comment.