# Parallel and Asynchronous Programming with Stream and CompletableFuture

### Parallel vs Asynchronous

1. Nature of problems: Computation with speed
2. When to use **parallel**: Processing collection of data - process the collection in parallel (Stream)
3. When to use **asynchronous**: Solving a big problem - divide them in to smaller problems and conquer the results (CompletableFuture)

### Parallel Streams

##### Collection pipeline pattern
Functional composition
https://martinfowler.com/articles/collection-pipeline/

###### In JAVA
- Lazy evaluation
- Pure functions (Easy to parallelize)

In [1]:
import java.util.stream.Stream;

#### From Imperative to Declarative (Functional)
https://sookocheff.com/post/fp/what-is-functional-programming/

In [2]:
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);

System.out.println("Imperative programming: How we do?");

int sum = 0;

for (int num : numbers) {
    if (num % 2 == 0) {
        sum += num;
    }
}

System.out.println("Sum of even numbers: " + sum);

Imperative programming: How we do?
Sum of even numbers: 6


In [3]:
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);

System.out.println("Declarative programming: What we do?");

int sum = numbers.stream()
                .filter(num -> num % 2 == 0)
                .mapToInt(num -> num)
                .sum();

System.out.println("Sum of even numbers computed sequentially: " + sum);

Declarative programming: What we do?
Sum of even numbers computed sequentially: 6


Stream - Internal iterator (Auto-pilot)

Lambdas - Pure functions

#### Before and after JAVA 8
- In imperative style, multithreading was hard and complicated before JAVA 8 because *the structure of concurrent code was very different from the structure of sequential code*
- After JAVA 8 (Streams), *the structure of concurrent code is the same as the structure of sequential code* which makes multithreading code a lot simpler

#### Stream to Parallel Stream / Sequential to Parallel Execution

Example: Let's make the previous code block parallel:

In [4]:
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);

System.out.println("Structure of code remains the same after parallelizing");

int sum = numbers.parallelStream()
                .filter(num -> num % 2 == 0)
                .mapToInt(num -> num)
                .sum();

System.out.println("Sum of even numbers computed concurrently: " + sum);

Structure of code remains the same after parallelizing
Sum of even numbers computed concurrently: 6


In [5]:
List<Integer> numbers = Arrays.asList(1, 2, 3);

public void sleep(long ms){
    try {
        Thread.sleep(ms);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

public int transform(int number) {
    sleep(1000);
    return number;
}

numbers.stream()
    .map(num -> transform(num))
    .forEach(System.out::println);

1
2
3


In [6]:
List<Integer> numbers = Arrays.asList(1, 2, 3);

public void sleep(long ms){
    try {
        Thread.sleep(ms);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

public int transform(int number) {
    sleep(1000);
    return number;
}

numbers.parallelStream()
    .map(num -> transform(num))
    .forEach(System.out::println);

2
3
1


##### parallelStream() vs parallel()
- When we are the creator (eg. Collection) of the stream, then use parallelStream()
  - Example: numbers.parallelStream() in the previous code block, we are the source of List of numbers
- When we are processing a stream from another source, then use parallel()
  - Example: A method which processes the stream -

In [7]:
public int transform(int number) {
    return number;
}

public void workWithStream(Stream<Integer> stream) {
    stream.parallel()
        .map(num -> transform(num))
        .forEach(System.out::println);
    return;
}

workWithStream(numbers.stream());

1
3
2


#### Observing Threads

- JAVA 1 - Threads
- JAVA 5 - ExecutorService (Pool of threads)
    - Problem: Pool induced deadlock
    - Work stealing
- JAVA 7 - ForkJoinPool
- JAVA 8 - Streams
    - Uses common ForkJoinPool by default

In [8]:
System.out.println("Sequential Execution");

List<Integer> numbers = Arrays.asList(1, 2, 3);

public int transform(int number) {
    System.out.println("transform: " + number + "--" + Thread.currentThread());
    return number;
}

numbers.stream()
    .map(num -> transform(num))
    .forEach(System.out::println);

Sequential Execution
transform: 1--Thread[IJava-executor-0,5,main]
1
transform: 2--Thread[IJava-executor-0,5,main]
2
transform: 3--Thread[IJava-executor-0,5,main]
3


In [9]:
System.out.println("Parallel Execution");

List<Integer> numbers = Arrays.asList(1, 2, 3);

public int transform(int number) {
    System.out.println("transform: " + number + "--" + Thread.currentThread());
    return number;
}

numbers.stream()
    .parallel()
    .map(num -> transform(num))
    .forEach(System.out::println);

Parallel Execution
transform: 2--Thread[IJava-executor-0,5,main]
2
transform: 3--Thread[ForkJoinPool.commonPool-worker-27,5,main]
transform: 1--Thread[ForkJoinPool.commonPool-worker-5,5,main]
1
3


##### Both parallel() and sequential()
When both parallel() and sequential() are present in the pipeline - Entire pipiline runs either parallelly and sequentially whichever is the last one

In [10]:
System.out.println("Sequential Execution as it is the last one");

List<Integer> numbers = Arrays.asList(1, 2, 3);

public int transform(int number) {
    System.out.println("transform: " + Thread.currentThread());
    return number;
}

numbers.stream()
    .parallel()
    .map(num -> transform(num))
    .sequential()
    .forEach(System.out::println);

Sequential Execution as it is the last one
transform: Thread[IJava-executor-0,5,main]
1
transform: Thread[IJava-executor-0,5,main]
2
transform: Thread[IJava-executor-0,5,main]
3


#### Streams vs Reactive Stream

| Streams | Reactive Stream |
| --- | --- |
| Sequential vs Parallel | Sync vs Async |
| Entire pipeline is either sequential or parallel / no segments | Depends - subscribeOn - no segments, observeOn - segments |

#### Order of execution
- Sequential: Order is preserved, irrespective of the collection is ordered (List) or unordered (Set)
- Parallel: Order is not preserved, irrespective of the collection is ordered (List) or unordered (Set)

For parallel execution, Ordering is generally imposed on terminal operations (like forEachOrdered)

#### Controlling the order
#### Inherently ordered:
- map
- findFirst

#### Explicit ordering is needed / ordered counterpart:
- forEachOrdered - It's not sequential execution but the sequence is followed during parallel execution

##### Operations that can be parallelized without worrying about the order

- map
- filter
- reduce

#### Map - Parallel

In [11]:
List<Integer> numbers = Arrays.asList(1, 2, 3);

public int transform(int number) {
    System.out.println("transform: " + number + " : " + Thread.currentThread());
    sleep(2000);
    return number;
}

public void printIt(int number) {
    System.out.println("printIt: " + number + " : " + Thread.currentThread());
}

numbers.stream()
    .parallel()
    .map(num -> transform(num))
    .forEachOrdered(num -> printIt(num));

transform: 2 : Thread[IJava-executor-0,5,main]
transform: 3 : Thread[ForkJoinPool.commonPool-worker-5,5,main]
transform: 1 : Thread[ForkJoinPool.commonPool-worker-27,5,main]
printIt: 1 : Thread[ForkJoinPool.commonPool-worker-27,5,main]
printIt: 2 : Thread[ForkJoinPool.commonPool-worker-27,5,main]
printIt: 3 : Thread[ForkJoinPool.commonPool-worker-27,5,main]


#### Filter - Parallel

In [12]:
List<Integer> numbers = Arrays.asList(1, 2, 3);

public void sleep(long ms){
    try {
        Thread.sleep(ms);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

public boolean check(int number) {
    System.out.println("transform: " + number + " : " + Thread.currentThread());
    sleep(2000);
    return true;
}

public void printIt(int number) {
    System.out.println("printIt: " + number + " : " + Thread.currentThread());
}

numbers.stream()
    .parallel()
    .filter(num -> check(num))
    .forEachOrdered(num -> printIt(num));

transform: 2 : Thread[IJava-executor-0,5,main]
transform: 3 : Thread[ForkJoinPool.commonPool-worker-5,5,main]
transform: 1 : Thread[ForkJoinPool.commonPool-worker-27,5,main]
printIt: 1 : Thread[ForkJoinPool.commonPool-worker-27,5,main]
printIt: 2 : Thread[ForkJoinPool.commonPool-worker-27,5,main]
printIt: 3 : Thread[ForkJoinPool.commonPool-worker-27,5,main]


#### Reduce - Parallel

In [13]:
List<Integer> numbers = Arrays.asList(1, 2, 3);

public int add(int total, int e) {
    int result = total + e;
    System.out.println("total: " + total + " e: " + e + " result: " + result + " : " + Thread.currentThread());
    return result;
    
}

System.out.println("Sequential Reduce");
System.out.println(numbers.stream()
                        .reduce(0, (total, e) -> add(total, e)));

System.out.println("");

System.out.println("Parallel Reduce");
System.out.println(numbers.stream()
                    .parallel()
                    .reduce(0, (total, e) -> add(total, e)));

Sequential Reduce
total: 0 e: 1 result: 1 : Thread[IJava-executor-0,5,main]
total: 1 e: 2 result: 3 : Thread[IJava-executor-0,5,main]
total: 3 e: 3 result: 6 : Thread[IJava-executor-0,5,main]
6

Parallel Reduce
total: 0 e: 2 result: 2 : Thread[IJava-executor-0,5,main]
total: 0 e: 3 result: 3 : Thread[ForkJoinPool.commonPool-worker-5,5,main]
total: 0 e: 1 result: 1 : Thread[ForkJoinPool.commonPool-worker-27,5,main]
total: 2 e: 3 result: 5 : Thread[ForkJoinPool.commonPool-worker-5,5,main]
total: 1 e: 5 result: 6 : Thread[ForkJoinPool.commonPool-worker-5,5,main]
6


#### Use an identity value as the intial value whenever reduce operation is executed parallely 

Identity value? For addition it's 0 (0 + x = x), for multiplication it's 1 (1 * x = x), .....

In [14]:
List<Integer> numbers = Arrays.asList(1, 2, 3);

public int add(int total, int e) {
    int result = total + e;
    System.out.println("total: " + total + " e: " + e + " result: " + result + " : " + Thread.currentThread());
    return result;
    
}

System.out.println("Sequential Reduce");
System.out.println(numbers.stream()
                        .reduce(10, (total, e) -> add(total, e)));

System.out.println("");

System.out.println("Parallel Reduce");
System.out.println(numbers.stream()
                    .parallel()
                    .reduce(10, (total, e) -> add(total, e)));

Sequential Reduce
total: 10 e: 1 result: 11 : Thread[IJava-executor-0,5,main]
total: 11 e: 2 result: 13 : Thread[IJava-executor-0,5,main]
total: 13 e: 3 result: 16 : Thread[IJava-executor-0,5,main]
16

Parallel Reduce
total: 10 e: 2 result: 12 : Thread[IJava-executor-0,5,main]
total: 10 e: 3 result: 13 : Thread[ForkJoinPool.commonPool-worker-9,5,main]
total: 12 e: 13 result: 25 : Thread[ForkJoinPool.commonPool-worker-9,5,main]
total: 10 e: 1 result: 11 : Thread[ForkJoinPool.commonPool-worker-5,5,main]
total: 11 e: 25 result: 36 : Thread[ForkJoinPool.commonPool-worker-5,5,main]
36


##### The correct way of doing the above operation is add 10 after reduce operation

In [15]:
List<Integer> numbers = Arrays.asList(1, 2, 3);

public int add(int total, int e) {
    int result = total + e;
    System.out.println("total: " + total + " e: " + e + " result: " + result + " : " + Thread.currentThread());
    return result;
    
}

System.out.println("Sequential Reduce");
System.out.println(numbers.stream()
                        .reduce(10, (total, e) -> add(total, e)));

System.out.println("");

System.out.println("Parallel Reduce");
System.out.println(numbers.stream()
                    .parallel()
                    .reduce(0, (total, e) -> add(total, e)) + 10);

Sequential Reduce
total: 10 e: 1 result: 11 : Thread[IJava-executor-0,5,main]
total: 11 e: 2 result: 13 : Thread[IJava-executor-0,5,main]
total: 13 e: 3 result: 16 : Thread[IJava-executor-0,5,main]
16

Parallel Reduce
total: 0 e: 2 result: 2 : Thread[IJava-executor-0,5,main]
total: 0 e: 3 result: 3 : Thread[ForkJoinPool.commonPool-worker-5,5,main]
total: 0 e: 1 result: 1 : Thread[ForkJoinPool.commonPool-worker-9,5,main]
total: 2 e: 3 result: 5 : Thread[ForkJoinPool.commonPool-worker-5,5,main]
total: 1 e: 5 result: 6 : Thread[ForkJoinPool.commonPool-worker-5,5,main]
16


#### How many threads should I create?

Number of threads <= Number of cores / ( 1 - blocking factor )

where, blocking factor = factor of rest time of CPU, 0 <= blocking factor < 1

#### Computation intensive:

blocking factor is close to 0:

Number of threads <= Number of cores

General practice: Number of threads = Number of cores (Java Implementation for ForkJoinPool.commonPool() has ( Number of cores - 1 ) threads + Main thread

#### I/O intensive:

If blocking factor is 0.5:

Number of threads <= 2 * Number of cores

In [16]:
System.out.println("Number of cores: " + Runtime.getRuntime().availableProcessors());

Number of cores: 16


In [17]:
System.out.println(ForkJoinPool.commonPool());

java.util.concurrent.ForkJoinPool@7b5a2aef[Running, parallelism = 15, size = 5, active = 0, running = 0, steals = 20, tasks = 0, submissions = 0]


In [18]:
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20);

public void sleep(long ms){
    try {
        Thread.sleep(ms);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

public int transform(int number) {
    System.out.println("transform: " + number + " : " + Thread.currentThread());
    sleep(1000);
    return number;
}

numbers.parallelStream()
    .parallel()
    .map(num -> transform(num))
    .forEach(e -> {});

transform: 13 : Thread[IJava-executor-0,5,main]
transform: 15 : Thread[ForkJoinPool.commonPool-worker-3,5,main]
transform: 16 : Thread[ForkJoinPool.commonPool-worker-13,5,main]
transform: 14 : Thread[ForkJoinPool.commonPool-worker-17,5,main]
transform: 19 : Thread[ForkJoinPool.commonPool-worker-21,5,main]
transform: 12 : Thread[ForkJoinPool.commonPool-worker-27,5,main]
transform: 17 : Thread[ForkJoinPool.commonPool-worker-19,5,main]
transform: 4 : Thread[ForkJoinPool.commonPool-worker-15,5,main]
transform: 9 : Thread[ForkJoinPool.commonPool-worker-7,5,main]
transform: 7 : Thread[ForkJoinPool.commonPool-worker-5,5,main]
transform: 5 : Thread[ForkJoinPool.commonPool-worker-25,5,main]
transform: 18 : Thread[ForkJoinPool.commonPool-worker-9,5,main]
transform: 3 : Thread[ForkJoinPool.commonPool-worker-23,5,main]
transform: 11 : Thread[ForkJoinPool.commonPool-worker-11,5,main]
transform: 20 : Thread[ForkJoinPool.commonPool-worker-31,5,main]
transform: 2 : Thread[ForkJoinPool.commonPool-worke

#### Configuring number of threads JVM wide
-Djava.util.concurrent.ForkJoinPool.common.parallelism=100
##### Problem
This is applied for the whole JVM, so threads compete and performance degrade incase of computation intensive operations


In [19]:
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20);

public void sleep(long ms){
    try {
        Thread.sleep(ms);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

public void process(Stream<Integer> stream) throws InterruptedException {
    ForkJoinPool pool = new ForkJoinPool(100);
    pool.submit(() -> stream.forEach(e -> {}));
    pool.shutdown();
    pool.awaitTermination(10, TimeUnit.SECONDS);
}

public int transform(int number) {
    System.out.println("transform: " + number + " : " + Thread.currentThread());
    sleep(1000);
    return number;
}

process(numbers.parallelStream()
            .map(num -> transform(num)));

transform: 13 : Thread[ForkJoinPool-1-worker-115,5,main]
transform: 2 : Thread[ForkJoinPool-1-worker-31,5,main]
transform: 1 : Thread[ForkJoinPool-1-worker-117,5,main]
transform: 14 : Thread[ForkJoinPool-1-worker-5,5,main]
transform: 11 : Thread[ForkJoinPool-1-worker-145,5,main]
transform: 18 : Thread[ForkJoinPool-1-worker-87,5,main]
transform: 12 : Thread[ForkJoinPool-1-worker-59,5,main]
transform: 16 : Thread[ForkJoinPool-1-worker-175,5,main]
transform: 20 : Thread[ForkJoinPool-1-worker-89,5,main]
transform: 19 : Thread[ForkJoinPool-1-worker-205,5,main]
transform: 5 : Thread[ForkJoinPool-1-worker-33,5,main]
transform: 15 : Thread[ForkJoinPool-1-worker-231,5,main]
transform: 10 : Thread[ForkJoinPool-1-worker-61,5,main]
transform: 6 : Thread[ForkJoinPool-1-worker-203,5,main]
transform: 9 : Thread[ForkJoinPool-1-worker-3,5,main]
transform: 8 : Thread[ForkJoinPool-1-worker-147,5,main]
transform: 4 : Thread[ForkJoinPool-1-worker-63,5,main]
transform: 17 : Thread[ForkJoinPool-1-worker-173,

From above example it's clear that the **thread / thread pool in which the terminal operation is present is the one executing the whole stream pipeline**

With Lazy evaluation, parallelStream might be using more resources than needed