-
Notifications
You must be signed in to change notification settings - Fork 2
/
LowLevel_Task.cs
58 lines (47 loc) · 2.09 KB
/
LowLevel_Task.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
using System;
using System.Net.Http;
using System.Threading;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Threading.Tasks; //Task.WhenAll
namespace ThrottledParallelism.Strategies
{
public class LowLevel_Task : IGovernedParallelDownloader
{
static readonly HttpClient client = new HttpClient();
public Task DownloadThemAllAsync(IEnumerable<Uri> uris, ProcessResult processResult, byte maxThreads)
{
//Single Producer
var sharedUris = Producer(uris); //!SPOT: There is no workerSynchronizer
//!SPOT: Ordering is important here < GetConsumingEnumerable() is blocking
//Multiple Consumers < Fork phase
var consumers = new List<Task>(maxThreads); //!SPOT: Replaced wrapper Task to Task collection
for (var i = 0; i < maxThreads; i++)
consumers.Add(ConsumerAsync(sharedUris, processResult)); //!SPOT: HOT task, starts automatically
//Waiting for all workers to finish < Join phase
return Task.WhenAll(consumers)
.ContinueWith(_ => sharedUris?.Dispose());
}
BlockingCollection<Uri> Producer(IEnumerable<Uri> uris)
{
var sharedUris = new BlockingCollection<Uri>();
foreach (var uri in uris)
sharedUris.Add(uri);
//Signaling producing is over
sharedUris.CompleteAdding();
return sharedUris;
}
async Task ConsumerAsync(BlockingCollection<Uri> uris, ProcessResult processResult)
{
//If we would process in parallel then it would break the contract
//Process sequential
foreach (var uri in uris.GetConsumingEnumerable()) //!SPOT: This is blocking!
await WorkerAsync(uri, processResult);
}
async Task WorkerAsync(Uri uri, ProcessResult processResult)
{
var content = await client.GetStringAsync(uri).ConfigureAwait(false);
processResult(Thread.CurrentThread.ManagedThreadId.ToString(), content);
}
}
}