# GCM Filters Scaling Benchmark

Run on Casper full node (36 cores)

In [1]:
import xarray as xr
xr.__version__

'0.18.2'

In [2]:
import dask
dask.__version__

'2021.05.1'

In [3]:
import numpy as np
np.__version__

'1.18.2'

In [4]:
import cupy as cp
cp.__version__

ImportError: CuPy is not correctly installed.

If you are using wheel distribution (cupy-cudaXX), make sure that the version of CuPy you installed matches with the version of CUDA on your host.
Also, confirm that only one CuPy package is installed:
  $ pip freeze

If you are building CuPy from source, please check your environment, uninstall CuPy and reinstall it with:
  $ pip install cupy --no-cache-dir -vvvv

Check the Installation Guide for details:
  https://docs-cupy.chainer.org/en/latest/install.html

original error: libcublas.so.10: cannot open shared object file: No such file or directory

In [5]:
import dask.array as dsa

In [6]:
import gcm_filters as gf
gf.__version__

'0.1'

In [7]:
from contextlib import contextmanager
import time
import pandas as pd

class DiagnosticTimer:
    def __init__(self):
        self.diagnostics = []

    @contextmanager
    def time(self, **kwargs):
        tic = time.time()
        yield
        toc = time.time()
        kwargs["runtime"] = toc - tic
        self.diagnostics.append(kwargs)

    @property
    def df(self):
        return pd.DataFrame(self.diagnostics)

In [8]:
def make_data(shape, chunks, gpu=False):
    nt, ny, nx = shape
    da = xr.DataArray(dsa.random.random(shape, chunks=chunks), dims=['time', 'y', 'x'])
    mask_data = dsa.ones((ny, nx))
    mask_data[(ny // 4):(3 * ny // 4), (nx // 4):(3 * nx // 4)] = 0
    wet_mask = xr.DataArray(mask_data, dims=['y', 'x'])
    
    da_masked = da.where(wet_mask)
    
    if gpu:
        raise NotImplementedError("Can't get cupy working :(")

    filter = gf.Filter(
        filter_scale=4,
        dx_min=1,
        filter_shape=gf.FilterShape.TAPER,
        grid_type=gf.GridType.REGULAR_WITH_LAND,
        grid_vars={'wet_mask': wet_mask}
    )
    filter

    da_filtered = filter.apply(da_masked, dims=['y', 'x'])
    return da_masked, da_filtered

In [9]:
from dask.distributed import Client, LocalCluster

In [10]:
# strong scaling - problem size stays the same

shape = 360, 1024, 1024
chunks = (10,) + shape[1:]

unfiltered, filtered = make_data(shape, chunks)

diag_timer_strong = DiagnosticTimer()

for threads_per_worker in [1, 3, 9]:
    max_workers = 36 // threads_per_worker
    worker_step = max_workers // 4
    cluster = LocalCluster(threads_per_worker=threads_per_worker, n_workers=1)
    client = Client(cluster)
    for nworkers in [1] + list(range(worker_step, max_workers + 1, worker_step)):
        cluster.scale(nworkers)
        client.wait_for_workers(nworkers)
        assert len(client.ncores()) == nworkers
        ncores = sum(client.ncores().values())
        details = dict(ncores=ncores, nworkers=nworkers, shape=shape, chunks=chunks,
                       nbytes=filtered.data.nbytes, dtype=str(filtered.dtype))
        with diag_timer_strong.time(operation='unfiltered_mean', **details):
            unfiltered.data.mean().compute()
        with diag_timer_strong.time(operation='filtered_mean', **details):
            filtered.data.mean().compute()
        print(diag_timer_strong.df.iloc[-2:])
    client.close()
    cluster.close()

         operation  ncores  nworkers              shape            chunks  \
0  unfiltered_mean       1         1  (360, 1024, 1024)  (10, 1024, 1024)   
1    filtered_mean       1         1  (360, 1024, 1024)  (10, 1024, 1024)   

       nbytes    dtype     runtime  
0  3019898880  float64    5.003284  
1  3019898880  float64  274.039091  
         operation  ncores  nworkers              shape            chunks  \
2  unfiltered_mean       9         9  (360, 1024, 1024)  (10, 1024, 1024)   
3    filtered_mean       9         9  (360, 1024, 1024)  (10, 1024, 1024)   

       nbytes    dtype    runtime  
2  3019898880  float64   0.775494  
3  3019898880  float64  36.032485  
         operation  ncores  nworkers              shape            chunks  \
4  unfiltered_mean      18        18  (360, 1024, 1024)  (10, 1024, 1024)   
5    filtered_mean      18        18  (360, 1024, 1024)  (10, 1024, 1024)   

       nbytes    dtype    runtime  
4  3019898880  float64   0.501118  
5  3019898880

In [11]:
# weak scaling - problem size stays the same

diag_timer_weak = DiagnosticTimer()

for threads_per_worker in [1, 3, 9]:
    max_workers = 36 // threads_per_worker
    worker_step = max_workers // 4
    cluster = LocalCluster(threads_per_worker=threads_per_worker, n_workers=1)
    client = Client(cluster)
    for nworkers in [1] + list(range(worker_step, max_workers + 1, worker_step)):
        cluster.scale(nworkers)
        client.wait_for_workers(nworkers)
        assert len(client.ncores()) == nworkers
        ncores = sum(client.ncores().values())
        
        shape = ncores * 10 * 4, 1024, 1024
        chunks = (10,) + shape[1:]
        unfiltered, filtered = make_data(shape, chunks)

        details = dict(ncores=ncores, nworkers=nworkers, shape=shape, chunks=chunks,
                       nbytes=filtered.data.nbytes, dtype=str(filtered.dtype))
        with diag_timer_weak.time(operation='unfiltered_mean', **details):
            unfiltered.data.mean().compute()
        with diag_timer_weak.time(operation='filtered_mean', **details):
            filtered.data.mean().compute()
        print(diag_timer_weak.df.iloc[-2:])
    client.close()
    cluster.close()

         operation  ncores  nworkers             shape            chunks  \
0  unfiltered_mean       1         1  (40, 1024, 1024)  (10, 1024, 1024)   
1    filtered_mean       1         1  (40, 1024, 1024)  (10, 1024, 1024)   

      nbytes    dtype    runtime  
0  335544320  float64   0.644294  
1  335544320  float64  30.876005  
         operation  ncores  nworkers              shape            chunks  \
2  unfiltered_mean       9         9  (360, 1024, 1024)  (10, 1024, 1024)   
3    filtered_mean       9         9  (360, 1024, 1024)  (10, 1024, 1024)   

       nbytes    dtype    runtime  
2  3019898880  float64   0.894022  
3  3019898880  float64  34.990347  
         operation  ncores  nworkers              shape            chunks  \
4  unfiltered_mean      18        18  (720, 1024, 1024)  (10, 1024, 1024)   
5    filtered_mean      18        18  (720, 1024, 1024)  (10, 1024, 1024)   

       nbytes    dtype    runtime  
4  6039797760  float64   0.846153  
5  6039797760  float64

In [12]:
from datetime import datetime

now = datetime.now().isoformat()[:19]
diag_timer_weak.df.to_csv(f'data/scaling_weak_cpu_{now}.csv', index=False)
diag_timer_strong.df.to_csv(f'data/scaling_strong_cpu_{now}.csv', index=False)