![Cover](./images/1.cover.png)

---

# Definition

### According to Wikipedia

 * Parallel computing is a type of computing “in which many calculations or processes are carried out simultaneously”.
 * Concurrent computing is a form of computing in which several computations are executed concurrently – in overlapping time periods – instead of sequentially.
 * It is possible to have parallelism without concurrency, and concurrency without parallelism.

### Motivation

 * Faster runtime
 * Improved responsiveness

---

# Parallelism vs Concurrency

![Parallel vs Concurrent](./images/2.jpeg)

---

# Concurrency: processes vs threads

![Process vs Thread](./images/3.jpeg)

---

# Preemptive vs cooperative scheduling

![Preemptive vs Cooperrative](./images/4.jpeg)

---

# Parallel and Concurrent Programming in the JVM

* The JVM has its own scheduler
    * It is independent from the OS scheduler
    * A JVM thread != an OS thread
    * Multithreaded JVM apps can run on a single-threaded OS (DOS)
* JVM threads are either daemons or user threads.
* The app stops when all user threads are done.
* The JVM does not wait for daemon threads to finish.

___

# Parallel programming in the JVM

### 2 Java packages

* `java.lang` contains basic primitives: Runnable, Thread, etc.
* `java.util.concurrent` contains synchronization primitives and concurrent data structures.

### 1 Kotlin package

* `kotlin.concurrent` — Wrappers and extensions for Java classes (quality-of-life).

___

# Throwback: Single Abstract Method interfaces

SAM – an interface with a single abstract method, which can be instantiated with a lambda.

#### Java:

In [None]:
@FunctionalInterface
public interface Runnable {
    public abstract void run();
}

#### Kotlin:

In [None]:
fun interface Screamer {
    fun scream(): Unit
}

#### Example:

In [None]:
class RunnableWrapper(val runnable: Runnable)
class ScreamerWrapper(val screamer: Screamer)

In [None]:
val myWrapperFromLambda = RunnableWrapper { println("Runnable from lambda speaking!") }
val independentScreamer = Screamer { println("Boo!") }
val wrappedScreamer = ScreamerWrapper { println("Boo! x2") }

myWrapperFromLambda.runnable.run()
independentScreamer.scream()
wrappedScreamer.screamer.scream()

---

# Ways to create threads

Java Standard Library provides the `Thread` class, which implements `Runnable`.  
You can inherit from the `Thread`.

In [None]:
class MyThread : Thread() {
    override fun run() {
        println("${currentThread()} is running")
    }
}

In [None]:
val myThread = MyThread()
myThread.start()

# run vs start


**Never call Thread.run()!**

`run` will execute the body _on your thread_, while `start` will create _a new thread_ where `run`'s body will be executed.

In [None]:
println("${Thread.currentThread()} is the main thread")

val myThread1 = MyThread()
val myThread2 = MyThread()
val myThread3 = MyThread()

myThread1.start()
myThread2.run()
myThread3.start()

If the code above gets executed, most probably you see the main thread twice and then two other threads. This means that `myThread2.run`'s body was executed on the main thread.  
Two other threads took some time to start and thus appear later.

You can implement the `Runnable` interface and pass its instance to a thread. You can pass the same `Runnable`s to several threads.

In [None]:
var extremelyBadGlobalVariable = 0
val myRunnable = Runnable { println("Runnable ${++extremelyBadGlobalVariable}") }

In [None]:
val thread1 = Thread(myRunnable)
val thread2 = Thread(myRunnable)

thread1.start()
thread2.start()

In Kotlin, the preferred way to create threads is with a simple HOF:

In [None]:
import kotlin.concurrent.thread

val kotlinThread = thread {
    println("I start instantly, but you can pass an option to start me manually")
}

val kotlinThreadLater = thread(start = false) {
    println("I do not start instantly, you have to start me manually")
}

In [None]:
kotlinThreadLater.start()

---

# Thread properties

A thread's properties cannot be changed after it was started.

Main properties of a thread:

In [None]:
kotlinThread.id // Long, This is the thread's identifier
kotlinThread.name // String
kotlinThread.priority // Int, This can range from 1 to 10, with a larger value indicating higher priority
kotlinThread.isDaemon // Boolean
kotlinThread.state // Thread.State
kotlinThread.isAlive // Boolean

# State of a thread

| state         | isAlive |
|---------------|---------|
| NEW           | false   |
| RUNNABLE      | true    |
| BLOCKED       | true    |
| WAITING       | true    |
| TIMED_WAITING | true    |
| TERMINATED    | false   |


# 

![Thread states](./images/5.png)

___

# Ways to manipulate a thread's state

In [26]:
val simple = thread { /* code */ } // Creates a new thread

val myThread = thread(false) { /* code */ } // Creates a new thread in the NEW state
myThread.start() // Starts the thread
myThread.join() // Causes the current thread to wait for myThread to finish
val durationInMs: Long = 3000
Thread.sleep(durationInMs) // Puts the current thread to sleep for 3 seconds
Thread.yield() // Tries (sic!) to step back
myThread.interrupt() // Tries (sic!) to interrupt a thread
myThread.isInterrupted() // Boolean, Checks whether myThread was interrupted
Thread.interrupted() // Boolean, Checks and clears the interruption flag

false

 - The `sleep` and `yield` methods are only applicable to the current thread, which means that you cannot suspend another thread.
 - All blocking and waiting methods can throw `InterruptedException`.

In [28]:
import java.lang.Thread.sleep

In [32]:
val runningThread = thread {
    var counter = 0
    try {
        while (true) {
            sleep(100)
            println("Had some sleep $counter")
            counter++
        }
    } catch (e: Exception) {
        println(e)
    }
}
sleep(400)
runningThread.interrupt()

Had some sleep 0
Had some sleep 1
Had some sleep 2
java.lang.InterruptedException: sleep interrupted
sleep interrupted


# Classic worker

In [33]:
class ClassicWorker : Runnable {
    override fun run() {
        try {
            while (!Thread.interrupted()) {
                // do stuff
            }
        } catch (e: InterruptedException) {} // absolutely legal empty catch block
    }
}

Worker runs indefinitely. In the body it should get work from somewhere, execute it, and then go on to loop. In case it is interrupted it finishes more or less gracefully.

___

# Parallelism and shared memory

## Examples of problematic interleaving

Parallel threads have access to the same shared memory.

This often leads to problems that cannot arise in a single-threaded environment.

In [34]:
class Counter {
    private var c = 0

    fun increment() {
        c++
    }
    
    fun decrement() {
        c--
    }
    
    fun value(): Int {
        return c
    }
}

Both operations on `c` are single, simple statements.

However, even simple statements can be translated into multiple steps by the virtual machine, and those steps can be interleaved.


In [42]:
val sharedCounter = Counter()

val thread1 = thread {
    sharedCounter.increment()
}
val thread2 = thread {
    sharedCounter.increment()
}

thread1.join()
thread2.join()
println(sharedCounter.value())

2


Suppose both `thread1` and `thread2` invoke increment at the same time. If the initial value of `c` is `0`, their interleaved actions might follow this sequence:
- T#1: Read value 0 from c.
- T#2: Read value 0 from c.
- T#1: Increment value — result is 1.
- T#1: Write result 1 to c.
- T#2: Increment value — result is 1.
- T#2: Write result 1 to c.

---

# Synchronization mechanisms

- Mutual exclusion, such as `Lock` and the `synchronized` keyword.
- Concurrent data structures and synchronization primitives.
- Atomics, which work directly with shared memory (DANGER ZONE).

# Locks

In [43]:
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock

val exampleLock = ReentrantLock()

exampleLock.lock() // Acquires the lock
exampleLock.tryLock() // Boolean, Tries to acquire the lock
exampleLock.unlock() // Releases the lock
exampleLock.withLock {
    // critical section
} // Executes a lambda with the lock held (has try/catch inside)
val exampleCondition = exampleLock.newCondition() // Creates a condition variable associated with the lock

In [44]:
class LockedCounter {
    private var c = 0

    private val lock = ReentrantLock()

    fun increment() {
        lock.withLock { c++ }
    }

    fun decrement() {
        lock.withLock { c-- }
    }

    fun value(): Int {
        lock.withLock { return c }
    }
}

# Conditions

A condition allows a thread holding a lock to wait until another thread signals it about a certain event. Internally, the await method releases the associated lock upon call, and acquires it back before finally returning it again.

In [45]:
class PositiveLockedCounter {
    private var c = 0
    private val lock = ReentrantLock()
    private val condition = lock.newCondition()

    fun increment() {
        lock.withLock {
            c++
            condition.signal()
        }
    }

    fun decrement() {
        lock.withLock {
            while (c == 0) {
                condition.await()
            }
            c--
        }
    }

    fun value(): Int {
        return lock.withLock { c }
    }
}

In [49]:
val posCounter = PositiveLockedCounter()

val decrementor = { 
    println("Trying to decrement the counter...")
    posCounter.decrement()
    println("Successfully decremented!")
}
val incrementor = {
    println("Going to sleep and the increment the counter")
    sleep(100)
    posCounter.increment()
    println("Incremented.")
}

val threads = buildList {
    repeat(5) {
        add(thread(block = decrementor))
        add(thread(block = incrementor))
    }
}

threads.forEach { it.join() }

Trying to decrement the counter...
Going to sleep and the increment the counter
Trying to decrement the counter...
Trying to decrement the counter...
Going to sleep and the increment the counter
Going to sleep and the increment the counter
Trying to decrement the counter...
Going to sleep and the increment the counter
Trying to decrement the counter...
Going to sleep and the increment the counter
Incremented.
Successfully decremented!
Incremented.
Successfully decremented!
Incremented.
Successfully decremented!
Incremented.
Successfully decremented!
Incremented.
Successfully decremented!


# ReentrantLock

Allows the lock to be acquired multiple times by the same thread.

In [57]:
class ReentrantExample {
    private val lock = ReentrantLock()

    fun criticalSection() = lock.withLock {
        lock.withLock {
            println("${Thread.currentThread()} holds ${lock.holdCount}")
            sleep(1000)
        }
    }

    fun anotherFunction() = lock.withLock {
        criticalSection()
    }

    val isFair: Boolean
        get() = lock.isFair // Checks the fairness of the lock, constructor of ReentrantLock accepts `fair: Boolean` argument, false by default

    val queueLength: Int
        get() = lock.queueLength
}

In [58]:
val reentrantExample = ReentrantExample()

val t1 = thread {
    reentrantExample.anotherFunction()
}
val t2 = thread {
    reentrantExample.criticalSection()
}
val t3 = thread {
    reentrantExample.criticalSection()
}
sleep(100)
println("Q length: ${reentrantExample.queueLength}")
println("Lock is fair: ${reentrantExample.isFair}")
t1.join()
t2.join()
t3.join()

Thread[Thread-333,5,main] holds 3
Q length: 2
Lock is fair: false
Thread[Thread-334,5,main] holds 2
Thread[Thread-335,5,main] holds 2


# ReadWriteLock

`ReadWriteLock` allows multiple readers to access a resource concurrently but only lets a single writer modify it.

In [61]:
import java.util.concurrent.locks.*
import kotlin.concurrent.read
import kotlin.concurrent.write

val rwLock: ReadWriteLock = ReentrantReadWriteLock()

val readLock: Lock = rwLock.readLock()
val writeLock: Lock = rwLock.writeLock()

In [62]:
class PositiveLockedCounter {
    private var c = 0
    private val rwLock = ReentrantReadWriteLock()

    fun increment() {
        rwLock.write { c++ }
    }

    fun decrement() {
        rwLock.write { c-- }
    }

    fun value(): Int {
        return rwLock.read { c }
    }
}


---

# The synchronized statemend

In the JVM, every object has an intrinsic lock associated with it (aka a monitor).

In [None]:
class SynchronizedCounter {
    private var c = 0

    fun increment() {
        synchronized(this) { c++ }
    }
}

# synchronized method

#### Java:

In [None]:
public class SynchronizedJavaCounter {
    private int c = 0;

    public synchronized void increment() {
        c++;
    }
}


#### Kotlin:

In [None]:
class SynchronizedKotlinCounter {
    private var c = 0

    @Synchronized
    fun increment() {
        c++
    }
}


---

# Concurrent blocking collections

`java.util.concurrent` is a Java package that implements both blocking and non-blocking concurrent collections, such as:
- `SynchronousQueue` – One-element rendezvous channel 
- `ArrayBlockingQueue` – Fixed-capacity queue 
- `LinkedBlockingQueue` – Unbounded blocking queue 
- `PriorityBlockingQueue` – Unbounded blocking priority queue

# Concurrent non-blocking collections

`java.util.concurrent` is a Java package that implements both blocking and non-blocking concurrent collections, such as:
- `ConcurrentLinkedQueue` – Non-blocking unbounded queue 
- `ConcurrentLinkedDequeue` – Non-blocking unbounded dequeue 
- `ConcurrentHashMap` – Concurrent unordered hash-map 
- `ConcurrentSkipListMap` – Concurrent sorted hash-map

# Synchronization primitives

`java.util.concurrent` also implements concurrent data structures and synchronization primitives, such as:
- `Exchanger` – Blocking exchange 
- `Phaser` – Barrier synchronization 

---

# Java Memory Model: Weak behaviors

There are no guarantees when it comes to ordering!

In [63]:
var x = 0
var y = 0


thread {
    x = 1
    y = 1
}

thread {
    val a = y
    val b = x

    println("$a, $b")
}

Possible outputs:
- `0, 0`
- `0, 1`
- `1, 1`
- `1, 0` (???)

#### There are no guarantees when it comes to progress!

In [70]:
var flag = false

val t1 = thread {
    while (!flag) { }
    println("I am free!")
}

val t2 = thread { 
    flag = true
}

t1.join()
t2.join()

I am free!


Possible outputs:
- "I am free!"
- Halt

---

Because the compiler is allowed to do this:

In [None]:
thread {
    while (true) {}
    println("I am free!")
}

---

# JMM: Data-Race-Freedom Guarantee

But what does JMM guarantee?

**Well-synchronized** programs have **simple interleaving semantics**.

Well-synchronized = Data-race-free 

Simple interleaving semantics = Sequentially consistent semantics

**Data-race-free** programs have **sequentially consistent semantics**.

---

# Volatile fields

Volatile fields can be used to restore **sequential consistency**.

In [None]:
class OrderingTest {
    @Volatile var x = 0
    @Volatile var y = 0
    fun test() {
        thread {
            x = 1
            y = 1
        }
        thread {
            val a = y
            val b = x
            println("$a, $b")
        }
    }
}

In [None]:
class ProgressTest {
    @Volatile var flag = false
    fun test() {
        thread {
            while (!flag) {}
            println("I am free!")
        }
        thread { flag = true }
    }
}

Volatile variables can be used for synchronization.

In [None]:
class OrderingTest {
    var x = 0
    @Volatile var y = 0
    fun test() {
        thread {
            x = 1
            y = 1
        }
        thread {
            val a = y
            val b = x
            println("$a, $b")
        }
    }
}

How do we know there is enough synchronization?

---

# JMM: Happens-before relation

![HB1](./images/6.png)

---

![HB2](./images/7.png)

---

![HB3](./images/8.png)

---

# JMM: Synchronizing actions

- Read and write for volatile fields
- `Lock.lock` and `Lock.unlock`
- `Thread.run` and `Thread.start`, as well as `Thread.join`

In [75]:
class Storage {
    @Volatile
    var x = 23
    private var y = 42
    val lock = ReentrantReadWriteLock()
    
    fun getStored() = lock.read { y }
    
    fun store(newY: Int) = lock.write { y = newY }
    
    fun print(marker: String? = null) = lock.read { println("${marker?.let { "$it: " } ?: ""}$x, $y") }
}

In [81]:
val s = Storage()
s.print() // 23, 42
val t1 = thread {
    s.store(44)
    s.x = -1
    s.print("t1")
}
val t2 = thread {
    s.print("t2")
}
val t3 = thread {
    s.x = 777
    s.store(66)
}

t1.join()
t2.join()
s.print() // never 23, 42
t3.join()
s.print()

23, 42
t1: -1, 44
t2: -1, 44
-1, 44
777, 66


---

# JMM: DRF-SC again

Two events form a data race if:
- Both are memory accesses to the same field. 
- Both are **plain** (non-atomic) accesses.
- At least one of them is a *write* event.
- They are not related by *happens before*.

Data-race-free programs have sequentially consistent semantics.

A program is data-race-free if, **for every possible execution** of this program, **no** two events form a **data race**.

---

# JMM: Atomics aka Danger Zone

In [82]:
import java.util.concurrent.atomic.AtomicInteger

class AtomicCounter {
    private val c = AtomicInteger()

    fun increment() {
        c.incrementAndGet()
    }

    fun decrement() {
        c.decrementAndGet()
    }

    fun value(): Int {
        return c.get()
    }
}

Atomic classes from package the `java.util.concurrent.atomic` package:
- `AtomicInteger`
- `AtomicLong`
- `AtomicBoolean`
- `AtomicReference`

And their array counterparts:
- `AtomicIntegerArray`
- `AtomicLongArray`
- `AtomicReferenceArray`


In [87]:
val atomicInteger = AtomicInteger(23)

atomicInteger.get() // Reads a value with volatile semantics
atomicInteger.set(42) // Writes a value with volatile semantics
atomicInteger.getAndSet(66) // Atomically exchanges a value
/* 
    Atomically compares a value of atomic variable  with the expected value -  e.
    If they are equal, replaces content of atomic variable  with the desired value - v.
    Returns a boolean indicating success or failure.
 */
val t = thread {
    var attemptCount = 0
    val e = 777
    val v = 1024
    while (!atomicInteger.compareAndSet(e, v)) {
        println("${attemptCount++} failed attempts to set")
        if (attemptCount > 10) break
    }
}
thread {
    val e = 66
    val v = 777
    // Atomically compares a value with an expected value, e, and if they are equal, replaces with the desired value, v; returns a read value .
    atomicInteger.compareAndExchange(e, v)
}
// getAndIncrement(), addAndGet(d), etc – Perform Atomic arithmetic operations for numeric atomics (AtomicInteger, AtomicLong).

t.join()
println(atomicInteger.get())

0 failed attempts to set
1024


There are more methods of atomic classes:
* `getXXX()`
* `setXXX(v)`
* `weakCompareAndSetXXX(e, v)`
* `compareAndExchangeXXX(e, v)`

In these cases, `XXX` is an access mode: `Acquire`, `Release`, `Opaque`, `Plain`.  
You can learn more about Java Access Modes [here](https://gee.cs.oswego.edu/dl/html/j9mm.html).

---


# JMM: Atomics Problem

In [None]:
import java.util.concurrent.atomic.AtomicReference

class Node<T>(val value: T) {
    val next = AtomicReference<Node<T>>()
}

![Atomics Problem](./images/9.jpeg)

---

# JMM: Atomic field updaters

Use `AtomicXXXFieldUpdater` classes to directly modify volatile fields: 

In [None]:
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater

class Counter {
    @Volatile private var c = 0
    companion object {
        private val updater = AtomicIntegerFieldUpdater.newUpdater(Counter::class.java, "c")
    }
    fun increment() {
        updater.incrementAndGet(this)
    }
    fun decrement() {
        updater.decrementAndGet(this)
    }
    fun value(): Int {
        return updater.get(this)
    }
}

Starting from JDK9, there is also the `VarHandle` class, which serves a similar purpose.

---

# Kotlin: AtomicFU

[The AtomicFU library](https://github.com/Kotlin/kotlinx-atomicfu) is the recommended way to use atomic operations in Kotlin.
* It provides `AtomicXXX` classes with API similar to Java atomics.
* Under the hood _a compiler plugin_ replaces usage of atomics to `AtomicXXXFieldUpdater` or `VarHandle`.
* It also provides convenient extension functions, e.g. `c.update { it + 1 }`

In [95]:
import kotlinx.atomicfu.*

class Counter {
    private val c = atomic(0)
    
    fun increment() {
        c += 1
    }
    
    fun decrement() {
        c -= 1
    }
    
    fun value(): Int {
        return c.value
    }
}

println("???")

Line_96.jupyter.kts (4:21 - 27) Unresolved reference: atomic
Line_96.jupyter.kts (7:11 - 13) Function '<Error function>' should return Unit to be used by corresponding operator '+='
Line_96.jupyter.kts (11:11 - 13) Function '<Error function>' should return Unit to be used by corresponding operator '-='