- Java thread is an execution path
- thread actions are implemented using `run` of `Runnable` interface
- thread **execution order is not guaranteed**
- scheduling to run via `start` method of thread class
- Thread schedule allocates portions of CPU time via _Thread Scheduler_
- there are
    - system threads (throw `Error`) 
    - user-defined threads (throw `Exception`)
- _Java threads time-slice hardware threads (processors) provided by the the CPU cores and can be interrupted at 
any time to give way to another thread_
- if less CPUs then threads
    - time sharing between threads (stop + let other thread execute)
    - order of thread execution is not predictable (stochastic)
- anology
    - cars = threads
    - physical roads = CPU cores
    - if more cars then roads, then time-sharing (or road-sharing)
- terminology
    - parallel: really happens at the same time
    - concurrent
        - executing multiple task at the same time
        - not necessarlily simultaneously
- context switch
    - the process of storing a thread's current state and later restoring the state of the thread to continue execution
    - there is often a cost associated with a context switch by way of lost time saving and reloading a thread's state


<img src=attachment:image.png width=400></img>
<img src=attachment:image-2.png width=600></img>

## Implements Runnable
- common practice > _recommmended_
- Functional interface (one abstract method)
    - takes no argument
    - returns no data

```java
@FunctionalInterface 
public interface Runnable {
    void run();
}
```

In [1]:
//  ----------------+
//                  v
class HiSayer implements Runnable {
    public void run() {
        System.out.println("hi");
    }
}

In [2]:
Thread t = new Thread(new HiSayer());
t.start();

hi

## Runnable Via Lambda
- good for small amount of actions

In [3]:
Runnable r = () -> System.out.println("Via Lambda");
Thread t = new Thread(r);
t.start();

Via Lambda

## Runnable Class vs. Lambda
- class let's you pass additional data via the constructor

In [4]:
public class CalculateAverages implements Runnable {
    private double[] scores;
    public CalculateAverages(double[] scores) {
        this.scores = scores;
    }
    
    public void run() {
      // Define work here that uses the scores object       
    }
}

## Extends Thread
- use Thread abstract class
- already implements `Runnable`
- not a flexible design > _not recommended_

In [5]:
//  --------------------+
//                      v
public class MuhSayer extends Thread {
    public void run() {
        System.out.println("muh");
    }
}

Thread t = new MuhSayer();
t.start();  // <----- required to start
// t.run();  // <----synchronously start

muh

## Thread Life Cycle
<img src=attachment:image.png width=700></img>

- you can only go from "new" to "runnable"
- from "runnable" all later states can be reached
- most of the later states can return to "runnable"


- same `Runnable` can be processed by multiple threads (parallel processing)
- life cycle phases can be checked
- same thread _can't start twice_

In [1]:
import java.util.concurrent.atomic.*;

In [4]:
AtomicBoolean firstRun = new AtomicBoolean(true);

public void sleep(long millis) {
    try {
        Thread.sleep(millis);    
    } catch (Exception e) {
    // swallowed
    }
}


Runnable r = () -> {
    
    if (firstRun.get()) {
        firstRun.set(false);    
        System.out.println("first run");
        sleep(100);
    }
    
    sleep(500);
    System.out.println("inside");
};

Thread t1 = new Thread(r);
Thread t2 = new Thread(r);
t1.start();
t2.start();
System.out.println("t1 alive: " + t1.isAlive());
System.out.println("t2 alive: " + t2.isAlive());

System.out.println("t1 state: " + t1.getState());
System.out.println("t2 state: " + t2.getState());
sleep(400);
System.out.println("t1 state: " + t1.getState());
System.out.println("t2 state: " + t2.getState());

first run
t1 alive: true
t2 alive: true
t1 state: TIMED_WAITING
t2 state: TIMED_WAITING
inside
inside
t1 state: TERMINATED
t2 state: TERMINATED


## Interrupt Thread Via Signal
- logic inside `run` is in charge of the life cycle decisions
- a thread in _runnable_ state _may_ check if it has received an interrupt signal
- a thread in _waiting_ or _timed waiting_ state 
    - _must_ catch `InterruptedException`
    - puts it back into _runnable_ state (wakeup) and runs the according logic (catch block of the exception) 
- `InterruptedException` > `RUNNABLE` state

In [8]:
Runnable r = () -> {
    Thread currentThread = Thread.currentThread();
    while (!currentThread.isInterrupted()) {
        try {
            Thread.sleep(1000);   // enter TIMED_WAITING for 1 sec
        } catch (InterruptedException e) {
            System.out.println("woke up - terminating");
            return;
        }
    }
};
Thread t = new Thread(r);
t.start();
//sleep(100);
System.out.println(t1.getState());
t.interrupt();

TERMINATED
woke up - terminating

- reacting to `interrupt` signal 
    - while in `RUNNABLE` state via `ct.isInterrupted()`
    - while in `TIMED_WAITING` `sleep` is interrupted and throws the `InterruptedException`


## Block Thread
- helps to coordinate order of execution of threads
- _monitor object_ - helps to coordinate execution order of threads
- anology
    - monitor object = traffic light
    - thread itself turns traffic light red ("I wanna be blocked")
- thread that first enters synchronized block remains in RUNNABLE state
- all other threads wait until block is released
- then next thread enters RUNNABLE

In [9]:
// note: synchronized on method level lead to syntax error, possible not supported by jshell?
public class SynchronizedTest {
    
    private AtomicInteger cnt = new AtomicInteger(0);

    public void miau() {
        System.out.println("Thread no " + cnt.incrementAndGet());
        /*   TODO understand why I dont see all 3 threads output when sleep is turned on
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            System.out.println("interrupted");  // should be not called
        }
        */
    }
    
}

SynchronizedTest s = new SynchronizedTest();
Runnable r = () -> {
    synchronized (s) {
        s.miau();
    }
};

new Thread(r).start();
new Thread(r).start();
new Thread(r).start();

Thread no 1
Thread no 2
Thread no 3

<img src=attachment:image.png width=700></img>

- `a()` - only threads sharing the same instance of the class block each other. Threads on an other instance form a new thread group
- `b()` - any thread using Some.b() will block
- `c()` - guessing. similar to `a()` but for the whole instance. Access to `s` is blocked, not only to `s.a()`

## Make Thread Wait Until Notified
- suspend thread waiting indefinetly
- `wait` puts thread into waiting state against specific monitor
- any number of threads can be waiting against the same monitor
- `notify` wakes up _one_ of the waiting threads (random)
- `notifyAll` wakes up _all_ waiting threads


Note: `wait/notify/notifyAll` must be invoked within `synchronized` blocks against the same monior



In [10]:
// TODO understand why (4) is not printed
Object monitor = new Object();

Runnable r = () -> {
    try {
        synchronized (monitor) {
            System.out.println("(2) Entering waiting state");
            monitor.wait();
        }
    } catch (InterruptedException e) {
        System.out.println("(4) Got notifed");
    }
};

Thread t = new Thread(r);
System.out.println("(1) Starting thread");
t.start();
sleep(500);

System.out.println("(3) Notifying thread");
synchronized (monitor) {
    monitor.notify();
}
System.out.println("(5) Done");

(1) Starting thread
(2) Entering waiting state
(3) Notifying thread
(5) Done


## Thread Properties

In [11]:
Runnable r = () -> { /* nop */ };

### Custom Name

In [12]:
new Thread(r, "My Thread").getName()

My Thread

## Daemon or User 
- by default a thread is marked as `user`
- does not exit when main thread is done
- setting thread to daemon (`t.setDaemon(true)`) tells JVM to exit anyway even if thread is still running
- think of background task which are not business critical (e.g. logging)
- must be invoked before starting the thread

### Setting Priorities
- `t.setPriority(3)`
- depends on the platfirm 
- determines the number of CPU times slots the thread scheduler allocates to this thread
- cannot guarantee order of execution

In [13]:
Thread.MIN_PRIORITY

1

In [14]:
Thread.MAX_PRIORITY

10

In [15]:
Thread.NORM_PRIORITY

5

### join threads
- wait for other thread to terminate


_When we invoke the join() method on a thread, the calling thread goes into a waiting state. It remains in a waiting state until the referenced thread terminates._

In [16]:
Runnable r = () -> {
    System.out.println("(2) Inside job");
    sleep(500);
};

System.out.println("(1) Starting thread");
Thread t = new Thread(r);
t.start();
System.out.println("(3) Waiting for finish");
t.join();  // optionally with max wait time
System.out.println("(4) Done");

(1) Starting thread
(2) Inside job
(3) Waiting for finish
(4) Done


### Trickery - Thread start() vs. run()
- `new Thread(task).run()` 
    - If this thread was constructed using a separate Runnable run object, then that Runnable object's run method is called; otherwise, this method does nothing and returns. > blocks until done.
- `new Thread(task).start()` 
    - Causes this thread to begin execution; the Java Virtual Machine calls the run method of this thread. -> async execution

In [36]:
import java.util.stream.*;

public class Flavors {
    private static int counter;
    public static void countIceCreamFlavorsViaRun() {
        counter = 0;
        Runnable task = () -> {
            counter++;
            sleep(Math.round(Math.random() * 10));
        };
        // "range" is excluding to upper boundary
        LongStream.range(1, 500)
            // calling "run" executes the runnable **without** continuing
            .forEach(m -> new Thread(task).run());
        System.out.println(counter);
    }
    
    public static void countIceCreamFlavorsViaStart() {
        counter = 0;
        Runnable task = () -> {
            counter++;
            sleep(Math.round(Math.random() * 10));
        };
        LongStream.range(1, 500)
            // using "start" is supposed to make the result indetermined - but for some reason it is not
            .forEach(m -> new Thread(task).start());
        System.out.println(counter);
    }

}

Flavors.countIceCreamFlavorsViaRun()

499


In [35]:
Flavors.countIceCreamFlavorsViaStart()

498


# Executors
- simplify thread management
- there are different `ExecutorService` objects
- thread pool size: use numbers of CPUs: ` Runtime.getRuntime().availableProcessors()`

| Name        |  Description               |
|-------------|----------------------------|
|Fixed Thread Pool | resuses fixed numer of threads. if more tasks then threads, tasks are queued |
| Work Stealing Pool | mantains enough threads to support the given parallelism level |
| Single Thread Executor | uses single worker thread | 
| Cached Thread Pool | creates new threads as needed or reuses existing threads. **Unbounded**, usually used for many short lived tasks |
| Scheduled Thread Pool | schedules taks to execute with a delay and/ or periodically |
| Single Thread Scheduler Executor | schedules task to execute with a delay using single worker thread |
| Unconfgurable Executor Service | provides a way to "freeze" another Executur Service configuration |


### Important Methods

| Method name                        |Description                                              |
|-------------------------------------|--------------------------------------------------------|
|`void execute(Runnable command)`   |  Executes a Runnable task at some point in the future    |
|`Future<?> submit(Runnable task)`  |Executes a Runnable task at some point in the future and returns a Future representing the task  |
|`<T> Future<T> submit(Callable<T> task)`| Executes a Callable task at some point in the future and returns a Future representing the pending results of the task  |
|`<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException`|  Executes the given tasks and waits for all tasks to complete. Returns a List of Future instances, in the same order they were in the original collection |
|`<T> T <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException`|  Executes the given tasks and waits for at least one to complete. Returns a Future instance for a complete task and cancels any unfinished tasks |       
| `void ExecutorService.shutdown()`|stop accepting tasks (does _not_ shutdown int pool!) |
|`boolean ExecutorService.awaitTermination(long, TimeUnit)` | wait for all threads to complete. blocks for max given time |
|`List<Runnable> ExecutorService.shutdownNow()` |terminates the pool and returns list of still running tasks |


### Live Cycle
<img src=attachment:image.png width=600></img>

### Execute task

In [2]:
import java.util.concurrent.*;

In [18]:
Runnable r = () -> System.out.println("hi");
ExecutorService es = Executors.newSingleThreadExecutor();
// no return value
es.execute(r);
es.shutdown();

hi


### Waiting for all tasks to finish
```java
ExecutorService service = null;
try {
    service = Executors.newSingleThreadExecutor();   // Add tasks to the thread executor   
    …
} finally {
    if(service != null) service.shutdown();
}

if(service != null) {
    service.awaitTermination(1, TimeUnit.MINUTES);    // Check whether all tasks are finished
    if(service.isTerminated()) System.out.println("Finished!");   
    else System.out.println("At least one task is still running");}
```

### Callable
```java
@FunctionalInterface 
public interface Callable<V> {
    V call() throws Exception;
}
```
- allows for return value
- for this reason often prefered over Runnable
- ! can throw a **checked exception**

### Submit Callable To Obtain a Future

In [19]:
Callable c = () -> {
    sleep(500);
    return "i am done";
};

ExecutorService es = Executors.newSingleThreadExecutor();
// returns a future
Future<String> result = es.submit(c);

System.out.println(result.isDone());
sleep(500);
System.out.println(result.isDone());
System.out.println(result.get());
es.shutdown();

false
true
i am done


### Submit Collection Of Tasks

In [20]:
Callable<String> c = () -> "done";
var executor = Executors.newFixedThreadPool(3);
List<Future<String>> results = executor.invokeAll(List.of(c, c, c));
results.forEach(it -> {
    try {
        System.out.println(it.get());    
    } catch (InterruptedException | ExecutionException e) {
        throw new RuntimeException(e);
    }
});

done
done
done


### Null Result when submitting a Runnable

In [29]:
var s = Executors.newSingleThreadExecutor();
var r = s.submit(() -> {});
r.get() == null

true

### Future Methods
| Method name                      |Description                                         |
|---------------------------------|------------------------------------------------------|
|`boolean isDone()`|  Returns true if the task was completed, threw an exception, or was cancelled  |
|`boolean isCancelled()`|  Returns true if the task was cancelled before it completed normally   |
|`boolean cancel(boolean mayInterruptIfRunning)`|  Attempts to cancel execution of the task and returns true if it was successfully cancelled or false if it could not be cancelled or is complete   |
|`V get()`|  **Blocks.** Retrieves the result of a task, waiting endlessly if it is not yet available |
|`V get(long timeout, TimeUnit unit)`| Retrieves the result of a task, waiting the specified amount of time. If the result is not ready by the time the timeout is reached, a **checked TimeoutException** will be thrown.|


### Schedule Example

| Method Name                  |Description                                              |
|------------------------------|----------------------------------------------------------|
| `schedule(Callable<V> callable, long delay, TimeUnit unit)`| Creates and executes a Callable task after the given delay  |
|`schedule(Runnable command, long delay, TimeUnit unit)`| Creates and executes a Runnable task after the given delay |
| `scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)`| Creates and executes a Runnable task after the given initial delay, creating a new task every period value that passes. **Long running tasks can pile up** (like good old **cron**) | 
| `scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)`|  Creates and executes a Runnable task after the given initial delay and subsequently with the given delay between the termination of one execution and the commencement of the next |

### ScheduledExecutorService extends ExecutorService
```java
public interface ScheduledExecutorService extends ExecutorService {
...
```

In [97]:
import java.nio.*;
import java.nio.file.*;

In [102]:
// - use the more specific "ScheduledExecutorService" interface to have access to the schedule* methods
ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
// only takes Runnable as argument
service.scheduleWithFixedDelay(() -> System.out.println("miau"), 0, 1, TimeUnit.MINUTES);
service.shutdownNow();

miau


[java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@5ed6d334[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@4d3c0b3f[Wrapped task = REPL.$JShell$231$$Lambda$605/0x00000008003d1c40@2b088a32]]]

In [22]:
Path tmpFile = Path.of("/tmp/output");
AtomicInteger cnt = new AtomicInteger(0);

Runnable t = () -> {
    try {
        Files.writeString(tmpFile, String.valueOf(cnt.incrementAndGet()) + "\n", 
            StandardOpenOption.CREATE, StandardOpenOption.APPEND);
    }
    catch (IOException e) {
        System.err.println(e);
    }
};

Files.deleteIfExists(tmpFile);
// pool of 3 threads
ScheduledExecutorService ses = Executors.newScheduledThreadPool(3);
// run task after 100 ms inital wait, then every 200 ms
ses.scheduleAtFixedRate(t, 100, 200, TimeUnit.MILLISECONDS);
// create an an unmodifiable Executor Servcie (e.g. to pass it to an external user)
ExecutorService es = Executors.unconfigurableScheduledExecutorService(ses);

sleep(1000);
es.shutdown();
Files.readAllLines(tmpFile);

[1, 2, 3, 4, 5]

<img src=attachment:image.png width=800></img>

## Locking Problems
- hard to troubleshoot & debug
- hardly reproducable in different environment and depends on many factors like
    - load
    - CPU available
    - memory
    - ...

### Starvation
- synchronized block of code which is entered by the thread A
- thread B using the same monitor object will wait until A is done 
- can take very long

### Livelock
- two threads asking constantly "Are you finished?"
- threads appear active but are conceptually blocked
- form an indefinite loop
- In fron of a door: "After you" - "No, after you" - "No, after you" ...

### Deadlock
- using different object monitors for synchronized blocks in reverse order
- example:
    - first thread is entering synchronized block on monitor _a_ and waits to enter synchronized block _b_
    - second thread entered synchronized block on monitor _b_ and waits to enter synchronized block _a_
- always a reverse order is involved
- they wait forever, program stops

![image.png](attachment:image.png)

## Avoding Them
- accept stochastic nature of threads
- don't try to force a certain order
- immutable design
- work with copies or read-only views

## Writing Thread-Safe Code

- stack values
    - local vars, method arguments
    - _are thread-safe_
    - local to the current thread

- heap is potentially shared
    - immutable objects on the heap are thread-safe (no mutation)
    - issues is with _mutable heap objects_ > thread-unsafe


- potential error scenarios
    - inconsistant - observed by other thread before modification is complete
    - corrupted - partially changed by another thread writing at the same time
    - compiler could cache heap values locally within a thread > thread does not notice that data has been changed by another thread

### Lack Of Thread Synchronization
<img src=attachment:image.png width=500></img>

### Volatile keyword
- disable caching of a heap variable locally in the stack
- always read from the heap

<img src=attachment:image.png width=500></img>

Example:
- `while` loop could become indefinite if compiler decides to cache variable `y` locally
- setting `y` to `volatile` ensures that the value is always accessed at its source in the heap
- slightly slows down the program 

## Atomic Action
_Can be performed in a single CPU cycle without being interrupted_

Atomic: is the property of an operation to be carried out as a single unit of execution without any interference by another thread.

- cannot be interleaved (verschachtelt)
- default atomic action
    - performed by a single CPU
    - in a single cycle
- variable asssignments
    - are atomic
    - exceptions: `long` and `double`
        - 64 bit values
        - on a 32 bit platform it takes more than 1 CPU cycle to assign them
- _non-atomic_ operations (example): `+ - / * % ++ --`
- `java.utlil.concurrent.atomic` provides classes for lock-free thread-safe programming of atomic behaviour of single variables
    - `AtomicBoolean`
    - `AtomicInteger`
    - `AtomicLong`
    - `AtomicReference<V>`
    - ...
- behave as if they are volatile

Alternative to `synchronize` every arithmatic block -> leads to deadlocks

### Thread Synchronization Using Atomic Operations
<img src=attachment:image.png width=500></img>

In [6]:
public class Shared {
    public AtomicInteger x = new AtomicInteger(0);
}

Shared s = new Shared();
Runnable r = () -> {
    int y = 0;
    while(y < 10) {
        y = s.x.incrementAndGet();
        System.out.println("shared 'x' is " + y + ", thread name: " + Thread.currentThread().getName());
        // stochastic execution time (0-100 ms)
        sleep(Math.round(Math.random()*100));
    }
};
Thread t1 = new Thread(r, "Hugo");
t1.start();
Thread t2 = new Thread(r, "Berd");
t2.start();
t2.join();
System.out.println("Done")


shared 'x' is 1, thread name: Hugo
shared 'x' is 2, thread name: Hugo
shared 'x' is 3, thread name: Hugo
shared 'x' is 4, thread name: Berd
shared 'x' is 5, thread name: Hugo
shared 'x' is 6, thread name: Hugo
shared 'x' is 7, thread name: Hugo
shared 'x' is 8, thread name: Berd
shared 'x' is 9, thread name: Berd
shared 'x' is 10, thread name: Hugo
shared 'x' is 11, thread name: Berd
Done


## Synchronized
- _A monitor (aka a lock) is a structure that supports mutual exclusion, which is the property that at most one thread is executing a particular segment of code at a given time._

_Each thread that arrives will first check if any threads are in the block. In this manner, a thread “acquires the lock” for the monitor. If the lock is available, a single thread will enter the block, acquiring the lock and preventing all other threads from entering. While the first thread is executing the block, all threads that arrive will attempt to acquire the same lock and wait for the first thread to finish. Once a thread finishes executing the block, it will release the lock, allowing one of the waiting threads to proceed_

<img src=attachment:image.png width=700></img>

- guarantees that `list.add` is only called by one thread at a time

## Intrinsic Lock Automation
- use `synchronized` version of a collection
- writing is guaranteed to be thread safe
- reading from the list requires `synchronized` 
    - supposed to avoid reading half-written data

In [8]:
import java.util.stream.*;

In [17]:
List<String> list = new ArrayList<>();
List<String> slist = Collections.synchronizedList(list);
var writer = new Thread(() -> {
        IntStream.rangeClosed(0, 10).forEach(i -> {
            String name = Thread.currentThread().getName();
            slist.add(i, name);
            sleep(50);
        });
}, "Ronny");
writer.start();

// read is not automatically synchronized
// run in a synchronized block to ensure consistency
synchronized(slist) {
    Iterator iterator = slist.iterator();
    while (iterator.hasNext()) {
        String item = (String)iterator.next();        
        System.out.println(item);
        sleep(50);
    }    
}

writer.join();

Ronny


### synchronized + order of execution

In [35]:
// - create thread pool
// - submit the tasks one after another
// - synchronized print + count ensures that the threads are executed in the right order
public class SheepManager {
    private int sheepCount = 0;
    public void incrementAndReport() {
        // this is called a "synchronized block"
        synchronized(this) {
            System.out.print((++sheepCount)+" ");
        }
    }
}    

ExecutorService service = null;
service = Executors.newFixedThreadPool(20);
var manager = new SheepManager();

for(int i = 0; i < 10; i++)
    service.submit(() -> manager.incrementAndReport());
service.shutdown();

1 2 3 4 5 6 7 8 9 10 

### Synchronize on Methods
- `synchronized` on instance method automatically synchronize on the object itself

These 2 definitions are equvivalent:

```java
private void incrementAndReport() {
    synchronized(this) {
        System.out.print((++sheepCount)+" ");
    }
}

private synchronized void incrementAndReport() {
    System.out.print((++sheepCount)+" ");
}
```

This works similarly for static, synchronization happens at the class object (the static context). These 2 definitions are equvivalent:
```java
public static void printDaysWork() {
    synchronized(SheepManager.class) {
         System.out.print("Finished work");
   }
}

public static synchronized void printDaysWork() {
    System.out.print("Finished work");
}
```

`static` synchronization is like a global lock for _all instances_ of a certain class.


## Concurrent Collections

In [62]:
var foodData = new HashMap<String, Integer>();
foodData.put("penguin", 1);
foodData.put("flamingo", 2);
for(String key: foodData.keySet())   // iterator on keySet() is not updated after first removal
    foodData.remove(key);

EvalException: null

In [66]:
// fixed by using ConcurrentHashMap
var foodData = new ConcurrentHashMap<String, Integer>();
foodData.put("penguin", 1);
foodData.put("flamingo", 2);
for(String key: foodData.keySet())
    foodData.remove(key);

### Concurrent collection classes    

| Class name   | Java Collections Framework interfaces | Elements ordered? |Sorted?| Blocking? |
|---------------|---------------------------------------|-------------------|-------|----------|
| `ConcurrentHashMap`|  ` ConcurrentMap`|  No |No |No|
| `ConcurrentLinkedQueue`  | `Queue` | Yes | No| No |
| `ConcurrentSkipListMap` | `ConcurrentMap` `SortedMap` `NavigableMap` | Yes | Yes | No |
| `ConcurrentSkipListSet` | `SortedSet` `NavigableSet`|  Yes| Yes| No |
| `CopyOnWriteArrayList` |  `List`  | Yes| No | No  |  
| `CopyOnWriteArraySet`  | `Set`  | No | No | No |
| `LinkedBlockingQueue` |  `BlockingQueue`|  Yes| No |Yes|

- assign to the common Java interface like `List` or `Map`
- `Map<String,Integer> map = new ConcurrentHashMap<>();`

### LinkedBlockingQueue
- implements `Queue`
- two additional blocking methods
    - `offer(E e, long timeout, TimeUnit unit)` - Adds an item to the end of the queue, waiting the specified time and returning false if the time elapses before space is available    
    - `poll(long timeout, TimeUnit unit)` - Retrieves and removes an item from the queue, waiting the specified time and returning null if the time elapses before the item is available

In [30]:
try {
    var blockingQueue = new LinkedBlockingQueue<Integer>();
    blockingQueue.offer(39);
    blockingQueue.offer(3, 4, TimeUnit.SECONDS);
    System.out.println(blockingQueue.poll());
    System.out.println(blockingQueue.poll(10, TimeUnit.MILLISECONDS));
}

// thrown by the timeout versions of "offer" and "poll" if timeout exceeded
catch (InterruptedException e) {
    // handle not being able to read/ write the queue after waiting
}

39
3


### Synchronized Versions of Standard Collections
- `synchronizedCollection(Collection<T> c)`
- `synchronizedList(List<T> list)`
- `synchronizedMap(Map<K,V> m)`
- `synchronizedNavigableMap(NavigableMap<K,V> m)`
- `synchronizedNavigableSet(NavigableSet<T> s)`
- `synchronizedCollection(Collection<T> c)`
- `synchronizedList(List<T> list)`
- `synchronizedMap(Map<K,V> m)`
- `synchronizedNavigableMap(NavigableMap<K,V> m)`
- `synchronizedNavigableSet(NavigableSet<T> s)`


- when to use
    - if writing a concurrent class: use concurrent collections
    - if existing code is accessed by multiple threads, then wrapping in a synchronized version is also fine

## Non-Blocking Concurrency Automation
- using `CopyOnWriteArrayList` or `CopyOnWriteArraySet`
- all mutations of the list make fresh copy exclusive for the current thread
- to read a read-only snapshot of merged content is created
- issue: memory consumption (copy for each thread)
- best suited for 
    - small collecitons with
    - **many reads**
    - little writes
    
<img src=attachment:image.png width=300></img>


In [18]:
List<String> list = new ArrayList<>();
List<String> copyOnWriteList = new CopyOnWriteArrayList(list);
var writer = new Thread(() -> {
        IntStream.rangeClosed(0, 10).forEach(i -> {
            String name = Thread.currentThread().getName();
            copyOnWriteList.add(i, name);
            sleep(50);
        });
}, "Ronny");
writer.start();

Iterator iterator = copyOnWriteList.iterator();
while (iterator.hasNext()) {
    String item = (String)iterator.next();        
    System.out.println(item);
    sleep(50);
}    

writer.join();

Ronny


### Trickery
- the concurrent collections work as expected
- the `bears` collection is a set ! -> hence the 4 items - not 6

In [38]:
List<Integer> lions = new ArrayList<>(List.of(1,2,3));
List<Integer> tigers = new CopyOnWriteArrayList<>(lions);
Set<Integer> bears = new ConcurrentSkipListSet<>();
bears.addAll(lions);

for(Integer item: tigers) tigers.add(4);
for(Integer item: bears) bears.add(5);
System.out.println(lions.size() + " " + tigers.size() + " " + bears.size());

3 6 4


## Alternative Locking Mechanisms
- `java.util.concurrent.locks`
- allows actions to be performed on an object, without interference from other threads
- write lock
    - prevents other threads from concurrently modifying the object
- read lock
    - can be acquired if write lock is not held by another thread
    - pause when some other thread is currently writing -> ensures consistency
    - allows concurrent read actions


### Reentrant Locks

- alternative to `synchronized`
- more flexible
- ! `lock` and `unLock` need to be balanced (same amount of calls)
- pros compared to synchronized
    - ability to request a lock without blocking 
    - Ability to request a lock while blocking for a specified amount of time 
    - A lock can be created with a fairness property, in which the lock is granted to threads in the order it was requested.
    
```java
// Implementation #1 with a synchronized block
Object object = new Object();
synchronized(object) {   
    // Protected code
}   
//
//
// Implementation #2 with a Lock
Lock lock = new ReentrantLock();
try {   
    lock.lock();   
    // Protected code
} finally {   
    // good practice to release in finally block
    lock.unlock();
}
```

### Lock methods 

|Method                             | Description                                                 |
|-----------------------------------|-------------------------------------------------------------|
| `void lock()`                     | Requests a lock and **blocks** until lock is acquired     |
| `boolean tryLock()`               | Requests a lock and returns **immediately**. Returns a boolean indicating whether the lock was successfully acquired |
| `boolean tryLock(long,TimeUnit)`| Requests a lock and blocks up to the specified time until lock is required. Returns a boolean indicating whether the lock was successfully acquired |
| `void unlock()`                   | Releases a lock                    |

In [22]:
import java.util.concurrent.locks.*;

In [24]:
List<String> someDates = new ArrayList<>();

ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
Lock readLock = rwLock.readLock();
Lock writeLock = rwLock.writeLock();
Random random = new Random();

public String getRandomEntry() {
    readLock.lock();
    try {
        int randomIndex = 0;
        if (someDates.size() > 1) {
            randomIndex = random.nextInt(someDates.size()-1);
        }
        return someDates.get(randomIndex);
    // note the use of finally to guarantee calling "unlock"
    } finally {
        readLock.unlock();
    }
}    


public void addEntry(String timestamp) {
    writeLock.lock();
    try {
        someDates.add(timestamp);
    } finally {
        writeLock.unlock();
    }
}

In [26]:
import java.time.*;
import java.util.concurrent.*;

Runnable writeTask = () -> {
    addEntry(ZonedDateTime.now().toString());
    //sleep(Math.round(Math.random()*50));
};

Runnable readTask = () -> {
    getRandomEntry();
    //sleep(Math.round(Math.random()*50));
};

In [27]:
var executor = Executors.newFixedThreadPool(20);
IntStream.rangeClosed(1, 30).forEach(i -> {
    executor.execute(writeTask);
    executor.execute(readTask);
});

executor.awaitTermination(3, TimeUnit.SECONDS);
executor.shutdownNow();
someDates.size();

30

In [21]:
import java.util.concurrent.locks.*;
import java.util.stream.*;

public class Bank {
   private Lock vault = new ReentrantLock();
   int total = 0;

    public void deposit(int value) {
        if (vault.tryLock()) {
            try {            
                total += value;
            } finally {
                vault.unlock();
            }
        } else {
            // do something... and then
            // try to obtain the lock again
            deposit(value);
        }
    }
    
    public void depositWithError(int value) {
        try {
            // does not block but returns immediately with a boolean ! 
            vault.tryLock();
            total += value;
      } finally {
          vault.unlock();
      }
  }
}

var bank = new Bank();
IntStream.range(1, 10).parallel().forEach(s -> bank.deposit(s));
bank.total

45

### CyclicBarrier
- coordinate group of threads
- ! number of threads in the executor must be equal or greater then the `CyclicBarrier` threshold

In [59]:
import java.util.concurrent.*;

public class LionPenManager {
    private void removeLions() {System.out.println("Removing lions");}
    private void cleanPen() {System.out.println("Cleaning the pen");}
    private void addLions() {System.out.println("Adding lions");}
    
    public void performTask(CyclicBarrier c1, CyclicBarrier c2) {
        try {
            removeLions();
            // blocks until the 4th thread as executed c1.await()
            c1.await();
            cleanPen();
            // blocks until the 4th thread as executed c2.await()
            // also executes supplied runnable
            c2.await();
            addLions();
        } catch (InterruptedException | BrokenBarrierException e) {
            // Handle checked exceptions here
        }
    }
}

ExecutorService service = null;

try {
    service = Executors.newFixedThreadPool(4);
    var manager = new LionPenManager();
    var c1 = new CyclicBarrier(4);
    var c2 = new CyclicBarrier(4, () -> System.out.println("*** Pen Cleaned!"));

    for (int i = 0; i < 4; i++)
        service.submit(() -> manager.performTask(c1, c2));
} finally {
    if (service != null) service.shutdown();
}

Thread.sleep(500);

Removing lions
Removing lions
Removing lions
Removing lions
Cleaning the pen
Cleaning the pen
Cleaning the pen
Cleaning the pen
*** Pen Cleaned!
Adding lions
Adding lions
Adding lions
Adding lions
