Skip to content

Commit

Permalink
Merge pull request #327 from earlbread/receive-message-poller
Browse files Browse the repository at this point in the history
Move ReceiveMessageAsync to poller
  • Loading branch information
dahlia committed Jul 4, 2019
2 parents 912b002 + 3520e8f commit 9a2699b
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 81 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ To be released.
index of a block that does not exist locally. [[#208], [#317]]
- Fixed a bug that `Swarm` had not dial to other peer after
`Swarm.PreloadAsync()`. [[#311]]
- Fixed an issue where unknown exceptions occurred when `Swarm<T>` receiving
a message. [[#321], [#327]]


[LiteDB]: https://www.litedb.org/
Expand Down Expand Up @@ -135,8 +137,10 @@ To be released.
[#310]: https://github.com/planetarium/libplanet/pull/310
[#311]: https://github.com/planetarium/libplanet/pull/311
[#317]: https://github.com/planetarium/libplanet/pull/317
[#321]: https://github.com/planetarium/libplanet/pull/321
[#324]: https://github.com/planetarium/libplanet/pull/324
[#326]: https://github.com/planetarium/libplanet/pull/326
[#327]: https://github.com/planetarium/libplanet/pull/327
[#329]: https://github.com/planetarium/libplanet/pull/329

Version 0.3.0
Expand Down
123 changes: 42 additions & 81 deletions Libplanet/Net/Swarm.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,15 @@ public class Swarm<T>

private readonly NetMQQueue<Message> _replyQueue;
private readonly NetMQQueue<Message> _broadcastQueue;
private readonly NetMQPoller _queuePoller;
private readonly NetMQPoller _poller;

private readonly ILogger _logger;

private TaskCompletionSource<object> _runningEvent;
private int? _listenPort;
private TurnClient _turnClient;
private CancellationTokenSource _workerCancellationTokenSource;
private CancellationToken _cancellationToken;
private IPAddress _publicIPAddress;

static Swarm()
Expand Down Expand Up @@ -140,7 +141,7 @@ public Swarm(
_router.Options.RouterHandover = true;
_replyQueue = new NetMQQueue<Message>();
_broadcastQueue = new NetMQQueue<Message>();
_queuePoller = new NetMQPoller { _replyQueue, _broadcastQueue };
_poller = new NetMQPoller { _router, _replyQueue, _broadcastQueue };

_receiveMutex = new AsyncLock();
_blockSyncMutex = new AsyncLock();
Expand Down Expand Up @@ -169,6 +170,7 @@ public Swarm(
_logger = Log.ForContext<Swarm<T>>()
.ForContext("SwarmId", loggerId);

_router.ReceiveReady += ReceiveMessage;
_replyQueue.ReceiveReady += DoReply;
_broadcastQueue.ReceiveReady += DoBroadcast;
}
Expand Down Expand Up @@ -338,7 +340,6 @@ public async Task StopAsync(
{
if (Running)
{
_router.Dispose();
_removedPeers[AsPeer] = DateTimeOffset.UtcNow;
DistributeDelta(false);

Expand All @@ -349,14 +350,16 @@ public async Task StopAsync(

_broadcastQueue.ReceiveReady -= DoBroadcast;
_replyQueue.ReceiveReady -= DoReply;
_router.ReceiveReady -= ReceiveMessage;

if (_queuePoller.IsRunning)
if (_poller.IsRunning)
{
_queuePoller.Dispose();
_poller.Dispose();
}

_broadcastQueue.Dispose();
_replyQueue.Dispose();
_router.Dispose();

foreach (DealerSocket s in _dealers.Values)
{
Expand Down Expand Up @@ -443,32 +446,26 @@ public async Task StartAsync(
CancellationTokenSource.CreateLinkedTokenSource(
_workerCancellationTokenSource.Token, cancellationToken
).Token;
_cancellationToken = workerCancellationToken;

using (await _runningMutex.LockAsync())
{
Running = true;
await PreloadAsync(
cancellationToken: workerCancellationToken);
await PreloadAsync(cancellationToken: _cancellationToken);
}

var tasks = new List<Task>
{
RepeatDeltaDistributionAsync(
distributeInterval,
workerCancellationToken),
ReceiveMessageAsync(
workerCancellationToken),
BroadcastTxAsync(
broadcastTxInterval,
cancellationToken),
Task.Run(() => _queuePoller.Run(), workerCancellationToken),
RepeatDeltaDistributionAsync(distributeInterval, _cancellationToken),
BroadcastTxAsync(broadcastTxInterval, _cancellationToken),
Task.Run(() => _poller.Run(), _cancellationToken),
};

if (behindNAT)
{
tasks.Add(BindingProxies(workerCancellationToken));
tasks.Add(RefreshAllocate(workerCancellationToken));
tasks.Add(RefreshPermissions(workerCancellationToken));
tasks.Add(BindingProxies(_cancellationToken));
tasks.Add(RefreshAllocate(_cancellationToken));
tasks.Add(RefreshPermissions(_cancellationToken));
}

await await Task.WhenAny(tasks);
Expand Down Expand Up @@ -817,68 +814,6 @@ await Task.WhenAll(
}
}

private async Task ReceiveMessageAsync(
CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
NetMQMessage raw;
try
{
raw = await _router.ReceiveMultipartMessageAsync(
timeout: TimeSpan.FromMilliseconds(100),
cancellationToken: cancellationToken);
}
catch (TimeoutException)
{
// Ignore this exception because it's expected
// when there is no received message in duration.
continue;
}

_logger.Verbose($"The raw message[{raw}] has received.");
Message message = Message.Parse(raw, reply: false);
_logger.Debug($"The message[{message}] has parsed.");

// Queue a task per message to avoid blocking.
#pragma warning disable CS4014
Task.Run(
async () =>
{
// it's still async because some method it relies
// are async yet.
await ProcessMessageAsync(
message,
cancellationToken
);
},
cancellationToken);
#pragma warning restore CS4014
}
catch (InvalidMessageException e)
{
_logger.Error(
e,
"Could not parse NetMQMessage properly; ignore."
);
}
catch (TaskCanceledException e)
{
_logger.Information(e, "Task was canceled.");
}
catch (Exception e)
{
_logger.Error(
e,
"An unexpected exception occured during ReceiveMessageAsync()"
);
throw;
}
}
}

private async Task BroadcastTxAsync(
TimeSpan broadcastTxInterval,
CancellationToken cancellationToken)
Expand Down Expand Up @@ -1696,6 +1631,32 @@ private async Task RepeatDeltaDistributionAsync(
}
}

private void ReceiveMessage(object sender, NetMQSocketEventArgs e)
{
try
{
NetMQMessage raw = e.Socket.ReceiveMultipartMessage();

_logger.Verbose($"The raw message[{raw}] has received.");
Message message = Message.Parse(raw, reply: false);
_logger.Debug($"The message[{message}] has parsed.");

// it's still async because some method it relies are async yet.
Task.Run(
async () => { await ProcessMessageAsync(message, _cancellationToken); },
_cancellationToken);
}
catch (InvalidMessageException ex)
{
_logger.Error(ex, "Could not parse NetMQMessage properly; ignore.");
}
catch (Exception ex)
{
_logger.Error(ex, "An unexpected exception occured during ReceiveMessage().");
throw;
}
}

private void DoBroadcast(object sender, NetMQQueueEventArgs<Message> e)
{
Message msg = e.Queue.Dequeue();
Expand Down

0 comments on commit 9a2699b

Please sign in to comment.