In [1]:
from __future__ import division
import os
import sys
import glob
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
%matplotlib inline
%precision 4
plt.style.use('ggplot')

In [2]:
%load_ext Cython

In [3]:
from numba import jit, typeof, int32, int64, float32, float64

In [4]:
import random

# Writting parallel code

The goal is to desing parallel programs that are flexible, efficient and simple.

Step 0: Start by profiling a serial program to identify bottlenecks

Step 1: Are there for opportunities for parallelism?

1. Can tasks be perforemd in parallel?
   
   A. Function calls 
   
   B.Loops

2. Can data be split and operated on in parallel?

    A. Decomposition of arrays along rows, columns, blocks
    
    B. Decomposition of trees into sub-trees

3. Is there a pipeline with a sequence of stages?

    A. Data preprocesing and analysis
    
    B. Graphics rendering


Step 2: What is the nature of the parallelism?

1. Linear

    Embarassingly parallel programs
    
2. Recursive

    Adaptive partitioning methods
    
Step 3: What is the granularity?

1. 10s of jobs

2. 1000s of jobs

Step 4: Choose an algorihtm

1. Organize by tasks

2. Organize by data

3. Organize by flow

Step 5: Map to program and data structures

Program structures

Single program multiple data (SPMD)

Master/worker

Loop parallelism

Fork/join

Data structures

Shared data

Shared queue

Distributed array

Step 6: Map to parallel environment

Multi-core shared memrory

Cython with OpenMP

multiprocessing

IPython.cluster

Multi-computer

IPython.cluster

MPI

Hadoop / Spark

GPU

CUDA

OpenCL

Step 7: Execute, debug, tune in parallel environment

## Example: Estimating Pi

This is clearly a toy example, but the template cna be used for most embarassingly parallel problems. First we see how much we can speed-up the serial code by the use of compilation, then we apply parallel processing for a furhter linear speed-up in the number of processors.

In [5]:
def pi_python(n):
    s = 0
    for i in range(n):
        x = random.uniform(-1, 1)
        y = random.uniform(-1, 1)
        if (x**2 + y**2) < 1:
            s += 1
    return s/n

In [6]:
stats = %prun -r -q pi_python(1000000)

 

In [7]:
stats.sort_stats('time').print_stats(5)

         4000004 function calls in 1.273 seconds

   Ordered by: internal time
   List reduced from 6 to 5 due to restriction <5>

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.746    0.746    1.273    1.273 <ipython-input-5-5ece63c5fe8a>:1(pi_python)
  2000000    0.425    0.000    0.527    0.000 random.py:367(uniform)
  2000000    0.102    0.000    0.102    0.000 {method 'random' of '_random.Random' objects}
        1    0.000    0.000    1.273    1.273 {built-in method builtins.exec}
        1    0.000    0.000    1.273    1.273 <string>:1(<module>)




<pstats.Stats at 0x1f2c186e9e8>

In [8]:
def pi_numpy(n):
    xs = np.random.uniform(-1, 1, (n,2))
    return 4.0*((xs**2).sum(axis=1).sum() < 1)/n

In [9]:
@jit
def pi_numba(n):
    s = 0
    for i in range(n):
        x = random.uniform(-1, 1)
        y = random.uniform(-1, 1)
        if x**2 + y**2 < 1:
            s += 1
    return s/n

In [10]:
n = int(1e5)
%timeit pi_python(n)
%timeit pi_numba(n)
%timeit pi_numpy(n)

91.1 ms ± 811 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
1.23 ms ± 17.2 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)
4.24 ms ± 46.1 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


The bigger the problem, the more scope there is for parallelism

Amhdahls’ law says that the speedup from parallelization is bounded by the ratio of parallelizable to irreducibly serial code in the aloorithm. However, for big data analysis, Gustafson’s Law is more relevant. This says that we are nearly always interested in increasing the size of the parallelizable bits, and the ratio of parallelizable to irreducibly serial code is not a static quantity but depends on data size. For example, Gibbs sampling has an irreducibly serial nature, but for large samples, each iteration may be able perform PDF evaluations in parallel for zillions of data points.

## Multiprocessing

In [11]:
import multiprocessing

num_procs = multiprocessing.cpu_count()
num_procs

8

In [12]:
%%file pimulti.py
import numpy as np
def pi_numpy2(n):
    xs = np.random.uniform(-1, 1, (n,2))
    return 4.0*((xs**2).sum(axis=1).sum() < 1)/n

Overwriting pimulti.py


In [13]:
import pimulti as pm
n = int(1e5)
%time pi_numpy(n)


Wall time: 5.01 ms


0.0

In [14]:
%%time
def pi_multiprocessing(n):
   
    num_procs = multiprocessing.cpu_count()
    """Split a job of length n into num_procs pieces."""
    m = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(m)
    results = pool.map(pm.pi_numpy2, [n//m]*m)
    pool.close()
    return(np.mean(results))
pi_multiprocessing(n)

Wall time: 1.3 s


In [None]:
n = int(1e10)
%time pi_numpy(n)
%time pi_multiprocessing(n)

MemoryError: 