![](../nci-logo.png)

-------
# Data Performance: Parallel IO


-----

### In this notebook:

In this workshop we will demonstrate how to conduct the parallel IO in python by using multiprocessing module and MPI.

There are three ways to do concurrent programming in Python: threads, the multiprocessing module, and the Message
Passing Interface (MPI).

Python itself includes a single master lock that governs access to the
interpreterâs functions, called the Global Interpreter Lock or GIL. This lock serializes
access from multiple threads to basic resources like object reference counting. We can
have as many threads as we like in a Python program, but only one at a time can use
the interpreter. Thus we canât use more than one coreâs worth of time when running a pure-Python program. 
Thread-based code is fine for GUIs and applications that call into external libraries that
don't tie up the Python interpreter.
<img src="images/threads.png" alt="Drawing" style="width: 400px;">

Multiprocessing provides support for basic fork()-based parallel processing. Itâs the simplest way in Python to write parallel code that uses more than one core, and strongly recommended for general-purpose computation.However, the parallel processes canât share a single NetCDF4/HDF5 file, even if the file is opened read only.  Alternatively, multiprocessing could be used to open multiple files respectively or doing computation in parallel and doing IO in serial.
<img src="images/multiple_process.png" alt="Drawing" style="width: 400px;">

MPI-based Parallel HDF5 is by far the best way to do parallel IO on NetCDF4/HDF5 files. MPI is the official flavor of parallelism supported by the HDF5 library. You can have an unlimited number of processes, all of which share the same open HDF5 file. All of them can read and write data, and modify the fileâs structure. Programs written this way require a little more care,  but it is the most elegant and highest-performance way to use HDF5 in a parallel context.
<img src="images/mpi_hdf5.png" alt="Drawing" style="width: 400px;">


#### The following material uses CSIRO IMOS/TERN MODIS Data Collection which is available under the Create Commons License 4.0 through NCI's THREDDS Data Server. For more information on the collection and licensing, please [click here](http://geonetwork.nci.org.au/geonetwork/srv/eng/catalog.search#/metadata/f3633_7369_2730_8213). 

---------

<br>



## Prerequisites

#### Please load the following modules:

```
module load python/2.7.11
module load python/2.7.11-matplotlib
module load ipython/4.2.0-py2.7
module load netcdf4-python/1.2.4-py2.7

```

```
module load h5py/2.6.0-hdf5-1.8.14p-py2.7
```


#### Then launch a new Jupyter (iPython) notebook: 

`$ ipython notebook`


<br>

## Multiple processes read multiple files

Order matters here, netCDF4 needs to be loaded following the h5py. 

In [None]:
import os
import time
import sys
import commands
from multiprocessing import Pool,current_process
import numpy as np
from tempfile import *
from mpi4py import MPI
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import h5py
global tmp_file
from netCDF4 import Dataset

### Useful functions

#### Print the metadata of a variable

In [None]:
def view_prt(filename,varname='all'):
    f=Dataset(filename,'r')
    print 'File Name: \t',filename
    print 'Format: \t',f.data_model
    for item in f.dimensions:
        print 'dimension \t',f.dimensions[item].name, f.dimensions[item].size
    print '    '
    vars = f.variables.keys()
    for item in vars:
        if varname=='all' or item==varname:
            print 'Variable: \t', item
            print 'Dimensions: \t', f[item].dimensions
            print 'Shape:    \t', f[item].shape
            print "size:  \t\t", f[item].size
            print "data type:  \t", f[item].dtype
            print "chunksize: \t", f[item].chunking()
            print "tendian: \t", f[item].endian()
            print "filters: \t", f[item].filters()
            print "ncattrs: \t", f[item].ncattrs() 
            print ""

### Multiprocessing module example

In [None]:
def doubler(number):
    """
    A doubling function that can be used by a process
    """
    result = number * 2
    proc_name = current_process().name
    print('{0} doubled to {1} by: {2}'.format(
        number, result, proc_name))
    return number*2

In [None]:
%%time 
numbers = [x for x in range(10)] 
pool = Pool(processes=4)
print(pool.map(doubler, numbers))
pool.close()
pool.join()

### Example Dataset- MODIS

<p>The MOD10A1 daily product is gridded in equal area tiles. Each tile consists of a 1200 km by 1200 km data array, which corresponds to 2400 by 2400 cells at 500 m resolution. The following image shows tile locations for MOD10A1 V005 data in a sinusoidal projection.</p>
https://modis-land.gsfc.nasa.gov/MODLAND_grid.html

<img src="images/sinusoidal_v5.png" alt="Drawing" style="width: 600px;">

#### Read variable by using Multiprocessing module

In [None]:
def multi_read_wrapper(args):
    return read_var(*args)

def read_var(filename,varname,start=0,end=2400):
    proc_name = current_process().name
    print('read variable {0} of file {1} by: {2} from {3} to {4}'.format(
        varname,filename, proc_name,start,end))
    fr = Dataset(filename,'r',format='NETCDF4')
    out=fr[varname][:,:,start:end]
    fr.close()

#### Metadata of target files

In [None]:
filename='/g/data/u39/public/prep/modis-fc/FC.v302.MCD43A4/FC.v302.MCD43A4.h24v05.2000.005.nc'
varname='bare_soil'
view_prt(filename,varname)
filename='/g/data/u39/public/prep/modis-fc/FC.v302.MCD43A4/FC.v302.MCD43A4.h24v05.2001.005.nc'
varname='bare_soil'
view_prt(filename,varname)

#### Create list of target file names

In [None]:
path=[]
for year in range(2000,2008):
    filename='/g/data/u39/public/prep/modis-fc/FC.v302.MCD43A4/FC.v302.MCD43A4.h24v05.'+str(year)+'.005.nc'
    varname='bare_soil'
    path.append((filename,varname))

#### Use 1 process to read a list of target files

In [None]:
%%time
nprocs=1
pool = Pool(processes=nprocs)
val=pool.map(multi_read_wrapper, path)
pool.close()
pool.join()

#### Use 2 processes to read a list of target files

In [None]:
%%time
nprocs=2
pool = Pool(processes=nprocs)
val=pool.map(multi_read_wrapper, path)
pool.close()
pool.join()

#### Use 4 process to read a list of target files

In [None]:
%%time
nprocs=4
pool = Pool(processes=nprocs)
val=pool.map(multi_read_wrapper, path)
pool.close()
pool.join()

#### Use multiprocessing module to read a single file.

In [None]:
%%time
val=[]
paths=[]

nprocs=4
pool = Pool(processes=nprocs)

for year in range(2000,2008):
    args=[]
    filename='/g/data/u39/public/prep/modis-fc/FC.v302.MCD43A4/FC.v302.MCD43A4.h24v05.'+str(year)+'.005.nc'
    varname='bare_soil'
    f=Dataset(filename,'r')
    dimns=f[varname].shape
    for iproc in range(nprocs):
        start=iproc*dimns[-1]/nprocs
        end=(iproc+1)*dimns[-1]/nprocs
        args.append((filename,'bare_soil',start,end))
    val=pool.map(multi_read_wrapper, args)
pool.close()
pool.join()

### Multiple processes accessing a single file (Parallel HDF5)

#### Parallel HDF5 implementation layers

<img src="images/phdf5_layers.png" alt="Drawing" style="width: 400px;">

This is typical scheme when working on Raijin with the Lustre file system. On VDI, we simply employed the local storage or NFS which may give rise to different IO performance characteristics. 

#### Python code to parallelly accessing a single file via MPI

<p>application: parallel_io.py</p>
<nl>
<li>argv[1] = 1 (collective) or 0 (independent)</li>
<li>argv[2] = file name</li>
<li>argv[3] = variable name</li>
<li>argv[4] = 0,1,2 the dimension index to be splitted among MPI ranks </li>
</nl>

In [None]:
%%time
val=[]
paths=[]

nprocs=' 2'
along_dim=' 1'
prog_name=" python parallel_io.py"
collective=' 0'

for year in range(2000,2008):
    args=[]
    filename=' /g/data/u39/public/prep/modis-fc/FC.v302.MCD43A4/FC.v302.MCD43A4.h24v05.'+str(year)+'.005.nc'
    varname=' bare_soil'
    cmd_line="mpirun -np "+nprocs+prog_name+collective+filename+varname+along_dim
    print cmd_line
    out=commands.getoutput(cmd_line)
    print out

#### Collective vs. independent

<p>MPI has two flavors of operation: collective, which means that all processes have to
participate (and in the same order), and independent, which means each process can
perform the operation (or not) whenever and in whatever order it pleases.</p>

With NetCDF4/HDF5, the modifications to file metadata must be done collectively, such as:
<nl>
<li>Opening or closing a file</li>
<li> Creating or deleting new datasets, groups, attributes, or named types</li>
<li> Changing a dataset shape</li>
<li> Moving or copying objects in the file</li>
</nl>

##### Contiguous access

<img src="images/parallel_access_x.png" alt="Drawing" style="width: 400px;">

##### Non-contiguous Access

<img src="images/parallel_access_y.png" alt="Drawing" style="width: 400px;">

Collective calls can effectively reduce the number of IO operations for non-contiguous access.
Collective calls means all processes of the communicator must participate in calls in the right order.
<img src="images/collective.png" alt="Drawing" style="width: 800px;">

<p>application: collective_io.py</p>
<nl>
<li>argv[1]=col (collective) or idp (independent)</li>
<li>argv[2]=x (contiguous access) or y (non-contiguous access)</li></nl>

In [None]:
val=[]
paths=[]

nprocs=' 4'
prog_name=" python collective_io.py"

for col_idp in [' col',' idp']:
    for along_dim in [' y',' x']:        
        cmd_line="mpirun -np "+nprocs+prog_name+col_idp+along_dim
        print cmd_line
        out=commands.getoutput(cmd_line)
        print out

# References

<ul>
<li> 'Parallel HDF5',Quincey Koziol, The HDF group, 2015.</li>
<li> 'Portable Parallel I/O: Handling large datasets in heterogeneous parallel environments', Michael Stephan, JULICH, 2013.</li>
<li> 'Python and HDF5',Andrew Collette, 2014.</li>
</ul>