# Brief introduction to parallel programming with Python

You've had brief introduction to computer acrhitecture and should already know that (once you've exhausted algorithmic improvements and using efficient libraries such as NumPy) the main path to increased performance is parallelism --- i.e., exploiting the ability of your computer to do multiple things at once.

How many processors/cores/threads does your computer have?
* my laptop has 1 processor with 14 physical cores and supports a total of 22 (hyper)threads
* yours might have more cores
We routinely use both cores when working on the computer --- e.g., doing homework in the word processor while watching Netflix in the browser.

At SBU we have the SeaWulf supercomputer
* It contains over 165 computers (also called nodes) --- actually now over 450 nodes
* Each computer has 2 processors for a total of 28 cores and 56 threads per computer (which has 128 Gbyte memory)
* In total there are 4620 cores and 21 Tbyte memory --- actually now over 20K cores

In addition to running faster we often need to use parallel computing to use more memory.
* There is one computer in SeaWulf that has 72 cores (144 threads) and 3 Tbyte memory.  A fantastic machine for big data analytics.

Lots of standard packages can internally use parallelism.

Here we explore how to use parallel programming as a general purpose tool using all of the cores within a single computer.

There are two related concepts:
* concurrency --- two or more actions that can occur in any order, including at the same time.
  * `(a+b) + (c+d)` --- yes --- the operations in parenthesis are independent, but note the final (center) addition must be evaluated last.
  * `a + b*c` --- no --- you must evaluate the multiplication before the addition
* parallelism --- operations actually occur at the same time. This requires concurrency for correctness, and adds simultaneous execution for performance.
  * In HPC (high-performance computing), performance is treated with nearly the same level of concern as correctness.

Also, 
* asynchronous execution --- overlapped/interleaved execution of operations, e.g.,
  1. start loading a web page
  2. switch to another tab and start loading another web page
  3. drink coffee while waiting for one of the pages to finish loading
  4. read which ever page downloads first
* useful when individual operations may take a long time but don't necessarily consume a lot of computer time (e.g., waiting for network services, a human, etc.)


To begin, let's pick a problem to solve.

Do you remember Tim?   We were interested in computing the probability of him falling into a canal on the way home.

A simplified version of the problem is the following random process.  In 1D, Tim is initially located at the origin.  I.e.,
$$y(0)=0$$
where $y(t)$ is his position as a function of time ($t$) or, equivalently, the number of steps.
Each step he takes is chosen from a uniform random distribution between $-1$ and $1$.  I.e.,
$$y(t+1) = y(t) + U[-1,1]$$
where $U[-1,1]$ denotes the random number.  

We want to answer the question "When (i.e., at what time or after how many steps) does Tim first reach distance $Y$ from the origin, and how does this time depend upon $Y$?
* This is called the first-crossing time
* It is important in many applications including chemistry, electronics, biology, etc.
* It is interesting to also examine the time and/or probability of returning to the origin.
* Also interesting is how first-crossing times and return probability change with the number of dimensions.

To do this we will write a function to run a single random walk and return the time (number of steps) taken to reach $Y$.

In [None]:
import random
def random_walk(Y):
    ''' Execute a random walk and return the number of steps to reach +/- Y'''
    y = 0.0
    n = 0
    while abs(y) < Y:
        y += (random.random()-0.5)*2.0
        n += 1
    return n

In [None]:
print(random_walk(10))
print(random_walk(10))
print(random_walk(10))
print(random_walk(10))

It's a random number!  Duh.  

So instead we need to compute the average (or expectation value) over many walks.

Write a function to average the number of steps needed over `Nsample` walks

In [None]:
def mean_length(Y,Nsample=300):
    mean = 0
    for sample in range(Nsample):
        mean += random_walk(Y)
    return mean/Nsample

Run it for Y=10,20,100

In [None]:
mean_length(10)

In [None]:
mean_length(100)

In [None]:
mean_length(200)

Wow ... as we increase Y it takes longer and longer.  It would be nice to make this calculation faster.

Let's write a program that given a list of Ys runs each calculation (task) in parallel and returns the result to us.

We will use the Python standard `multiprocessing` module.
* https://docs.python.org/dev/library/multiprocessing.html 

Specifically, we will be using the process pool. Imagine a pool of workers waiting for a manager to give them tasks to execute.  The manager (or main process) 
* passes data to each worker describe the work that needs doing
  * if there is a lot of data the overhead of passing data needs to be offset by the amount of work that needs doing
* takes the result as each worker finishes a task and gives more work
* once all the work is done sends the workers home (terminates the worker processes)

The processes clearly need to know not just some data (in our case the calue of $Y$) but also what code to execute.  Thus they need access to your Python program.

[On Windows using slightly older versions of Python, parallelism did not work easily inside Jupyter notebook and some workarounds were necessary]

First a simple example.  We will write a parallel program to compute the sine of each of a list of numbers --- we don't expect to see any speed up since there is so little work to do compared with the overhead of communication between manager and worker.


In [None]:
for value in map(lambda x: x**2, [1,2,3,4,5,6,7]):
    print(value)

In [None]:
import math
import time
import multiprocessing as mp

# For use in a script
#if __name__ == "__main__":
#    mp.freeze_support() # usually only needed for older Pythons on windows
# and indent everything below to be protected by the if

NUM_WORKERS = 16

start_time = time.time()

with mp.Pool(processes=NUM_WORKERS) as pool:
    results = pool.map(math.sin, [0.1,0.2,0.3,-2.3,77.0])

end_time = time.time()   
print(results)

print("Time : %ssecs" % (end_time - start_time))

Python `with` statement
* https://effbot.org/zone/python-with-statement.htm
* https://www.geeksforgeeks.org/with-statement-in-python/
* https://preshing.com/20110920/the-python-with-statement-by-example/
* https://docs.python.org/3/reference/compound_stmts.html#the-with-statement
* https://realpython.com/python-with-statement/

Python exception handling
* https://docs.python.org/3/tutorial/errors.html
* https://docs.python.org/3/library/exceptions.html
* https://docs.python.org/3/reference/compound_stmts.html#the-try-statement
* https://docs.python.org/3/reference/simple_stmts.html#the-raise-statement
* https://www.programiz.com/python-programming/exception-handling

Some key ingredients if using a script rather than inline Jupyter notebook
* One process (the manager) will be running the script as its main program --- but we don't want all of the worker processes to execute the full script otherwise they too will make pools of workers that in turn will make more workers ... 
* So we need to use the `if __name__ == "__main__":` technique to stop unnecessary code from being executed
* For more details https://docs.python.org/dev/library/multiprocessing.html#multiprocessing-programming


Also, it *used* to be essential on Windows to have `mp.freeze_support` --- if needed it *must* be the first line after `if __name__ == "__main__":`
* `NUM_WORKERS` controls the number of processes we will create

OK, let's do the real problem.  Instead of executing math.sin(value) we want to execute mean_length(Y)
* modify the program to do this
* run it
* modify the number of workers --- how does the execution time vary?

Our list of Y values will be.

In [None]:
Ylist = [10,20,30,40,50,60,70,80,90,100,110,120,130,140,150]
print(Ylist)

In [None]:
import math
import time
import multiprocessing as mp

NUM_WORKERS = 2

start_time = time.time()

with mp.Pool(processes=NUM_WORKERS) as pool:
    results = pool.map(mean_length, Ylist)

end_time = time.time()   
print(results)

print("Time : %ssecs" % (end_time - start_time))



This is what I get on my laptop (in a windows virtual machine with 2 cores)

* 1 worker  --- 30.0s
* 2 workers --- 18.5s
* 4 workers --- 16.6s

On my laptop under Linux
* 1 worker  --- 23.7s
* 2 workers --- 14.4s
* 4 workers --- 14.4s

On SeaWulf login node
* 1 worker  --- 32.0s
* 2 workers --- 15.7s
* 4 workers --- 9.6s
* 15 workers --- 5.9

Plot the results from the simulation

In [None]:
Ylist

In [None]:
T = [313.9866666666667, 1150.01, 2672.4133333333334, 4621.51, 7085.17, 11509.996666666666, 15458.16, 20049.47, 22710.116666666665, 31472.48, 38479.166666666664, 42849.293333333335, 54146.223333333335, 57255.62333333334, 70433.5]

In [None]:
%matplotlib inline
import matplotlib.pyplot as plt
plt.plot(Ylist,T);

In [None]:
Texact = [3.14159*Y*Y for Y in Ylist]
plt.plot(Ylist,T,"r-", Ylist,Texact, "b-");

Can we make the calculation run even faster?
* order of evaluations
* chunking
* imap
* imap_unordered



What is the fastest we can run this calculation?

Concept of critical path --- the sequence of computational steps that limit execution time.

Concept of performance model

**Amdahl's law.**  Imagine that it takes you $S$ seconds to get ready for work (pencil sharpening, making coffee, planning, etc.), and that you have lots of independent tasks which need doing that together take $P$ seconds. Your execution time with just one worker (you) is
$$ T(1) = S + P $$
But if you had $w$ workers your execution time could be
$$ T(w) = S +  \frac{P}{w} $$

($P$ for parallel work; $S$ for sequential work)

In [None]:
import numpy as np
S = 100
P = 10000
T = lambda w : S + P/w 
wlist = np.arange(1,201,1)
plt.plot(wlist, np.log10(T(wlist)));


How much faster can we make the calculation run?  I.e., if we had an infinite number of processors (I *want* that computer!!) how fast could it run? 

If we are using $w$ workers the speedup over using 1 worker is given by
$$ \text{speedup} = \frac{T(1)}{T(w)} = \frac{S+P}{S+P/w}  $$

If we have an infinite number of processors, then $P/w \rightarrow 0$ and
$$ \text{speedup} = \frac{S+P}{S} \approx \frac{P}{S}$$
where we assumed that $P \gg S$.

I.e., the speedup is limited by the amount of sequential work you have.  

Amdahl's law is often said to be **cruel**.  If you want a 100-fold speedup, then only 1% of your work can be sequential.  If you want a 1,000,000-fold speedup, then only 0.0001% can be sequential.  Note that the largest computers in the world has $O(10^{18})$ parallelism!!!!!!!!!!

In our example above, with $S=100$ and $P=10000$ only a speedup of 100 is possible no matter how hard you try.

In [None]:
speedup = lambda w : (S+P)/(S+P/w)
plt.plot(wlist, speedup(wlist));

Ideal speedup is equal to the number of workers (processors) used.

The concept of efficiency is also useful.  Efficiency is defined as the ratio between the possible speedup and the ideal speedup.
$$\text{efficiency}(w) = \frac{\text{speedup}(w)}{w} = \frac{S+P}{wS+P}$$

In [None]:
eff = lambda w : (S+P)/(w*S+P)
plt.plot(wlist,eff(wlist));

Efficiency is not an abstract concept --- it translates into time, money spent for cloud services, energy consumed, $CO_2$ produced, etc.

Can we make the computation of a single random walk run in parallel?
* Not as defined --- it is an inherently sequential process

Use the multiprocessing module to parallelize this Monte Carlo algorithm to compute $\pi$.
(https://www.geeksforgeeks.org/estimating-value-pi-using-monte-carlo/)


In [None]:
from random import random

def sample_one_point():
    x,y = random(),random()
    rsq = x*x + y*y
    if rsq <= 1:
        return 1
    else:
        return 0
    
def sample_one_batch(Npoint = 10000):
    sum = 0.0
    for i in range(Npoint):
        sum += sample_one_point()
    return 4*sum/Npoint

sum = 0.0
sumsq = 0.0
Nbatch = 100
for i in range(Nbatch):
    sample = sample_one_batch()
    sum += sample
    sumsq += sample**2

PM = ""
mean = sum/Nbatch
meansq = sumsq/Nbatch
err = math.sqrt((meansq - mean**2)/Nbatch)
print("%.5f\u00B1%.5f" % (mean, err))

In [None]:
with mp.Pool(processes=NUM_WORKERS) as pool:
    results = pool.map(sample_one_batch, [10000]*10)

In [None]:
print(results)

In [None]:
import numpy as np

def batch(junk):
    return sample_one_batch(100000)

start = time.time()

NUM_WORKERS = 4

results = []
with mp.Pool(processes=NUM_WORKERS) as pool:
    for value in pool.imap_unordered(batch, range(1000), chunksize=4):
        results.append(value)
    
print("used %.2fs" % (time.time()-start))
#print(results)
results = np.array(results)
print("%.5f\u00B1%.5f" % (results.mean(), results.std()))


Can you speed this up using Numpy?

In [None]:
def sample_one_batch(Npoint = 10000):
    x = np.random.random(Npoint)
    y = np.random.random(Npoint)
    rsq = x*x + y*y
    return 4*np.sum(rsq<=1)/Npoint

import numpy as np

def batch(junk):
    return sample_one_batch(100000)

start = time.time()

results = []
with mp.Pool(processes=NUM_WORKERS) as pool:
    for value in pool.imap_unordered(batch, range(1000), chunksize=4):
        results.append(value)
    
print("used %.2fs" % (time.time()-start))
#print(results)
results = np.array(results)
print("%.5f +/- %.5f" % (results.mean(), results.std()))




We got about a 10x speedup switching from Python to NumPy ... what's the moral?  

Start close to home for optimization
1. Do less work by only computing what you need.
1. Use the best algorithm.
2. Efficient use of a single process (NumPy, external libraries, etc.) which translates to intra-process use of intra-processor parallelism.
3. Multi-thread (intra-process) and/or multi-process parallelism

Coming back to Tim.  Can you speedup up simulation of one Tim using NumPy? 

No.  It is sequential and is not working on a vector of data.

But multiple, **independent** Tims?  Yes.  This requires moving the loop over samples inside the loop over steps and having a vector of samples.  This process of loop interchange and promotion of data structures (scalar to vector) is common when optimizing code.

## Comunicating processes

This is an advanced topic --- for most purposes try to start with `map` or `imap` or `imap_unordered`.

For parallel programming, my rule-of-thumb is that for each factor of 10 speedup via parallelism there is about a factor of 2 increase in the programming cost
* Fugaku, the currently fastest computer in the world, has about 150,000 processors
* Programs that can use Fugaku efficiently are about 32-64x more expensive to write than programs for just one core (e.g., 1 month of programming versus 2-5 years).  
* Is your problem worth that effort?

Sometimes processes need to coordinate with each other and exchange data in order to parallelize a calculation.  Examples include:
* Client-server: A process is running in the background responding to requests --- e.g., database or web server
* Domain decomposition: E.g., computational fluid dynamcs simulation of the atmosphere
* Parallel linear algebra: operating on large matrices

**Example**: Use `process` and `pipe` from `multiprocessing` to create a server process connected via a pipe. It should forever
* receive a string, and 
* reply with the received value with "X" appended (e.g., if you received "hello" reply with "helloX")
* if the value is "stop" close the connection and return
Test the server by sending some test strings and then "stop".  The main process should also join the child process to clean up.


In [None]:
import multiprocessing as mp

def server(connection):
    while True:
        value = connection.recv()
        connection.send(value + "X")
        if value == "stop":
            break
    connection.close()

if __name__ == '__main__':
    parent_conn, child_conn = mp.Pipe()
    p = mp.Process(target=server, args=(child_conn,))
    p.start()
    for msg in ["a","fred","Who am I?", "stop"]:
        parent_conn.send(msg)
        print("sent", msg, "received", parent_conn.recv())
    p.join()

**Example:** Using `process` and `pipe` from `multiprocessing` write a program that 
* creates a "server" process connected to the main program with a pipe
* the server process *forever* receives values from the pipe
  * if the value is "hello" it replies "yes"
  * if the value is "bye" it replies "no", closes the pipe, and returns
* the main process 
  * sends "hello" 10 times to the server printing out the reply each time
  * sends "bye", checks that the reply is no, and then joins the process and exits


In [None]:
import multiprocessing as mp

def server(connection):
    while True:
        value = connection.recv()
        if value == "hello":
            connection.send("yes")
        elif value == "bye":
            connection.send("no")
            break
        else:
            connection.close()
            raise ValueError("Server: Expecting only hello or bye but received: %s" % str(value))
            
    connection.close()

def customer(connection):
    for i in range(10):
        connection.send("hello")
        value = connection.recv()
        print("I received", value)
    connection.send("bye")
    value = connection.recv()
    if value != "no":
        raise ValueError("Customer: Expecting 'no' but recevied %s" % str(value))

if __name__ == '__main__':
    parent_conn, child_conn = mp.Pipe()
    p = mp.Process(target=server, args=(child_conn,))
    print(p.name)
    p.start()
    customer(parent_conn) 
    p.join()

## Solution of partial differential equation in parallel

[Don't worry about the calculus --- it will vanish very soon.]

We will employ domain decomposition to solve the differential equation (eigen problem)
$$
 \frac{\partial^2 }{\partial x^2} f(x) = - f(x)
$$
on the domain $[0,\pi]$ with boundary conditions $f(0) = f(\pi) = 0$.  
* A solution is $f(x) = \alpha \sin(x)$ for any value of constant $\alpha$.

We will approximate the second derivative using the three-point stencil
$$
 \frac{\partial^2 f(x)}{\partial x^2} \approx \frac{1}{h^2} \left(f(x+h) + f(x-h) - 2 f(x)\right)
$$
where $h$ is the spacing between grid points in the $x$-dimension --- i.e., $x_i = i h$.  
* Note that to compute the second derivative at point $x_i$ we need the function values at $x_{i-1}$, $x_i$, and $x_{i+1}$.
* We also use the notation $f_i = f(x_i)$

Finally, we will update using Euler's method
$$
 f(x) \leftarrow f(x) + dt \frac{\partial^2 }{\partial x^2} f(x)
$$
for some small value $dt$ interpreted as a time step (in imaginary time if you are really interested).

Thus, to solve the equation we 
1. Initialize $f(x_i)$ with $x_i = i h$ and ensuring $f(0) = f(\pi) = 0$.
2. Repeat until converged $f(x_i) \leftarrow f(x_i) + dt \left(f(x_{i+1}) + f(x_{i-1}) - 2 f(x_i)\right) / h^2$


The next cell defines necessary parameters and functions

In [None]:
import math
import numpy as np

N = 51              # No. of points used --- need this to be odd
h = math.pi/(N-1)   # Element size
dt = 0.4*h**2       # Time step (must be less than (1/2)*h**2)
nstep = int(5.0/dt) # No. of steps required to converge
print("    N", N)
print("    h %.4f" % h)
print("   dt %.4f" %dt)
print("nstep", nstep)

# We'll solve for $f(x_i)$ at these points
x = np.linspace(0.0,math.pi,N)

def guess(x):
    return x*(math.pi-x) / (0.25*math.pi**2)

def diff2(f,h):
    ''' Estimates d2f/dx2 using 3 point stencil '''
    N = f.shape[0]
    df2 = np.zeros(N)
    df2[1:-1] = (f[2:]+f[:-2] - 2*f[1:-1])*h**-2
    #for i in range(1,N-1):
    #    df2[i] = (f[i+1] + f[i-1] - 2*f[i])/h**2
    return df2

def euler(f,dt,h):
    ''' Advances one time step using f(t+dt) = f(t) + dt * d2f/dx2 '''
    return f + dt * diff2(f,h)

def doplot(f, x):
    fac = math.sin(x[N//4])/f[N//4]
    f = fac*f # rescale to make comparison to exact easier 
    fig, ax1 = plt.subplots()
    ax2 = ax1.twinx()
    ax1.plot(x, f, 'b-', x, np.sin(x), 'k-')
    ax1.set_ylabel("f(x)")
    ax1.set_xlabel("x")
    ax2.plot(x, f-np.sin(x), 'r-')
    ax2.set_ylabel("error");


**First,** solve across the entire domain $[0,\pi]$

In [None]:
f = guess(x)
for step in range(nstep):
    f = euler(f,dt,h)
    
doplot(f, x)

**Second,** solve by dividing the domain into two $[0,\frac{\pi}{2}] + [\frac{\pi}{2}, \pi]$

The domains will be split between points $x_m = \frac{\pi}{2}$ and $x_{m+1}$, with $m=\frac{N-1}{2}$.  

Because of this, the solutions are coupled.  
* On the left, to compute the second derivative at point $x_{m}$ we will need the value at $x_{m+1}$
* On the right, to compute the second derivative at point $x_{m+1}$, we will need the value at $x_{m}$
* These values must be exchanged every time step

But now --- the left and right sides can be updated in parallel!
* How do we know this?  Below the updates of `fleft` and `fright` can be done in any order.

In [None]:
f = guess(x)
m = N//2
fleft = f[0:m+1]
fright = f[m-1:]

for iter in range(nstep):
    fleft = euler(fleft, dt, h)
    fright = euler(fright, dt, h)
    fleft[-1] = fright[1]
    fright[0] = fleft[-2]

f = np.concatenate((fleft[:-1],fright[1:]))
doplot(f, x)

**Third,** make it run in parallel using just two processes.
* We make a new process and number 
  * The original process as `id=0`
  * The new process as `id=1`
* Process `0` will simulate the left and process `1` the right
* When we start the new process we pass `id,dt,h,fright`
* Each process iterates, exchanging data each time step
* When finished, process `1` sends the solution `fright` process `0`


In [None]:
def solve(conn, id, dt, h, nstep, f):
    for step in range(nstep):
        f = euler(f, dt, h)
        if id == 0:
            f[-1] = conn.recv()
            conn.send(f[-2])
        else:
            conn.send(f[1])
            f[0] = conn.recv()

    if id == 0:
        fright = conn.recv()
        return np.concatenate((f[:-1],fright[1:]))
    else:
        conn.send(f)
        conn.close()

f = guess(x)
m = N//2
fleft = f[0:m+1]
fright = f[m-1:]

parent_conn, child_conn = mp.Pipe()
p = mp.Process(target=solve, args=(child_conn, 1, dt, h, nstep, fright))
p.start()
f = solve(parent_conn, 0, dt, h, nstep, fleft) 
p.join()

doplot(f, x)