# Parallelism

Computers nowadays all have multiple cores that can run code in parallel. Many codes and algorithms in astronomy can be catagorized as "embarrassingly parallel", meaning that they can easily be split up into multiple tasks that each have little or no dependency on each other. An example of this is running the same image processing steps on multipel files, or computing the likelihood of a bunch of different sets of models. 

Parallelism adds extra complexity to the code, making it harder to debug and maintain. Thus, it is important to consider the relative gain of putting in parallelism versus the extra effort in developing and maintaining that code. For this reason, we also recommend that you keep parallelization code as simple as possible. For many tasks, even the most simple parallelism is sufficient. 

We will discuss a few options to do parallelism in Python in order of increasing complexity:

  1. BLAS/LAPACK
  3. Thread/process pools

TODO: monitor your CPU/RAM utilization

## BLAS/LAPACK/ATLAS/MKL

There are packages that automatically parallelize linear algebra routines like matrix dot product or singular value decomposition. BLAS/LAPACK  are the algorithms, and ATLAS, OpenBLAS, or MKL are implementations of them. If you have an Intel processor, you will be using MKL. They are quite useful for when you don't want to write your own parallelization code but want to speed up matrix routines. It requires numpy to be configured for BLAS/MKL properly. Generally if you install numpy with Anaconda, this happens automatically, so we recommend simply doing it this way. These speedups are turned on by default when configured.

However, if you wish to do your own parallelism, make sure to turn this off. Having multiple layers of parallelism will often slow down your code! 

In [1]:
# Check we have BLAS/LAPACK. Notice they are from the MKL library
import numpy as np
np.show_config()

blas_mkl_info:
    libraries = ['mkl_rt', 'pthread']
    library_dirs = ['/home/jwang/anaconda3/lib']
    define_macros = [('SCIPY_MKL_H', None), ('HAVE_CBLAS', None)]
    include_dirs = ['/home/jwang/anaconda3/include']
blas_opt_info:
    libraries = ['mkl_rt', 'pthread']
    library_dirs = ['/home/jwang/anaconda3/lib']
    define_macros = [('SCIPY_MKL_H', None), ('HAVE_CBLAS', None)]
    include_dirs = ['/home/jwang/anaconda3/include']
lapack_mkl_info:
    libraries = ['mkl_rt', 'pthread']
    library_dirs = ['/home/jwang/anaconda3/lib']
    define_macros = [('SCIPY_MKL_H', None), ('HAVE_CBLAS', None)]
    include_dirs = ['/home/jwang/anaconda3/include']
lapack_opt_info:
    libraries = ['mkl_rt', 'pthread']
    library_dirs = ['/home/jwang/anaconda3/lib']
    define_macros = [('SCIPY_MKL_H', None), ('HAVE_CBLAS', None)]
    include_dirs = ['/home/jwang/anaconda3/include']


In [2]:
import mkl
import timeit

mat = np.random.random((1000, 1500)) # 1000x1500 matrix

# Use 4 cores to parallelize matrix operations
mkl.set_num_threads(4)
print("4 cores", timeit.timeit("mat.dot(mat.T)", setup="from __main__ import mat", number=100))

# Disable MKL
mkl.set_num_threads(1)
print("1 core", timeit.timeit("mat.dot(mat.T)", setup="from __main__ import mat", number=100))

4 cores 4.90791590011213
1 core 12.856466499972157


## Running Multiple Python Instances

This might sound very unsophisticated, but it is actually a good choice for many instances. As you will learn before, Python parallelism is not the best, and if tasks are completely independent of each other, running each task as a separate Python process and saving the result as a file is a perfectly reasonable (and simple) option. This is great for batch processes such as bulk processing a bunch of files with some data reduction code.

There are many options to do this: bash script, GNU Parallel, or, as we will focus on today, a master python script. We will focus on using a python script to launch a bunch of python processes because all capabilities of shell scripting (e.g., calling bash commands with the sys module) can be done in Python, and often with much better readability. We will use python to launch a bunch of python processes.

We create a function with an argument `index` that tells each proces what chunk of the task to run. We then create a bunch of processes, give them their chunks, and call `start()` to run them. Afterwards, our master process uses `join()` to wait for each process to finish. It is important to always call `join()` at the end to ensure all processes have finished running! If a process has finished immediately, `join()` will immediately return; if a process has not finished, our master process will sleep until the process it is waiting on has finished. 

In [13]:
import multiprocessing

def matrix_loop(mat, index):
    # divide up so that we only compute one chunk of the mat.dot(mat.T) matrix
    index_start = mat.shape[0] // 10 * index
    index_end = mat.shape[0] // 10 * (index + 1)
    val  = mat[index_start:index_end].dot(mat.T)
    print("Process {0} complete".format(index))
    # could save value to a shared variable or save to a file to be used later

process_list = []

for i in range(10):
    p = multiprocessing.Process(target=matrix_loop, args=(mat, i))
    process_list.append(p)
    p.start()

for p in process_list:
    p.join()

Process 0 complete
Process 1 complete
Process 2 complete
Process 3 complete
Process 4 complete
Process 5 complete
Process 6 complete
Process 7 complete
Process 8 complete
Process 9 complete


## Pools

Manually creating threads requires a bunch of upkeep code, which is unnecessary if you are just running over a giant loop. In the spirit of keeping parallelism simple, use high-level parallelization API provided by your programming language whenever possible! It will save you time and effort (Trust me). For dividing up tasks with a for loop, use Python process `Pools`. Essentially, you can give
any number of tasks to a process `Pool` and the processes in the pool will loop through and do each one per your instructions. 

In [16]:
pool = multiprocessing.Pool(processes=4) # creae a pool with 4 worker processes

def matrix_loop(mat, index):
    # divide up so that we only compute one chunk of the mat.dot(mat.T) matrix
    index_start = mat.shape[0] // 10 * index
    index_end = mat.shape[0] // 10 * (index + 1)
    val  = mat[index_start:index_end].dot(mat.T)
    print("Process {0} complete".format(index))
    
    return val # let's return the data this time

pool_jobs = []
for i in range(10):
    job = pool.apply_async(matrix_loop, (mat, i))
    pool_jobs.append(job)
    print("Created job {0}".format(i))

for i, job in enumerate(pool_jobs):
    result = job.get() 
    print("Result {0}".format(i), result.shape)

Created job 0
Created job 1
Created job 2
Created job 3
Created job 4
Created job 5
Created job 6
Created job 7
Created job 8
Created job 9
Process 0 complete
Result 0 (100, 1000)
Process 1 complete
Result 1 (100, 1000)
Process 2 complete
Result 2 (100, 1000)
Process 3 complete
Result 3 (100, 1000)
Process 4 complete
Result 4 (100, 1000)
Process 5 complete
Result 5 (100, 1000)
Process 6 complete
Result 6 (100, 1000)
Process 7 complete
Result 7 (100, 1000)
Process 8 complete
Result 8 (100, 1000)
Process 9 complete
Result 9 (100, 1000)


# Advanced Topics in Parallelization

## Threads vs Processes

## Shared Variables

Do you really need it?
