# ⚡️Parallel Programming in .NET

<img src=images/multi-cores.jpg>

https://learn.microsoft.com/en-us/dotnet/api/system.threading 👈

- Thread Management: Thread, ThreadStart, ParameterizedThreadStart
- Task Parallel Library: Task, Task<T>, TaskCompletionSource<T>, Parallel, ParallelOption
- Thread Pooling: ThreadPool
- Synchronization Primitives: Monitor, Mutex, Semaphore, SemaphoreSlim, SpinLock, SpinWait
    - Locking Primitives: ReaderWriteLock, ReaderWriterLockSlim, LockCookie
    - Barrier
    - CountdownEvent
    - ManualResetEvent, ManualResetEventSlim, AutoResetEvent
- Thread Signaling: Interlocked, Volatile
- Timers: Timer
- Thread Safety: ThreadLocal, LazyInitializer

## 1- Threading Primitives

In [None]:
//ThreadPool
using static System.Console;

void work(Object stateInfo) 
{
    WriteLine("Hello from the thread pool.");
}

ThreadPool.QueueUserWorkItem(work);
WriteLine("Main thread does some work, then sleeps.");
Thread.Sleep(1000);

WriteLine("Main thread exits.");

- https://learn.microsoft.com/en-us/archive/msdn-magazine/2006/march/net-matters-abortable-thread-pool 👈

In [None]:
//LazyInitializer
using System.Threading;

class ExpensiveData { }

ExpensiveData data = null;
bool dataInitialized = false;  
object dataLock = new object();  

ExpensiveData dataToUse = LazyInitializer.EnsureInitialized(ref data, ref dataInitialized, ref dataLock);

In [None]:
using System.Threading;

readonly static object lockObject = new object(); // 👈 static?

void work()
{
    lock (lockObject)
    {
        // Critical section code
    }
}

void workUsingMonitor()
{
    Monitor.Enter(lockObject);
    try
    {
        // Critical section code
    }
    finally
    {
        Monitor.Exit(lockObject);
    }
}

In [None]:
var namedMutex = new Mutex(false, "Global\\MyMutex"); // OS level support is used

- Mutex can be used; it put the waiting thread to sleep; while SpinWait keeps the thread "spinning"; if wait time is less; SpinWait is more efficient
    - Mutex can be system wide (different applications doing synchronization)
- Monitor is another thread synchronization primitive and is similar to Mutex but with more features; Wait/Pulse
    - We have higher order language construct; lock which uses Monitor

__Resources__
- https://learn.microsoft.com/en-us/dotnet/standard/threading/overview-of-synchronization-primitives

In [None]:
// SpinLock
using System.Threading;

var spinLock = new SpinLock();

void work()
{
    bool lockTaken = false;

    try
    {
        spinLock.Enter(ref lockTaken);

        // Critical section
        Console.WriteLine($"Thread {Thread.CurrentThread.ManagedThreadId} is in the critical section.");
        Thread.Sleep(1000); // Simulate some work
    }
    finally
    {
        if (lockTaken)
            spinLock.Exit();
    }
}

var thread1 = new Thread(work);
var thread2 = new Thread(work);

thread1.Start();
thread2.Start();

thread1.Join();
thread2.Join();

- https://khurram-aziz.github.io/posts/net-4-barrier-class 👈

In [None]:
//Barrier
using System.Threading;

var barrier = new Barrier(0, b => // initial participantt count for barrier plus the action performed when someone is added
{
    Console.WriteLine($"{b.ParticipantCount} are going...");
});

void friendLogic(int identifier)
{
    Console.WriteLine($"{identifier} is interested");
    barrier.AddParticipant();

    Thread.Sleep(1000);

    if (identifier % 2 == 0)
    {
        Console.WriteLine($"{identifier} is not interested anymore");
        barrier.RemoveParticipant();
    }
    else
        barrier.SignalAndWait(); // we have reached and now waiting at barrier
}

var friend1 = Task.Run(() => friendLogic(1));
var friend2 = Task.Run(() => friendLogic(2));
var friend3 = Task.Run(() => friendLogic(3));
var friend4 = Task.Run(() => friendLogic(4));
var friend5 = Task.Run(() => friendLogic(5));

Task.WaitAll(friend1, friend2, friend3, friend4, friend5);

In [None]:
//Semaphore
using static System.Console;
using System.Threading;

var semaphore = new SemaphoreSlim(3);

void AccessResource(int taskNumber)
{
    WriteLine($"Task {taskNumber} waiting to enter...");
    semaphore.Wait();

    try
    {
        WriteLine($"Task {taskNumber} has entered.");
        Thread.Sleep(1000); // Simulate some work
        WriteLine($"Task {taskNumber} is leaving.");
    }
    finally
    {
        semaphore.Release();
    }
}

var tasks = new Task[5];
for (int i = 0; i < tasks.Length; i++)
{
    int taskNumber = i; // Capture the current value of i
    tasks[i] = Task.Run(() => AccessResource(taskNumber));
}

Task.WaitAll(tasks);

In [None]:
Illustration by GPT

[Thread A] ----> | LOCK | ----> [Critical Section] ----> | UNLOCK | ----> [Thread A Continues]
[Thread B] ----> | WAIT | (Blocked until Thread A finishes and unlocks)


[Thread A] ----> [Checkpoint] --\
                                 \
[Thread B] ----> [Checkpoint] ----+----> [All Threads Proceed]
                                 /
[Thread C] ----> [Checkpoint] --/


[Thread A] ----> | Semaphore (2) | ----> [Resource Access]
[Thread B] ----> | Semaphore (2) | ----> [Resource Access]
[Thread C] ----> | Semaphore Full (Waits) |
(Semaphore count is 2, so only two threads can access the resource simultaneously)

__Signalling__

In [None]:
using System.Diagnostics;

var stopwatch = new Stopwatch();
stopwatch.Start();

void print(string s) => Console.WriteLine($"{stopwatch.Elapsed} - {s}");

int count = 3;
var countdown = new CountdownEvent(count);

for (int i = 0; i < count; i++)
{
    var taskNumber = i;     // we need to capture it; as when task will run; delegate will have most propably last number
    _ = Task.Run(() =>      // Closure
    {
        print($"Task {i} is starting.");                // using i instead of taskNumber
        Thread.Sleep(new Random().Next(1000, 3000));    // Simulating work
        print($"Task {taskNumber} is completed.");
        countdown.Signal(); // Signal that the task is done
    });
}

print("Main thread is waiting for tasks to complete...");
countdown.Wait(); // Wait for all tasks to signal
print("All tasks have completed. Main thread is resuming.");

stopwatch.Stop();


- https://khurram-aziz.github.io/posts/net-4-countdownevent 👈
- There are other signalling primitives; EventWaitHandle, AutoResetEvent, ManualResetEvent
- Monitor has Wait and Pulse
- https://arghya.xyz/articles/thread-synchronization-part-three

## 2- Timers ⏰

__System.Timers.Timer__
- general-purpose; not tied to any specific UI framework
- best suited for non-UI related tasks: background processing, data polling or scheduling

__System.Windows.Forms.Timer__
- specifically designed for Windows Forms applications
- raised events are tied to the UI thread
- best suited for UI related tasks; updating UI at regular intervals, animations or handling user inputs

__System.Threading.Timer__
- general-purpose; not tied to any specific UI framework
- high resolution
- best suited for real-time processing, network communication or asynchronous operations

- System.Timers.Stopwatch

## 3- Task Parallel Library

In [None]:
using System.Threading.Tasks;

//Run
Task task1 = Task.Run(() => { /* Do work */ });
// if we are not waiting / getting result; it will run in background

//Result
Task<int> task2 = Task.Run(() => { return 42; });
int result = task2.Result; // .Result will block calling thread

//FromResult
Task<int> task3 = Task.FromResult(42);

//FromException
Task task4 = Task.FromException(new InvalidOperationException());

//FromCancelled
CancellationTokenSource cts = new CancellationTokenSource();
Task task5 = Task.FromCanceled(cts.Token);

TaskCompletionSource<int> tcs = new TaskCompletionSource<int>();
Task<int> task6 = tcs.Task;
// Later, when the result is available:
tcs.SetResult(42); // waiting / blocking thread will get the result / completition

//ContinueWith
var task = Task.Run(() => {
    // Do some work
}).ContinueWith(t => {
    // we can check completed, faulted, cancelled etc using t
    // Continuation work
});

//Wait
Task task11 = Task.Run(() => { /* Do work */ });
task11.Wait();

//WaitAll
Task[] tasksWaitAll = { task1, task2, task3 };
Task.WaitAll(tasksWaitAll);

//WaitAny
Task[] tasksWaitAny = { task1, task2, task3 };
int completedTaskIndex = Task.WaitAny(tasksWaitAny);

//Delay
await Task.Delay(1000); // Waits for 1 second

//WhenAll
Task[] tasksWhenAll = { task1, task2, task3 };
await Task.WhenAll(tasksWhenAll);

//WhenAny
Task[] tasksWhenAny = { task1, task2, task3 };
Task firstCompleted = await Task.WhenAny(tasksWhenAny);

__Difference between Wait and When__
- Wait methods are blocking; When methods are non blocking and return another task that we can Wait
- Wait methods are Synchronous; When methods are Asynchronous
- Task Exception is propogated to calling thread; When methods return task so they can be used for Continuation

__Execution Flow and Tasks__
- 👉 Thread Pool is fire and forget
    - We can implement Abortable Thread Pool; https://learn.microsoft.com/en-us/archive/msdn-magazine/2006/march/net-matters-abortable-thread-pool 👈
    - https://github.com/topics/thread-pool?l=c%23
- 👉 Task is "started" / "running" when you create / get it
- 👉 Waiting appropriately: One has to Wait/Join/get result (optionally)
- ✅ Task Composition / Continuations
- ✅ Deadlock prevention
- ✅ Exception Handling
- ✅ Syncrhronization and Signalling Primitives
- ⚠️ How many threads to run; queue everything and let system starve; jump start load
    - Parallel class; Seperating Responsibility 🤝

<img src=images/parallel-programming.png>

In [None]:
using static System.Console;
using static System.DateTime;

using System.Threading;
using System.Threading.Tasks;

void work(int i)
{
    try // thread localized exception handling
    {
        Thread.Sleep(5000);
    }
    catch { }
}

WriteLine(Now);

try // orchestration level exception handling
{
    Parallel.Invoke(    // Seperating Responsibility 🤝
        () => work(0), () => work(1), () => work(2), () => work(3),
        () => work(4), () => work(5), () => work(6), () => work(7)
    );
}
catch { }

WriteLine(Now); // should take slightly more than 5s

In [None]:
using static System.Console;
using static System.DateTime;

using System.Threading;
using System.Threading.Tasks;

void work(int i) { Thread.Sleep(5000); }
//async Task work(int i) { await Task.Delay(1000); }

WriteLine(Now);

Parallel.For(0, 25, work); // should take slightly more than 5s

//var ct = CancellationToken.None;
//await Parallel.ForAsync(0, 25, async (i, ct) => await work(i)); // should take slightly more than 5s

WriteLine(Now);

- There's also Parallel.ForEach and Parallel.ForEachAsync
- Task-based Asynchronous Programming
- Data Parallelism

__Resources__
- https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/task-parallel-library-tpl 👈
- https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming

## 4- Real World Example

In [None]:
using System.Threading.Tasks;

Task task = Task.Run(() =>
{
    Console.WriteLine("Task is starting.");
    Thread.Sleep(5000);
    Console.WriteLine("Task is starting.");
});

Thread.Sleep(2000);
task.Wait(); // wait was missing 👈
Console.WriteLine("Main thread terminating");

In [None]:
using System.Threading.Tasks;

// 2
try
{
    // 1
    Task task =
    Task.Run(() =>
    {
        // 3
        try
        {
            Console.WriteLine("Step 1 done");
            throw new InvalidOperationException("This is an unhandled exception in the task.");
            Console.WriteLine("Step 2 done");
        // 3
        }
        catch (Exception ex)
        {
            throw;
        }

        Console.WriteLine("All steps done");
    });
    // 1
    task.Wait(); // Wait for the task to complete
// 2
}
catch (AggregateException ex) // exception handling was missing
{
    Console.WriteLine("Caught an aggregate exception: " + ex.Message);
    foreach (var innerEx in ex.InnerExceptions)
    {
        Console.WriteLine("Inner exception: " + innerEx.Message);
    }
}

Thread.Sleep(2000);

In [None]:
volatile bool loop = true;  // 👈
//int loop = 1; // 1 represents true, 0 represents false

void keepWriting()
{
    try
    {
        while(loop)
        //while (Interlocked.CompareExchange(ref loop, 1, 1) == 1) Interlocked doesnt support boolean
        {
            Thread.Sleep(1000);
            Console.WriteLine("Ticking...");
        }
    }
    catch { }
}

_ = Task.Run(() => keepWriting());
Thread.Sleep(5000);
loop = false;
//Interlocked.Exchange(ref loop, 0);
Thread.Sleep(1000);


In [None]:
using static System.Console;

using System.Diagnostics;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

void scanFolder(string folder, bool inParallel)
{
    long totalSize = 0;
    var stopWatch = new Stopwatch();
    stopWatch.Start();

    string[] files = Directory.GetFiles(folder);

    if (!inParallel)
    {
        foreach(var f in files)
        {
            var fi = new FileInfo(f);
            totalSize += fi.Length;
        }
    }
    else
    {
        Parallel.For(0, files.Length, index =>
        {
            var fi = new FileInfo(files[index]);
            long size = fi.Length;
            Interlocked.Add(ref totalSize, size);   // 👈
        });
    }

    stopWatch.Stop();

    WriteLine($"{files.Length:N0} files, {totalSize:N0} bytes, took {stopWatch.Elapsed}");
}

scanFolder(@"C:\Windows\System32", inParallel: false);
scanFolder(@"C:\Windows\System32", inParallel: true);

# 🗄️Concurrent Collections

- ConcurrentBag: Represents a thread-safe, unordered collection of objects
- There's also Concurrent Dictionary, Queue, Stack

In [None]:
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

var bag = new ConcurrentBag<int>();
var countdown = new CountdownEvent(3);

for (int i = 0; i < 3; i++)
{
    int producerId = i;

    _ = Task.Run(() =>
    {
        try
        {
            for (int j = 0; j < 5; j++)
            {
                bag.Add(producerId * 10 + j);
                Thread.Sleep(100); // Simulate work
            }
        }
        finally
        {
            countdownEvent.Signal();
        }
    });
}

var consumer = Task.Run(() =>
{
    while (true)
    {
        if (bag.TryTake(out int item))
        {
            Console.WriteLine($"Processed {item}");
            Thread.Sleep(50); // Simulate work
        }
        else if (countdownEvent.IsSet)
        {
            if (bag.IsEmpty) // If no more items are being added and the bag is empty, exit the loop
            {
                Console.WriteLine("All items processed.");
                break;
            }
        }
        else
        {
            // If the bag is empty but not all producers are done, wait a bit before retrying
            Thread.Sleep(100);
        }
    }
});

consumer.Wait();

BlockingCollection is a thread-safe collection class that provides:
- An implementation of the Producer-Consumer pattern.
- Concurrent adding and taking of items from multiple threads.
- Optional maximum capacity.
- Insertion and removal operations that block when collection is empty or full.
- Insertion and removal "try" operations that do not block or that block up to a specified period of time.
- Encapsulates any collection type that implements IProducerConsumerCollection<T>
- Cancellation with cancellation tokens.
- Two kinds of enumeration with foreach (For Each in Visual Basic):
    - Read-only enumeration
    - Enumeration that removes items as they are enumerated

In [None]:
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading;

var blockingCollection = new BlockingCollection<string>();

void print(string s) => Console.WriteLine($"{DateTime.Now} {s}");

var producer = Task.Factory.StartNew(() =>
{
    for(int i = 1; i <= 20; i ++)
    {
        blockingCollection.Add("value " + i);
        Thread.Sleep(400); // lets throw some fixed values to be processed periodically
    }
    print("Producer is done!");
});

bool fastWorkerRunning = true;

bool checkForNotFinished()
{
    if (blockingCollection.Count > 0)
        return true;
    else if (fastWorkerRunning)
        return true;
    else
    {
        Thread.Sleep(500);
        return blockingCollection.Count > 0;
    }
}

var consumerSlow = Task.Factory.StartNew(() =>
{
    while(checkForNotFinished())
    {                    
        print("Slow Worker: " + blockingCollection.Take());
        Thread.Sleep(1000); //slow processing
    }

    print("Slow Worker is quitting...");
});

var consumerFast = Task.Factory.StartNew(() =>
{
    var stopWatch = new Stopwatch();
    stopWatch.Start();
    while (true)
    {
        print("Fast Worker: " + blockingCollection.Take());
        Thread.Sleep(300); //lets take some time to take it in

        if (stopWatch.Elapsed.TotalSeconds > 2)
        {
            print("Fast Worker is quitting...");
            fastWorkerRunning = false;
            break;
        }
    }
});

// everything is started; we just need to Wait

producer.Wait();
consumerSlow.Wait();
consumerFast.Wait();

print("Finished");

- https://learn.microsoft.com/en-us/dotnet/api/system.collections.concurrent 👈
    - Partitioner 👈

## 1- Real World Example

In [None]:
using System.Collections.Concurrent;

var flags = new ConcurrentDictionary<string, bool>();

bool setFlag(string key) => flags.TryAdd(key, true); // from concurrent dictionary; if we succeed we get true
// indirectly one thread who will be able to add flag will get to know about it

In [None]:
if (setFlag("Connection_Successful"))
    Console.WriteLine("Connection is successfully established."); // we dont want to log too much; just once

string vmName = "AzureVM11";
string flagKey = $"Machine.InternalDnsName:{vmName}";
if (setFlag(flagKey))
    Console.WriteLine($"Failing to determine DNS name of {vmName}");

## 2- Distributed Computing - Collections and Constructs

- https://www.nuget.org/packages/StackExchange.Redis.Collections
- https://khurram-aziz.github.io/posts/zookeeper-higher-level-constructs 👈
- https://redis.io/docs/latest/develop/use/patterns/distributed-locks
    - Leader Election
- https://dapr.io
    - https://docs.dapr.io/concepts/building-blocks-concept 👈

# 💾PLINQ

In [None]:
using System.Linq;

var numbers = Enumerable.Range(1, 10000);
var evens = from n in numbers.AsParallel()
            where n % 2 == 0
            select n;

Console.WriteLine($"{evens.Count()} even numbers out of {numbers.Count()} total");

<img src=images/plinq.png>

In [None]:
using System.Linq;
using System.Threading.Tasks;

int ExpensiveFunction(int n)
{
    Thread.Sleep(100);
    return n;
}

var numbers = Enumerable.Range(1, 1000);

var queryA = from n in numbers.AsParallel()
             where n % 2 > 0
             select ExpensiveFunction(n);   //good for PLINQ  
var queryB = from n in numbers.AsParallel()
             where n % 2 > 0
             select n;                      //no point of using PLINQ

Console.WriteLine(DateTime.Now);
queryA.ToList();
Console.WriteLine(DateTime.Now);

- https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/introduction-to-plinq

# 🏁Terminating Things

<img src=images/while-loop.png>

In [None]:
using System.Diagnostics;
using System.Net.Http;
using System.Threading;

async Task<int> processUrlAsync(HttpClient client, string url, CancellationToken token)
{
    try
    {
        HttpResponseMessage response = await client.GetAsync(url, token);
        byte[] content = await response.Content.ReadAsByteArrayAsync(token);
        Console.WriteLine($"{url,-60} {content.Length,10:#,#}");

        return content.Length;
    }
    catch(Exception ex) // why its important
    {
        Console.WriteLine($"{url,-60} Failed {ex.GetType()}");
        return 0;
    }
}

async Task sumPageSizesAsync(string[] urls, CancellationToken token)
{
    var stopwatch = Stopwatch.StartNew();

    int total = 0;
    //foreach (var url in urls)
    Parallel.ForEach(urls, url =>
    {
        var client = new HttpClient() { MaxResponseContentBufferSize = 1_000_000 };
        int contentLength = processUrlAsync(client, url, token).Result;
        total += contentLength;
    });

    stopwatch.Stop();

    Console.WriteLine($"Total bytes returned:  {total:#,#}");
    Console.WriteLine($"Elapsed time:          {stopwatch.Elapsed}");
}


var urls = new string[] {
    "https://learn.microsoft.com",
    "https://learn.microsoft.com/aspnet/core",
    "https://learn.microsoft.com/azure",
    "https://learn.microsoft.com/azure/devops",
    "https://learn.microsoft.com/dotnet",
    "https://learn.microsoft.com/dynamics365",
    "https://learn.microsoft.com/education",
    "https://learn.microsoft.com/enterprise-mobility-security",
    "https://learn.microsoft.com/gaming",
    "https://learn.microsoft.com/graph",
    "https://learn.microsoft.com/microsoft-365",
    "https://learn.microsoft.com/office",
    "https://learn.microsoft.com/powershell",
    "https://learn.microsoft.com/sql",
    "https://learn.microsoft.com/surface",
    "https://learn.microsoft.com/system-center",
    "https://learn.microsoft.com/visualstudio",
    "https://learn.microsoft.com/windows",
    "https://learn.microsoft.com/xamarin" };

var ct = new CancellationTokenSource();

try
{
    ct.CancelAfter(2000);
    await sumPageSizesAsync(urls, ct.Token);
}
catch (OperationCanceledException)
{
    Console.WriteLine("Tasks cancelled: timed out");
}
finally
{}

- Threading Features https://learn.microsoft.com/en-us/dotnet/standard/threading/overview-of-synchronization-primitives
- Task Parallel Library; https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/task-parallel-library-tpl
- PLINQ; https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/introduction-to-plinq
- System.Collections.Concurrent; https://learn.microsoft.com/en-us/dotnet/api/system.collections.concurrent
- Dataflow; https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/dataflow-task-parallel-library

# 🎈Async Programming

In [None]:
using System.Threading.Tasks;
using System.Diagnostics;

// explicit
var get2ndPrime = new Task<int>(() => 2);
//the task is running and when complete we will get the Result
//we need to join the task, or wait for the task to complete

get2ndPrime.Wait();
var result = get2ndPrime.Result;

var secondPrime = new Task<int>(() => 2)
    .Result; // The Result property call is blocking call

async Task<int> GetNthPrime(int n)
{
    await Task.Delay(100);
    return await Task.FromResult(2);
}

int result1 = GetNthPrime(2).Result;
int result2 = await GetNthPrime(2); // to use await; we will need to async / color the caller

In [None]:
using System.Threading;

var random = new Random(Guid.NewGuid().GetHashCode());

// we want to make this async
bool CpuHeavyMethod()
{
    var next = random.Next();
    Thread.Sleep(1000);
    return next > (int.MaxValue / 2);
}

In [None]:
using System.Threading.Tasks;
using System.Diagnostics;

async ValueTask<int> GetNthPrime(int n)
{
    if (n < 10)
        return 2;
    else
    {
        await Task.Delay(100);
        return await ValueTask.FromResult(2);  
    }
}

for(int i = 0; i < 50; i++)
    _ = GetNthPrime(5).Result;

__Further Readings__

- https://learn.microsoft.com/en-us/dotnet/csharp/asynchronous-programming
- https://learn.microsoft.com/en-us/dotnet/csharp/asynchronous-programming/async-scenarios
- https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/task-based-asynchronous-programming
- https://learn.microsoft.com/en-us/dotnet/standard/asynchronous-programming-patterns
    - TAP: https://learn.microsoft.com/en-us/dotnet/standard/asynchronous-programming-patterns/task-based-asynchronous-pattern-tap
    - EAP: https://learn.microsoft.com/en-us/dotnet/standard/asynchronous-programming-patterns/event-based-asynchronous-pattern-overview
    - APM: https://learn.microsoft.com/en-us/dotnet/standard/asynchronous-programming-patterns/asynchronous-programming-model-apm

# 🎁Dataflow (Task Parallel Library)

- Sources & Targets
    - ISourceBlock<TOutput>
    - ITargetBlock<TInput>
    - IPropagatorBlock<TInput, TOutput>
    - Blocks
        - BufferBlock<T>
        - BroadcastBlock<T>
        - WriteOnceBlock<T>
    - Executions
        - ActionBlock<T>
        - TransformBlock<TIn, TOut>
        - TransformManyBlock<TIn, TOut>
    - Grouping
        - BatchBlock<T>
        - JoinBlock<T1, T2>
        - BatchedJoinBlock<T1, T2>
- Connecting Blocks
    - DataflowBlockOptions
    - ExecutionDataflowBlockOptions
    - GroupingDataflowBlockOptions
- Filtering
- Message Passing
    - Post, SendAsync
    - Receive, ReceiveAsync

In [None]:
#r "System.Threading.Tasks.Dataflow"

In [None]:
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks.Dataflow;

void print(string s) => Console.WriteLine($"[{Thread.CurrentThread.ManagedThreadId}] {s}");

var downloadString = new TransformBlock<string, string>(async uri =>
{
    print($"Downloading {uri}...");
    return await new HttpClient(
        //new HttpClientHandler { AutomaticDecompression = System.Net.DecompressionMethods.GZip }
        ).GetStringAsync(uri);
});

var createWordList = new TransformBlock<string, string[]>(text =>
{
    print("Creating word list...");
    char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
    text = new string(tokens);
    return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
});

var filterWordList = new TransformBlock<string[], string[]>(words =>
{
    print("Filtering word list...");
    return words
        .Where(word => word.Length > 3)
        .Distinct()
        .ToArray();
});

var findReversedWords = new TransformManyBlock<string[], string>(words =>
{
    print("Finding reversed words...");
    var wordsSet = new HashSet<string>(words);
    return from word in words.AsParallel()
        let reverse = new string(word.Reverse().ToArray())
        where word != reverse && wordsSet.Contains(reverse)
        select word;
});

var printReversedWords = new ActionBlock<string>(reversedWord =>
{
    print($"Found reversed words {reversedWord}/{new string(reversedWord.Reverse().ToArray())}");
});

In [None]:
// downloadString -> createWordList -> filterWordList -> findReversedWords

var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

downloadString.LinkTo(createWordList, linkOptions);
createWordList.LinkTo(filterWordList, linkOptions);
filterWordList.LinkTo(findReversedWords, linkOptions);
findReversedWords.LinkTo(printReversedWords, linkOptions);

In [None]:
print("Starting...");

// "The Iliad of Homer" by Homer: http://www.gutenberg.org/cache/epub/16452/pg16452.txt
downloadString.Post("https://en.wikipedia.org/wiki/Palindrome");
//it gets triggered
//we can do something else here
downloadString.Complete(); // marking pipeline that we are done
//we can more stuff here
printReversedWords.Completion.Wait();

print("Finished!");

- https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/dataflow-task-parallel-library
- https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/walkthrough-creating-a-dataflow-pipeline

# 🔀Channels

In [None]:
using System.Threading.Channels;

var channel = Channel.CreateUnbounded<int>(); // any numbers of producers or consumers can use this channel
//var channel = Channel.CreateBounded<int>(10);  // maximum capacity of messages

In [None]:
using System.Threading;
using System.Threading.Channels;

record struct Coordinates(
    Guid DeviceId,
    double Latitude,
    double Longitude);

void print(string s) => Console.WriteLine($"{DateTime.Now} {s}");
    
var channel = Channel.CreateUnbounded<Coordinates>();
ChannelWriter<Coordinates> writer = channel.Writer;
ChannelReader<Coordinates> reader = channel.Reader;

var consumerSlow = Task.Factory.StartNew(async () =>
{
    for (int i = 0; i < 5; i++)
    {
        Thread.Sleep(1000);
        var coordinate = await reader.ReadAsync();
        print($"Slow: {coordinate}");
    }
});
var consumerFast = Task.Factory.StartNew(async () =>
{
    for (int i = 0; i < 5; i++)
    {
        Thread.Sleep(500);
        var coordinate = await reader.ReadAsync();
        print($"Fast: {coordinate}");
    }
});

var producer = Task.Factory.StartNew(() =>
{
    var deviceId = Guid.NewGuid();

    for (int i = 0; i < 9; i++)
        writer.TryWrite(new Coordinates(deviceId, 0, 0));

    print($"Waiting for 5sec");
    Thread.Sleep(5000);

    if (writer.TryWrite(new Coordinates(deviceId, 10, 10)))
        print($"Sent the last one");
    else
        print($"Failed");
    
    /*
     * above we are producing 10 values and consuming 10 values
     * in real world we often dont know how many values to consume
     * in these cases we can use "punctuations"
     *      producer sends special data telling that its done
     *      in the above example; it will need to learn how many consumers
     *      are still running and send that many "end of data" punctuation
     *      we can use Task's IsCompleted; or our orchestrator can tell how
     *      many conusmers are running
     *      message queues / event buses have exchange / routine concept
     *      a single message from producer is broadcasted to all listeners
     */
     
});

Task.WaitAll(producer, consumerSlow, consumerFast);

Thread.Sleep(1000);
print("Finished");

In [None]:
static void ProduceWithWhileAndTryWrite(ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 })
    {
        var tempCoordinates = coordinates with
        {
            Latitude = coordinates.Latitude + .5,
            Longitude = coordinates.Longitude + 1
        };

        if (writer.TryWrite(item: tempCoordinates))
            coordinates = tempCoordinates;
    }
}

static async ValueTask ConsumeWithWhileAsync(ChannelReader<Coordinates> reader)
{
    while (true)
    {
        // May throw ChannelClosedException if
        // the parent channel's writer signals complete.
        Coordinates coordinates = await reader.ReadAsync();
        Console.WriteLine(coordinates);
    }
}

## 1- Java Streams are cool

__Channels and Java's Streams API Similarities__

- Asynchronous Processing: Both channels and streams allow for asynchronous processing of data. This means that data can be produced and consumed independently of each other, which can lead to more efficient use of system resources
- Backpressure: Both channels and streams support the concept of backpressure. This is a mechanism that allows the producer of data to be notified when the consumer is unable to keep up with the rate of data production, allowing the producer to adjust its rate accordingly
- Functional Programming: Both channels and streams are designed to be used in a functional programming style. This means that they support operations like map, filter, and reduce, which can be composed together to process data in a declarative and composable way

__Channels and Java's Streams API Differences__
- Data Flow Direction: Channels in .NET are bidirectional, meaning that data can be sent in both directions between the producer and consumer. Streams in Java are unidirectional, meaning that data can only be sent from the producer to the consumer
- Error Handling: Channels in .NET support the concept of faulting, which allows errors to be propagated along the channel. Streams in Java do not have a built-in mechanism for error handling
- Synchronization: Channels in .NET are synchronous by default, meaning that the producer will block if the consumer is not ready to receive data. Streams in Java are asynchronous by default, meaning that the producer will not block if the consumer is not ready to receive data

*__Similar but not exactly same__*

__Resources__
- https://learn.microsoft.com/en-us/dotnet/core/extensions/channels
- https://devblogs.microsoft.com/dotnet/an-introduction-to-system-threading-channels

In [None]:
Stream.generate(new Supplier<Integer>() {   // how its being declared / instantiated and invoked
    int a = 0;                              // anonymous inner class; sadly we dont have it in C#
    int b = 1;

    public Integer get() {
        int temp = a;
        a = b;
        b = a + temp;
        return temp;
    }
}).limit(10).forEach(System.out::println);

In [None]:
IEnumerable<int> FibonacciSequence()
{
    (int a, int b) = (0, 1);
    while (true)
    {
        yield return a;
        (a, b) = (b, a + b);
    }
}

foreach (var number in FibonacciSequence().Take(10))
    Console.WriteLine(number);

FibonacciSequence().Take(10).ToList().ForEach(n => Console.WriteLine(n));

In [None]:
using System.Linq;

(int a, int b) = (0, 1);
Enumerable.Range(0, 10)
    .Select(index =>
    {
        int r = a;
        (a, b) = (b, a + b);
        return r;
    })
    .ToList()
    .ForEach(n => Console.WriteLine(n));

## 2- Declarative Programming with Channels

- sync.cs

In [None]:
#r "nuget: Open.ChannelExtensions, 8.3.0"

In [None]:
using System.Threading.Channels;
using System.Linq;
using Open.ChannelExtensions;

class ThumbnailGenerator
{
    public async Task<object> ReadMetaDataAsync(string filePath)
    { return await Task.FromResult<object>(null); }

    public async Task<object> CreateImageAsync(object frontMatter)
    { return await Task.FromResult<object>(null); }

    public async Task SaveImageAsync()
    { await Task.CompletedTask; }
}

var directory = @"C:\\Windows\\System32";
var generator = new ThumbnailGenerator();

await Channel
    .CreateBounded<string>(50000)
    .Source(Directory.GetFiles(directory))
    .PipeAsync(
        maxConcurrency: 2,
        capacity: 100,
        transform: async filePath =>
        {
            var metaData = await generator.ReadMetaDataAsync(filePath);
            return (filePath, metaData);
        })
    .Filter(tuple => tuple.Item2 != null)
    .PipeAsync(
        maxConcurrency: 10,
        capacity: 20,
        transform: async tuple =>
        {
            var (filePath, metaData) = tuple;
            var image = await generator.CreateImageAsync(metaData);

            return (filePath, metaData, image);
        })
    .ReadAllAsync(async tuple =>
    {
        var (filePath, metaData, image) = tuple;
        //some logic

        await generator.SaveImageAsync();
    });


## 3- Dataflow vs Channels

__Dataflow__
- Complex Pipelines and Workflows: When you need to process data through multiple stages, with each stage potentially running in parallel and asynchronously
- Customizable Task Scheduling: When you need fine-grained control over task scheduling, execution, and resource management
- Message Passing Between Blocks: When you need to pass messages between different processing blocks with options for filtering, batching, and transforming data
- Backpressure Handling: When you need to handle backpressure by controlling the flow of data through the pipeline to prevent overloading of downstream components

__Channels__
- High-Performance Producer-Consumer Scenarios: When you need a low-overhead way to implement producer-consumer patterns
- Streaming Data: When you need to process a continuous stream of data with minimal latency
- Simple Pipelines: When you need a simpler pipeline without the complex features of TPL Dataflow
- Custom Buffering Strategies: When you need custom buffering strategies and precise control over how data is read and written

# 🎁Namespace of the Day

## System.IO.Pipelines

- This namespace provides a library for building high-performance I/O pipelines. It's particularly useful for scenarios involving streams and network programming.
- Key classes include Pipe, PipeReader, and PipeWriter.

These namespaces and libraries provide a rich set of tools for writing parallel and concurrent applications in C#. They help manage synchronization, communication, and workload distribution across multiple threads and processors

# 🎯 Composing Code

__Break Down__
- Procedural Programming
- Object Oriented Programming
- Functional Programming

__Control Flow__
- Linear Control Flow
- Control Structures: Conditional Structures (if/else)
- Control Structures: Looping Structures
- Control Structures: Branching Structure (switch)
- Control Structures: Jump Statements (goto, return, continue, break, yield)
- Control Structures: Exception Handling

__Execution Flow__
- Threading
- Concurrency
- Parallelism
- Asynchronous Programming
- Task Scheduling

__Data Flow__
- Data Flow Programming
- Producer-Consumer Model
- Reactive Programming
- Pipeline
- Stream Processing

__Resources__
- https://devblogs.microsoft.com/pfxteam
- https://github.com/dotnet/samples/tree/main/csharp/parallel/ParallelExtensionsExtras
