-
Notifications
You must be signed in to change notification settings - Fork 0
/
BatchQueueCommandHandler.cs
95 lines (79 loc) · 2.67 KB
/
BatchQueueCommandHandler.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
namespace AExpense.Data.Process
{
using System;
using System.Linq;
using System.Threading.Tasks;
using AExpense.Data.Storage;
public static class BatchQueueCommandHandler
{
public static BatchProcessingQueueHandler<T> For<T>(IAzureQueue<T> queue) where T : AzureQueueMessage
{
return BatchProcessingQueueHandler<T>.For(queue);
}
}
public class BatchProcessingQueueHandler<T> : GenericQueueHandler<T> where T : AzureQueueMessage
{
private readonly IAzureQueue<T> queue;
private TimeSpan interval;
private IAzureQueue<T> poisonMessageQueue;
protected BatchProcessingQueueHandler(IAzureQueue<T> queue)
{
this.queue = queue;
this.interval = TimeSpan.FromMilliseconds(200);
}
public static BatchProcessingQueueHandler<T> For(IAzureQueue<T> queue)
{
if (queue == null)
{
throw new ArgumentNullException("queue");
}
return new BatchProcessingQueueHandler<T>(queue);
}
public BatchProcessingQueueHandler<T> WithPosionMessageQueue(IAzureQueue<T> poisonQueue)
{
if (poisonQueue == null)
{
throw new ArgumentNullException("poisonQueue");
}
this.poisonMessageQueue = poisonQueue;
return this;
}
public BatchProcessingQueueHandler<T> Every(TimeSpan intervalBetweenRuns)
{
this.interval = intervalBetweenRuns;
return this;
}
public virtual void Do(IBatchQueueCommand<T> batchQueueCommand)
{
Task.Factory.StartNew(
() =>
{
while (true)
{
this.Cycle(batchQueueCommand);
}
},
TaskCreationOptions.LongRunning);
}
protected void Cycle(IBatchQueueCommand<T> batchQueueCommand)
{
try
{
batchQueueCommand.PreRun();
bool continueProcessing;
do
{
var messages = this.queue.GetMessages(32);
ProcessMessages(this.queue, this.poisonMessageQueue, messages, batchQueueCommand.Run);
continueProcessing = messages.Count() > 0;
}
while (continueProcessing);
batchQueueCommand.PostRun();
this.Sleep(this.interval);
}
catch (TimeoutException)
{
}
}
}
}