<a href="https://colab.research.google.com/github/rkurniawati/java-parallel-stream-patternlets/blob/master/java-parallel-stream-patternlets.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Java Parallel Stream Patternlet Notebook

Adapted to Java parallel stream by Ruth Kurniawati (Westfield State University) based on the [PDC book](https://pdcbook.calvin.edu/pdcbook/RaspberryPiHandout/) from [CSInParallel](https://csinparallel.org/index.html). 

This notebook contains patternlet examples in Java parallel stream. Patterns are reusable solution for commonly occuring problems. The parallel design patternlets are minimalist patterns to teach parallel programming. The orignal patternlets were originally written in the C language for the OpenMP and MPI by Joel Adams: 

> Adams, Joel C. "Patternlets: A Teaching Tool for Introducing Students to Parallel Design Patterns." 2015 IEEE International Parallel and Distributed Processing Symposium Workshop. IEEE, 2015.

However, OpenMP and MPI are not available in Java. In this notebook, we will be using the higher level concurrency objects, specifically parallel streams. These objects are included in the Java Development Kit (JDK) starting from version 8.

# Multicore Systems and Multi-Threading

Before proceeding with the examples, let's investigate the computer that this notebook is running on. For this, let's use the `lscpu` command. 




In [None]:
!lscpu

## Cores, Processes and Threads



If you run `lscpu` in the notebook, you may see output similar to below:

```
Architecture:        x86_64
CPU op-mode(s):      32-bit, 64-bit
Byte Order:          Little Endian
CPU(s):              2
On-line CPU(s) list: 0,1
Thread(s) per core:  2
Core(s) per socket:  1
Socket(s):           1
NUMA node(s):        1
Vendor ID:           GenuineIntel
CPU family:          6
Model:               79
Model name:          Intel(R) Xeon(R) CPU @ 2.20GHz
Stepping:            0
CPU MHz:             2199.998
BogoMIPS:            4399.99
Hypervisor vendor:   KVM
Virtualization type: full
L1d cache:           32K
L1i cache:           32K
L2 cache:            256K
L3 cache:            56320K
NUMA node0 CPU(s):   0,1
Flags:               fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush mmx fxsr sse sse2 ss ht syscall nx pdpe1gb rdtscp lm constant_tsc rep_good nopl xtopology nonstop_tsc cpuid tsc_known_freq pni pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt aes xsave avx f16c rdrand hypervisor lahf_lm abm 3dnowprefetch invpcid_single ssbd ibrs ibpb stibp fsgsbase tsc_adjust bmi1 hle avx2 smep bmi2 erms invpcid rtm rdseed adx smap xsaveopt arat md_clear arch_capabilities
```

This output means that the computer has 2 CPUs -- however, there is actually only one physical core but this core can execute 2 execution threads. 

The `lscpu` command tells us a LOT of useful information, including the number of available cores. In this case we know that there is one socket (or chip) with 1 physical core, where each core can support 2 thread. On larger systems, it is common to see multiple threads supported per core. This is an example of **simultaneous multi-threading** (SMT, or Hyperthreading on Intel systems). A **core** can be thought of as the compute unit of the CPU. It includes registers, an ALU, and control units.

Before we can discuss what a thread is, we must first discuss what a process is. A **process** can be thought of as an abstraction of a running program. When you type a command into the command line and press Enter, the Bash shell launches a process associated with that program executable. Each process contains a copy of the code and data of the program executable, and its own allocation of the stack and heap.

A **thread** is a light-weight process. While each thread gets its own stack allocation, it shares the heap, code and data of the parent process. As a result, all the threads in a multi-threaded process can access a common pool of memory. This is why multi-threading is commonly referred to as shared memory programming. A single-threaded process is also referred to as a serial process or program.

### Process Execution
A multicore CPU allows multiple processes to execute simultaneously, or in **parallel**. While the terms concurrency and parallel are related, it is useful to think of concurrency as a software/OS-level concept, while parallel as a hardware/execution concept. A multi-threaded program, while capable of parallel execution, runs concurrently on a system with only a single CPU core.

## Thread Execution

The primary goal of creating multi-threaded programs is to decrease the speed of a program’s execution. In a program that is perfectly parallelizable (that is, all components are paralleizable), it is usually possible to distribute the work associated with a program equally among all the threads. For a program _p_ whose work is equally distributed among _t_ threads, it will take roughly _p_/_t_ time, if executed on _t_ cores.

For example, if a multi-threaded process that is perfectly parallelized takes 100 seconds to execute on one core, on a multi-core system with 4 cores the program will take approximately 100/4 seconds = 25 seconds to execute.

## Leveraging Multiple Cores
While multicore processors are ubiquitous in today’s world, most of the popular programming languages were designed to support single-thread execution. However, several native libraries are available for supporting multi-threading in popular languages like C/C++ and FORTRAN.

One of these libraries is the Open MultiProcessing (OpenMP), a popular API for shared memory programming, and a standard since 1997. A key benefit of OpenMP over explicit threading libraries like POSIX threads is the ability to incrementally add parallelism to a program. For standard threaded programs, it is usually necessary to write a lot of extra code to add multi-threading to a program. Instead, OpenMP employs a series of pragmas, or special compiler directives, that tell the compiler how to parallelize the code.

OpenMP library is only available for C/C++ and Fortran languages. For Java, Pyjama compiler and runtime provide support for OpenMP-like directive. More information about Pyjama can be found in the paper below:

Vikas, Nasser Giacaman, and Oliver Sinnen. 2013. Pyjama: OpenMP-like implementation for Java, with GUI extensions. In <i>Proceedings of the 2013 International Workshop on Programming Models and Applications for Multicores and Manycores</i> (<i>PMAM '13</i>). Association for Computing Machinery, New York, NY, USA, 43–52. DOI:https://doi.org/10.1145/2442992.2442997

If you want to explore parallel programming using Pyjama in Java, please check out the Pyjama version of this notebook. 

In the rest of this notebook, we will use Java parallel stream to explore small patterns (_patternlets_) in parallel programming. 

A quick tutorial on Java parallel stream can be found [here](https://docs.oracle.com/javase/tutorial/collections/streams/parallelism.html)

# A Simple Parallel Program

## The SPMD Patternlet


A patternlet is a small program that succinctly illustrates common patterns in parallel programming. The first patternlet we will study is Single Program, Multiple Data (SPMD). Let’s start by examining Spmd2.java, a program that uses a parallel stream to make it easy to run a portion of the program on multiple threads. Note that the variables `id` and `numThreads` are shared among the threads.

In [None]:
%%writefile Spmd.java
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

class Spmd {

    private static String threadName;
    private static int numThreads;

    public static void main(String[] args) throws Exception {

        // check and parse argument
        if (args.length == 0) {
            System.out.println("Usage Spmd numThreads.");
            System.out.println("Number of threads should be >= 1");
            return;
        }

        numThreads = Integer.parseInt(args[0]);
        if (numThreads < 1) {
            System.out.println("Usage Spmd numThreads.");
            System.out.println("Number of threads should be >= 1");
            return;
        }

        // launch the parallel stream using the custom pool
        ForkJoinPool customPool = new ForkJoinPool(numThreads);
        customPool.submit(() -> 
            IntStream.range(0, numThreads).parallel().forEach(i -> {
                threadName = Thread.currentThread().getName();
                Thread.yield();
                String message = "Hello from " +  threadName + " from a pool of " + numThreads;
                System.out.println(message);
            })
        ).get();

        System.out.println("Done.");
    }
}

The executeCode call caused the the block of code within the curly braces be run on separate threads. Prior to the line containing the call to executeCode, the program is run serially. Within the executeCode function a thread pool with the specified number of threads is started and each thread runs the same block of code in the curly braces. Each thread is assigned its own name and runs separate copies of the code between the curly braces. Afterwards when shutdown is called, we wait for all threads to terminate and the execution threads get combined back into a single-threaded process (known as joining). Conceptually, the process looks like the following.

<img src=https://pdcbook.calvin.edu/pdcbook/RaspberryPiHandout/_images/ForkJoin_SPMD.png >

## Running the Program

Just like in the HelloWorld example, first we need to use compile the source code using <code>javac</code>. 

In [None]:
!javac Spmd.java

Now, we're ready to run the Spmd program. Let's specify that you'd like to have 10 threads by supplying this number in the command line argument. 

In [None]:
!java Spmd 10

Try running the program a few times with 10 threads (press the run button in the cell above). Observe the output. Occasionally something will be amiss. Do you notice it?

## Race Conditions

Watch this [video](
https://d32ogoqmya1dw8.cloudfront.net/files/csinparallel/raceconditions_workshop.mov) to help you understand what's going on. Note that the video is made for the C++ version of the program, however the underlying issue is the same. 

The Spmd program has a race condition where there are more than one threads modifying a shared variable. Which shared variable(s) is/are causing the problem?

## Fixing the code

For this example, the race condition can be avoided by ensuring that each threads has its own copy of `id` and `numThreads` variables. Instead of declaring them as `private static` member variables in the class, we declare them as local variables inside the task executed in each parallel stream.

In [None]:
%%writefile Spmd2.java
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

class Spmd2 {


    public static void main(String[] args) throws Exception {

        // check and parse argument
        if (args.length == 0) {
            System.out.println("Usage Spmd2 numThreads.");
            System.out.println("Number of threads should be >= 1");
            return;
        }

        int poolNumThreads = Integer.parseInt(args[0]);
        if (poolNumThreads < 1) {
            System.out.println("Usage Spmd2 numThreads.");
            System.out.println("Number of threads should be >= 1");
            return;
        }

        // launch the parallel stream using the custom pool
        ForkJoinPool customPool = new ForkJoinPool(poolNumThreads);
        customPool.submit(() -> 
            IntStream.range(0, poolNumThreads).parallel().forEach(i -> {
                String threadName = Thread.currentThread().getName();
                int numThreads = customPool.getParallelism();
                Thread.yield();
                String message = "Hello from " +  threadName + " from a pool of " + numThreads;
                System.out.println(message);
            })
        ).get();

        System.out.println("Done.");
    }
}

Let's compile and run this modified program.

In [None]:
!javac Spmd2.java

In [None]:
!java Spmd2 10  

Were you able to reproduce the race condition using the corrected program? Why should you also declare `numThreads` as a private variable?

# Running Loops in Parallel

Next we will consider a program that has a loop in it. An iterative for loop is a remarkably common pattern in all programming, primarily used to perform a calculation N times, often over a set of data containing N elements, using each element in turn inside the for loop.

If there are no dependencies between the iterations (i.e. the order of them is not important), then the code inside the loop can be split between forked threads. However, the programmer must first decide how to partition the work between the threads. Specifically, how many and which iterations of the loop will each thread complete on its own?

The **data decomposition** pattern describes the way how work is distributed across multiple threads. This chapter presents two patternlets, parallelLoop-equalChunks and parallelLoop-chunksOf1, that describe two common data decomposition strategies.

## Parallel Loop, Equal Chunks

Let's do more experiment stream and see how the work is divided into equal chunks. 

In [None]:
%%writefile ParallelLoopEqualChunks.java
import java.util.stream.IntStream;

public class ParallelLoopEqualChunks {
    static final int REPS = 16;
    static int numReps = REPS;

    public static void main(String[] args) throws Exception {

        // check and parse argument
        if (args.length >= 1) {
            numReps = Integer.parseInt(args[0]);
        }

        IntStream.range(0, numReps)
                .parallel()
                .forEach(i -> System.out.println("Thread " +
                        Thread.currentThread().getName() + " performed iteration " + i));
        System.out.println("Done.");

    }
}


The `parallel()` call does the following:
- Call the [SplitIterator](https://docs.oracle.com/javase/8/docs/api/java/util/Spliterator.html) class to split the streams into chunks
- Assign each chunks to threads in the common thread pool for parallel execution
- At the end of the scope of the parallel stream (before we reach the `System.out.println("Done."), the execution will wait until all the threads are done executing.

As in our previous example, the code up to the call to `Stream.parallel()` is run serially. The code that is following the `parallel()` call is run in parallel, with a subset of iterations assigned to each thread. After the implicit join at the end of the parallel stream, the program once again is a single-threaded process that executes serially to completion.

In the above program, REPS is set to 16. If the program is run with 2 threads, then each thread gets approximately half the number of iterations of the loop. If the program is run with 4 threads, each thread gets about 1/4th of the number of iterations (see illustration below).

<img src="https://pdcbook.calvin.edu/pdcbook/RaspberryPiHandout/_images/ParallelFor_Chunks-4_threads-1.png">

## Try It Out

Try compile and run the program using the following commands below.

In [None]:
!javac ParallelLoopEqualChunks.java

In [None]:
!java ParallelLoopEqualChunks 16

Try running the program a few times with 4 threads. How does the work in the for loop get assigned to the threads?

### Controlling the Number of Threads

To control the number of threads in a parallel stream, you have to create a fork-join thread pool and assign the execution of the parallel thread to this custom pool. 

In [None]:
%%writefile ParallelLoopEqualChunksCustomPool.java
import java.util.stream.IntStream;
import java.util.concurrent.ForkJoinPool;

public class ParallelLoopEqualChunksCustomPool {
    static final int REPS = 16;
    static int numReps = REPS;

    public static void main(String[] args) throws Exception {
        if (args.length == 0) {
            System.out.println("Usage ParallelLoopEqualChunks numberOfRepetitions numberOfThreads");
        }

        // check and parse arguments
        final int numReps = (args.length > 0) ? Integer.parseInt(args[0]) : 16;
        final int numThreads = (args.length > 1) ? Integer.parseInt(args[0]) : Runtime.getRuntime().availableProcessors();
        
        System.out.println("Number of threads " + numThreads);

        ForkJoinPool customPool = new ForkJoinPool(numThreads);
        customPool.submit(() -> 
            IntStream.range(0, numReps)
                    .parallel()
                    .forEach(i -> {
                        System.out.println("Thread " + Thread.currentThread().getName() + " completed iteration " + i);
                    })
        ).get();
        System.out.println("Done.");

    }
}


Let's try it out:

In [None]:
!javac ParallelLoopEqualChunksCustomPool.java

Run 16 iterations using 4 threads:

In [None]:
!java ParallelLoopEqualChunksCustomPool 16 4

Run 16 iterations using 8 threads:

In [None]:
!java ParallelLoopEqualChunksCustomPool 16 8

### Unequal Iterrations

Also try using a different number of threads. Pick a number so that the number iterations cannot be equally divided by the number of threads, such as 5. 

In [None]:
!java ParallelLoopEqualChunksCustomPool 22 4

What happens to the extra iterations?

This equal-chunk decomposition is especially useful in the following scenarios:
- Each iteration of the loop takes the same amount of time to finish
- The loop involves accesses to data in consecutive memory locations (e.g. an array), allowing the program to take advantage of spatial locality.

## Another way to do parallel loop 

At this point, you have seen how we can use Java ThreadPool and ForJoinPool to run tasks in parallel. Java allows for a higher abstraction using parallel stream. A SplitIterator for the stream will try to split the stream (and the work) equally among threads. By default, a parallel stream will use the common
thread pool; however you can also choose to provide your own thread pool.



In [None]:
%%writefile ParallelLoopEqualChunks.java
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ForkJoinPool;

/**
 * 
 * To specify the number of thread in the ForkJoinPool, specify the 
 * java.util.concurrent.ForkJoinPool.common.parallelism system property. 
 * 
 * For example:
 *
 *    java -Djava.util.concurrent.ForkJoinPool.common.parallelism=100 ParallelLoopEqualChunks
 */
public class ParallelLoopEqualChunks {
    static final int REPS = 16;

    static class ChunkExecutor implements Callable<Void> {
        private int startIndex, endIndex;

        public ChunkExecutor(int startIndex, int endIndex) {
            this.startIndex = startIndex;
            this.endIndex = endIndex; // not inclusive
        }

        @Override
        public Void call() throws Exception {
            for(int i = startIndex; i < endIndex; i++) {
                System.out.println("Thread " + Thread.currentThread().getName() + " performed iteration " + i);
            }
            return null;
        }
    }

    public static void main(String[] args) {
        
        // check and parse argument
        int numReps = REPS;
        if (args.length >= 1) {
            numReps = Integer.parseInt(args[0]);
        }

        // initialize the thread pool
        ForkJoinPool fjp = new ForkJoinPool();
        int numThreads = fjp.getParallelism();
        System.out.println("Number of repetitions " + numReps);
        System.out.println("Number of parallel threads " + numThreads);

        // divide the work
        List<ChunkExecutor> tasks = new ArrayList<>();
        int startIndex = 0;

        for(int i = 0; i < numThreads; i++) {
            int leftOver = (numReps % numThreads <= i) ? 0 : 1; 
            
            // if REPS is not divisible evenly, spread it over threads
            int chunkSize = numReps / numThreads + leftOver;

            assert(startIndex+leftOver+chunkSize <= numReps);

            tasks.add(new ChunkExecutor(startIndex, startIndex + chunkSize));
            startIndex += chunkSize;
        }

        fjp.invokeAll(tasks);

        System.out.println("Done.");

    }
}



In [None]:
!javac ParallelLoopEqualChunks.java

In [None]:
!java ParallelLoopEqualChunks 16

## Parallel Loop, Chunks of 1

In some cases, it makes sense to have iterations assigned to threads in “round-robin” style. In other words, iteration 0 goes to thread 0, iteration 1 goes to thread 1, iteration 2 goes to thread 2, and so on.

Let's examine the code below that directs Pyjama to do this.

In [None]:
%%writefile ParallelLoopChunksOf1.java
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ForkJoinPool;

/**
 * 
 * To specify the number of thread in the ForkJoinPool, specify the java.util.concurrent.ForkJoinPool.common.parallelism system property. For example:
 *
 *    java -Djava.util.concurrent.ForkJoinPool.common.parallelism=100 ParallelLoopEqualChunks
 */
public class ParallelLoopChunksOf1 {
    static final int REPS = 16;

    static class ChunkExecutor implements Callable<Void> {
        private int startIndex, endIndex, step;

        public ChunkExecutor(int startIndex, int endIndex, int step) {
            this.startIndex = startIndex;
            this.endIndex = endIndex; // not inclusive
            this.step = step;
        }

        @Override
        public Void call() throws Exception {
            for(int i = startIndex; i < endIndex; i+=step) {
                System.out.println("Thread " + Thread.currentThread().getName() + " performed iteration " + i);
            }
            return null;
        }
    }

    public static void main(String[] args) {
        
        // check and parse argument
        int numReps = REPS;
        if (args.length >= 1) {
            numReps = Integer.parseInt(args[0]);
        }

        // initialize the thread pool
        ForkJoinPool fjp = new ForkJoinPool();
        int size = fjp.getParallelism();

        // divide the work
        List<ChunkExecutor> tasks = new ArrayList<>();
        for(int i = 0; i < size; i++) {
            tasks.add(new ChunkExecutor(i, numReps, size));
        }

        fjp.invokeAll(tasks);

        System.out.println("Done.");

    }
}


The code is nearly identical to the previous program. The difference is in the `omp` directive. The `omp parallel for` directive has a new `schedule` clause which specifies the way iterations should be assigned to threads. The `static` keyword indicates that the the compiler should assign work to each thread at compile time (a **static** scheduling policy). The `1` indicates that the chunk size should be 1 iteration. Therefore, the above code would have 16 total chunks.

In the case where the number of chunks exceed the number of theads, each successive chunk is assigned to a thread in round-robin fashion.

### Try It Out

Let's compile and run the code. 

In [None]:
!javac ParallelLoopChunksOf1.java

In [None]:
!java ParallelLoopChunksOf1 16

Compare the work assignment in this program with the previous program that uses `#omp for` directive.

## Dynamic Scheduling

In some cases, it is beneficial for the assignment of loop iterations to occur at run-time. Both Java fork join thread pool and parallel stream will assign work to threads at runtime. The fork join thread pool thread will be assigned any available task in its queue and if needed it can steal work from another thread. 

An example of programs that will benefit from dynamic scheduling are shown below. The amount of work time the thread will sleep will be different and thread that has longer sleep time will require more "work" from the thread (occupying the thread for a longer time). 

Try experimenting with them and see how the tasks get assigned to different threads with changes in "workload" (sleep time). 



In [None]:
%%writefile DynamicSchedulingStream.java
import java.util.stream.IntStream;

public class DynamicSchedulingStream {
    static final int REPS = 16;

    static void sleepALittle(int numMillis) {
        try { 
            Thread.sleep(numMillis); 
        } catch(InterruptedException e) {
            // do nothing
        }
    }

    public static void main(String[] args) throws Exception {
        int numReps = 16;
        // check and parse argument
        if (args.length >= 1) {
            numReps = Integer.parseInt(args[0]);
        }

        IntStream.range(0, numReps)
                .parallel()
                .forEach(i -> {
                    final int sleepTime = ((i % 6 == 0) ? 1000 : 1);
                    System.out.println("Thread " + Thread.currentThread().getName() + " about to sleep " + sleepTime + " ms");
                    sleepALittle(sleepTime);
                    System.out.println("Thread " + Thread.currentThread().getName() + " finished sleeping " + sleepTime + " ms");
                });
        System.out.println("Done.");

    }
}


In [None]:
!javac DynamicSchedulingStream.java

In [None]:
!java DynamicSchedulingStream 16 

Below you will find another example of dynamic scheduling, but this time we utilize a ForkJoinPool.

In [None]:
%%writefile DynamicSchedulingForkJoinPool.java
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ForkJoinPool;

public class DynamicSchedulingForkJoinPool {
    static final int REPS = 16;

    static void sleepALittle(int numMillis) {
        try { 
            Thread.sleep(numMillis); 
        } catch(InterruptedException e) {
            // do nothing
        }
    }

    public static void main(String[] args) {
        
        // check and parse argument
        int numReps = REPS;
        if (args.length >= 1) {
            numReps = Integer.parseInt(args[0]);
        }

        // initialize the thread pool
        ForkJoinPool fjp = new ForkJoinPool();
        int numThreads = fjp.getParallelism();
        System.out.println("Number of repetitions " + numReps);
        System.out.println("Number of parallel threads " + numThreads);

        Random r = new Random();
        List<Callable<Void>> tasks = new ArrayList<>();
        for(int i = 0; i < numReps; i++) {
            final int sleepTime = ((i % 6 == 0) ? 1000 : 1);

            tasks.add(() -> {
                System.out.println("Thread " + Thread.currentThread().getName() + " about to sleep " + sleepTime + " ms");
                sleepALittle(sleepTime);
                System.out.println("Thread " + Thread.currentThread().getName() + " finished sleeping " + sleepTime + " ms");
                return null;
            });
        }

        fjp.invokeAll(tasks);
    }
}


In [None]:
!javac DynamicSchedulingForkJoinPool.java

In [None]:
!java DynamicSchedulingForkJoinPool

# Parallel Sum

Very often, loops are used with an accumulator variable to compute a a single value from a set of values, such as the sum of integers in an array or list. Fortunately, Java stream provides a *reduce* method, an implementation of the [*reduction*](https://docs.oracle.com/javase/tutorial/collections/streams/reduction.html) operation, which will aid us in this process. The reduction pattern is one of a group of patterns called **collective communication** patterns because the threads must somehow work together to create the final desired single value.


## Initial Attempt

Let's try implementing parallel sum without using the _reduce_ method. 

The `Reduction0` program below will create an array with 1 million integers that are generated randomly in the [0-1000) range. Note that `sequentialSum` sums up the array sequentially. The code in `parallelSum` uses OpenMP's parallel and for directives to attempt to sum up the array in parallel. Examine the two functions carefully. 

In [None]:
%%writefile ReductionRace.java
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;

public class ReductionRace {
    private static final long SIZE = 1_000_000;
    private static final int MAX = 1_000;

    // the total in sequential and parallel
    private static int seqTotal, parTotal1, parTotal2;

    static void sequentialSum(List<Integer> randomInts) {
        randomInts.stream().forEach(i->seqTotal = seqTotal+i);
    }

    // parallel sum using a common pool
    static void parallelSum1(List<Integer> randomInts) {
        randomInts.parallelStream().forEach(
            i-> parTotal1= parTotal1 + i
        );
    }

    // parallel sum using a custom pool
    static void parallelSum2(List<Integer> randomInts, ForkJoinPool customThreadPool) {
        try {
            customThreadPool.submit(
                () ->{
                    randomInts.parallelStream().forEach(i->parTotal2 = parTotal2 + i); 
                    return null;
                }).get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
    
    public static void main(String[] args) {
        // generate a stream of random integer in [0..MAX)
        List<Integer> randomInts = (new Random(123)).ints(0, MAX).limit(SIZE)
                .mapToObj(x -> x).collect(Collectors.toList());

        // sequential 
        long startTime = System.currentTimeMillis();
        sequentialSum(randomInts);
        long seqTime = System.currentTimeMillis() - startTime;
        System.out.println("Seq total " + seqTotal + ", calculated in " + seqTime + " ms.");

        // using common pool
        startTime = System.currentTimeMillis();
        parallelSum1(randomInts);
        long parTime1 = System.currentTimeMillis() - startTime;
        System.out.println("Parallel total " + parTotal1 + ", calculated in " + parTime1 + " ms.");

        // use custom pool
        ForkJoinPool customThreadPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
        startTime = System.currentTimeMillis();
        parallelSum2(randomInts, customThreadPool);
        long parTime2 = System.currentTimeMillis() - startTime;
        System.out.println("Parallel total " + parTotal2 + ", calculated in " + parTime2 + " ms.");
    }
}

In [None]:
!javac ReductionRace.java

In [None]:
!java ReductionRace

Note that the sum from the sequentialSum is different from the sum from the parallelSum. Try running the program multiple times. 

Note that the set of random numbers generated are the same each time you run the program since we supplied the seed for the random number generator to be `123`.  

Are the results of the `sequentialSum` function changing? How about the results of the `parallelSum` function? Can you explain why the results of sequential and parallel sum are different?

How many threads were used to calculate the sum in `parallelSum`? What happens when you change this?

## Fixing the parallelSum function

Let's try a different version of the parallelSum function (below). Note that this version use the _reduce_ method. 

The notion of a reduction comes from the mathematical operation _reduce_, in which a collection of values are combined into a single value via a common mathematical function. Summing up a collection of values is a natural example of reduction. 


In [None]:
%%writefile Reduction.java
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;

public class Reduction {
    private static final long SIZE = 1_000_000;
    private static final int MAX = 1_000;

    static int sequentialSum(List<Integer> randomInts) {
        return randomInts.stream().reduce(0, Integer::sum);
    }

    // parallel sum using a common pool
    static int parallelSum1(List<Integer> randomInts) {
        return randomInts.parallelStream().reduce(0, Integer::sum);
    }

    // parallel sum using a custom pool
    static int parallelSum2(List<Integer> randomInts, ForkJoinPool customThreadPool) {
        try {
            return customThreadPool.submit(
                        () -> randomInts.parallelStream().reduce(0, Integer::sum)
                    ).get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        return 0;
    }
    
    public static void main(String[] args) {
        // generate a stream of random integer in [0..MAX)
        List<Integer> randomInts = (new Random(123)).ints(0, MAX).limit(SIZE)
                .mapToObj(x -> x).collect(Collectors.toList());

        // sequential 
        long startTime = System.currentTimeMillis();
        int seqTotal = sequentialSum(randomInts);
        long seqTime = System.currentTimeMillis() - startTime;
        System.out.println("Seq total " + seqTotal + ", calculated in " + seqTime + " ms.");

        // using common pool
        startTime = System.currentTimeMillis();
        int parTotal1 = parallelSum1(randomInts);
        long parTime1 = System.currentTimeMillis() - startTime;
        System.out.println("Parallel total " + parTotal1 + ", calculated in " + parTime1 + " ms.");

        // use custom pool
        ForkJoinPool customThreadPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
        startTime = System.currentTimeMillis();
        int parTotal2 = parallelSum2(randomInts, customThreadPool);
        long parTime2 = System.currentTimeMillis() - startTime;

        System.out.println("Parallel total " + parTotal2 + ", calculated in " + parTime2 + " ms.");
    }

}

In [None]:
!javac Reduction.java

In [None]:
!java Reduction

Try running the program a few times and compare the results of `sequentialSum` and `parallelSum` again. Are they the same? 



## Something to ponder

What do you think happen when you specify the `reduce` function? 

Can you fix `parallelSum1` and `parallelSum2` functions so that they will produce the correct result without using the `reduce` clause?



The sum example that we experimented with was a fairly small example. We will look at larger examples in the following sections. 

# Integration using the Trapezoidal Rule

## Estimating the Area Under a Curve

As our next example, let’s look at the problem of estimating the area under a curve. If you have taken a calculus course, you may recognize this problem as the Riemann sum or Trapezoidal Rule, which approximates the area under the curve (i.e. the integral) by splitting the area under the curve into a series of trapezoids.

In the following programs, we attempt to use the trapezoid rule to approximate the integral:

  $\int_0^{\pi} sin(x)_{dx}$

using $2^{20}$ equal subdivisions. The answer from this computation should be 2.0. This [video](https://d32ogoqmya1dw8.cloudfront.net/files/csinparallel/workshops/numerical_integration_1_thread.mov) shows how a single thread would solve this problem. 


In the following program, a single thread serially computes the area of each trapezoid and adds all the trapezoids together into one value. A Java implementation of this program may look like the following:

In [None]:
%%writefile Integration.java
import java.util.stream.IntStream;
import java.util.List;
import java.util.function.DoubleBinaryOperator;
import java.util.stream.Collectors;

class Integration {
    static double integral; // variable to accumulate answer

    static double f(double x) {
        return Math.sin(x);
    }
    
    public static void main(String[] args) {
        //Variables

        final double a = 0.0, b = Math.PI;         //limits of integration
        int n = 1048576;                           //number of subdivisions = 2^20
        final double h = (b - a) / n;              //width of each subdivision

        //sum up all the trapezoids
        List<Integer> indexes = IntStream.range(0, n+1).mapToObj(i->i)
            .collect(Collectors.toList());
        indexes.stream().forEach(i -> integral += f(a+i*h));;

        integral = integral * h;
        System.out.printf("With %d trapezoids, our estimate of the integral from \n", n);
        System.out.printf("%f to %f is %f\n", a,b,integral);
    } 
}

Let's compile and run this as a normal Java program:

In [None]:
!javac Integration.java

In [None]:
!java Integration

## Parallel Integration - First Attempt

Let’s now consider how we can use multiple threads to approximate the area under the curve in parallel. One strategy would be to assign each thread a subset of the total set of subdivisions, so that each thread separately computes its assigned set of trapezoids.

This [video](https://d32ogoqmya1dw8.cloudfront.net/files/csinparallel/workshops/numerical_integration_4_threads.mov) illustrates how 4 threads would work together to approximate the area under the curve.

Here's an attempt to parallelize the serial version above using a parallel stream:

In [None]:
%%writefile Integration1.java
import java.util.stream.IntStream;
import java.util.List;
import java.util.function.DoubleBinaryOperator;
import java.util.stream.Collectors;

class Integration1 {
    static double integral; // variable to accumulate answer

    static double f(double x) {
        return Math.sin(x);
    }
    
    public static void main(String[] args) {
        //Variables

        final double a = 0.0, b = Math.PI;         //limits of integration
        int n = 1048576;                           //number of subdivisions = 2^20
        final double h = (b - a) / n;              //width of each subdivision

        //sum up all the trapezoids
        List<Integer> indexes = IntStream.range(0, n+1).mapToObj(i->i)
            .collect(Collectors.toList());
        indexes.parallelStream().forEach(i -> integral += f(a+i*h));;

        integral = integral * h;
        System.out.printf("With %d trapezoids, our estimate of the integral from \n", n);
        System.out.printf("%f to %f is %f\n", a,b,integral);
    } 
}

In [None]:
!javac Integration1.java

In [None]:
!java Integration

If our parallel program is implemented correctly, the program should estimate the area under the curve as 2.00, which would be identical to the output of the serial program.

Below is the same program that uses a custom thread pool to allow you to specify the number of threads. Try running the program with different number of threads: 4, 2, 1. When will it produce the correct/expected value (2.0)?

In [None]:
%%writefile Integration1CustomPool.java
import java.util.stream.IntStream;
import java.util.List;
import java.util.function.DoubleBinaryOperator;
import java.util.stream.Collectors;
import java.util.concurrent.ForkJoinPool;

class Integration1CustomPool {
    static double integral; // variable to accumulate answer

    static double f(double x) {
        return Math.sin(x);
    }
    
    public static void main(String[] args) throws Exception {
        int numThreads = Runtime.getRuntime().availableProcessors();
        if (args.length >= 1) {
            numThreads = Integer.parseInt(args[0]);
            if (numThreads < 1) {
                System.out.println("Usage " + Integration1CustomPool.class.getName() + " numThreads.");
                System.out.println("Number of threads should be >= 1");
                return;
            }
        }
        System.out.println("Number of threads " + numThreads);

        //Variables

        final double a = 0.0, b = Math.PI;         //limits of integration
        int n = 1048576;                           //number of subdivisions = 2^20
        final double h = (b - a) / n;              //width of each subdivision

        //sum up all the trapezoids
        ForkJoinPool customThreadPool = new ForkJoinPool(numThreads);

        List<Integer> indexes = IntStream.range(0, n+1).mapToObj(i->i)
            .collect(Collectors.toList());
        customThreadPool.submit(
            () -> indexes.parallelStream().forEach(i -> integral += f(a+i*h))
        ).get();

        integral = integral * h;
        System.out.printf("With %d trapezoids, our estimate of the integral from \n", n);
        System.out.printf("%f to %f is %f\n", a,b,integral);
    } 
}

In [None]:
!javac Integration1CustomPool.java

In [None]:
!java Integration1CustomPool 4

In [None]:
!java Integration1CustomPool 2

In [None]:
!java Integration1CustomPool 1

## Parallel Integration - Second Attempt

Now, let's use the previously discussed `reduction` function to fix the issue that we observed in previous section. Here's the improved version below.

In [None]:
%%writefile Integration2.java
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.List;
import java.util.function.DoubleBinaryOperator;

class Integration2 {
    static double f(double x) {
        return Math.sin(x);
    }
    
    public static void main(String[] args) {
        //Variables

        final double a = 0.0, b = Math.PI;         //limits of integration
        int n = 1048576;                //number of subdivisions = 2^20
        final double h = (b - a) / n;         //width of each subdivision

        //sum up all the trapezoids
        List<Integer> indexes = IntStream.range(0, n+1).mapToObj(i->i)
            .collect(Collectors.toList());
        double integral = indexes.parallelStream().mapToDouble(i -> f(a+i*h))
           .reduce(0.0, Double::sum);

        integral = integral * h;
        System.out.printf("With %d trapezoids, our estimate of the integral from \n", n);
        System.out.printf("%f to %f is %f\n", a,b,integral);
    } 
}

As with the previous reduction patternlet example, we again have an accumulator variable in the loop, this time called integral. Thus our reduction clause reads reduction(+:integral).

Compile and run the above program with multiple threads using the following commands:

In [None]:
!javac Integration2.java

In [None]:
!java Integration2

Did you get the expected result? (2.0)

**Note**: 

Because we did not explicitly add an additional static or dynamic clause to the pragma on line 24 on the working version above, the default behavior of assigning equal amounts of work doing consecutive iterations of the loop (i.e. static scheduling) was used to decompose the problem onto threads. In this case, since the number of trapezoids used was 1048576, then with 4 threads, thread 0 will do the first 1048576/4 trapezoids, thread 1 the next 1048576/4 trapezoids, and so on.

# Performance Study - Drug Design Exemplar

Let’s look at a larger example. An important problem in Biology is that of drug design. The goal is to find small molecules, called _ligands_, that are good candidates for use as drugs.

This is a very rough simulation of a program to compute how well a set of short protein ligands (each a possible drug) matches a given longer protein string. In the real software programs that do this, the matching is quite sophisticated, targeting possible "receptor" sites on the protein.

Here is an image illustrating the concept of the ligand (represented by small sticks in center) binding to areas of the protein (represented by ribbon structure):


<img src="https://pdcbook.calvin.edu/pdcbook/RaspberryPiHandout/_images/proteinligand.jpg">

For the real versions of this code and our simulated case, the longer the ligand or the longer the protein, the longer it takes for the matching and score of the match to complete.

We have created a default fake protein in the code. This can be changed on the command line.

We create the list of possible ligands by choosing random lengths (controlled by the `maxligand` variable). Thus, each ligand can be of a different length.

## The Drug Design Exemplar

Unlike previous code examples, this example is a larger program where you can practice measuring the performance improvement of a parallel program.

An implementation of the exemplar is below. Try changing the number of threads and measure the speed-up. 

In [None]:
%%writefile DDStream.java
import java.util.Random;
import java.util.stream.IntStream;
import java.util.concurrent.ForkJoinPool;

public class DDStream {
    static Random rand = new Random(42);

    static String[] cannedLigands = 
        {"razvex", "qudgy", "afrs", "sst", "pgfht", "rt", 
        "id", "how", "aaddh",  "df", "os", "hid", 
        "sad", "fl", "rd", "edp", "dfgt", "spa"};

    // Ligand Score pair
    static class LSPair {
        String ligand;
        int score;
    
        public LSPair(String ligand, int score) {
            this.ligand = ligand;
            this.score = score;
        }

        @Override
        public String toString() {
            return "["+ligand+","+score+"]";
        }
    }

    // returns arbitrary string of lower-case letters of length at most max_ligand
    static String makeLigand(int maxLigandLength) {

        int len = rand.nextInt(maxLigandLength+1);
        if (len == 0) len++; // don't create a 0-character ligand
                
        StringBuilder sb = new StringBuilder();
        for (int i = 0;  i < len;  i++) 
            sb.append((char) ('a' + rand.nextInt(26)));  
        return sb.toString();
    }
  
    private static String[] generateLigands(int numLigands, int maxLigandLength, boolean useCanned) {
        // If we use canned ligands, use as many of them as we can, then fill the rest with randomly generated ligands. 
        // Otherwise, create a set of ligands whose length randomly varies from 1 to args.maxLigand

        String[] result = new String[numLigands];

        if (useCanned) {
            for(int i = 0; i < Math.min(numLigands, cannedLigands.length); i++) {
                result[i] = cannedLigands[i];
            }
        }

        for(int i = useCanned ? cannedLigands.length : 0; i < numLigands; i++) {
            result[i] = makeLigand(maxLigandLength);
        }
        return result;
    }

    public static void main(String[] args) throws Exception {

        if (args.length != 4) {
            System.out.println("Usage DDStream numThreads numLigands maxLigandLength protein useCanned printLigands");

            // the example string below is one of Dijkstra's famous quotes
            System.out.println("   Example: java -cp .:Pyjama.jar DDStream 4 10 8 \"Simplicity is a great virtue but it requires hard work to achieve it and education to appreciate it\" false true\n");
        }

        int numThreads = 4;
        if (args.length >= 1) {
            numThreads = Integer.parseInt(args[0]);
        }

        final int numLigands = (args.length >= 2) ? Integer.parseInt(args[1]) : 12;

        int maxLigandLength = 6;
        if (args.length >= 3) {
            maxLigandLength = Integer.parseInt(args[2]);
        }

        final String protein = (args.length >= 4) ? 
            args[3] :"the cat in the hat wore the hat to the cat hat party";

        System.out.println("Number of threads: " + numThreads);
        System.out.println("Number of ligands: "+numLigands);
        System.out.println("Max ligand length: "+ maxLigandLength);
        System.out.println("Protein: "+ protein);
        System.out.println();

        // Things to do: 
        // 1. Generate the requested numLigands w/ maxLigandLength
        // 2. Calculate the matching score for each ligand vs the given protein
        //    Score is calculated based on the number of character in the ligand that
        //    appears in the same order in the protein. 
        // 3. Find the ligand(s) with the highest score

        long start = System.currentTimeMillis();
        String[] ligands = generateLigands(numLigands, maxLigandLength, args.length >= 5 && args[4].equals("true"));

        // print the ligands if desired
        if (args.length >= 6 && args[5].equals("true")) {
            System.out.println("Here are the ligands");
            for(String l : ligands) {
                System.out.println(l);
            }
        }
        
        // map each ligand to (ligand, score)
        // also keep track of the maxScore 
        LSPair[] ligandsWScore = new LSPair[numLigands];

        ForkJoinPool customThreadPool = new ForkJoinPool(numThreads);

        int maxScore = customThreadPool.submit(
            () -> IntStream.range(0, numLigands).parallel().map(
                i -> {
                    String ligand = ligands[i];
                    int score = calcScore(ligand, protein);
                    ligandsWScore[i] = new LSPair(ligand, score);
                    return score;
                }
            ).reduce(Math::max)
        ).get().getAsInt();

        // find the ligands whose score is maxScore
        // this is a reduce operation
        StringBuilder sb = new StringBuilder();
        for(int i = 0; i < numLigands; i++) {
            if (ligandsWScore[i].score == maxScore) {
                if (sb.length() > 0) sb.append(", ");
                sb.append(ligandsWScore[i].ligand);
            }
        }

        long end = System.currentTimeMillis();
        System.out.println("The maximum score is " + maxScore);
        System.out.println("Achieved by ligand(s) "+ sb.toString());
        System.out.println("Calculation time " + (end-start) + " ms");
    }

    /**
     * Match a ligand (str1) and the protein. Count the number of characters in str1
     * that appear in the same seq in str2 (there can be any number of intervening chars)
     * @param str1 first string
     * @param str2 second string
     * @return number of matches
     */
    private static int calcScore(String str1, String str2) {
        // no match if either is empty string
        if (str1.length() == 0 || str2.length() == 0) return 0;

        if (str1.charAt(0) == str2.charAt(0)) {
            return 1 + calcScore(str1.substring(1), str2.substring(1));
        }
        return Math.max(
            calcScore(str1, str2.substring(1)), calcScore(str1.substring(1), str2));
    }
}

In [None]:
!javac DDStream.java

In [None]:
!java DDStream

Try running it with more ligands and longer ligands length. Observe how much longer it takes for the program to finish.

In [None]:
!java DDStream 4 20 6 "your time is limited, so don't waste it living someone else's life"

## Speedup

**Speedup** is the ratio of the execution time of a serial program to its parallel execution. Specifically,

  $S_n= \frac{T_1}{T_n}$

Where $T_1$ is the time it takes to execute a program on 1 thread, while $T_n$ is the time it takes to execute a program on n threads. Some important notes:

 - A speedup greater than 1 indicates that there is benefit to the parallel implementation. A speedup of x means that the parallel code is x times faster.
 - A speedup less than 1 indicates that there is no benefit to parallelism.

To correctly calculate speedup, we must first measure the running time of a program. In this case, we will use `System.currentTimeMillis` to measure how long it takes for the program to finish, starting from the ligand generation step.



## Performance study

 Try running the program using different number of threads and different scheduling strategy and complete the table below.

|         | 1 Thread | 2 Threads | 3 Threads | 4 Threads |
|---------|----------|-----------|-----------|-----------|
|Time (s) |          |           |           |           |



How does the run time of dynamic vs static scheduling compare when the program is run with multiple threads?

We are using Python to assisst us with the speed up calculation below. 

Try calculate the speed-up for each of the (thread number, scheduling) combination. Replace the `??` in the Python code below with numbers from your table. 

In [None]:
#lists holding measured times (floating point)
#TODO: Fill in arrays below (code will not compile otherwise!)
#Time (n threads)  1   2  3  4
dd_times =       [ ??, ??, ??, ??]

#compute speedup
speedup  = [round(dd_times[0]/dd_times[i],2)   for i in range(1,4)]

#print results
print("Speed-up:")
print(speedup)


## Summary - Work Stealing Scheduling

In many cases, parallel threads are scheduled using a static scheduling where it is assumed that all components take about the same amount of time. However, if some components take longer than others, a **load balancing** issue can arise. In the case of the drug design example, different ligands take longer to compute than others. Therefore, a work-stealing scheduling approach is better.

Next, note that speedup is non-linear, especially at higher number of threads. While increasing the number of threads is supposed to generally _improve_ the run time of a program, it usually does not scale linearly with the number of cores. This occurs for a number of reasons. First, at higher number of threads, it is more likely that the serial components of a program (such as thread creation overhead, and any other necessary serial operations that must take place before the parallel code can run) start dominating run time (see **Amdahl’s Law**). Second, **resource contention** on the CPU or memory from other processes can cause slow downs. A related measure called **efficiency** measures how _well_ a program uses the cores assigned to it.

Further Reading on Performance: [Dive into Systems, Chapter 14.4](https://diveintosystems.org/antora/diveintosystems/1.0/SharedMemory/performance.html)