# Computation Speed Demo


Comparing compute time for:

compute type | data structure | packages
---|---|---
fortran-style looping | numpy array | numpy 
fortran-style looping sped up with numba | numpy array | numpy+numba 
vectorized computation | numpy array | numpy 
vectorized parallelized computation | dask array | numpy, dask, dask.distributed

## Set up

build the conda environment in the repo (speeddemo.yml) before running this notebook

assuming you've already installed conda:

```
conda env create -f speeddemo.yml
```

if the yml doesn't work or takes forever
```
conda create --name speeddemo -c conda-forge numba dask distributed netcdf4 numpy matplotlib xarray jupyter
```


once you've created the environment
```
python -m ipykernel install --user --name speeddemo
```

import to your IDE and select the environment as your kernel

for VScode: View > Command Palette > Python: Select Interpreter > choose speeddemo, then select speeddemo as the kernel in the upper right of the notebook





In [None]:
import numpy as np
import numba as nb
import dask.array as da
from dask.distributed import Client

from time import time as timer
import matplotlib.pyplot as plt

# xr just for creating dummy data 
import xarray as xr

In [None]:
data_dir = 'data/'

## Dummy Data Creation

In [None]:
# define the spatial resolution of the data
# e.g. grids_per_degree=4 means 4 grid boxes per degree lat or lon (1/4 degree x 1/4 degree gridsize)
# enter dtype int here
grids_per_degree=24

# define number of chunks
# e.g. chunk_scale**2 total chunks, chunk_scale of 4 yield 16 chunks
# enter dtype int here, preferably in a multiple of the number of cores in your processor
chunk_scale=16

In [None]:
# grid and array 
gx=360
gy=180
nx=gx*grids_per_degree
ny=gy*grids_per_degree
nt=365

In [None]:
# function to create dummy data
def meanT_global_daily_dummy_data(nx,ny,nt,grids_per_degree,chunk_scale,tmax_xr,tmin_xr):
    # create lat lon grid centers based on dims
    lat=np.linspace(-90,90,ny+1).astype('float32') # location of grid box edges
    lat=(lat+abs((lat[-1]-lat[-2])/2))[:-1] # grid centers
    lon=np.linspace(-180,180,nx,endpoint=False).astype('float32') # grid edges
    lon=(lon+abs((lon[-1]-lon[-2])/2)) # grid centers  

    # create mean temperature 
    meanT=(tmax_xr+tmin_xr)/2    

    # expand onto a grid
    meanT_xr=meanT.expand_dims({'lon':lon,'lat':lat})

    # now create a numpy array with dims (x,y,t) as in pyaez tutorial notebooks
    meanT_np=meanT_xr.data

    # last create dask array (chunked numpy array)
    meanT_npda=da.from_array(meanT_np,chunks=(int(nx/chunk_scale),int(ny/chunk_scale),nt))

    return meanT_np, meanT_npda

In [None]:
# pull a single temperature timeseries from some local data files
# read data in as xarray dataArrays
tmax = xr.open_dataset(data_dir+'tmax_daily_singlegrid_8110.nc')['tmax'].squeeze().reset_coords(['lat','lon'],drop=True)
tmin = xr.open_dataset(data_dir+'tmin_daily_singlegrid_8110.nc')['tmin'].squeeze().reset_coords(['lat','lon'],drop=True)
tmax

In [None]:
# create data stored in 2 different ways
# the function outputs:
# numpy array with dims (nx,ny,nt)
# dask array (numpy array broken into chunk) with dims (nx,ny,nt)

meanT_np, meanT_npda = meanT_global_daily_dummy_data(nx,ny,nt,grids_per_degree,chunk_scale,tmax,tmin)

# look at the size in GB
meanT_npda

In [None]:
# plot a single grid cell timeseries
fig = plt.figure(figsize=(3,3))
plt.plot(meanT_np[0,0,:],marker='o',markersize=1,linewidth=0)

We've got daily mean temperature dummy data stored 2 different ways now:

- in a numpy array, dims (nx,ny,nt)
- in a dask array (numpy array broken into chunks), dims (nx,ny,nt)  

# Computations

Here we use a 4 different approaches for computing the following at each gridbox 
- annual mean T
- annual accumulated T
- warmest day
- coolest day

We'll demonstrate how vectorization and parallelization affect computation speed. 

Note: I purposefully picked simple calculations and functions supported by all packages so none of the demos error

## Fortran-style looping from numpy array

this is super inefficient in python

In [None]:
start=timer()

# pre-allocate arrays to store results
ann_meanT = np.empty((nx,ny)) 
ann_accT = np.empty((nx,ny))
day_warmest = np.empty((nx,ny))
day_coldest = np.empty((nx,ny)) 

# loop through each gridbox
for ix in range(nx): 
    for iy in range(ny):
        # single gridbox timeseries from numpy array 
        data_1D = meanT_np[ix,iy,:] 
        ann_meanT[ix,iy]=data_1D.mean()
        ann_accT[ix,iy]=data_1D.sum()
        day_warmest[ix,iy]=data_1D.argmax()+1
        day_coldest[ix,iy]=data_1D.argmin()+1       

tasktime = timer()-start
tasktime

In [None]:
del ann_meanT, ann_accT, day_warmest, day_coldest

on kerrie's windows desktop, 32GB RAM, grids_per_degree=8, (array size = 5.64GB): ~51.5 seconds

on kerrie's windows laptop, 32GB RAM, grids_per_degree=24, (array size=51.75GB): ~268.5 seconds

## Fortran-style looping from numpy array sped up with numba

numba speeds up loops by compiling the loops into machine code and executing the machine code

notes: 
- numba only works with a subset of numpy functions. Something as simple as np.median can cause errors and more complicated numpy functions like polynomial fitting aren't supported

In [None]:
# put everything inside a function 
start=timer()

@nb.jit(nopython=True)
def compute_stuff(nx,ny,data3D):
    # pre-allocate arrays to store results
    ann_meanT = np.empty((nx,ny)) 
    ann_accT = np.empty((nx,ny)) 
    day_warmest = np.empty((nx,ny)) 
    day_coldest = np.empty((nx,ny)) 

    for ix in range(nx): 
        for iy in range(ny):
            # single gridbox timeseries from numpy array 
            data_1D = data3D[ix,iy,:] 
            ann_meanT[ix,iy]=data_1D.mean()
            ann_accT[ix,iy]=data_1D.sum()
            day_warmest[ix,iy]=data_1D.argmax()+1
            day_coldest[ix,iy]=data_1D.argmin()+1 
            
    return ann_meanT, ann_accT, day_warmest, day_coldest       

ann_meanT, ann_accT, day_warmest, day_coldest = compute_stuff(nx,ny,meanT_np)

tasktime = timer()-start
tasktime

In [None]:
del ann_meanT, ann_accT, day_warmest, day_coldest

on kerrie's Windows desktop, 32GB RAM, grids_per_degree=8, (array size = 5.64GB): ~6.5 seconds

on kerrie's windows laptop, 32GB RAM, grids_per_degree=24, (array size=51.75GB): ~34 seconds

Numba appears to be lightning fast

BUT for smaller data (e.g. data that fits in memory) this still won't be faster than simply using vectorized numpy functions, as we will see next

for arrays bigger than memory, numba is way faster than numpy vectorization

Also, problems arise with numba for more complicated computations or use of functions that aren't supported by numba. In these cases, which is often, you can get no speed ups or even slow downs vs the fortran-style looping.

### Vectorized Computation with numpy

Numpy vectorization is always faster than loops- even loops sped up with numba. This is because numpy functions execute C scripts behind the scenes

In [None]:
start=timer()

# no need to pre-allocate
# no loops, vectorize instead
# indicate computation on the time dimension by passing axis=2

ann_meanT=meanT_np.mean(axis=2)
ann_accT=meanT_np.sum(axis=2)
day_warmest=meanT_np.argmax(axis=2)+1
day_coldest=meanT_np.argmin(axis=2)+1       

tasktime = timer()-start
tasktime

In [None]:
del ann_meanT, ann_accT, day_warmest, day_coldest

on kerrie's Windows desktop, 32GB RAM, grids_per_degree=8, (array size = 5.64GB): ~5s

on kerrie's windows laptop, 32GB RAM, grids_per_degree=24, (array size=51.75GB): ~301s wow, slower than fortan style looping! (this is only true for data bigger than memory)


numpy vectorization results in slightly faster computation than fortran-style looping with numba for arrays that are smaller than memory

it also reduces the amount of code to 4 simple lines

additionally, numpy vectorization can apply to the entirety of the code, whereas numba loops can only speed up sections of simple code that is looped

## Vectorized Parallelized Computation with dask

We can speed up the vectorized computations even further by dividing the data into chunks (chunked numpy array = dask array) and computing on multiple chunks at a time (parallelization) 

Let's look at a couple ways to do this

METHOD 1:

let dask automatically parallelize (this uses a single machine scheduler by default)

- use this if you're working with dask arrays
- dask will automatically use however many cores and threads your computer has to execute simultaneous computations
- this is super nice because not much modification of the code is necessary
- but note that this methods doesn't scale to an HPC environment

In [None]:

import dask
# dask.config.set(scheduler='threads') # threads is the default

start=timer()

# here, we are essentially sending the data in chunks to sit in worker memory once so it's not loaded up for every individual computation
# what we're actually doing is creating a "future" for each data chunk and then sending the futures to the workers to compute
# in this way we can process more data than we have memory and get the results very quickly
meanT_npda =meanT_npda.persist() 
# del meanT_np

# these lines are lazy, meaning they don't actually compute anything
# they simply store each task in a what's called a dask graph, for computation later
ann_meanT=meanT_npda.mean(axis=2)
ann_accT=meanT_npda.sum(axis=2)
day_warmest=meanT_npda.argmax(axis=2)+1
day_coldest=meanT_npda.argmin(axis=2)+1       

# when we want the results, we have to call compute
ann_meanT=ann_meanT.compute()
ann_accT=ann_accT.compute()
day_warmest=day_warmest.compute()
day_coldest=day_coldest.compute()

tasktime = timer()-start
tasktime


In [None]:
del ann_meanT, ann_accT, day_warmest, day_coldest

Whoa. Super fast and easy

on kerrie's Windows desktop, 32GB RAM, grids_per_degree=8, (array size = 5.64GB): xx seconds

on kerrie's windows laptop, 32GB RAM, grids_per_degree=24, (array size=51.75GB): ~11s WHAT?! so fast 



METHOD 2:

connect to a distributed cluster vs using the single machine default

first, we'll show code that doesn't run fast using this method and then we'll show how to fix it

In [None]:
# use all your computer's cores simulatenously, like a mini HPC cluster
# click on the cluster info expand arrow to get information about workers/threads/memory
# once you start a client in ipynb, you don't have to do it again unless you restart the kernel or if you client.close() 
# be careful not to start more than one client/cluster (you will get a warning if you do, just restart the kernel in that case and start over)

# this way uses defaults and will be different per machine
client=Client()#(processes=False)
client

# this way you can choose specific settings
# i believe this scales to a single HPC node
# to use multiple nodes, we'd have to use SlurmCluster instead of LocalCluster
# from dask.distributed import LocalCluster
# cluster=LocalCluster(n_workers=4, threads_per_worker=2, memory_limit='8GB')
# client=Client(cluster)
# client

setting up a client/cluster with dask.distributed will allow us to scale up to an HPC environment, but there are a couple of tricks....

while using the automated method worked seamlessly, distributed doesn't like large items like meanT_npda in the task graph

look what happens when we run the same exact code, except this time it's running on the distributed system

(for smaller arrays, this will run but take a long time. for larger arrays, this will error, just keep moving through this notebook past the error)


In [None]:
start=timer()

# these lines are lazy, meaning they don't actually compute anything
# they simply store each task in a what's called a dask graph, for computation later
ann_meanT=meanT_npda.mean(axis=2)
ann_accT=meanT_npda.sum(axis=2)
day_warmest=meanT_npda.argmax(axis=2)+1
day_coldest=meanT_npda.argmin(axis=2)+1       

# when we want the results, we have to call compute
ann_meanT=ann_meanT.compute()
ann_accT=ann_accT.compute()
day_warmest=day_warmest.compute()
day_coldest=day_coldest.compute()

tasktime = timer()-start
print(tasktime)

del ann_meanT, ann_accT, day_warmest, day_coldest

how do we fix this? 

# the options below only work up to a certain size data. all bigger data errors. why? If automated dask can do it, we should be able to do it here if we adjust some settings

there are a couple of options. The first option is to send the data chunks to the workers using persist, then execute the same exact code as above

voila! we still get the warning and it's slighlty slower than the single machine dask, but scalable to HPC

In [None]:
data =client.persist(meanT_npda)

start=timer()

# these lines are lazy, meaning they don't actually compute anything
# they simply store each task in a what's called a dask graph, for computation later
ann_meanT=data.mean(axis=2)
ann_accT=data.sum(axis=2)
day_warmest=data.argmax(axis=2)+1
day_coldest=data.argmin(axis=2)+1       

# when we want the results, we have to call compute
ann_meanT=ann_meanT.compute()
ann_accT=ann_accT.compute()
day_warmest=day_warmest.compute()
day_coldest=day_coldest.compute()

tasktime = timer()-start
print(tasktime)

del ann_meanT, ann_accT, day_warmest, day_coldest

what's the other option?  to use scatter like the warning message suggests

this method is faster than .persist(), but the problem with that is it requires us to make our code look very different
- put our computations inside a function
- scatter numpy data (not dask arrays) to the workers
- submit our computations to the client
- and the store the results in local variables



In [None]:
def np_comps(data):
    ann_meanT=data.mean(axis=2)
    ann_accT=data.sum(axis=2)
    day_warmest=data.argmax(axis=2)+1
    day_coldest=data.argmin(axis=2)+1     
    return ann_meanT, ann_accT, day_warmest, day_coldest

start=timer()

data_scattered = client.scatter(meanT_np)
ann_meanT,ann_accT,day_warmest,day_coldest = client.submit(np_comps, data_scattered).result()

# this also works instead of using .result()
# futures=client.submit(np_comps, data_scattered)
# ann_meanT,ann_accT,day_warmest,day_coldest=client.gather(futures)

tasktime = timer()-start
print(tasktime)

del ann_meanT, ann_accT, day_warmest, day_coldest

In [None]:
client.close()
# cluster.close()

on kerrie's Windows desktop, grids_per_degree=8, (array size = 5.64GB):

vectorized, parallelized computation is the fastest, requiring only xx seconds using dask's automated parallelization and xx-xx seconds using dask.distributed which is scalable to an HPC system 


- implementation of vectorized computation across the entire pyaez will provide the biggest speed gain
- implementation of parallelized lazy computation on the heaviest comps will provide additional pyaez speed gains
- implementation of additional parallelized lazy comps outside of the heaviest comps will slow pyaez down a little bit but will allow input of much larger datasets without running out of memory

## Caveats

We will only get dask speed gains for bigger data due to the overhead of the parallelization (see table below comparing times for different sized input data)

grids_per_degree| array size | nchunks | fortran-style python | fortran-style python +numba | vectorized python | vectorized parallelized (dask)
---|---|---|---|---|---|---
2 | 360 MB | 16 | 3s | 1.5s | 0.3s | 1s   
4 | 1.41 GB | 16 | 12.5s | 2.5s | 1.25s | 2s
8 | 5.64 GB | 16 | 51.5s | 6.5s | 5s | 3.75s
16 | 22.56 GB | 16 | 202.5s | 22.5s | 20s | 11s
16 | 22.56 GB | 64 | n/a | n/a | n/a | 10s
16 | 22.56 GB | 144 | n/a | n/a | n/a | 10s


## Recommendation

- eliminate use of loops and numba
- implement vectorization throughout the entire code
- implement dask for the heaviest computations
- later, implement more dask to allow for memory improvements