-
-
Notifications
You must be signed in to change notification settings - Fork 252
/
Queue.cs
230 lines (200 loc) · 7.77 KB
/
Queue.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Coravel.Events.Interfaces;
using Coravel.Invocable;
using Coravel.Queuing.Broadcast;
using Coravel.Queuing.Interfaces;
using Coravel.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
namespace Coravel.Queuing
{
public class Queue : IQueue, IQueueConfiguration
{
private ConcurrentQueue<ActionOrAsyncFunc> _tasks = new ConcurrentQueue<ActionOrAsyncFunc>();
private ConcurrentDictionary<Guid, CancellationTokenSource> _tokens = new ConcurrentDictionary<Guid, CancellationTokenSource>();
private Action<Exception> _errorHandler;
private ILogger<IQueue> _logger;
private IServiceScopeFactory _scopeFactory;
private IDispatcher _dispatcher;
private int _queueIsConsuming = 0;
private int _tasksRunningCount = 0;
public Queue(IServiceScopeFactory scopeFactory, IDispatcher dispatcher)
{
this._scopeFactory = scopeFactory;
this._dispatcher = dispatcher;
}
public Guid QueueTask(Action task)
{
var job = new ActionOrAsyncFunc(task);
this._tasks.Enqueue(job);
return job.Guid;
}
public Guid QueueInvocable<T>() where T : IInvocable
{
var job = EnqueueInvocable<T>();
return job.Guid;
}
public Guid QueueInvocableWithPayload<T, TParams>(TParams payload) where T : IInvocable, IInvocableWithPayload<TParams>
{
var job = this.EnqueueInvocable<T>(invocable => {
IInvocableWithPayload<TParams> invocableWithParams = (IInvocableWithPayload<TParams>) invocable;
invocableWithParams.Payload = payload;
});
return job.Guid;
}
public (Guid, CancellationTokenSource) QueueCancellableInvocable<T>() where T : IInvocable, ICancellableTask
{
var tokenSource = new CancellationTokenSource();
var func = this.EnqueueInvocable<T>((invocable) => {
(invocable as ICancellableTask).Token = tokenSource.Token;
});
this._tokens.TryAdd(func.Guid, tokenSource);
return (func.Guid, tokenSource);
}
public Guid QueueAsyncTask(Func<Task> asyncTask)
{
var job = new ActionOrAsyncFunc(asyncTask);
this._tasks.Enqueue(job);
return job.Guid;
}
public void QueueBroadcast<TEvent>(TEvent toBroadcast) where TEvent : IEvent
{
this.QueueAsyncTask(async () => await this._dispatcher.Broadcast(toBroadcast));
}
public IQueueConfiguration OnError(Action<Exception> errorHandler)
{
this._errorHandler = errorHandler;
return this;
}
public IQueueConfiguration LogQueuedTaskProgress(ILogger<IQueue> logger)
{
this._logger = logger;
return this;
}
public async Task ConsumeQueueAsync()
{
try
{
Interlocked.Increment(ref this._queueIsConsuming);
await this.TryDispatchEvent(new QueueConsumationStarted());
var dequeuedTasks = this.DequeueAllTasks();
var dequeuedGuids = dequeuedTasks.Select(t => t.Guid);
await Task.WhenAll(
dequeuedTasks
.Select(InvokeTask)
.ToArray()
);
this.CleanTokens(dequeuedGuids);
await this.TryDispatchEvent(new QueueConsumationEnded());
}
finally
{
Interlocked.Decrement(ref this._queueIsConsuming);
}
}
public async Task ConsumeQueueOnShutdown()
{
this.CancelAllTokens();
await this.ConsumeQueueAsync();
}
public bool IsRunning => this._queueIsConsuming > 0;
public QueueMetrics GetMetrics()
{
// See https://docs.microsoft.com/en-us/dotnet/api/system.collections.concurrent.concurrentqueue-1.count?view=net-5.0#remarks
var waitingCount = this._tasks.IsEmpty
? 0
: this._tasks.Count;
return new QueueMetrics(this._tasksRunningCount, waitingCount);
}
private void CancelAllTokens()
{
foreach(var kv in this._tokens.AsEnumerable())
{
if(!kv.Value.IsCancellationRequested)
{
kv.Value.Cancel();
}
}
}
private ActionOrAsyncFunc EnqueueInvocable<T>(Action<IInvocable> beforeInvoked = null) where T : IInvocable
{
var func = new ActionOrAsyncFunc(async () =>
{
Type invocableType = typeof(T);
// This allows us to scope the scheduled IInvocable object
/// and allow DI to inject it's dependencies.
using (var scope = this._scopeFactory.CreateScope())
{
if (scope.ServiceProvider.GetService(invocableType) is IInvocable invocable)
{
if(beforeInvoked != null)
{
beforeInvoked(invocable);
}
await invocable.Invoke();
}
else
{
this._logger?.LogError($"Queued invocable {invocableType} is not a registered service.");
throw new Exception($"Queued invocable {invocableType} is not a registered service.");
}
}
});
this._tasks.Enqueue(func);
return func;
}
private void CleanTokens(IEnumerable<Guid> guidsForTokensToClean)
{
foreach(var guid in guidsForTokensToClean)
{
if(this._tokens.TryRemove(guid, out var token))
{
token.Dispose();
}
}
}
private List<ActionOrAsyncFunc> DequeueAllTasks()
{
List<ActionOrAsyncFunc> dequeuedTasks = new List<ActionOrAsyncFunc>(this._tasks.Count());
while (this._tasks.TryPeek(out var dummy))
{
this._tasks.TryDequeue(out var dequeuedTask);
dequeuedTasks.Add(dequeuedTask);
}
return dequeuedTasks;
}
private async Task TryDispatchEvent(IEvent toBroadcast)
{
if (this._dispatcher != null)
{
await this._dispatcher.Broadcast(toBroadcast);
}
}
private async Task InvokeTask(ActionOrAsyncFunc task)
{
try
{
Interlocked.Increment(ref this._tasksRunningCount);
this._logger?.LogInformation("Queued task started...");
await this.TryDispatchEvent(new QueueTaskStarted(task.Guid));
await task.Invoke();
this._logger?.LogInformation("Queued task finished...");
await this.TryDispatchEvent(new QueueTaskCompleted(task.Guid));
}
catch (Exception e)
{
await this.TryDispatchEvent(new DequeuedTaskFailed(task));
_errorHandler?.Invoke(e);
}
finally
{
Interlocked.Decrement(ref this._tasksRunningCount);
}
}
}
}