# Example Lecture


The below is an example of a real world talk I gave to junior engineers on how Tasks work and how to leverage co-routines for performance in C#

---


# Coroutines, Concurrency & Parallel Threads

The worst phrase I have read in programming is _"This programming language is slower than another programming language"_. Most languages are fast enough. The faster, lower-level, languages give you a way to program faster, but it is much easier to write slower code if you are not careful.

Asynchronous processes are the best and the worst of this! Many languages have their own way around async processes, some are easier than others. In Python you'll often find Celery; In TypeScript you'll find Promises (and so on...).

In C#, we have Task-based asynchronicity - and a whole bunch of baggage to learn to make your code fast when using Tasks.


## Preamble A - Tasks In C#

Tasks are the API provided to us by C# to represent an asynchronous (non-linear) operation.

In simple terms, where a synchronous method executes and we are forced to wait for its completion, with an asynchronous Task the execution happens on a seperate thread and we can continue executing our code on, only getting the result as and when we need it.


Take the following Task which waits one second, prints a value, then returns it:


In [None]:
public async Task<int> WaitOneSecondThenGimmeAValue(int value) {
    var oneSecondInMilliseconds = 1000;
    await Task.Delay(oneSecondInMilliseconds);
    Console.WriteLine($"Fine then... here's the value: {value}");
    return value;
}

We now have a number of ways to perform this task. The simplest way is to use the `await` keyword. When the await keyword is applied, it suspends the calling method and yields control back to its caller until the awaited task is complete - in otherwords we are **_forced_** to wait for the task to complete. On top of this, if a value is returned, we get the value back straight away!

Under the hood, C# implements a Task Scheduler to ensure the operations and continuations happen in the right order - think we can all agree this sugar syntax is much nicer to handle.


In [None]:
var preciousValue = await WaitOneSecondThenGimmeAValue(5);
preciousValue.Display();

To have the task execute without us waiting for it, we can simply call it without the `await` keyword. However, we don't get a value printed this time...


In [None]:
var preciousTask = WaitOneSecondThenGimmeAValue(5);

That's because the execution of the main thread finishes well before the Task (which takes time) - therefore there's no continuation.

However if we give the task enough time to complete, it will execute in the background


In [None]:
var preciousTask = WaitOneSecondThenGimmeAValue(5);
await Task.Delay(1100);

We can then retrieve the result at any time with `.Result`. However, for reasons amounting to deadlocking and dis-aggregated error messages, this should never be used. Instead we use the `.GetAwaiter().GetResult()` to get net nice error messaging and fewer deadlocks.

Notice the time of execution is still 1.1s or so, proving the task happens at the same time as the current execution.


In [None]:
var preciousTask = WaitOneSecondThenGimmeAValue(5);
await Task.Delay(1100);
preciousTask.GetAwaiter().GetResult().Display();

Side Note: The `await` keyword really calls an extension method of `GetAwaiter` on the declared type - you can technically find a way to await any type!


In [None]:
using System.Runtime.CompilerServices;
using System.Threading.Tasks;

public static TaskAwaiter<T[]> GetAwaiter<T>(this (Task<T>, Task<T>) tasksTuple) {
    return Task.WhenAll<T>(tasksTuple.Item1, tasksTuple.Item2).GetAwaiter();
}

public static TaskAwaiter<string> GetAwaiter(this string stringToAwait) {
    return Task.FromResult(stringToAwait).GetAwaiter();
}

public async Task WaitConcurrently() {
    await (WaitOneSecondThenGimmeAValue(5), WaitOneSecondThenGimmeAValue(5));
    await "Hello World";
}

await WaitConcurrently();

## Pre-Amble B - Thread-Safe (or Concurrent) Collections

Most collections are not thread safe - this means it is not safe to write to these collections concurrently. If you try to write to a `List` from two threads, you may end up with 2 threads competing for write access and you don't know which one will win.

With the System.Collections.Concurrent collections, we have a way in the standard library of using collections, or data-structures, that are thread safe. They all do something slightly different to preserve thread-safety, but have some clever locking mechanisms somewhere between `SemaphoreSlim` and use of the `Interlocked` class (both of which you can use to try make your own thread safe collections)

- `ConcurrentQueue` (preserves write order)
- `ConcurrentBag` (doesnt preserve write order - also the fastest option)
- `ConcurrentStack` (preserves write order, but LIFO)
- `ConcurrentDictionary` (keyed values)


In [None]:
using System.Collections.Concurrent;
using System.Linq;

var collection = new ConcurrentQueue<int>();
collection.Enqueue(42);


More recently the Channels library has become a fast, thread-safe way to read-write collections


In [None]:
#r "nuget: System.Threading.Channels"
using System.Threading.Channels;
var channel = Channel.CreateUnbounded<int>();
await channel.Writer.WriteAsync(42);
await channel.Writer.WriteAsync(43);
channel.Writer.Complete(); // You must complete the reading at some point
await foreach (var item in channel.Reader.ReadAllAsync()) {
    item.Display();
}

## Pre-Amble C - Options For Concurrent/Parallel Processing


### Parallel Library


In [None]:

var inputList = new List<int>() {1,2,3,4,5};
var concurrentBag = new ConcurrentBag<string>();
await Parallel.ForEachAsync(
    inputList,
    new ParallelOptions()
    {
        MaxDegreeOfParallelism = 10
    },
    (produced, ct) =>
    {
        concurrentBag.Add(produced.ToString());
        return ValueTask.CompletedTask;
    }
);
concurrentBag.ToList().Display()

### Parallel Linq (PLinq)


In [None]:
var list = new List<int>() {1,2,3,4,5}.AsParallel().Select(val => val.ToString());
list.Display(); 

### Dataflow Library


In [None]:
#r "System.Threading.Tasks.Dataflow"
using System.Threading.Tasks.Dataflow;
using System.Threading.Tasks;
using System.Threading;


var transformBlock = new TransformBlock<int, string>(
    produced => produced.ToString(),
    new ExecutionDataflowBlockOptions()
    {
        MaxDegreeOfParallelism = 10
    }
);
var bufferBlock = new BufferBlock<string>();
transformBlock.LinkTo(bufferBlock);

foreach (var produced in new List<int>() {1,2,3,4,5})
{
    transformBlock.Post(produced);
}

transformBlock.Complete();
await transformBlock.Completion.WaitAsync(CancellationToken.None);
bufferBlock.Complete();
bufferBlock.TryReceiveAll(out var output);
output.Display();

In [None]:
#r "nuget: System.Reactive"
#r "nuget: System.Reactive.Async, 6.0.0-alpha.18"
#r "nuget: System.Linq.Async"
using System.Reactive;
using System.Linq;
using System.Reactive.Linq;

var o = new List<int>() {1,2,3,4,5}.ToObservable();
foreach (var resVal in o.Select(val => val.ToString())) {
    resVal.Display();
}

// System.Reactive (& AsyncRx) has really, really poor docs and support - I wouldn't recommend using unless you really, really want to.


---


## Problem 1 - Processing asynchronous threads

> We have an asynchronous process that takes a chunk of time to complete (let's say 1 second). We then need to take the result and perform some post processing to it. Furthermore we need to do this for many tasks (let's say 10) and return the processed results in a collection.

How do we get this as fast as possible?


In [None]:
public async Task<int> DoStuff(int value, bool debugOutput = true) {
    var oneSecondInMilliseconds = 1000;
    await Task.Delay(oneSecondInMilliseconds);
    if (debugOutput) Console.WriteLine($"Fine then... here's the value: {value}");
    return value;
}

public async Task<string> ProcessStuff(int value) {
    return await Task.FromResult<string>((value + 1000).ToString());
}

### Solution A: Iterate

We can await the tasks and perform the processing iteratively in a foreach loop


In [None]:

var output = new List<string>();
foreach (var value in Enumerable.Range(1,10)) 
{
    var consumable = await DoStuff(value);
    var processed = await ProcessStuff(consumable);
    output.Add(processed);
}
output.Display()

Fine then... here's the value: 1
Fine then... here's the value: 2
Fine then... here's the value: 3
Fine then... here's the value: 4
Fine then... here's the value: 5
Fine then... here's the value: 6
Fine then... here's the value: 7
Fine then... here's the value: 8
Fine then... here's the value: 9
Fine then... here's the value: 10


### Solution B - Iterate With Tasks

We iterate but only over the task and await all tasks post-iteration


In [None]:
var tasks = new List<Task<int>>();
foreach (var value in Enumerable.Range(1,10)) 
{
    tasks.Add(DoStuff(value)); // This presents a problem.... What is it?
}
var produced = await Task.WhenAll<int>(tasks);
var processingTasks = new List<Task<string>>();
foreach (var producedValue in produced) {
    processingTasks.Add(ProcessStuff(producedValue));
}
var output = await Task.WhenAll<string>(processingTasks);
output.Display(); // Order preserved

### Solution C - Iterate With Channels

Similar to Solution B but with Channels


In [None]:
using System.Threading.Channels;
var channel1 = Channel.CreateUnbounded<Task<int>>();
foreach (var value in Enumerable.Range(1,10)) {
    await channel1.Writer.WriteAsync(DoStuff(value));
}
var channel2 = Channel.CreateUnbounded<Task<string>>();
await foreach (var value in channel.Reader.ReadAllAsync()) {
    await channel2.Writer.WriteAsync(ProcessStuff(value));
}
channel1.Writer.Complete();
(await Task.WhenAll(await channel2.Reader.ReadAsync())).Display();
channel2.Writer.Complete();


### Solution D - Parallel Linq

We iterate in parallel using the Parallel Linq library.


In [None]:
var output = new ConcurrentBag<string>();
foreach (var value in Enumerable.Range(1,10).AsParallel().Select(val => DoStuff(val).GetAwaiter().GetResult())) 
{
    var processed = await ProcessStuff(value);
    output.Add(processed);
}
output.Display()

## Solution E - Dataflow & Parallel


In [None]:
using System.Threading.Tasks.Dataflow;
using System.Threading.Tasks;
using System.Threading;


var transformBlock = new TransformBlock<int, string>(
    async produced => await ProcessStuff(produced),
    new ExecutionDataflowBlockOptions()
    {
        MaxDegreeOfParallelism = 10
    }
);
var bufferBlock = new BufferBlock<string>();
transformBlock.LinkTo(bufferBlock);

await Parallel.ForEachAsync(
    Enumerable.Range(0, 10),
    new ParallelOptions()
    {
        MaxDegreeOfParallelism = 10
    },
    async (produced, ct) =>
    {
        transformBlock.Post(await DoStuff(produced));
    }
);

transformBlock.Complete();
await transformBlock.Completion.WaitAsync(CancellationToken.None);
bufferBlock.Complete();
bufferBlock.TryReceiveAll(out var output);
output.Display()

Fine then... here's the value: 3
Fine then... here's the value: 9
Fine then... here's the value: 2
Fine then... here's the value: 1
Fine then... here's the value: 0
Fine then... here's the value: 6
Fine then... here's the value: 4
Fine then... here's the value: 7
Fine then... here's the value: 8
Fine then... here's the value: 5


## Solution F - Parallel


In [None]:
var concurrentBag = new ConcurrentBag<string>();
await Parallel.ForEachAsync(
    Enumerable.Range(0, 10),
    new ParallelOptions()
    {
        MaxDegreeOfParallelism = 10
    },
    async (produced, ct) =>
    {
        concurrentBag.Add(await ProcessStuff(produced));
    }
);
concurrentBag.Display();

## Mid-Amble - Coroutines

![image.png](attachment:image.png)


---

## Problem 2 - Real World Paginated API calls

Imagine you make a GET request to a paginated API that includes `?pagenum=x&pagecount=y`. You want to read all pages, take each response and process all of them as quick as possible. Which solution do you use?

Lets assume we get we get an easy way in the response to tell what the next page is...
