Skip to content

Commit

Permalink
Merge pull request #69 from cjmurph/fix/ChannelTcpListenerCloseUsedCh…
Browse files Browse the repository at this point in the history
…annels

Keep track of used channels and close the connection on stop
  • Loading branch information
jgauffin committed May 11, 2016
2 parents 3d64785 + 59eed95 commit e0e0280
Showing 1 changed file with 12 additions and 9 deletions.
21 changes: 12 additions & 9 deletions src/Griffin.Framework/Griffin.Core/Net/ChannelTcpListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using Griffin.Net.Protocols;
using Griffin.Net.Protocols.MicroMsg;
using Griffin.Net.Protocols.Serializers;
using System.Linq;

namespace Griffin.Net
{
Expand All @@ -17,6 +18,7 @@ namespace Griffin.Net
public class ChannelTcpListener : IMessagingListener
{
private readonly ConcurrentStack<ITcpChannel> _channels = new ConcurrentStack<ITcpChannel>();
private readonly ConcurrentDictionary<string, ITcpChannel> _usedChannels = new ConcurrentDictionary<string, ITcpChannel>();
private IBufferSlicePool _bufferPool;
private ITcpChannelFactory _channelFactory;
private ChannelTcpListenerConfiguration _configuration;
Expand All @@ -38,16 +40,10 @@ public ChannelTcpListener(ChannelTcpListenerConfiguration configuration)

/// <summary>
/// </summary>
public ChannelTcpListener()
{
Configure(
new ChannelTcpListenerConfiguration(
public ChannelTcpListener() : this(new ChannelTcpListenerConfiguration(
() => new MicroMessageDecoder(new DataContractMessageSerializer()),
() => new MicroMessageEncoder(new DataContractMessageSerializer()))
);

ChannelFactory = new TcpChannelFactory();
}
() => new MicroMessageEncoder(new DataContractMessageSerializer())))
{ }

/// <summary>
/// Port that the server is listening on.
Expand Down Expand Up @@ -149,6 +145,9 @@ public virtual void Stop()
{
_shuttingDown = true;
_listener.Stop();
var tasks = _usedChannels.Values.Select(x => x.CloseAsync()).ToArray();
if (tasks.Any())
System.Threading.Tasks.Task.WaitAll(tasks);
}

/// <summary>
Expand Down Expand Up @@ -226,6 +225,8 @@ private void OnAcceptSocket(IAsyncResult ar)
channel.MessageReceived = OnMessage;
channel.Assign(socket);

_usedChannels.TryAdd(channel.ChannelId, channel);

var args = OnClientConnected(channel);
if (!args.MayConnect)
{
Expand All @@ -246,6 +247,8 @@ private void OnAcceptSocket(IAsyncResult ar)

private void OnChannelDisconnect(ITcpChannel source, Exception exception)
{
ITcpChannel removed;
_usedChannels.TryRemove(source.ChannelId, out removed);
OnClientDisconnected(source, exception);
source.Cleanup();
_channels.Push(source);
Expand Down

0 comments on commit e0e0280

Please sign in to comment.