# Exemple de traitement de fichiers netcdf en parallèle avec multiprocessing & xarray

Dans cet exemple, nous faison la comparaison entre 2 programmes qui exécutent de façon séquentielle et asynchrone (parallèle) une analyse du contenu de plusieurs fichiers de SST.

En bouclant sur une liste de fichier:
* nous ouvrons le fichier
* affichons le nom du fichier et le pas de temps associé
* récupérons toutes les variables ayant des dimensions (time,lat,lon)
* Pour chacune de ces variables:
    * On calcule la moyenne de la variable (avec un objet ```DataFrame``` de la lib **xarray** pour gagner du temps)
    * On affiche le nom du fichier, nom de variable et sa moyenne

### Initialisation du programme

In [2]:
from netCDF4 import Dataset, num2date
from multiprocessing import Pool
from glob import glob
import xarray as xr
import timeit
import os

ImportError: No module named xarray

### Recherche des fichiers & déclaration du main

In [None]:
dataDir = '/dataref/opr/DATA/SST/OSTIA/2017101[0-2]*.nc'
ls = glob(dataDir)

if __name__ == '__main__':

### Création de la fonction de traitement getmean()

* déclaration de getmean(), ouverture d'un fichier, conversion du temps et affichage des infos de base

In [None]:
    def getmean(args):

        mode, ind, filename = args[:]

        nc = Dataset(filename, 'r')

        time = num2date(nc.variables['time'][:], nc.variables['time'].units)

        print("[{mode}] run: {file} ({time:%Y-%m-%dT%H:%M})".format(mode=mode,
                                                                    file=os.path.basename(filename),
                                                                    time=time[0]))

* Récupération des variables ayant des dimensions (time,lat,lon)

In [None]:
        vars = [var for var in nc.variables if nc.variables[var].dimensions == (u'time',u'lat',u'lon')]

* Pour chacune de ces variables:
    * On calcule la moyenne de la variable (avec un objet DataFrame de la lib xarray pour gagner du temps)
    * On affiche le nom du fichier, nom de variable et sa moyenne

In [None]:
        for var in vars:
            V = xr.DataArray(nc.variables[var][:])
            mn = V.mean().data
            print("MODE [{mode}] : file {file} / var {var} : mean = {mn} ".format(mode=mode,
                                                                                  file=os.path.basename(filename),
                                                                                  var=var, mn=mn))

* On ferme de netcdf

* On ferme de netcdf et on sort de la fonction

In [None]:
        nc.close()
    
        return None

### Corps du programme

* On instancie un Pool de "Workers" (cad. des threads pour le multiprocessing), lancés par blocs de 4 processes.

In [None]:
p = Pool(processes=4)

### Lancement séquentiel

* Et affichage du temps de traitement

In [None]:
    # Sequential run
    start_time = timeit.default_timer()
    myArgs = [('Sequential', i, f) for i, f in enumerate(ls)]
    for arg in myArgs:
        getmean(arg)

    interm_time = timeit.default_timer()
    print "ELAPSED TIME [Sequential]:", interm_time - start_time

### Lancement asynchrone (parallèle)

* Et affichage du temps de traitement

In [None]:
    # Asynchronous run
    myArgs = [('Asynchronous',i, f) for i, f in enumerate(ls)]
    p.map_async(getmean, myArgs)
    p.close()
    p.join()

    end_time = timeit.default_timer()
    print "ELAPSED TIME [Asynchronous]:", end_time - interm_time