Skip to content

Commit

Permalink
Try make IOQueue auto-parallelizing
Browse files Browse the repository at this point in the history
Applies the technique from dotnet/runtime#35330
to IOQueue.
  • Loading branch information
tmds committed May 15, 2020
1 parent 8675632 commit 778f8d5
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 29 deletions.
60 changes: 32 additions & 28 deletions src/Servers/Kestrel/Transport.Sockets/src/Internal/IOQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Runtime.CompilerServices;
using System.Collections.Concurrent;
using System.IO.Pipelines;
using System.Threading;
Expand All @@ -11,54 +12,57 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
internal class IOQueue : PipeScheduler, IThreadPoolWorkItem
{
private readonly ConcurrentQueue<Work> _workItems = new ConcurrentQueue<Work>();
private int _doingWork;
private int _processingRequested;

public override void Schedule(Action<object> action, object state)
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void ScheduleToProcessWork()
{
_workItems.Enqueue(new Work(action, state));

// Set working if it wasn't (via atomic Interlocked).
if (Interlocked.CompareExchange(ref _doingWork, 1, 0) == 0)
// Schedule a thread pool work item to process Work.
// Only one work item is scheduled at any given time to avoid over-parallelization.
// When the work item begins running, this field is reset to 0.
if (Interlocked.CompareExchange(ref _processingRequested, 1, 0) == 0)
{
// Wasn't working, schedule.
System.Threading.ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false);
}
}

public override void Schedule(Action<object> action, object state)
{
_workItems.Enqueue(new Work(action, state));

ScheduleToProcessWork();
}

void IThreadPoolWorkItem.Execute()
{
while (true)
// Ensure processing is requested when new work is queued.
Interlocked.Exchange(ref _processingRequested, 0);

ConcurrentQueue<Work> workItems = _workItems;
if (!workItems.TryDequeue(out Work work))
{
while (_workItems.TryDequeue(out Work item))
{
item.Callback(item.State);
}
return;
}

// All work done.
int startTimeMs = Environment.TickCount;

// Set _doingWork (0 == false) prior to checking IsEmpty to catch any missed work in interim.
// This doesn't need to be volatile due to the following barrier (i.e. it is volatile).
_doingWork = 0;
// Schedule a work item to parallelize processing of work.
ScheduleToProcessWork();

// Ensure _doingWork is written before IsEmpty is read.
// As they are two different memory locations, we insert a barrier to guarantee ordering.
Thread.MemoryBarrier();
while (true)
{
work.Callback(work.State);

// Check if there is work to do
if (_workItems.IsEmpty)
// Avoid this work item for delaying other items on the ThreadPool queue.
if (Environment.TickCount - startTimeMs >= 15)
{
// Nothing to do, exit.
break;
}

// Is work, can we set it as active again (via atomic Interlocked), prior to scheduling?
if (Interlocked.Exchange(ref _doingWork, 1) == 1)
if (!workItems.TryDequeue(out work))
{
// Execute has been rescheduled already, exit.
break;
return;
}

// Is work, wasn't already scheduled so continue loop.
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public class SocketTransportOptions
/// <remarks>
/// Defaults to <see cref="Environment.ProcessorCount" /> rounded down and clamped between 1 and 16.
/// </remarks>
public int IOQueueCount { get; set; } = Math.Min(Environment.ProcessorCount, 16);
public int IOQueueCount { get; set; } = 1;

/// <summary>
/// Wait until there is data available to allocate a buffer. Setting this to false can increase throughput at the cost of increased memory usage.
Expand Down

0 comments on commit 778f8d5

Please sign in to comment.