# SDP Working Set & Communication Analysis

This notebook attempts to calculate how much working memory and communication we need to run SDP pipelines. See SDP Memo 038 (Pipeline Working Sets & Communication) for details.

In [None]:
from __future__ import print_function
import sys
import math
from ipywidgets import ToggleButtons, Text, Layout, interactive
sys.path+=['..']
from sdp_par_model import reports
from sdp_par_model.config import PipelineConfig
from sdp_par_model.parameters.definitions import *
from sdp_par_model.parameters.definitions import Constants as c

verbose_display     = ['Overview', 'Details', 'Debug']
def toggles(opts, *args, **kwargs): return ToggleButtons(options=opts, *args, **kwargs)
def adjusts(*args): return Text(*args, layout=Layout(width='100%'))

In [None]:
%%javascript
IPython.OutputArea.prototype._should_scroll = function(lines) {return false;}

## Visibilities

The assumption is that visibilities expose us to the greatest memory pressure, as they are large and multiply up quickly once you consider that we also need model and facet visibilities around. Therefore we should stream them in small chunks. Here we calculate the visibility rates, and how much time we have to work on them before we overflow buffers.

In [None]:
def entry(title, val, unit='', doc=''):
    return (title, val, unit, doc)
def visibility_analysis(tp, Qspeed, Nnode, Npredict, Mvisnode):
    
    Mbuf = tp.Npp * tp.Rvis.eval_sum(tp.baseline_bins) * tp.Nbeam * tp.Mvis * tp.Tobs
    Rvis = tp.Mvis * tp.Nbeam * tp.Npp * tp.Rvis_ingest
    Nmajor = 1 + tp.Nmajortotal
    Rvisnode = Qspeed * (Rvis * Nmajor) / float(Nnode)
    Rpredictnode = Npredict * Rvisnode
    twork = Mvisnode * c.giga / (Rvisnode + Rpredictnode)
    Msnapnode = tp.Mvis * tp.Npp * tp.Rvis_ingest * tp.Tsnap / Nnode
    Cmemprice = 0.5 # €/GB
 
    return [
        entry('-- Visibilities --', None, None),
        entry('Q_speed = ', Qspeed, '', 'Compute time scale compared with ingest'),
        entry('t_major = ', tp.Tobs / Qspeed / Nmajor, 's', 'Average time we have to run one major loop'),
        entry('M_buf/node =', Mbuf / Nnode / c.giga, 'GB', 'Visibility buffer size we need per node to hold the observation'),
        entry('R_vis/node = ', Rvisnode / c.giga, 'GB/s', 'Minimum visibility buffer read rate to sustain to read visibilities once per major loop'),
        entry('M_snap/node = ', Msnapnode / c.giga, "GB", 'Size of snapshot data per node (without predict)'),
        entry('R_predict/node = ', Rpredictnode / c.giga, 'GB/s', 'Rate we need to produce predicted visibilities at to match visibility buffer rate'),
        entry('M_vis/node =', Mvisnode, 'GB', 'Assumed memory we have per node for holding visibilities'),
        entry('t_work =', twork, 's', 'Time we can work on every visibility before we run out of memory'),
        entry('C_work =', Nnode * Cmemprice * (Rvisnode + Rpredictnode) / c.giga, '€/s', 'Cost for extending the above number by one second')
    ]

## Imaging

The second most important data items is images (or grids). Especially for the Mid telescope, those are going to be large enough that we cannot afford to have too many copies of them lying around.

Therefore the idea here is that we distribute image data around the cluster, and have visibility streaming processes load/write the data "on demand". This can cause significant amount of communication unless we permit at least some redundancy.

In [None]:
def get_image_size(tp):
    Mfacet = tp.Mpx * tp.Npix_linear**2
    Mimage = Mfacet * tp.Nfacet**2
    return Mfacet, Mimage

def imaging_analysis(tp, Qspeed, Nnode, Nsubbands):
    Mfacet, Mimage = get_image_size(tp)
    
    # Calculating image size including facetting, and therefore overlap. I think this is more fair.
    Mimagenodemin = max(Nsubbands * tp.Npp * tp.Ntt * Mimage / Nnode, Mfacet)
    Rphrot = Qspeed * (tp.products.get(Products.PhaseRotationPredict,{}).get('Rflop',0) + \
                       tp.products.get(Products.PhaseRotation,{}).get('Rflop',0))
    
    return [
        entry('-- Imaging --', None, None),
        entry('N_subbands = ', Nsubbands, '', 'Minimum number of frequencies we *need* to image separately'),
        entry('N_pp N_tt M_image = ', tp.Npp * tp.Ntt * Mimage / c.tera, 'TB', 'Continuum image/grid size - data a single visibility updates'),
        entry('N_facet = ', tp.Nfacet, '', 'How much we split the image into facets'),
        entry('R_phaserotate = ', Rphrot / c.peta, 'Pflop/s', 'Compute cost for facetting (imaging+predict)'),
        entry('M_image/node,max = ', tp.Npp * tp.Ntt * Mfacet / c.giga, 'GB', 'Facet data per node, assuming no distribution in polarisation/Taylor terms'),
        entry('M_image/node,min = ', Mimagenodemin / c.giga, 'GB', 'Facet data per node, assuming complete distribution in polarisation/Taylor terms')
    ]

def _get_distribution(tp, Nnode, Nsubbands, Ndist_pptt):
    Ndist_ft = max(1, float(Nnode / (Nsubbands * tp.Nfacet**2 * Ndist_pptt)))
    Nseq = Nsubbands * tp.Nfacet**2 * Ndist_pptt * Ndist_ft / Nnode
    return Ndist_ft, Nseq

def imaging_data_analysis(tp, Qspeed, Nnode, Nisland, Nsubbands, Ndist_pptt):
    Mfacet, Mimage = get_image_size(tp)
    Ndist_ft, Nseq = _get_distribution(tp, Nnode, Nsubbands, Ndist_pptt)
    
    Mimagenode = max(Ndist_ft * Nsubbands * tp.Npp * tp.Ntt * Mimage / Nnode / Nseq, Mfacet)
    
    Nmajor = 1 + tp.Nmajortotal
    Tsnapwork = tp.Tsnap / Qspeed / Nmajor / Nseq
    Rimagenode = Mimagenode / Tsnapwork
    Tobswork = tp.Tobs / Qspeed / Nmajor / Nseq
    Routnode = Mimagenode / Tobswork
    
    entries_data = [
        entry('-- Imaging Data --', None, None),
        entry('N_dist,pp/tt = ', Ndist_pptt, '', 'Distribution degree in polarisation / Taylor Terms'),
        entry('N_dist,f/t = ', Ndist_ft, '', 'Remaining distribution degree in frequency / time to get enough parallelism'),
        entry('N_seq = ', Nseq, '', ''),
        entry('M_image/node = ', Mimagenode / c.giga, 'GB', 'Resulting facet data per node'),
        entry('t_snap,work = ', Tsnapwork, 's', 'Time every node has to work on a snapshot (assuming no time distribution)'),
        entry('R_image/node = ', Rimagenode / c.giga, 'GB/s', 'Data rate for "spilling" facets to storage every snapshot'),
        entry('R_out/node = ', Routnode / c.giga, 'GB/s', 'Data rate for "spilling" facets to storage every snapshot'),
    ]

    # Number of nodes visibility data needs to be distributed to. Frequency/time distribution
    # doess not need communication, as we can assume visibilities to have been appropriately
    # distributed beforehand
    Ncopy = max(1, Nseq * Nnode / Ndist_ft / Nsubbands) # Nfacet**2 * Ndist_pptt
    Rfacet = Qspeed * float(tp.Rfacet_vis) / Nnode

    # Every node needs to send a facet visibility stream to Ncopy-1 nodes
    Rdistributenode = Qspeed * tp.Rfacet_vis / Nnode * (Ncopy - 1)

    # Every island node needs to send at least (Ncopy-Nisland) off-island nodes a facet
    # visibility stream. Stream emitted by an island is therefore Nisland times higher.
    Rdistributeisland = Qspeed * tp.Rfacet_vis / Nnode * max(0, Ncopy - Nisland) * Nisland
    
    return Mimagenode, Rdistributenode, Rdistributeisland, entries_data + [
        entry('-- Imaging Communication --', None, None),
        entry('N_copy = ', Ncopy, '', 'Number of times we need to copy (facet!) visibilities from each node'),
        entry("R_facet =", Rfacet / c.mega, "MB/s"),
        entry('R_distribute/node =', float(Rdistributenode / c.giga), "GB/s", 'Facet visibility output and input rate of every node'),
        entry('R_distribute/island =', float(Rdistributeisland / c.giga), "GB/s", 'Facet visibility output and input rate of every island'),
        entry('R_distribute/island2island =', float(Qspeed * tp.Rfacet_vis / Nnode * Nisland**2 / c.giga), "GB/s", 'Maximum visibility exchange between pair of islands involved in all-to-all')
    ]

## Calibration

Finally we need to calibrate the instrument, which involves solving a number of calibration problems with (very) different scopes into the visibility data. To prevent this from blowing up our working set, we assume that calibration problem input can be reduced in a map/reduce manner. To be specific, we assume that by the time we actually start solving a particular particular problem, we have reduced the input data down to something smaller than 2 visibilities per slot.

This allows us to calculate the overhead of calibration with a simple tree reduction model.

In [None]:
def calibration_analysis(tp, Qspeed, Nnode, Nsubbands, Ndist_pptt):
    
    # 2 sets (model+predicted) of visibilities per polarisation and baseline
    Ndist_ft, Nseq = _get_distribution(tp, Nnode, Nsubbands, Ndist_pptt)
    Tsnapwork = tp.Tsnap / Qspeed / (1 + tp.Nmajortotal) / Nseq
    
    Mcal = 2 * tp.Mvis * tp.Npp * tp.Nbl
    MGcal = Mcal * tp.Ncal_G_obs / Nnode
    MBcal = Mcal * tp.Ncal_B_obs / Nnode
    MIcal = Mcal * tp.Ncal_I_obs / Nnode
    RGcal_snap = Mcal * tp.Ncal_G_solve / Tsnapwork
    RBcal_snap = Mcal * tp.Ncal_B_solve / Tsnapwork
    RIcal_snap = Mcal * tp.Ncal_I_solve / Tsnapwork    
    
    return MGcal+MBcal+MIcal, RGcal_snap + RBcal_snap + RIcal_snap, [
        entry('-- Calibration --', None, None),
        entry('M_cal = ', Mcal / c.mega, "MB", 'Size of input into an individual calibration "problem"'),
        entry('', ['Gain', 'Band', 'Ion'], "", ''),
        entry('N_cal = ', [tp.Ncal_G_obs, tp.Ncal_B_obs, tp.Ncal_I_obs], "k", 'Calibration problem count'),
        entry('M_cal/node = ', [MGcal / c.giga, MBcal / c.giga, MIcal / c.giga], "GB", 'Calibration working set per node (assuming perfect distribution)'),
        entry('N_cal,snap = ', [tp.Ncal_G_solve, tp.Ncal_B_solve, tp.Ncal_I_solve], "",
              'Number of calibration problems overlapping a snapshot and subband'),
        entry('M_cal,snap = ', [Mcal*tp.Ncal_G_solve/c.giga, Mcal*tp.Ncal_B_solve/c.giga, Mcal*tp.Ncal_I_solve/c.giga], "GB",
              'Number of calibration problems overlapping a snapshot and subband'),
        entry('R_cal,snap = ', [RGcal_snap / c.giga, RBcal_snap / c.giga, RIcal_snap / c.giga], "GB/s",
              'Data rate per node for exchanging calibration problems after every snapshot')
    ]
    

## Summary

In the end, the working set is given by:

1. Streaming visibilities, given as parameter
2. Two images+grids (predict+imaging)
3. The calibration problems

And communication rates are given by the sum of:

1. Facet streams for imaging (predict + imaging)
2. Calibration streams

In [None]:
def calculate_working_set(telescope, band, pipeline, adjusts,
                          Nnode=1500, Nisland=56,
                          Rflop=15, Npredict=4, Mvisnode=64,
                          Ntt=3, Nfacet=7, Ndist_pptt=4):
    """
    :param Nnode: Number of nodes in cluster
    :param Nisland: Number of nodes in a compute island
    :param Qspeed: How much faster we need to process compared with ingest
       (to leave space for other pipelines to run)
    :param Npredict: Number of separate predicts we need to do
    :param Mvisnode: Memory per node to cache raw visibilities [GB]
    :param Ntt: Number of Taylor terms
    :param Nfacet: Number of facets (horizontal+vertical)
      Reduces working set and increases distribution, but requires more flops (phase rotation)
    :param Ndist_pptt: Degree of distribution in polarisation and Taylor terms.
      Reduces working set and increases distribution, but also increases communication.
    """

    adjusts_ = ("Nfacet=%d Ntt=%d" % (Nfacet, Ntt)) + adjusts
    config = PipelineConfig(telescope, pipeline, band, adjusts=adjusts_)
    if not config.is_valid()[0]:
        print(*config.is_valid()[1])
        return
    
    # Do calculations
    tp = config.calc_tel_params()
    Nsubbands = tp.Nsubbands
    if pipeline in [Pipelines.DPrepB, Pipelines.DPrepC]:
        Nsubbands = tp.Nf_out
    Qspeed = Rflop / float(tp.Rflop) * c.peta
        
    # Perform analyses
    entries = []
    def add_entry(title, val, unit='', doc=''):
        entries.append((title, val, unit, doc))
    entries.extend(visibility_analysis(tp, Qspeed, Nnode, Npredict, Mvisnode))
    entries.extend(imaging_analysis(tp, Qspeed, Nnode, Nsubbands))
    Mws_img, Rnode_img, Risland_img, img_entries = \
      imaging_data_analysis(tp, Qspeed, Nnode, Nisland, Nsubbands, Ndist_pptt)
    entries.extend(img_entries)
    Mws_cal, Rcal, cal_entries = calibration_analysis(tp, Qspeed, Nnode, Nsubbands, Ndist_pptt)
    entries.extend(cal_entries)
    
    # Compose summary
    add_entry('-- Summary --', None, None)
    Mworkset = Mvisnode * c.giga + 4 * Mws_img + Mws_cal
    add_entry('Mworkset/node = ', Mworkset / c.giga, "GB")
    add_entry('Rnode = ', (2 * Rnode_img + Rcal) / c.giga, "GB/s")
    add_entry('Risland = ', (2 * Risland_img + Rcal) / c.giga, "GB/s")
    add_entry('Rflop = ', Qspeed * float(tp.Rflop) / c.peta, 'Pflop/s')
    add_entry('Rflop/node = ', Qspeed * float(tp.Rflop) / Nnode / c.tera, 'Tflop/s')
    reports.show_table("Working Set Analysis", *zip(*entries))
    
# Make interactive layout, reserving a fairly large area to prevent "blinking"
widget = interactive(calculate_working_set,
                     telescope=toggles(Telescopes.available_teles),
                     band=toggles(Bands.available_bands),
                     pipeline=toggles(Pipelines.imaging, value=Pipelines.ICAL),
                     adjusts=adjusts("Tobs=6*3600"),
                     Nnode=(1,3000), Nisland=(1,500),
                     Rflop=(1,20,0.1), Npredict=(1,10), Mvisnode=(1,128),
                     Ntt=(1,5), Nfacet=(1,16), Ndist_pptt=(1,4*4))
# widget.layout.height = '500px'
widget