# Parallelism


In [2]:
import os 
import numpy as np
import matplotlib.pylab as plt
%matplotlib inline

import scipy.ndimage as ndimage
import timeit
import multiprocessing
import astropy.io.fits as fits

## BLAS/LAPACK/ATLAS/MKL

This often isn't the parallelism you are looking for, but it is a way to speed up single-threaded code that relies heavily on matrix operations.

There are packages that automatically parallelize linear algebra routines like matrix dot product or singular value decomposition. BLAS/LAPACK are the algorithms, and ATLAS, OpenBLAS, or MKL are implementations of them. If you have an Intel processor, you will be using MKL. They are quite useful for when you don't want to write your own parallelization code but want to speed up matrix routines. It requires numpy to be configured for BLAS/MKL properly. Generally if you install numpy with Anaconda, this happens automatically (you should already be benefitting from it), so this is why I recommend installing numpy using `conda install numpy`. These speedups are turned on by default when configured.

### Note on when to use

However, if you wish to do your own parallelism, make sure to turn this off. Having multiple layers of parallelism will often slow down your code! You can turn off these parallelism features in your `.bashrc` using:

```
export OPENBLAS_NUM_THREADS=1
export MKL_NUM_THREADS=1
```

### How to check if you have these libraries

In [3]:
# Check we have BLAS/LAPACK. Notice they are from the MKL library
np.show_config()

blas_mkl_info:
    libraries = ['mkl_rt', 'pthread']
    library_dirs = ['/home/jwang/miniconda3/envs/codeastro/lib']
    define_macros = [('SCIPY_MKL_H', None), ('HAVE_CBLAS', None)]
    include_dirs = ['/home/jwang/miniconda3/envs/codeastro/include']
blas_opt_info:
    libraries = ['mkl_rt', 'pthread']
    library_dirs = ['/home/jwang/miniconda3/envs/codeastro/lib']
    define_macros = [('SCIPY_MKL_H', None), ('HAVE_CBLAS', None)]
    include_dirs = ['/home/jwang/miniconda3/envs/codeastro/include']
lapack_mkl_info:
    libraries = ['mkl_rt', 'pthread']
    library_dirs = ['/home/jwang/miniconda3/envs/codeastro/lib']
    define_macros = [('SCIPY_MKL_H', None), ('HAVE_CBLAS', None)]
    include_dirs = ['/home/jwang/miniconda3/envs/codeastro/include']
lapack_opt_info:
    libraries = ['mkl_rt', 'pthread']
    library_dirs = ['/home/jwang/miniconda3/envs/codeastro/lib']
    define_macros = [('SCIPY_MKL_H', None), ('HAVE_CBLAS', None)]
    include_dirs = ['/home/jwang/miniconda3/envs/code

To demonstrate the speed-up from these libraries, we will try some matrix dot product with it turned on and off. In the code block below, we are using the MKL package which works with Intel CPUs. If you do not have an Intel CPU, this command will not do anything. Setting the number of threads this way for MKL is effectively the same as doing it in your `.bashrc` but is not permanent (only affects this notebook).

In [4]:
##### Note, this following demo only works if you can use MKL. 
import mkl

mat = np.random.random((1000, 1500)) # 1000x1500 matrix

# Use 4 cores to parallelize matrix operations
mkl.set_num_threads(4)
print("4 cores", timeit.timeit("mat.dot(mat.T)", setup="from __main__ import mat", number=100))

# Disable MKL
mkl.set_num_threads(1)
print("1 core", timeit.timeit("mat.dot(mat.T)", setup="from __main__ import mat", number=100))

4 cores 1.5699356000404805
1 core 2.944283500080928


## Running Multiple Python Instances

This might sound very unsophisticated, but it is actually a good choice for many instances. Python parallelism is not the best, and if tasks are completely independent of each other, running each task as a separate Python process and saving the result as a file is a perfectly reasonable (and simple) option. For example, this is great for bulk processing a bunch of files with some data reduction code. There are many options to do this: bash script, GNU Parallel, or, a master python script.

## GNU Parallel

If you have a script that you want to run multiple times (e.g., on multiple files), you don't need to write Python parallelization. We can use the `parallel` package that can be installed on UNIX systems to handle the parallelization. For example, if we can process a single file (doesn't have to be a file) by running on the command line:

    python process.py file_01.fits

Then we can run it on `file_01.fits` to `file_99.fits` by running on the command line:

    parallel python process.py ::: file_{01..99}.fits

And that's it! You don't have to do anything else but make it so your python script can take a filename as input! (Hint: you can use `import sys; sys.argv` to grab command line arguments in your code).

## Python Scripting

I am not going to show you how to do this in a bash script. Instead, we will focus on using a python script to launch a bunch of python processes because all capabilities of shell scripting (e.g., calling bash commands with the sys module) can be done in Python, and often with much better readability. We will use python to launch a bunch of python processes.

We create a function with an argument `index` that tells each proces what chunk of the task to run. We then create a bunch of processes, give them their chunks, and call `start()` to run them. Afterwards, our master process uses `join()` to wait for each process to finish. It is important to always call `join()` at the end to ensure all processes have finished running! If a process has finished immediately, `join()` will immediately return; if a process has not finished, our master process will sleep until the process it is waiting on has finished. 

### Before we begin with multiprocessing, a few important points:

 * Only one process can spawn other processes. Your subprocesses cannot spawn their own subprocesses! Generally, this is always a way to program things so that only one process needs to spawn subprocesses
 * While python is generally OS-agnostic, you may run into OS specific issues with multiprocesses due to the fact it is implemented a bit differently for Linux, Mac, and Windows. 
 * One key difference to remember when developing packages for multiple OSes: on Mac and Windows, any lines with multiprocessing calls need to be wrapped in something like ``if __name__ == "__main__":``, otherwise you will get an error mentioning something about `freeze_support()` when trying to run it. This is not required in Jupyter notebooks, so we have merely commented where you would need them here. The details for why this is necessary is related to subprocesses not spawning their own subprocesses and is quite technical, but [here is a related blog post if you want to learn more](https://pythonspeed.com/articles/python-multiprocessing/). 


In [8]:

def process_data(filename):
    """
    Let's pretend this function does someting

    Args:
        filename (str): the input file to work on
    """
    # do some data processing
    print("Process {0} complete".format(filename))
    # could save value to a shared variable or save to a file to be used later

process_list = []

### If you get errors about freeze_support(): 
### then following code would need to be wrapped in `if __name__ == "__main__":` (both for loops).
for i in range(10):
    filename = "file_{0}.fits".format(i)
    p = multiprocessing.Process(target=process_data, args=(filename,))
    process_list.append(p)
    p.start()

for p in process_list:
    p.join()

Process file_0.fits complete
Process file_1.fits completeProcess file_2.fits complete

Process file_3.fits complete

Process file_4.fits completeProcess file_5.fits complete
Process file_6.fits complete
Process file_7.fits complete
Process file_8.fits complete
Process file_9.fits complete


## Pools

Manually creating threads requires a bunch of upkeep code, which is unnecessary if you are just running over a giant loop. In the spirit of keeping parallelism simple, use a high-level parallelization API provided by your programming language whenever possible! It will save you time and effort (Trust me). For dividing up tasks with a for loop, use Python process `Pools`. Essentially, you can give
any number of tasks to a process `Pool` and the processes in the pool will loop through and do each one per your instructions. 

When in doubt about how to parallelize code, use process pools! They are flexible and can accomodate most use cases. And since they have a standardized interface for how to use them, it will make your code more understadable by others compared to home-brewing your own parallelism. 

### Matrix dot product example

In [9]:
def matrix_pool(mat, index):
    # divide up so that we only compute one chunk of the mat.dot(mat.T) matrix
    index_start = mat.shape[0] // 10 * index
    index_end = mat.shape[0] // 10 * (index + 1)
    val  = mat[index_start:index_end].dot(mat.T)
    print("Job {0} complete".format(index))
    
    return val

In [10]:
### This entire block would need to be wrapped in `if __name__ == "__main__":`if you have freeze_support() issues
pool = multiprocessing.Pool(processes=8) # creae a pool with 8 worker processes

pool_jobs = []
for i in range(10):
    job = pool.apply_async(matrix_pool, (mat, i))
    pool_jobs.append(job)
    print("Created job {0}".format(i))

for i, job in enumerate(pool_jobs):
    result = job.get() 
    print("Result {0}".format(i), result.shape)

Created job 0
Created job 1
Created job 2
Created job 3
Created job 4
Created job 5
Created job 6
Created job 7
Created job 8
Created job 9
Job 0 complete
Result 0 (300, 3000)
Job 1 complete
Result 1 (300, 3000)
Job 2 complete
Result 2 (300, 3000)
Job 3 complete
Result 3 (300, 3000)
Job 4 complete
Result 4 (300, 3000)
Job 5 complete
Result 5 (300, 3000)
Job 6 complete
Result 6 (300, 3000)
Job 7 complete
Result 7 (300, 3000)
Job 8 complete
Result 8 (300, 3000)
Job 9 complete
Result 9 (300, 3000)


## Checking Resource Usage

Anytime you are testing parallelized code, it is good to monitor your CPU/RAM usage. Monitoring your CPU usage can help you assess if the number of processes being run in parallel is consistent with what you are looking for. Many times, running parallelized code already involves big data sets, and parallelization will use even more memory (you can think of it as trading runtime for memory usage). So have your resource monitor up occasionally when you are developing this code. It is also a great way to debug the code (e.g., identify hanging parallelization that is not finishing). 

# Activity: Image Processing

Run the following code to generate some fake data and then run the following image processing function on each fake image and save the resulting data. How fast can you process the data when the images are 1000x1000? What is the speedup from 1 core to multiple cores (is it linear?)?

### Instructions

  * Run the first cell below to generate 25 images of fake data
  * Run the second cell below to run the `process_image()` function on a single image without parallelism. Time how long this takes. Let's call this time t0. 
  * Write parallelization code to parallelize the `process_image()` function to process all 25 images. Time how long it takes to process 25 images. Let's call this tp. 
  * Compute speedup = (25 * t0) / tp: the speedup factor between doing 25 images in series and parallelizing it. 
  * (Optional) Try using different number of processes to parallelize the activity. How does the run time change with the number of processes used? How does it relate to the number of cores you have on your computer? Is it linear?

### End Product
  * Report on Piazza your best speedup factor you got. And specify how many processes you used, and how many cores you have.
  * (Optional): make a graph of wall clock time to process 25 images vs number of processes used. Post this on Piazza as well.  

### Roles
  * Driver: in charge of typing and running the code for this activity
  * Navigator: in charge of directing the driver what to code (can be more than one person)

In [None]:
# note, this cell might take a minute to run.

def generate_fake_data(filename, dims):
    """
    Generates a fake dataframe with random numbers
    
    Args:
        filename (str): file to save the data to
        dims (tuple): (Ny, Nx) pair that species the size of the y and x dimensions
    """
    # some complicated random image generation. Feel free to ignore.
    # coordinate system in fourier spae
    u,v = np.meshgrid(np.fft.fftfreq(dims[1]), np.fft.fftfreq(dims[0]))
    phases = np.random.uniform(0, 2*np.pi, u.shape)
    rho = np.sqrt((u*dims[1])**2 + (v*dims[0])**2)
    # suppress high frequency by a squared exponential
    spectrum = np.exp(-rho**2/(np.max(rho)/50)**2)  * np.exp(1j * phases)
    filtered = np.real(np.fft.ifft2(spectrum))

    fits.writeto(filename, filtered, overwrite=True)


# generate fake data (can choose to save it somethere else if you want)
fileformat = os.path.join("./", "fake_{0}x{1}_{2}.fits")

ny = 1000
nx = 1000
for i in range(25):
    filename = fileformat.format(ny, nx, i)
    generate_fake_data(filename, (ny, nx) )


In [None]:
def process_image(frame, filtersize=50):
    """
    Run a high-pass filter on the data. 
    Remove the low spatial frequency (i.e., smooth features) in the image

    Args:
        frame (np.array): a 2-D image to be processed
        fitersize (int): the size of the filter. Features smaller than the filtersize will be preserved

    Returns:
        processed_frame (np.array): a 2-D image after processing
    """
    # run a median filter to smooth the image
    frame_smooth = ndimage.median_filter(frame, filtersize)

    processed_frame = frame - frame_smooth

    return processed_frame

# an example of running this on one image
with fits.open(fileformat.format(ny, nx, 0)) as hdulist:
    data = hdulist[0].data


    filt_data = process_image(data)

    fig = plt.figure(figsize=(6,3))
    ax1 = fig.add_subplot(121)
    ax1.imshow(data, cmap="inferno")
    ax1.set_title("Original")
    ax2 = fig.add_subplot(122)
    ax2.imshow(filt_data, cmap="inferno")
    ax2.set_title("Filtered")

#### Activity:
# write and time some code that runs this on all 25 images in parallel. How does the performance increase as you increase the number of processes you use?
# we recommend you use multiprocessing pool for this task

# Parallelism Extended Cut

There is a lot of talk about in parallelism - we are just scratching the surface. The above concepts _should_ cover the majority of parallelism use cases. However, here are some good things to know.


## Do you really need parallelism

We already emphasized this above, but parallelism adds complexity. Try avoiding parallelization until it is necessary. If some code takes 10 minutes to run, but you only need to run it once per week, is it worth parallelizing? The programming and upkeep costs may not be worth it. Parallelism also makes it hard to debug: we cannot attach the python debugger on other Python processes meaning they often crash with no error message as to why. GNU Parallel (below) is an alternative to writing more complicated code.

## Threads vs Processes
Python has two parallelization modules: `multiprocessing` and `multithreading`. What's the difference? Why do we only use `multiprocessing`?

Threads and processes are both tasks that your computer CPUs run in parallel. Threads share the same memory, whereas processes do not. Processes have to communicate themselves with shared variables (see below) or via a slower I/O method (writing to files, communicating over websockets). Most languages use threads to parallelize computations because sharing the same memory is very convenient and saves on resources. However, Python has the "Global Intepreter Lock" (GIL) that allows only one thread to run at a time (for complicated consistency reasons). Generally, parallelism in astronomy involves computationally intensive tasks so only one of them being able to run at a time would defeat the purpose. That's why you will generally only see multiprocessing in astronomy-related software development.

Threads are more frequently seen outside of astronomy, but it is generally useful for "I/O bound tasks" as opposed to the "CPU bound tasks" that we usually encounter in astronomy. Anytime you have tasks with a lot of sleeping/waiting time such as if you have multiple web API queries (e.g., multiple database queries) or waiting for files to be created, threads are a better choice because they use less memory and they can access all of your variables instead of having to define shared variables.

The one notable exception is that `numpy` functions that call C code releases the GIL. This means if your code is dominated by `numpy` matrix operations such as dot product, then you can actually use threads with minimal increase in runtime.

## Shared Variables

Processes do not by default share memory. For processes to read/write/access the same variables, we have to declare shared memory. Python `multiprocessing` provides nearly all shared memory structure that you should be using: queues and arrays. Queues allow for interprocess communication. Arrays allow for large datasets to be accessed by multiple processes without needing to duplicate them (possibly saving a lot of memory usage). Use these special arrays and queues becasue they are automatically synchronized between processes and do not require synchronization code, which is generally low-level synchronization that we should avoid programming unless we absolutely need it. 
