-
Notifications
You must be signed in to change notification settings - Fork 0
/
WorkQueue.cs
121 lines (109 loc) · 3.57 KB
/
WorkQueue.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
namespace ReentrantAsyncLock;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
/// <summary>
/// A <see cref="SynchronizationContext"/> in which units of work are executed one-at-a-time on the thread pool.
/// </summary>
sealed class WorkQueue : SynchronizationContext
{
/// <summary>
/// Exposes exceptions thrown on this <see cref="SynchronizationContext"/>.
/// </summary>
public event Action<Exception>? ExceptionThrown;
readonly Queue<Entry> _entries = new();
readonly object _gate = new();
bool _isPumping;
static readonly Action<object?> PumpDelegate;
static readonly SendOrPostCallback SetManualResetEventSlimDelegate;
static readonly ConcurrentBag<ManualResetEventSlim> UnusedManualResetEvents = new();
static WorkQueue()
{
PumpDelegate = Pump;
SetManualResetEventSlimDelegate = SetManualResetEventSlim;
}
/// <summary>
/// Gets the ID of the thread that is currently pumping this <see cref="WorkQueue"/>, or <c>null</c> if it is
/// not being pumped.
/// </summary>
/// <remarks>
/// <para>
/// This will be the same as <see cref="Environment.CurrentManagedThreadId"/> for all work done on this
/// <see cref="WorkQueue"/>.
/// </para>
/// </remarks>
public int? CurrentThreadId { get; private set; }
/// <summary>
/// Returns a new <see cref="WorkQueue"/>.
/// </summary>
public override SynchronizationContext CreateCopy() => new WorkQueue();
public override void Post(SendOrPostCallback d, object? state)
{
var executionContext = ExecutionContext.Capture();
lock (_gate)
{
_entries.Enqueue(new Entry(d, state, executionContext));
if (_isPumping)
return;
_isPumping = true;
}
ThreadPool.QueueUserWorkItem(PumpDelegate, this, false);
}
static void Pump(object? state)
{
var me = (WorkQueue)state!;
me.Pump();
}
void Pump()
{
CurrentThreadId = Environment.CurrentManagedThreadId;
var oldContext = Current;
while (true)
{
Entry entry;
lock (_gate)
{
if (!_entries.TryDequeue(out entry))
{
_isPumping = false;
CurrentThreadId = null;
SetSynchronizationContext(oldContext);
return;
}
}
try
{
SetSynchronizationContext(this);
if (entry.ExecutionContext is {} executionContext)
{
ExecutionContext.Run(executionContext, new ContextCallback(entry.Callback), entry.State);
}
else
{
entry.Callback(entry.State);
}
}
catch (Exception e)
{
ExceptionThrown?.Invoke(e);
}
}
}
public override void Send(SendOrPostCallback d, object? state)
{
Post(d, state);
if (!UnusedManualResetEvents.TryTake(out var mre))
mre = new ManualResetEventSlim();
Post(SetManualResetEventSlimDelegate, mre);
mre.Wait();
mre.Reset();
UnusedManualResetEvents.Add(mre);
}
static void SetManualResetEventSlim(object? state)
{
var mre = (ManualResetEventSlim)state!;
mre.Set();
}
record struct Entry(SendOrPostCallback Callback, object? State, ExecutionContext? ExecutionContext);
}