Skip to content

Commit

Permalink
use the kestrel awaitable args code
Browse files Browse the repository at this point in the history
  • Loading branch information
mgravell committed Aug 16, 2018
1 parent 699f8ab commit 9dae565
Show file tree
Hide file tree
Showing 8 changed files with 196 additions and 229 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ private void Execute(Action<object> action, object state)
{
action(state);
Helpers.Incr(Counter.ThreadPoolExecuted);
Helpers.Incr(action == SocketAwaitable.InvokeStateAsAction ? ((Action)state).Method : action.Method);
Helpers.Incr(action == SocketAwaitableEventArgs.InvokeStateAsAction ? ((Action)state).Method : action.Method);
}
catch (Exception ex)
{
Expand Down
162 changes: 0 additions & 162 deletions src/Pipelines.Sockets.Unofficial/SocketAwaitable.cs

This file was deleted.

146 changes: 146 additions & 0 deletions src/Pipelines.Sockets.Unofficial/SocketAwaitableEventArgs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
using System;
using System.Diagnostics;
using System.IO.Pipelines;
using System.Net.Sockets;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;

namespace Pipelines.Sockets.Unofficial
{
// This type is largely similar to the type of the same name in KestrelHttpServer, with some minor tweaks:
// - when scheduing a callback against an already complete task (semi-synchronous case), prefer to use the io pipe scheduler for onward continuations, not the thread pool
// - when invoking final continuations, we detect the Inline pipe scheduler and bypass the indirection
// - the addition of an Abort concept (which invokes any pending continuations, guaranteeing failure)

/// <summary>
/// Awaitable SocketAsyncEventArgs, where awaiting the args yields either the BytesTransferred or throws the relevant socket exception
/// </summary>
public class SocketAwaitableEventArgs : SocketAsyncEventArgs, ICriticalNotifyCompletion
{
/// <summary>
/// Abort the current async operation (and prevent future operations)
/// </summary>
public void Abort(SocketError error = SocketError.OperationAborted)
{
if (error == SocketError.Success) throw new ArgumentException(nameof(error));
_forcedError = error;
OnCompleted(this);
}

private volatile SocketError _forcedError; // Success = 0, no field init required

private static readonly Action _callbackCompleted = () => { };

private readonly PipeScheduler _ioScheduler;

private Action _callback;

internal static readonly Action<object> InvokeStateAsAction = state => ((Action)state)();

/// <summary>
/// Create a new SocketAwaitableEventArgs instance, optionally providing a scheduler for callbacks
/// </summary>
/// <param name="ioScheduler"></param>
public SocketAwaitableEventArgs(PipeScheduler ioScheduler = null)
{
// treat null and Inline interchangeably
if ((object)ioScheduler == (object)PipeScheduler.Inline) ioScheduler = null;
_ioScheduler = ioScheduler;
}

/// <summary>
/// Get the awaiter for this instance; used as part of "await"
/// </summary>
public SocketAwaitableEventArgs GetAwaiter() => this;

/// <summary>
/// Indicates whether the current operation is complete; used as part of "await"
/// </summary>
public bool IsCompleted => ReferenceEquals(_callback, _callbackCompleted);

/// <summary>
/// Gets the result of the async operation is complete; used as part of "await"
/// </summary>
public int GetResult()
{
Debug.Assert(ReferenceEquals(_callback, _callbackCompleted));

_callback = null;

if (_forcedError != SocketError.Success)
{
ThrowSocketException(_forcedError);
}

if (SocketError != SocketError.Success)
{
ThrowSocketException(SocketError);
}

return BytesTransferred;

void ThrowSocketException(SocketError e)
{
throw new SocketException((int)e);
}
}

/// <summary>
/// Schedules a continuation for this operation; used as part of "await"
/// </summary>
public void OnCompleted(Action continuation)
{
if (ReferenceEquals(Volatile.Read(ref _callback), _callbackCompleted)
|| ReferenceEquals(Interlocked.CompareExchange(ref _callback, continuation, null), _callbackCompleted))
{
// this is the rare "kinda already complete" case; push to worker to prevent possible stack dive,
// but prefer the custom scheduler when possible
if (_ioScheduler == null)
{
Task.Run(continuation);
}
else
{
_ioScheduler.Schedule(InvokeStateAsAction, continuation);
}
}
}

/// <summary>
/// Schedules a continuation for this operation; used as part of "await"
/// </summary>
public void UnsafeOnCompleted(Action continuation)
{
OnCompleted(continuation);
}

/// <summary>
/// Marks the operation as complete - this should be invoked whenever a SocketAsyncEventArgs operation returns false
/// </summary>
public void Complete()
{
OnCompleted(this);
}

/// <summary>
/// Invoked automatically when an operation completes asynchronously
/// </summary>
protected override void OnCompleted(SocketAsyncEventArgs e)
{
var continuation = Interlocked.Exchange(ref _callback, _callbackCompleted);

if (continuation != null)
{
if (_ioScheduler == null)
{
continuation.Invoke();
}
else
{
_ioScheduler.Schedule(InvokeStateAsAction, continuation);
}
}
}
}
}
19 changes: 6 additions & 13 deletions src/Pipelines.Sockets.Unofficial/SocketConnection.Connect.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

namespace Pipelines.Sockets.Unofficial
{
partial class SocketConnection
public partial class SocketConnection
{
/// <summary>
/// Open a new or existing socket as a client
Expand All @@ -18,6 +18,7 @@ partial class SocketConnection
Func<SocketConnection, Task> onConnected = null,
Socket socket = null, string name = null)
=> ConnectAsync(endpoint, pipeOptions, pipeOptions, connectionOptions, onConnected, socket, name);

/// <summary>
/// Open a new or existing socket as a client
/// </summary>
Expand All @@ -39,18 +40,19 @@ partial class SocketConnection

SetRecommendedClientOptions(socket);

using (var args = CreateArgs(null, out _))
using (var args = new SocketAwaitableEventArgs((connectionOptions & SocketConnectionOptions.InlineConnect) == 0 ? PipeScheduler.ThreadPool : null))
{
Helpers.DebugLog(name, $"connecting to {endpoint}...");

await ConnectAsync(socket, args, endpoint);
if (!socket.ConnectAsync(args)) args.Complete();
await args;
}

Helpers.DebugLog(name, "connected");

var connection = Create(socket, sendPipeOptions, receivePipeOptions, connectionOptions, name);

if (onConnected != null) await onConnected(connection);
if (onConnected != null) await onConnected(connection).ConfigureAwait(false);

return connection;
}
Expand All @@ -74,14 +76,5 @@ internal static void SetFastLoopbackOption(Socket socket)
}
}
}

private static SocketAwaitable ConnectAsync(Socket socket, SocketAsyncEventArgs args, EndPoint endpoint)
{
args.RemoteEndPoint = endpoint;
SocketAwaitable.Reset(args);
if (!socket.ConnectAsync(args)) SocketAwaitable.OnCompleted(args);
return GetAwaitable(args);
}

}
}
5 changes: 5 additions & 0 deletions src/Pipelines.Sockets.Unofficial/SocketConnection.Flags.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ public enum SocketConnectionOptions
/// During async writes, the awaiter should continue on the IO thread
/// </summary>
InlineWrites = 1 << 2,

/// <summary>
/// During async connects, the awaiter should continue on the IO thread
/// </summary>
InlineConnect = 1 << 3,
}
public partial class SocketConnection
{
Expand Down
Loading

0 comments on commit 9dae565

Please sign in to comment.