<a href="https://colab.research.google.com/github/casangi/ngcasa/blob/master/docs/benchmark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Benchmark

## Dataset and Software
An ALMA Band 9/10 Chemical Survey of NGC 6334I dataset was chosen to benchmark the ngCASA (version 0.0.7) dirty imaging implementation against [CASA release 5.6.1-8.el7](https://casa.nrao.edu/download/distro/casa-pipeline/release/el7/casa-pipeline-release-5.6.1-8.el7.tar.gz) and the cube refactor in CAS-9386 (https://open-bamboo.nrao.edu/browse/CASA-C6DPT10-57/artifact/shared/tarfile/casa-CAS-9386-73.tar.xz). 

The asdm dataset can be obtained from https://almascience.nrao.edu/asax/ and typing uid://A002/Xcb8a93/Xc096 in the Asdm uid column search. 

To convert the dataset from the archival asdm format to a measurement set (ms) use the ```importasdm``` task in CASA.
```python
importasdm(asdm="uid___A002_Xcb8a93_Xc096.asdm.sdm",vis="uid___A002_Xcb8a93_Xc096.ms")
```
Currently cngi-prototype (version 0.0.50) does not have the functionality to convert to the LSRK reference frame (during CASA imaging the frequency reference frame is changed to LSRK). Therefore, to ensure the same compute is done during benchmarking the dataset is converted to LSRK a priori using the ```importasdm``` task in CASA.
```python
mstransform(vis="uid___A002_Xcb8a93_Xc096.ms",outputvis="combined_spw_uid___A002_Xcb8a93_Xc096.ms",spw="33,35,37,39,41,43,45,47",combinespws=True,regridms=True,nchan=7680,outframe="LSRK")
```
The spectral windows are also combined to produce a single ddi. The resulting ms has 
- **Rows** : 706146
- **Channels** : 7680
- **Polarizations** : 2
- **Uncompressed Size of Visibilities** : 173.54 GB

To convert the ms to a vis.zarr file used by ngCASA the cngi-prototype (version 0.0.50) function ```convert_ms``` is used
```python
convert_ms(infile="combined_spw_uid___A002_Xcb8a93_Xc096_v2.ms", chunk_shape=(23, 903, 10, 2))
```
- **Visibility Data Dimentions (time,baseline,chan,pol)** : 782, 903, 7680, 2 
- **Zarr Chunk shape (time,baseline,chan,pol)** : 23, 903, 10, 2 
- **Uncompressed Size of a Zarr Chunk** : 6.65 MB

An uncompressed chunk size on disk of 6.65 MB was chosen to adhere to the guidelines given in https://zarr.readthedocs.io/en/stable/tutorial.html (see Chunk optimizations section). Multiplying the time and baseline dimensions yields the number of rows in the ms. This will not always be the case, as the number of observing antennas can change during an observation. The ```convert_ms``` function replaces the missing values with ```np.nan```.







## Hardware Setup

The node cvpost020 was reserved on the cvpost cluster to do the benchmarks. The node has two sockets with Intel E5-2640v3 CPUs (16 cores) and 256 GB ram. The ms and vis.zarr datasets are stored on the CV Lustre file system.  

## Benchmark
The benchmark is the time to create a dirty image cube (500x500 pixels, 7860 channels, 2 polarization, 31.44 GB uncompressed). The available memory will be limited to 64 GB (8 GB per core). Therefore, the data (173.54 GB) to be imaged is larger than the available memory.

## CASA Commands

A shell script is used to launch CASA with different number of mpi threads.
```sh
#!/bin/sh
export CASA_VERSION=CASA_5.6.1-8
export CASAPATH=/.lustre/cv/users/jsteeb/CASA/casa-pipeline-release-5.6.1-8.el7
#export CASA_VERSION=CAS-9386-73
#export CASAPATH=/.lustre/cv/users/jsteeb/casa-CAS-9386-73
export chanchunks=-1
export parallel=True
for i in {1..17}
do
    xvfb-run -d $CASAPATH/bin/mpicasa -N $i $CASAPATH/bin/casa --nogui --logfile "benchmark_cvcluster_casa_n${i}.log" -c benchmark_cvcluster_casa.py
done
```
The CASA benchmarking python script
```python
import os
casalog.filter('INFO3')
vis_name = '/.lustre/cv/users/jsteeb/NGCASA/data/combined_spw_uid___A002_Xcb8a93_Xc096.ms'
niter = 0
gridder = 'standard'
imsize = [500,500]
cell = ['0.02arcsec']
specmode = 'cube'
weighting='natural'
chanchunks = int(os.getenv('chanchunks'))
parallel = True if os.getenv('parallel') == 'True' else False 

    
imagename = '/.lustre/cv/users/jsteeb/NGCASA/temp'
os.system('rm -rf '+imagename+'*')
os.system('rm -rf /.lustre/cv/users/jsteeb/NGCASA/TempLattice*')
tclean(vis=vis_name, imagename=imagename, imsize=imsize, cell=cell, stokes='XXYY', specmode=specmode, gridder=gridder, weighting=weighting, niter=niter, chanchunks=chanchunks, parallel=parallel)
```
Chanchunks is set to -1 so that CASA will automatically calculate the number of chunks needed to not exceed the memory limit. To control memory the following line is added to the ~/.casarc file

```python
system.resources.memory: 64000
```


## ngCASA Commands

To avoid thread collisions, when using the Dask.distributed Client, set the following environment variables.
```sh
export OMP_NUM_THREADS=1
export MKL_NUM_THREADS=1
export OPENBLAS_NUM_THREADS=1
```
The ngCASA benchmarking python script 
```python
if __name__ == '__main__':
    import os
    import xarray as xr
    from dask.distributed import Client
    import dask.array  as da
    from ngcasa.synthesis.imaging import make_imaging_weights, calc_image_cell_size, make_dirty_image
    from cngi.vis import applyflags
    import zarr
    import time
    
    local_store_file = 'data/combined_spw_uid___A002_Xcb8a93_Xc096.vis.zarr/0'
    vis_dataset = xr.open_zarr(store=local_store_file, chunks={'time':782,'chan':40}, consolidated=True)

    #Flag data
    vis_dataset = applyflags(vis_dataset, flags=['FLAG', 'FLAG_ROW'])
 
    #Make imaging weights
    storage_parms = {}
    storage_parms['to_disk'] = False 
    
    imaging_weights_parms = {}
    imaging_weights_parms['weighting'] = 'natural' 
    imaging_weights_parms['chan_mode'] = 'cube' 
    vis_dataset = make_imaging_weights(vis_dataset, imaging_weights_parms, storage_parms)
    
    #Make dirty image
    grid_parms = {}
    grid_parms['chan_mode'] = 'cube'
    grid_parms['imsize'] =  [500,500]
    grid_parms['cell'] = [0.02, 0.02]
    grid_parms['oversampling'] = 100
    grid_parms['support'] = 7
    grid_parms['fft_padding'] =  1.2
    
    storage_parms['to_disk'] = True
    storage_parms['outfile'] = 'data/cube_image_A002_Xcb8a93_Xc096.img.zarr' 
    
    max_threads = 17
    memory_limit = '64GB'
    
    for i in range(max_threads):
        bench_file = open('combined_spw_uid___A002_Xcb8a93_Xc096.txt','a')
        threads_per_worker = i + 1
        print('Threads per worker',threads_per_worker)
        n_worker = 1
        client = Client(n_workers=n_worker, threads_per_worker=threads_per_worker, memory_limit=memory_limit)
        print(client.scheduler_info()['services'])
        
        os.system("rm -fr " + storage_parms['outfile'])
        start = time.time()
        img_dataset = make_dirty_image(vis_dataset,grid_parms,storage_parms)
        time_to_calc_and_store = time.time() - start
        client.close()
         
        print('Time to create and store cube image',time_to_calc_and_store)
        bench_file.write(" %d %d %f \r\n" % (n_worker,threads_per_worker,time_to_calc_and_store))
        bench_file.close()
```
Parallelization is done by using one worker with multiple threads since only one node is used and memory is shared. If additional nodes are added the number of workers should be increased by the same number.

The zarr chunks are 6.65 MB (time:23, baseline:903, chan:10, pol:2) in size which is to small to provide meaningful work for a dask thread. Therefore, the dask chunk size is increased to 903.87 MB (time:782, baseline:903, chan:40, pol:2). Note that chunking is maintained on the channel axis since the channels are imaged independently. 


## Benchmark Results

### Timing 
The ngCASA implementation outperformed both CASA and the cube refactor with the larger than memory benchmark. The Fortran gridding code in CASA is slightly more efficient than the numba just in time compiled python code in ngCASA. However,  ngCASA more efficiently handles the chunked data and does not have intermediate steps where data is written to disk (CASA generates TempLattice files to store intermediate data on disk). For 16 threads/processes ngCASA had a speed up of 1.83 times that of the cube refactor and 4.00 times that of CASA. The cube refactor code was only tested for mpi processes 8 to 16. 

![bench_time](https://raw.githubusercontent.com/casangi/ngcasa/master/docs/bench_time.png)


### Memory and CPU Monitoring
A shell command was used to recorded the resident set size (rss) and the percentage cpu utilization every 50 seconds. The resident set size is the portion of memory occupied by a process that is held in main memory (https://en.wikipedia.org/wiki/Resident_set_size).

```sh
logpid() { while sleep 50; do  ps -e -o rss -o pcpu,command | grep python | grep -v grep ; echo xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx; done; }
logpid | tee /.lustre/cv/users/jsteeb/NGCASA/ngcasa.log
```
In the figure below the memory usage is given for all the runs (note that the cube refactor is only for 8-16 mpi processes). As the time axis increases so do the number of threads/processes. Both CASA and ngCASA kept memory usage below 30 GB while the cube refactor memory usage spiked to just below 50 GB. Not only did the cube refactor successfully use more memory it also used the most of the available processing power peaking at 1554\%.

![bench_top](https://raw.githubusercontent.com/casangi/ngcasa/master/docs/bench_top.png)

Further improvements to ngCASA implementation are being explored such as changing the Dask chunking and memory management (https://distributed.dask.org/en/latest/worker.html). The ```~/.config/dask/distributed.yaml``` can be edited to allow Dask to use more of the available memory, the default file contains:
```python
distributed:
  worker:
    # Fractions of worker memory at which we take action to avoid memory blowup
    # Set any of the lower three values to False to turn off the behavior entirely
    memory:
      target: 0.60  # target fraction to stay below
      spill: 0.70  # fraction at which we spill to disk
      pause: 0.80  # fraction at which we pause worker threads
      terminate: 0.95  # fraction at which we terminate the worker
```