-
Notifications
You must be signed in to change notification settings - Fork 2
/
LowLevel_Task_AttachToParent.cs
68 lines (57 loc) · 2.67 KB
/
LowLevel_Task_AttachToParent.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
using System;
using System.Net.Http;
using System.Threading;
using System.Collections.Generic;
using System.Collections.Concurrent; //BlockingCollection
using System.Threading.Tasks; //Task, Task.Factory.StartNew, TaskCreationOption
namespace ThrottledParallelism.Strategies
{
public class LowLevel_Task_AttachToParent : IGovernedParallelDownloader
{
static readonly HttpClient client = new HttpClient();
//https://kudchikarsk.com/tasks-in-csharp/task-parallelism-c/#attaching-child-tasks-to-a-parent-task
public Task DownloadThemAllAsync(IEnumerable<Uri> uris, ProcessResult processResult, byte maxThreads)
{
var sharedUris = new BlockingCollection<Uri>(); //!SPOT: can't using block here
var consumerSynchronizer = new Task(() => //!SPOT: Replaced CountdownEvent to Task
{
//Multiple Consumers
for (var i = 0; i < maxThreads; i++)
{
//Spans child jobs < Fork phase
Task.Factory.StartNew<Task>( //!SPOT: Replaced QueueUserWorkItem to StartNew << it is a hot task
async state =>
{
var urls = (BlockingCollection<Uri>)state;
await ConsumerAsync(urls, processResult);
},
sharedUris,
TaskCreationOptions.AttachedToParent)
.Unwrap(); //!SPOT: Flattens Task<Task> to Task
}
//Single Producer
Producer(sharedUris, uris);
}); //!SPOT: it's just a declaration << it is a cold task
consumerSynchronizer.Start();
//Waiting for all workers to finish < Join phase
return consumerSynchronizer
.ContinueWith(_ => sharedUris?.Dispose());
}
void Producer(BlockingCollection<Uri> sharedUris, IEnumerable<Uri> uris)
{
foreach (var uri in uris)
sharedUris.Add(uri);
//Signaling producing is over
sharedUris.CompleteAdding();
}
async Task ConsumerAsync(BlockingCollection<Uri> uris, ProcessResult processResult)
{
foreach (var uri in uris.GetConsumingEnumerable())
await WorkerAsync(uri, processResult);
}
Task WorkerAsync(Uri uri, ProcessResult processResult)
=> client.GetStringAsync(uri)
.ContinueWith(downloadTask => processResult(Thread.CurrentThread.ManagedThreadId.ToString(), downloadTask.Result),
TaskContinuationOptions.NotOnFaulted);
}
}