# Rust Crash Course - 04 - Threads and Concurrency

One of the main goals of Rust is *fearless concurrency*.

In the following, the usage of threads and related concurrency concepts in Rust are explained.

The contents are mainly based on the Rust Book by Steve Klabnik and Carol Nichols: https://doc.rust-lang.org/book/

## Threads

Rust allows one to create a new operating system thread by calling ``thread::spawn()`` from ``std::thread``.

In [None]:
use std::thread;

In order to allow the execution of other threads, a thread can be put into sleep by calling ``thread::sleep()`` with a duration from ``std::time::Duration``.

In [None]:
use std::time::Duration;

The code handed to the thread is called a *closure*.

Note that there is a call to ``thread::sleep()`` at the end of the following example so that the program waits for all threads to finish.

In [None]:
fn thread_or_main_thread() {
    
    // start new thread that executes a loop
    thread::spawn(|| {
        for i in 1..10 {
            println!("THREAD OUTPUT: {}", i);
            thread::sleep(Duration::from_millis(1));   // sleep and allow execution of other threads
        }
    });

    // execute a loop in main thread
    for i in 1..5 {
        println!("MAIN OUTPUT: {}", i);
        thread::sleep(Duration::from_millis(1));       // sleep and allow execution of other threads
    }
}

thread_or_main_thread();
println!("Function execution finished ...");

thread::sleep(Duration::from_millis(100));             // wait for threads to finish (function does not wait)

However, waiting for threads to finish with the ``thread::sleep()`` function is not the most reasonable way.

The result of a thread is a ``JoinHandle<T>`` that might contain a return value from a thread of type ``T`` and can be joined after the thread has finished execution. This is a better way to wait for unfinished threads and ``unwrap()`` is used for trivial error handling.

In [None]:
fn thread_or_main_thread_join() {
    
    // start new thread that executes a loop
    let handle = thread::spawn(|| {
        for i in 1..10 {
            println!("THREAD OUTPUT: {}", i);
            thread::sleep(Duration::from_millis(1));   // sleep and allow execution of other threads
        }
    });

    // execute a loop in main thread
    for i in 1..5 {
        println!("MAIN OUTPUT: {}", i);
        thread::sleep(Duration::from_millis(1));       // sleep and allow execution of other threads
    }
    
    handle.join().unwrap();                            // wait for thread result to join
}

thread_or_main_thread_join();
println!("Function execution finished ...");

Often, threads need to process data from the calling thread. However, Rust cannot infer if the thread lives longer or shorter than the main thread. Therefore, data has to be moved into the new thread by using the ``move`` keyword with ``thread::spawn()``.

Try to remove ``move`` and see what happens.

In [None]:
fn thread_move() {
    
    let primes: [i32; 8] = [2, 3, 5, 7, 11, 13, 17, 19];   // data from main thread
    
    let handle = thread::spawn(move || {                   // defines that required data is moved into thread
        for i in 0..8 {
            println!("THREAD OUTPUT: {}", primes[i]);
            thread::sleep(Duration::from_millis(1));       // sleep and allow execution of other threads
        }
    });

    for i in 0..8 {
        println!("MAIN OUTPUT: {}", primes[i]);
        thread::sleep(Duration::from_millis(1));           // sleep and allow execution of other threads
    }
    
    handle.join().unwrap();
}

thread_move();
println!("Function execution finished ...");

## Mutual Exclusion for Shared Data with ``Mutex<T>``

A common scenario in concurrent computations is the sharing of mutable variables. In many programming languages, this is handled by the concept of a *mutex*, a mutually exclusive access to shared data.

In Rust, a mutex can be instantiated by calling ``Mutex::new()`` from ``std::sync::Mutex``.

For detailed information, see https://doc.rust-lang.org/std/sync/struct.Mutex.html.

In [None]:
use std::sync::Mutex;

In order to lock the mutex, one has to call ``lock()``. The succeeding ``unwrap()`` handles a possible error situation and is not in focus here.

If the lock can be acquired, one can access the variable behind the mutex by dereferencing with ``*``.

After leaving the scope, e.g. a function, the mutex is unlocked automatically.

In [None]:
fn print_mutex(mtx: &Mutex<u32>) {
    println!("MUTEX DEBUG:   {:?}", mtx);
}

fn zero_mutex(mtx: &Mutex<u32>) {
    let mut n = mtx.lock().unwrap();       // mutex lock is aquired
    *n = 0;                                // dereferencing allows access to mutex variable
}                                          // mutex is unlocked automatically after n leaves scope

let m = Mutex::new(5);

print_mutex(&m);
zero_mutex(&m);
print_mutex(&m);

## Sharing Data by Reference Counting with ``Rc<T>``

The smart pointer ``Rc<T>`` from ``std::rc::Rc`` enables sharing data in single-threaded cases and counting the created references on the shared data.

For details, see https://doc.rust-lang.org/std/rc/struct.Rc.html.

In [None]:
use std::rc::Rc;

struct DayString {
    prefix: String,
    day: Rc<String>,
}

let day_str = String::from("day");                  // create data on the heap
let day_str_rc: Rc<String> = Rc::new(day_str);      // create reference counter for that data

println!("Rc count = {} (after creation)", Rc::strong_count(&day_str_rc));

{ 
    let monday_str = DayString { prefix: String::from("mon"), day: Rc::clone(&day_str_rc) };
    println!("Monday String: {}{}", monday_str.prefix, monday_str.day);
    println!("Rc count = {} (after monday clone)", Rc::strong_count(&day_str_rc));

    let tuesday_str = DayString { prefix: String::from("tues"), day: Rc::clone(&day_str_rc) };
    println!("Tuesday String: {}{}", tuesday_str.prefix, tuesday_str.day);
    println!("Rc count = {} (after tuesday clone)", Rc::strong_count(&day_str_rc));

    {
        let wednesday_str = DayString { prefix: String::from("wednes"), day: Rc::clone(&day_str_rc) };
        println!("Wednesday String: {}{}", wednesday_str.prefix, wednesday_str.day);
        println!("Rc count = {} (after wednesday clone)", Rc::strong_count(&day_str_rc));
    };
    
    println!("Rc count = {} (after wednesday clone left scope)", Rc::strong_count(&day_str_rc));
};

println!("Rc count = {} (after all clones left scope)", Rc::strong_count(&day_str_rc));

## Atomic Reference Counting with ``Arc<T>``

Although ``Rc<T>`` enables managing multiple references on shared data, it is not safe to use it in multi-threaded scenarios, because it does not implement protection measures for counting up and down references. Therefore, this might lead to wrong counts. If you would try to do so, the Rust compiler will tell you that this is not allowed.

However, there is the thread-safe primitive ``Arc<T>`` from ``std::sync::Arc``. While its functionality is more expensive in terms of overhead, it uses safe atomic operations for reference counting.

See https://doc.rust-lang.org/std/sync/struct.Arc.html, for details.

In [None]:
use std::sync::Arc;

fn count_with_threads() {
    
    let cntr = Arc::new(Mutex::new(0));                // create atomic reference counter for mutex on counter
    let mut handle_vec = vec![];                       // vector to collect all thread handles

    for _ in 0..42 {                                   // _ represents a not used (loop) variable
        let cntr_thread = Arc::clone(&cntr);           // clones mutex on counter
        let handle = thread::spawn(move || {           // creates new threads
            let mut n = cntr_thread.lock().unwrap();   // get lock on counter value
            *n += 1;                                   // increase counter value
        });
        handle_vec.push(handle);                       // push thread handle to handle vector
    }

    for handle in handle_vec {                         // wait for all threads to finish
        handle.join().unwrap();
    }

    println!("Counter Value: {}", *cntr.lock().unwrap());
}

count_with_threads();

## Exercise

The following exercise may help to practice the obtained knowledge.

### Multi-Threaded Average Computation

Extend the given function in a way that it takes the three measured time series and spawns 32 threads to compute the 32 average values of all three time series, resulting in an average time series.

In [None]:
fn average_with_threads() {

    let time_series_1 = vec![0.3746159125, 0.2361180109, 0.3378222126, 0.3889137705,
                             0.7939929678, 0.8965447566, 0.0025118861, 0.6385968099,
                             0.8674781412, 0.6601930348, 0.9572774544, 0.9381198140,
                             0.3056447072, 0.2665908600, 0.0133173369, 0.8197983090,
                             0.7433749131, 0.0439387530, 0.6669699395, 0.9390223500,
                             0.4035295925, 0.8354899994, 0.2600206713, 0.8082156640,
                             0.0838627935, 0.8768263590, 0.5783809980, 0.6945670911,
                             0.5793987002, 0.8084666539, 0.8561550624, 0.9307076209];

    let time_series_2 = vec![0.0189625067, 0.2439122623, 0.0438636327, 0.7645780703,
                             0.7963895690, 0.5217134415, 0.7228504208, 0.1318460353,
                             0.6853588089, 0.4007476913, 0.6026601135, 0.1334964979,
                             0.4791455524, 0.7088239179, 0.2397349057, 0.7939479349,
                             0.1995520750, 0.6500906015, 0.8553767667, 0.2266357112,
                             0.9149782985, 0.9970827177, 0.4288467251, 0.2903316867,
                             0.5334403306, 0.0004823761, 0.1138787079, 0.8923116925,
                             0.1291085839, 0.9031569160, 0.4396658674, 0.0958836336];
    
    let time_series_3 = vec![0.8141894986, 0.1330894087, 0.0913975503, 0.8819964392,
                             0.6968876854, 0.8336690037, 0.5928581439, 0.5477573420,
                             0.5994034220, 0.6489357528, 0.6719878308, 0.5026737468,
                             0.2456916874, 0.0245392908, 0.6129218998, 0.7244434205,
                             0.5609155413, 0.9067474801, 0.1528659589, 0.2726414966,
                             0.2081765468, 0.7501682686, 0.2350835574, 0.2784958920,
                             0.1980785938, 0.7129439713, 0.6218828190, 0.7592176281,
                             0.1686769254, 0.0147286985, 0.8693620120, 0.9322303511];

    let mut time_series_avg: Vec<f64> = Vec::new();
    time_series_avg.resize(time_series_1.len(), 0.0);

    // TODO
    
    let mut handle_vec = vec![];

    // loop through 32 threads
    for i in 0..32 {

        // TODO

        // spawn threads
        let handle = thread::spawn(move || {

            // TODO

            println!("THREAD {:02}: DONE!", i);
        });

        handle_vec.push(handle);
    }

    // join handles
    for handle in handle_vec {
        handle.join().unwrap();
    }

    // output averaged time series

    // TODO

    println!("Time Series Average:");
    for i in 0..8 {
        println!("{:.10}, {:.10}, {:.10}, {:.10}",
                 // TODO
                )
    }

}

average_with_threads();

// ### Correct Output ###
//
// Time Series Average:
// 0.4025893059, 0.2043732273, 0.1576944652, 0.6784960933
// 0.7624234074, 0.7506424006, 0.4394068169, 0.4394000624
// 0.7174134574, 0.5699588263, 0.7439751329, 0.5247633529
// 0.3434939823, 0.3333180229, 0.2886580475, 0.7793965548
// 0.5012808431, 0.5335922782, 0.5584042217, 0.4794331859
// 0.5088948126, 0.8609136619, 0.3079836513, 0.4590144142
// 0.2717939060, 0.5300842355, 0.4380475083, 0.7820321372
// 0.2923947365, 0.5754507561, 0.7217276473, 0.6529405352