# Why Parallel programming?

## Instruction Level Parallelism

* Processor hardware optimizes low-level instruction execution
* **Instruction pipelining:** Overlapped execution of serial instructions
* **Superscalar execution:** Multiple units of one processor are used in parallel
* **Out-of-order execution:** reorder instructions that do not have data depedencies
* **Speculative execution:** Control flow speculation and branch prediction

|                                                                                                Why ***Parallel*** computing                                                                                               |                                      Why parallel ***programming***                                     |
|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------|
| There are problems that cannot be made with <br>the current speed, because we reach the Brick Wall.                                                                                                                 | Auto-parallelism does not work well. It is <br>very inefficient. Possible only with certain codes |
| **Power Wall:** Because of Physical reasons it is <br>not possible to maintain a stable consumption of Power. <br><br> **Memory Wall:**<br>- Latency of response in the <br>memory with the CPU creates a Bottleneck<br><br>**Instruction Level Parallelism (ILP) Wall:** <br>- No longer cost-effective to dedicate new transistors <br>to ILP Mechanism <br>- Deeper pipilines make the power problem worse<br>- High ILP complexity effectively reduces the<br>processing speed for a given frequency  | Not possible for a lot of applications with auto-parallelism.                                     |

# Types of Parallelism

| Functional Parallelism                                                                                                 | Data Parallelism                                                                                                      |
|------------------------------------------------------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------|
| Views problem as a stream of instructions<br> that can be broken down into functions <br>to be executed simultaneously | View problem as an amount of data that can be broken down<br>into chunks to be independently operated upon (eg. code) |
| Each processing Element performs a different function                                                                  | Each processing Element performs a same function on<br>different piece of data                                        |
| Pipeline, Line of Production                                                                                           | Signal Processing                                                                                                     |

# Memory Achitecture

<img src="img/mem_arch.png" alt="Memory Architecture" width="80%" style="margin: 0 auto;">

## Shared Memory

| Uniform Memory Access (UMA)                        | Non-uniform memory access (NUMA)                  | Cache-coherent NUMA systems                                                 |
|----------------------------------------------------------|---------------------------------------------------------|---------------------------------------------------------------------------------|
| Each CPU has same access time to <br>each memory address | Memory has affinity to a processor                      | Distributed database storing location<br>and status cache of lines              |
| Simple design but limited scalability                    | Access to local memory faster than <br>to remote memory | Requires fast hardware because it must <br>be queried on every memory reference |
|                                                          | Harder to program but more scalable                     |                                                                                 |

<img src="img/types_mem.png" alt="Memory Architecture" width="80%" style="margin: 0 auto;">

# Single-threaded vs. multi-threaded

* All modern, pipelined CPUs suffer from the following problem
    * When a memory reference misses L1 or L2 caches, it takes a long time until the requested word is loaded into the cache
* On-chip multithreading allows the CPU to manage multiple threads to mask these stalls
* If one thread is stalled, the CPU can run another thread and keep the hardware busy
* Four hardware threads per core often sufficient to hide latency.

Knowning the amount of threads allow to have concurrency with the HW can be made using the `std::thread::hardware_concurrency` function

In [3]:
#include <iostream>
#include <thread>

unsigned int n = std::thread::hardware_concurrency();
std::cout << n << " concurrent threads are supported.\n";

4 concurrent threads are supported.


<img src="img/sing-multi-thread.png" alt="Memory Architecture" width="60%" style="margin: 0 auto;">

# Concurrency in C++

* the thread library is a set of components supporting traditional threads-and-locks-style system-level concurrent programming
    * threads
    * condition_variable
    * mutex
    * etc
* A task support library facilities supporting task-level concurrent programming
    * future
    * promise
    * packaged_task
    * async()
* Memory model – a set of guarantees for concurrent access to memory that basically ensures that simple and ordinary access works as one would naively expect
* Support for programming without locks – fine-grained low-level mechanisms for avoiding data races

In [4]:
void hello() {
    std::cout<<"Hello Concurrent World\n";
}

In [5]:
std::thread my_thread(hello);
my_thread.join();

Hello Concurrent World


In [6]:
std::thread lamba_thread([]{
    std::cout<<"Hello Concurrent World\n";
    std::cout<<"This is another message from concurrent world\n";
});
lamba_thread.join();

Hello Concurrent World
This is another message from concurrent world


# Background Threads

* `detach()` leaves the thread to run in the background without any means of communicating with it
    * No longer joinable
    * Ownership and control passed to the C++ runtime system
* Detached threads often called **daemon** threads after the UNIX concept of a daemon process that runs in the background
* Typically Long Running: There is a discussion about the behavior after the *Termination* of the main Code

# Cooperative Cancellation

C++20 provides `std::stop_source` and `std::stop_token` classes to handle a purely cooperative cancellation.

The new thread class in C++20 `std::jthread` integrates with `std::stop_token` to support cooperative cancellation.
* Destroying a std::jthread calls source.request_stop() and thread.join().
* The thread needs to check the stop token passed into the thread function.

```cpp
void thread_func(std::stop_token st, int arg1, int arg2) {
    while(!st.stop_requested()){
        do_stuff(arg1,arg2);
    }
}

void foo(int i) {
    std::jthread t(thread_func, i, 42);
    do_another_stuff();
} // destructor requests stop and joins
```

# Identifying threads

Thread identifiers are of type `std::thread::id`

* Can be obtained in two ways
    * `get_id()` member function
    * `std::this_thread::get_id()`
* If thread object does not have an associated thread of execution, `get_id()` returns default constructed `std::thread::id` object
    * Indicates “not any thread”

In [7]:
#include <iostream>
#include <thread>
#include <chrono>
 
void foo() {
    std::this_thread::sleep_for(std::chrono::seconds(1));
}

In [8]:
std::thread t1(foo);
std::thread::id t1_id = t1.get_id();

std::thread t2(foo);
std::thread::id t2_id = t2.get_id();

std::cout << "t1's id: " << t1_id << '\n';
std::cout << "t2's id: " << t2_id << '\n';

t1.join();
t2.join();

t1's id: 139640200578816
t2's id: 139639989991168


# Sharing data between threads

* No problem as long as all accesses are read only
* *Problems begin as soon as at least one threads modifies shared data*
* Problems often related to **Invariants**
    * **Invariants:** Statements about a particular data structure that should always be true

## Race condition

<img src="img/race_condition.png" alt="Race Condition" width="70%" style="margin: 0 auto;">

* Causes non-deterministic program behavior
    * Failure in programs expected to be deterministic
* Often benign – if all possible outcomes are acceptable
    * Example: often the order in which items are added to a queue does not matter – as long as all invariants are maintained

## Data Race condition

<img src="img/data_race.png" alt="Data Race Condition" width="70%" style="margin: 0 auto;">

<img src="img/data_race_types.png" alt="Data Race Condition Types" width="70%" style="margin: 0 auto;">

## Protecting shared data with mutexes

Mutexes ensure mutual exclusion during access to a data structure

<img src="img/mutexes.png" alt="Mutexes" width="40%">

In [9]:
#include <mutex>
#include <list>
#include <thread>
#include <vector>
#include <iostream>

#define NUMBER_OF_THREADS 10

std::list<int> some_list;
std::mutex some_mutex;

In [10]:
void add_to_list(int new_value) {
    std::lock_guard<std::mutex> guard(some_mutex);
    some_list.push_back(new_value);
}

In [11]:
// Test with Mutex
std::vector<std::thread> threads;

for (int i = 0; i < NUMBER_OF_THREADS; i++) {
    threads.emplace_back(add_to_list, i * 4);
}

for (auto& t : threads) {
    t.join();
}

std::cout << "Size of list with mutex: " << some_list.size() << std::endl;
for (auto& n : some_list) {
    std::cout << "Element in list with mutex: " << n << std::endl;
}

Size of list with mutex: 10
Element in list with mutex: 0
Element in list with mutex: 8
Element in list with mutex: 12
Element in list with mutex: 28
Element in list with mutex: 32
Element in list with mutex: 36
Element in list with mutex: 20
Element in list with mutex: 16
Element in list with mutex: 24
Element in list with mutex: 4


## Deadlock

<img src="img/deadlock_def.png" alt="Deadlock definition" width="70%" style="margin: 0 auto;">

* Can occur when multiple threads simultaneously try to lock multiple mutex variables
* Example - two threads T 1 and T 2 and two mutex variables ma and mb

## Unique Locks

Does not have to own the mutex
* Locking can be deferred
* Mutex can be unlocked before object is destroyed (via unlock())
* Movable
* More flexible – but incurs some overhead

## Condition variables

* Can be used to wait for an event
* Associated with an event or condition
* One or more threads can wait for the condition to be satisfied
* Thread that establishes the condition can notify waiting thread(s) and wake them up

In [12]:
#include <iostream>
#include <string>
#include <thread>
#include <mutex>
#include <condition_variable>
 
std::mutex m;
std::condition_variable cv;
std::string data;
bool ready = false;
bool processed = false;

In [13]:
void worker_thread() {
    // Wait until main() sends data
    std::unique_lock<std::mutex> lk(m);
    cv.wait(lk, []{return ready;});
 
    // after the wait, we own the lock.
    std::cout << "Worker thread is processing data\n";
    data += " after processing";
 
    // Send data back to main()
    processed = true;
    std::cout << "Worker thread signals data processing completed\n";
 
    // Manual unlocking is done before notifying, to avoid waking up
    // the waiting thread only to block again (see notify_one for details)
    lk.unlock();
    cv.notify_one();
}

In [14]:
std::thread worker(worker_thread);
 
data = "Example data";
// send data to the worker thread
{
    std::lock_guard<std::mutex> lk(m);
    ready = true;
    std::cout << "main() signals data ready for processing\n";
}
cv.notify_one();

// wait for the worker
{
    std::unique_lock<std::mutex> lk(m);
    cv.wait(lk, []{return processed;});
}
std::cout << "Back in main(), data = " << data << '\n';

worker.join();

main() signals data ready for processing
Worker thread is processing data
Worker thread signals data processing completed
Back in main(), data = Example data after processing


The `conditional_variable` can be used to wait for an event. One or more threads are going to wait for the predicated to be true. The `condition_variable` uses a `mutex` to block the access to the information. If the condition is not fulfil, the `condition_variable` will unlock the `mutex`, block the thread and saves its id to unlock when there is a `notify_one` or `notify_all`

The bigger difference between Condition Variables and Mutex is that the first ones can be used to implement *temporal ordering* between threads.

## Semaphores C++20

* A semaphore represents a number of available "slots". If you acquire a slot on the semaphore then the count is decreased until you release the slot.
* Attempting to acquire a slot when the count is zero will either block or fail.
* A thread may release a slot without acquiring one and vice versa.

```cpp
std::counting_semaphore<5> slots(5);

void func(){
    slots.acquire();
    do_stuff(); // at most 5 threads can be here
    slots.release();
}
```

## Waiting for an asynchronous event / task

<img src="img/future.png" alt="Use of Future in C++" width="70%" style="margin: 0 auto;">

* Starts an asynchronous task for which you don’t need the result right away
* Returns future object, which will eventually hold the value of the return function
* Calling `get()` on the future blocks the thread until future is ready and returns the value
* Allows additional arguments to be passed to the function – in the same way as `std::thread`

In [1]:
#include <string>
#include <chrono>
#include <thread>

std::string fetchDataFromDB(std::string recvdData) {
    // Make sure that function takes 5 seconds to complete
    std::this_thread::sleep_for(std::chrono::seconds(2));
    //Do stuff like creating DB Connection and fetching Data
    return "DB_" + recvdData;
}

In [2]:
std::string fetchDataFromFile(std::string recvdData) {
    // Make sure that function takes 5 seconds to complete
    std::this_thread::sleep_for(std::chrono::seconds(2));
    //Do stuff like fetching Data File
    return "File_" + recvdData;
}

In [3]:
// Single thread without async
#include <iostream>
#include <string>
#include <chrono>
#include <thread>

// Get Start Time
std::chrono::system_clock::time_point start = std::chrono::system_clock::now();
//Fetch Data from DB
std::string dbData = fetchDataFromDB("Data");
//Fetch Data from File
std::string fileData = fetchDataFromFile("Data");
// Get End Time
auto end = std::chrono::system_clock::now();
auto diff = std::chrono::duration_cast < std::chrono::seconds > (end - start).count();
std::cout << "Total Time Taken = " << diff << " Seconds" << std::endl;
//Combine The Data
std::string data = dbData + " :: " + fileData;
//Printing the combined Data
std::cout << "Data = " << data << std::endl;

Total Time Taken = 4 Seconds
Data = DB_Data :: File_Data


The code in `std::async` should be something like:

```cpp
// Get Start Time
system_clock::time_point start = system_clock::now();
std::future<std::string> resultFromDB = std::async(std::launch::async, fetchDataFromDB, "Data");
//Fetch Data from File
std::string fileData = fetchDataFromFile("Data");
//Fetch Data from DB
// Will block till data is available in future<std::string> object.
std::string dbData = resultFromDB.get();
// Get End Time
auto end = system_clock::now();
auto diff = duration_cast < std::chrono::seconds > (end - start).count();
std::cout << "Total Time Taken = " << diff << " Seconds" << std::endl;
//Combine The Data
std::string data = dbData + " :: " + fileData;
//Printing the combined Data
std::cout << "Data = " << data << std::endl;
```

## Using `std::packaged_task`

`std::packaged_task<>` is a class template and represents a asynchronous task. It encapsulates,

* A callable entity i.e either function, lambda function or function object.
* A shared state that stores the value returned or thrown exception by associated callback.

The idea is to have more control over the execution than with a normal `std::async` call. This would allow us to execute the `std::package_task` in a specific `thread` and in a specific time.

For mote details about the difference between `std::package_task` and `std::future` go to this StackOverflow [Question](https://stackoverflow.com/questions/18143661/what-is-the-difference-between-packaged-task-and-async#:~:text=Use%20std%3A%3Aasync%20if,threads%20or%20call%20them%20later.)

```cpp
// Fetch some data from DB
std::string getDataFromDB( std::string token)
{
    // Do some stuff to fetch the data
    std::string data = "Data fetched from DB by Filter :: " + token;
    return data;
}
int main()
{
    // Create a packaged_task<> that encapsulated the callback i.e. a function
    std::packaged_task<std::string (std::string)> task(getDataFromDB);
    // Fetch the associated future<> from packaged_task<>
    std::future<std::string> result = task.get_future();
    // Pass the packaged_task to thread to run asynchronously
    std::thread th(std::move(task), "Arg");
    // Join the thread. Its blocking and returns when thread is finished.
    th.join();
    // Fetch the result of packaged_task<> i.e. value returned by getDataFromDB()
    std::string data =  result.get();
    std::cout <<  data << std::endl;
    return 0;
}
```

Expected Output:

```bash
Data fetched from DB by Filter :: Arg
```

## Using Promises

The promise has a similar principle as the `std::package_task` with the difference that the `std::promise` could set the value of the future in any moment of the function. Meanwhile, the `std::package_task` only could do that as return of the value of the function.

Because of that `std::promise` could also be used inside of methods as parameter that does not have the need to return values.

The `std::promise` cannot be copied, therefore is has to always be used with the `std::move` parameter.