Skip to content

Commit

Permalink
spannetty
Browse files Browse the repository at this point in the history
  • Loading branch information
to11mtm committed Oct 19, 2021
1 parent 0fbcfa7 commit 3afd84d
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 19 deletions.
3 changes: 2 additions & 1 deletion src/core/Akka.Remote/Akka.Remote.csproj
Expand Up @@ -13,8 +13,9 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="BitFaster.Caching" Version="1.0.4" />
<PackageReference Include="DotNetty.Handlers" Version="0.6.0" />
<!--<PackageReference Include="DotNetty.Handlers" Version="0.6.0" />-->
<PackageReference Include="Google.Protobuf" Version="$(ProtobufVersion)" />
<PackageReference Include="SpanNetty.Handlers" Version="0.7.2012.2221" />
</ItemGroup>
<PropertyGroup Condition=" '$(Configuration)' == 'Release' ">
<DefineConstants>$(DefineConstants);RELEASE</DefineConstants>
Expand Down
28 changes: 20 additions & 8 deletions src/core/Akka.Remote/Transport/DotNetty/AkkaLoggingHandler.cs
Expand Up @@ -14,6 +14,7 @@
using DotNetty.Buffers;
using DotNetty.Common.Concurrency;
using DotNetty.Transport.Channels;

using ILoggingAdapter = Akka.Event.ILoggingAdapter;

namespace Akka.Remote.Transport.DotNetty
Expand Down Expand Up @@ -81,22 +82,30 @@ public override Task ConnectAsync(IChannelHandlerContext ctx, EndPoint remoteAdd
return ctx.ConnectAsync(remoteAddress, localAddress);
}

public override Task DisconnectAsync(IChannelHandlerContext ctx)

//public override Task DisconnectAsync(IChannelHandlerContext ctx)
public override void Disconnect(IChannelHandlerContext ctx, IPromise promise)
{
_log.Info("Channel {0} disconnect", ctx.Channel);
return ctx.DisconnectAsync();
base.Disconnect(ctx,promise);
//return ctx.DisconnectAsync();
}

public override Task CloseAsync(IChannelHandlerContext ctx)
//public override Task CloseAsync(IChannelHandlerContext ctx)
public override void Close(IChannelHandlerContext ctx, IPromise promise)
{
_log.Info("Channel {0} close", ctx.Channel);
return ctx.CloseAsync();
//return ctx.CloseAsync();
base.Close(ctx, promise);
}

public override Task DeregisterAsync(IChannelHandlerContext ctx)
//public override Task DeregisterAsync(IChannelHandlerContext ctx)
public override void Deregister(IChannelHandlerContext ctx, IPromise promise)
{

_log.Debug("Channel {0} deregister", ctx.Channel);
return ctx.DeregisterAsync();
base.Deregister(ctx, promise);
//return ctx.DeregisterAsync();
}

public override void ChannelRead(IChannelHandlerContext ctx, object message)
Expand All @@ -110,14 +119,17 @@ public override void ChannelRead(IChannelHandlerContext ctx, object message)
ctx.FireChannelRead(message);
}

public override Task WriteAsync(IChannelHandlerContext ctx, object message)
//public override Task WriteAsync(IChannelHandlerContext ctx, object message)
public override void Write(IChannelHandlerContext ctx, object message, IPromise promise)
{

if (_log.IsDebugEnabled)
{
// have to force a .ToString() here otherwise the reference count on the buffer might be illegal
_log.Debug("Channel {0} writing a message ({1}) of type [{2}]", ctx.Channel, message?.ToString(), message == null ? "NULL" : message.GetType().TypeQualifiedName());
}
return ctx.WriteAsync(message);
base.Write(ctx, message, promise);
//return ctx.WriteAsync(message);
}

public override void Flush(IChannelHandlerContext ctx)
Expand Down
13 changes: 9 additions & 4 deletions src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs
Expand Up @@ -10,6 +10,7 @@
using System.Threading.Tasks;
using DotNetty.Transport.Channels;
using Akka.Configuration;
using DotNetty.Common.Concurrency;

namespace Akka.Remote.Transport.DotNetty
{
Expand Down Expand Up @@ -133,18 +134,22 @@ public override void ExceptionCaught(IChannelHandlerContext context, Exception e
context.FireExceptionCaught(exception);
}

public override Task DisconnectAsync(IChannelHandlerContext context)
//public override Task DisconnectAsync(IChannelHandlerContext context)
public override void Disconnect(IChannelHandlerContext context, IPromise promise)
{
// Try to flush one last time if flushes are pending before disconnect the channel.
ResetReadAndFlushIfNeeded(context);
return context.DisconnectAsync();
//return context.DisconnectAsync();
base.Disconnect(context, promise);
}

public override Task CloseAsync(IChannelHandlerContext context)
//public override Task CloseAsync(IChannelHandlerContext context)
public override void Close(IChannelHandlerContext context, IPromise promise)
{
// Try to flush one last time if flushes are pending before disconnect the channel.
ResetReadAndFlushIfNeeded(context);
return context.CloseAsync();
context.CloseAsync(promise);
//return context.CloseAsync();
}

public override void ChannelWritabilityChanged(IChannelHandlerContext context)
Expand Down
9 changes: 5 additions & 4 deletions src/core/Akka.Remote/Transport/DotNetty/DotNettyTransport.cs
Expand Up @@ -18,6 +18,7 @@
using Akka.Configuration;
using Akka.Event;
using Akka.Util;
using Akka.Util.Internal;
using DotNetty.Buffers;
using DotNetty.Codecs;
using DotNetty.Common.Utilities;
Expand Down Expand Up @@ -81,7 +82,7 @@ public override void ExceptionCaught(IChannelHandlerContext context, Exception e
{
var listener = s.Result;
RegisterListener(channel, listener, msg, remoteSocketAddress);
channel.Configuration.AutoRead = true; // turn reads back on
channel.Configuration.IsAutoRead = true; // turn reads back on
}, TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.NotOnCanceled | TaskContinuationOptions.NotOnFaulted);
op = handle;
}
Expand Down Expand Up @@ -190,7 +191,7 @@ public override async Task<(Address, TaskCompletionSource<IAssociationEventListe
// Block reads until a handler actor is registered
// no incoming connections will be accepted until this value is reset
// it's possible that the first incoming association might come in though
newServerChannel.Configuration.AutoRead = false;
newServerChannel.Configuration.IsAutoRead = false;
ConnectionGroup.TryAdd(newServerChannel);
ServerChannel = newServerChannel;

Expand All @@ -206,7 +207,7 @@ public override async Task<(Address, TaskCompletionSource<IAssociationEventListe
LocalAddress = addr;
// resume accepting incoming connections
#pragma warning disable 4014 // we WANT this task to run without waiting
AssociationListenerPromise.Task.ContinueWith(result => newServerChannel.Configuration.AutoRead = true,
AssociationListenerPromise.Task.ContinueWith(result => newServerChannel.Configuration.IsAutoRead = true,
TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion);
#pragma warning restore 4014

Expand All @@ -230,7 +231,7 @@ public override async Task<(Address, TaskCompletionSource<IAssociationEventListe

public override async Task<AssociationHandle> Associate(Address remoteAddress)
{
if (!ServerChannel.Open)
if (!ServerChannel.IsOpen)
throw new ChannelException("Transport is not open");

return await AssociateInternal(remoteAddress).ConfigureAwait(false);
Expand Down
4 changes: 2 additions & 2 deletions src/core/Akka.Remote/Transport/DotNetty/TcpTransport.cs
Expand Up @@ -115,7 +115,7 @@ public override void ChannelActive(IChannelHandlerContext context)
void InitInbound(IChannel channel, IPEndPoint socketAddress, object msg)
{
// disable automatic reads
channel.Configuration.AutoRead = false;
channel.Configuration.IsAutoRead = false;

_associationEventListener.ContinueWith(r =>
{
Expand Down Expand Up @@ -169,7 +169,7 @@ public TcpAssociationHandle(Address localAddress, Address remoteAddress, DotNett

public override bool Write(ByteString payload)
{
if (_channel.Open)
if (_channel.IsOpen)
{
var data = ToByteBuffer(_channel, payload);
_channel.WriteAndFlushAsync(data);
Expand Down

0 comments on commit 3afd84d

Please sign in to comment.