Skip to content
Permalink
Browse files

[Core] ThreadPool: shutdown idle threads (#302)

  • Loading branch information
Eideren authored and xen2 committed Jan 19, 2019
1 parent 538f915 commit 993704f60316615c10f6665e2ace830739fa35b0
Showing with 43 additions and 61 deletions.
  1. +43 −61 sources/core/Xenko.Core/Threading/ThreadPool.cs
@@ -2,6 +2,7 @@
// Distributed under the MIT license. See the LICENSE.md file in the project root for more information.
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Xenko.Core.Annotations;
@@ -16,16 +17,18 @@ namespace Xenko.Core.Threading
/// </remarks>
internal class ThreadPool
{
private const long MaxIdleTimeInTicks = 5 * TimeSpan.TicksPerSecond;

public static readonly ThreadPool Instance = new ThreadPool();

private readonly int maxThreadCount = Environment.ProcessorCount + 2;
//private readonly int maxThreadCount = Environment.ProcessorCount * 2;
private readonly List<Task> workers = new List<Task>();
private readonly Queue<Action> workItems = new Queue<Action>();
private readonly ManualResetEvent workAvailable = new ManualResetEvent(false);

private SpinLock spinLock = new SpinLock();
private int activeThreadCount;
private int workingCount;
/// <summary> Usage only within <see cref="spinLock"/> </summary>
private int aliveCount;

public void QueueWorkItem([NotNull] [Pooled] Action workItem)
{
@@ -36,15 +39,14 @@ public void QueueWorkItem([NotNull] [Pooled] Action workItem)

PooledDelegateHelper.AddReference(workItem);
workItems.Enqueue(workItem);
workAvailable.Set();

if (activeThreadCount + 1 >= workers.Count && workers.Count < maxThreadCount)
var curWorkingCount = Interlocked.CompareExchange(ref workingCount, 0, 0);
if (curWorkingCount + 1 >= aliveCount && aliveCount < maxThreadCount)
{
var worker = Task.Factory.StartNew(ProcessWorkItems, workers.Count, TaskCreationOptions.LongRunning);
workers.Add(worker);
//Console.WriteLine($"Thread {workers.Count} added");
aliveCount++;
Task.Factory.StartNew(ProcessWorkItems, TaskCreationOptions.LongRunning);
}

workAvailable.Set();
}
finally
{
@@ -55,73 +57,53 @@ public void QueueWorkItem([NotNull] [Pooled] Action workItem)

private void ProcessWorkItems(object state)
{
//var spinWait = new SpinWait();

while (true)
long lastWork = Stopwatch.GetTimestamp();
TimeSpan maxIdleTime = TimeSpan.FromTicks(MaxIdleTimeInTicks);
while(true)
{
Action workItem = null;

//while (!spinWait.NextSpinWillYield)
var lockTaken = false;
bool idleForTooLong = Utilities.ConvertRawToTimestamp(Stopwatch.GetTimestamp() - lastWork) < maxIdleTime;
try
{
var lockTaken = false;
try
{
spinLock.Enter(ref lockTaken);

if (workItems.Count > 0)
{
try
{
workItem = workItems.Dequeue();
//Interlocked.Increment(ref activeThreadCount);
spinLock.Enter(ref lockTaken);

if (workItems.Count == 0)
workAvailable.Reset();
}
catch
{
}
}

//if (workItems.Count > 0)
//{
// // If we didn't consume the last work item, kick off another worker
// workAvailable.Set();
//}
if (workItems.Count > 0)
{
workItem = workItems.Dequeue();
if (workItems.Count == 0)
workAvailable.Reset();
}
finally
else if (idleForTooLong)
{
if (lockTaken)
spinLock.Exit(true);
aliveCount--;
return;
}
}
finally
{
if (lockTaken)
spinLock.Exit(true);
}

if (workItem != null)
if (workItem != null)
{
Interlocked.Increment(ref workingCount);
try
{
try
{
Interlocked.Increment(ref activeThreadCount);
workItem.Invoke();

//spinWait.Reset();
}
catch (Exception)
{
// Ignoring Exception
}
finally
{
PooledDelegateHelper.Release(workItem);
Interlocked.Decrement(ref activeThreadCount);
}
workItem.Invoke();
}
else
catch (Exception)
{
//spinWait.SpinOnce();
// Ignoring Exception
}
Interlocked.Decrement(ref workingCount);
PooledDelegateHelper.Release(workItem);
lastWork = Stopwatch.GetTimestamp();
}

// Wait for another work item to be (potentially) available
workAvailable.WaitOne();
workAvailable.WaitOne(maxIdleTime);
}
}
}

0 comments on commit 993704f

Please sign in to comment.
You can’t perform that action at this time.