# Multithreading in Rust

## Managing Threads

### Creating and joining threads

* Creating a thread using `std::thread::spawn`
* Joining threads to wait for their completion using `join()`

In [6]:
use std::thread;

fn background_counting() {
    for i in 1..=10 {
        println!("Counting {} in background {:?}", i, thread::current().id());
        thread::sleep(std::time::Duration::from_millis(50));
    }
}

let thd_1 = thread::spawn(background_counting);

let thd_2 = thread::spawn(|| {
    for i in 1..=10 {
        println!("Counting {} in spawned {:?}", i, thread::current().id());
        thread::sleep(std::time::Duration::from_millis(50));
    }
});

thd_1.join().unwrap();
thd_2.join().unwrap();

Counting 1 in background ThreadId(2)
Counting 1 in spawned ThreadId(3)
Counting 2 in background ThreadId(2)
Counting 2 in spawned ThreadId(3)
Counting 3 in background ThreadId(2)
Counting 3 in spawned ThreadId(3)
Counting 4 in background ThreadId(2)
Counting 4 in spawned ThreadId(3)
Counting 5 in spawned ThreadId(3)
Counting 5 in background ThreadId(2)
Counting 6 in spawned ThreadId(3)
Counting 6 in background ThreadId(2)
Counting 7 in background ThreadId(2)
Counting 7 in spawned ThreadId(3)
Counting 8 in background ThreadId(2)
Counting 8 in spawned ThreadId(3)
Counting 9 in spawned ThreadId(3)
Counting 9 in background ThreadId(2)
Counting 10 in spawned ThreadId(3)
Counting 10 in background ThreadId(2)


## Sharing Data Between Threads


### Transfer Ownership

* Safely transfer ownership of data to threads


In [7]:
let data_vec = (1..=1000).collect::<Vec<i32>>();

let thd_process = thread::spawn(move || {
    println!("Processing data_vec in thread {:?}", thread::current().id());
    thread::sleep(std::time::Duration::from_millis(500));
    let sum: i32 = data_vec.iter().sum();
    // panic!("Intentional panic after processing data_vec with sum {}", sum);
    sum
});

match thd_process.join() {
    Ok(sum) => println!("Sum of data_vec: {}", sum),
    Err(e) => println!("Thread panicked: {:?}", e),
}

Processing data_vec in thread ThreadId(4)
Sum of data_vec: 500500


()

### Shared Ownership - Immutable Data

* Using `Arc<T>` for shared ownership across threads

In [8]:
:dep rand = "0.9"

use rand::random_range;
use std::iter::from_fn;
use std::sync::Arc;

let data: Vec<i32> = from_fn(|| Some(random_range(1..=100))).take(100).collect();
let shared_data = Arc::new(data);

let thd_sum = {
    let data_clone = Arc::clone(&shared_data);
    thread::spawn(move || {
        let sum: i32 = data_clone.iter().sum();
        println!("Sum in thread {:?}: {}", thread::current().id(), sum);
        sum
    })
}; 

let thd_max = {
    let data_clone = Arc::clone(&shared_data);
    thread::spawn(move || {
        let min = data_clone.iter().min().cloned().unwrap();
        println!("Min in thread {:?}: {}", thread::current().id(), min);
        let max = data_clone.iter().max().cloned().unwrap();
        println!("Max in thread {:?}: {}", thread::current().id(), max);
        (min, max)
    })
};

let total_sum = thd_sum.join().unwrap();
let (min_value, max_value) = thd_max.join().unwrap();

println!("Final Results - Total Sum: {}, Min: {}, Max: {}", total_sum, min_value, max_value);

Min in thread ThreadId(6): 1
Max in thread ThreadId(6): 100
Sum in thread ThreadId(5): 5025
Final Results - Total Sum: 5025, Min: 1, Max: 100


### Shared Mutable State Between Threads

In [2]:
use std::sync::{Arc, Mutex};

pub struct BankAccount {
    // balance: f32
    balance: Mutex<f32>,
}

impl BankAccount {
    pub fn new(initial_balance: f32) -> Self {
        BankAccount { balance: Mutex::new(initial_balance) }
    }
    
    pub fn deposit(&self, amount: f32) {
        *self.balance.lock().unwrap() += amount;
    }
    
    pub fn withdraw(&self, amount: f32) -> Result<(), String> {
        let mut balance = self.balance.lock().unwrap();
        if amount > *balance {
            Err("Insufficient funds".to_string())
        } else {
            *balance -= amount;
            Ok(())
        }
    }

    fn transfer(&self, to: &BankAccount, amount: f32) -> Result<(), String> {
        let mut from_balance = self.balance.lock().unwrap();
        if amount > *from_balance {
            return Err("Insufficient funds".to_string());
        }
        let mut to_balance = to.balance.lock().unwrap();

        *from_balance -= amount;
        *to_balance += amount;
        
        Ok(())
    }
    
    pub fn balance(&self) -> f32 {
        *self.balance.lock().unwrap()
    }
}

In [3]:
fn pay_in(account: Arc<BankAccount>, amount: f32, count: usize) {
    for _ in 0..count {
        if account.balance() < 200.0 {
            println!("Balance low: ${}, depositing ${}", account.balance(), amount);
            account.deposit(amount);
        } 

        std::thread::sleep(std::time::Duration::from_millis(10));
    }
}

fn take_out(account: Arc<BankAccount>, amount: f32, count: usize) {
    for _ in 0..count {
        match account.withdraw(amount) {
            Ok(_) => println!("Withdrew ${}, new balance: ${}", amount, account.balance()),
            Err(e) => println!("Failed to withdraw ${}: {}", amount, e),
        }

        std::thread::sleep(std::time::Duration::from_millis(20));
    }
}

In [4]:
fn main() {
    let mut account = Arc::new(BankAccount::new(100.0));

    let mut account_for_pay_in = Arc::clone(&account);
    let pay_in_thread = std::thread::spawn(move || pay_in(account_for_pay_in, 50.0, 10));

    let mut account_for_take_out = Arc::clone(&account);
    let take_out_thread = std::thread::spawn(move || take_out(account_for_take_out, 30.0, 10));

    pay_in_thread.join().unwrap();
    take_out_thread.join().unwrap();
}

main();

Withdrew $30, new balance: $70
Balance low: $70, depositing $50
Balance low: $120, depositing $50
Withdrew $30, new balance: $140
Balance low: $140, depositing $50
Balance low: $190, depositing $50
Withdrew $30, new balance: $210
Withdrew $30, new balance: $180
Balance low: $180, depositing $50
Withdrew $30, new balance: $200
Withdrew $30, new balance: $170
Withdrew $30, new balance: $140
Withdrew $30, new balance: $110
Withdrew $30, new balance: $80
Withdrew $30, new balance: $50


## Send and Sync

* The `Send` and `Sync` traits affect the ability to transfer and share data between threads
* Types that implement `Send` can be transferred to another thread
  * Most primitive types in Rust implement `Send`
  * Owned types like `String` and `Vec<T>` also implement `Send`
  * Types that do not implement `Send` include raw pointers and types that manage non-thread-safe resources: `Rc<T>`, `RefCell<T>`
* Types that implement `Sync` can be referenced from multiple threads safely
  * Immutable references (`&T`) are `Sync` if `T` is `Sync`
  * Mutable references (`&mut T`) are not `Sync`
  * Types that do not implement `Sync` include `Cell<T>` and `RefCell<T>`
  * Types like `Mutex<T>` and `RwLock<T>` provide safe shared mutable access across threads

## Deadlocks

In [4]:
use std::sync::{Arc, Mutex};

mod Bank {
    use std::sync::{Arc, Mutex};

    pub struct BankAccount {
        id: u32,
        balance: Mutex<f32>,
    }

    impl BankAccount {
        pub fn new(id: u32, initial_balance: f32) -> Self {
            BankAccount { id, balance: std::sync::Mutex::new(initial_balance) }
        }

        pub fn transfer(&self, to: &BankAccount, amount: f32) -> Result<(), String> {

            let (mut from_balance, mut to_balance) = if self.id < to.id {
                let from_balance = self.balance.lock().unwrap();
                let to_balance = to.balance.lock().unwrap();
                (from_balance, to_balance)
            } else {
                let to_balance = to.balance.lock().unwrap();
                let from_balance = self.balance.lock().unwrap();
                (from_balance, to_balance)
            };

            //let mut from_balance = self.balance.lock().unwrap();
            //let mut to_balance = to.balance.lock().unwrap();

            *from_balance -= amount;
            *to_balance += amount;
            
            Ok(())
        }
        
        pub fn balance(&self) -> f32 {
            *self.balance.lock().unwrap()
        }

        pub fn id(&self) -> u32 {
            self.id
        }
    }

    pub fn transfer_funds(from: Arc<BankAccount>, to: Arc<BankAccount>, amount: f32, count: usize) {
        for _ in 0..count {
            match from.transfer(&to, amount) {
                Ok(_) => println!("Transferred ${} from BankAccount#{} to BankAccount#{}. New balances: from ${}, to ${}", 
                                amount, from.id(), to.id(), from.balance(), to.balance()),
                Err(e) => println!("Failed to transfer ${} from BankAccount#{} to BankAccount#{}: {}", 
                                amount, from.id(), to.id(), e),
            }

            std::thread::sleep(std::time::Duration::from_millis(15));
        }
    }

}

fn main() {
    let account_a = Arc::new(Bank::BankAccount::new(1, 500.0));
    let account_b = Arc::new(Bank::BankAccount::new(2, 300.0));

    let account_a_for_transfer_1 = Arc::clone(&account_a);
    let account_b_for_transfer_1 = Arc::clone(&account_b);

    let thd_transfer_from_a_to_b = std::thread::spawn(move || {
        Bank::transfer_funds(account_a_for_transfer_1, account_b_for_transfer_1, 50.0, 10);
    });

    let account_a_for_transfer_2 = Arc::clone(&account_a);
    let account_b_for_transfe_2 = Arc::clone(&account_b);
    let thd_transfer_from_b_to_a = std::thread::spawn(move || {
        Bank::transfer_funds(account_b_for_transfe_2, account_a_for_transfer_2, 30.0, 10);
    });

    thd_transfer_from_a_to_b.join().unwrap();

    println!("Final Balance of Account A: ${}", account_a.balance());
    println!("Final Balance of Account B: ${}", account_b.balance());
}

main();

Transferred $50 from BankAccount#1 to BankAccount#2. New balances: from $450, to $350
Transferred $30 from BankAccount#2 to BankAccount#1. New balances: from $320, to $480
Transferred $30 from BankAccount#2 to BankAccount#1. New balances: from $290, to $510
Transferred $50 from BankAccount#1 to BankAccount#2. New balances: from $460, to $340
Transferred $30 from BankAccount#2 to BankAccount#1. New balances: from $310, to $490
Transferred $50 from BankAccount#1 to BankAccount#2. New balances: from $440, to $360
Transferred $30 from BankAccount#2 to BankAccount#1. New balances: from $330, to $470
Transferred $50 from BankAccount#1 to BankAccount#2. New balances: from $420, to $380
Transferred $30 from BankAccount#2 to BankAccount#1. New balances: from $350, to $450
Transferred $50 from BankAccount#1 to BankAccount#2. New balances: from $400, to $400
Transferred $30 from BankAccount#2 to BankAccount#1. New balances: from $370, to $430
Transferred $50 from BankAccount#1 to BankAccount#2. N

## Thread Communication With Channels

* Using channels (`std::sync::mpsc`) for thread communication simplifies data transfer between threads
* Channels provide a way to send data from one thread to another safely
  * Create a channel with `mpsc::channel()` - returns a sender and a receiver
  * Send data between threads using the `send()` method on the sender
  * Receive data in the main thread using the `recv()` method or iterating over the receiver

In [5]:
use std::thread;
use std::sync::mpsc;

fn main() {
    let (sender, receiver) = mpsc::channel();

    let sender_clone = sender.clone();
    let thd_sender = thread::spawn(move || {
        let message = "Hello from the sender thread".to_string();
        sender_clone.send(message).unwrap();
    });

    let received_data = receiver.recv().unwrap();
    println!("Received: {}", received_data);

    thd_sender.join().unwrap();
}

main();

Received: Hello from the sender thread


### Message Passing with Structs

In [6]:
use std::thread;
use std::sync::mpsc;

struct CustomMessage {
    id: u32,
    content: String,
}

fn main() {
    let (sender, receiver) = mpsc::channel();

    let sender_clone = sender.clone();
    let handle = thread::spawn(move || {
        let message = CustomMessage {
            id: 42,
            content: "Important data".to_string(),
        };
        sender_clone.send(message).unwrap();
    });

    let received_message = receiver.recv().unwrap();
    println!("Received Message - ID: {}, Content: {}", received_message.id, received_message.content);

    handle.join().unwrap();
}

main();

Received Message - ID: 42, Content: Important data
