# Building Pipelines

We now have all of the pieces needed to build a complete pipeline in the Bifrost framework.  Although it is hard to run a full multi-threaded Bifrost pipeline inside a Jupyter notebook we will look at a few examples.

Let's start with a simple pipeline that generates random data in one block and writes it to disk in another.  The generator block looks like:

In [1]:
import os
import json
import time
import numpy
import threading

import bifrost

class GeneratorOp(object):
    def __init__(self, oring, ntime_gulp=250, 
                 shutdown_event=None, core=None):
        self.oring   = oring
        self.ntime_gulp   = ntime_gulp
        if shutdown_event is None:
            shutdown_event = threading.Event()
        self.shutdown_event = shutdown_event
        self.core    = core
        
    def shutdown(self):
        self.shutdown_event.set()
          
    def main(self):
        with self.oring.begin_writing() as oring:
            navg = 24
            tint = navg / 25e3
            tgulp = tint * self.ntime_gulp
            nbeam = 1
            chan0 = 1234
            nchan = 16*184
            npol = 4
            
            ohdr = {'time_tag': int(int(time.time())*196e6),
                    'seq0':     0, 
                    'chan0':    chan0,
                    'cfreq0':   chan0*25e3,
                    'bw':       nchan*25e3,
                    'navg':     navg,
                    'nbeam':    nbeam,
                    'nchan':    nchan,
                    'npol':     npol,
                    'pols':     'XX,YY,CR,CI',
                    'complex':  False,
                    'nbit':     32}
            ohdr_str = json.dumps(ohdr)
            
            ogulp_size = self.ntime_gulp*nbeam*nchan*npol*4      # float32
            oshape = (self.ntime_gulp,nbeam,nchan,npol)
            self.oring.resize(ogulp_size)
            
            prev_time = time.time()
            with oring.begin_sequence(time_tag=ohdr['time_tag'], header=ohdr_str) as oseq:
                while not self.shutdown_event.is_set():
                    with oseq.reserve(ogulp_size) as ospan:
                        
                        odata = ospan.data_view(numpy.float32).reshape(oshape)
                        odata[...] = numpy.random.randn(*oshape)
                        
                        curr_time = time.time()
                        while curr_time - prev_time < tgulp:
                            time.sleep(0.01)
                            curr_time = time.time()

The `GeneratorOp` block is implemented as an object with a `main` method that is intended to be launched via `threading.Thread`.  This setup is consistent with the Bifrost asychronous model where each block is asynchronous but operations on spans/gulps within a particular block are synchronous.  The `__init__` method sets up the block and defines the ring that is to be used and the CPU core that the `main` method will be bound to when `main` is started.  The `main` method does the heavy lifting here.  Inside this method:

 1. The output ring is prepared for writing.
 2. The parameters of the generated data (number of time samples, number of channels, etc.) and other meta data are defined and dumped to a JSON object.
 3.  The output sequence is started.
 4.  The innermost loops starts and puts random data into the output sequence span by span until a shutdown event breaks out of this loop.  The speed of iteration through this inner loop is controlled by a call to `time.sleep` so that the data rate can be limited.

The writer block looks like:

In [2]:

class WriterOp(object):
    def __init__(self, iring, ntime_gulp=250, guarantee=True, core=None):
        self.iring      = iring
        self.ntime_gulp = ntime_gulp
        self.guarantee  = guarantee
        self.core       = core
        
    def main(self):
        for iseq in self.iring.read(guarantee=self.guarantee):
            ihdr = json.loads(iseq.header.tostring())
            
            print("Writer: Start of new sequence:", str(ihdr))
            
            time_tag = ihdr['time_tag']
            navg     = ihdr['navg']
            nbeam    = ihdr['nbeam']
            chan0    = ihdr['chan0']
            nchan    = ihdr['nchan']
            chan_bw  = ihdr['bw'] / nchan
            npol     = ihdr['npol']
            pols     = ihdr['pols']
            pols     = pols.replace('CR', 'XY_real')
            pols     = pols.replace('CI', 'XY_imag')
            
            igulp_size = self.ntime_gulp*nbeam*nchan*npol*4        # float32
            ishape = (self.ntime_gulp,nbeam,nchan,npol)
            
            prev_time = time.time()
            iseq_spans = iseq.read(igulp_size)
            for ispan in iseq_spans:
                if ispan.size < igulp_size:
                    continue # Ignore final gulp
                    
                idata = ispan.data_view(numpy.float32).reshape(ishape)
                with open(f"{time_tag}.dat", 'wb') as fh:
                    fh.write(idata.tobytes())
                    print('  ', fh.name, '@', os.path.getsize(fh.name))
                    
                time_tag += navg * self.ntime_gulp * (int(196e6) // int(25e3))

The `WriterOp` block is structured in the same was as `GeneratorOp` block but uses the "input ring" structure for accessing data.

To run these two blocks as a pipeline you would using the following inside a `if __name__ == '__main__':` block:

In [3]:
write_ring = bifrost.ring.Ring(name="write")

ops = []
ops.append(GeneratorOp(write_ring, ntime_gulp=250, core=0))
ops.append(WriterOp(write_ring, ntime_gulp=250, core=1))

threads = [threading.Thread(target=op.main) for op in ops]
for thread in threads:
    thread.start()
    
# Don't run forever
time.sleep(3)
ops[0].shutdown()

for thread in threads:
    thread.join()
print("Done")

  # Remove the CWD from sys.path while we load stuff.


Writer: Start of new sequence: {'time_tag': 317552125968000000, 'seq0': 0, 'chan0': 1234, 'cfreq0': 30850000.0, 'bw': 73600000.0, 'navg': 24, 'nbeam': 1, 'nchan': 2944, 'npol': 4, 'pols': 'XX,YY,CR,CI', 'complex': False, 'nbit': 32}
   317552125968000000.dat @ 11776000
   317552126015040000.dat @ 11776000
   317552126062080000.dat @ 11776000
   317552126109120000.dat @ 11776000
   317552126156160000.dat @ 11776000
   317552126203200000.dat @ 11776000
   317552126250240000.dat @ 11776000
   317552126297280000.dat @ 11776000
   317552126344320000.dat @ 11776000
   317552126391360000.dat @ 11776000
   317552126438400000.dat @ 11776000
   317552126485440000.dat @ 11776000
   317552126532480000.dat @ 11776000
   317552126579520000.dat @ 11776000
   317552126626560000.dat @ 11776000
   317552126673600000.dat @ 11776000
   317552126720640000.dat @ 11776000
   317552126767680000.dat @ 11776000
Done


  if __name__ == '__main__':


This creates the ring that connects the two blocks, creates the `threading.Thread` instances that run each block, and starts the blocks.

If you wanted to insert a third block between `GeneratorOp` and `WriterOp` it might look like:

In [4]:
class CopyOp(object):
    def __init__(self, iring, oring, ntime_gulp=250, guarantee=True, core=-1):
        self.iring = iring
        self.oring = oring
        self.ntime_gulp = ntime_gulp
        self.guarantee = guarantee
        self.core = core
        
    def main(self):
        with self.oring.begin_writing() as oring:
            for iseq in self.iring.read(guarantee=self.guarantee):
                ihdr = json.loads(iseq.header.tostring())
                
                print("Copy: Start of new sequence:", str(ihdr))
                
                time_tag = ihdr['time_tag']
                navg     = ihdr['navg']
                nbeam    = ihdr['nbeam']
                chan0    = ihdr['chan0']
                nchan    = ihdr['nchan']
                chan_bw  = ihdr['bw'] / nchan
                npol     = ihdr['npol']
                pols     = ihdr['pols']
                pols     = pols.replace('CR', 'XY_real')
                pols     = pols.replace('CI', 'XY_imag')

                igulp_size = self.ntime_gulp*nbeam*nchan*npol*4        # float32
                ishape = (self.ntime_gulp,nbeam,nchan,npol)
                self.iring.resize(igulp_size, igulp_size*5)
                
                ogulp_size = igulp_size
                oshape = ishape
                self.oring.resize(ogulp_size)
                
                ohdr = ihdr.copy()
                ohdr_str = json.dumps(ohdr)
                
                iseq_spans = iseq.read(igulp_size)
                with oring.begin_sequence(time_tag=time_tag, header=ohdr_str) as oseq:
                    for ispan in iseq_spans:
                        if ispan.size < igulp_size:
                            continue # Ignore final gulp
                            
                        with oseq.reserve(ogulp_size) as ospan:
                            idata = ispan.data_view(numpy.float32)
                            odata = ospan.data_view(numpy.float32)    
                            odata[...] = idata

This `CopyOp` block combines the characteristics of reading and writing from rings to copy data from one ring to the other.

In [5]:
copy_ring  = bifrost.ring.Ring(name="copy")
write_ring = bifrost.ring.Ring(name="write")

ops = []
ops.append(GeneratorOp(copy_ring, ntime_gulp=250, core=0))
ops.append(CopyOp(copy_ring, write_ring, ntime_gulp=250, core=1))
ops.append(WriterOp(write_ring, ntime_gulp=250, core=2))

threads = [threading.Thread(target=op.main) for op in ops]
for thread in threads:
    thread.start()
    
# Don't run forever
time.sleep(3)
ops[0].shutdown()

for thread in threads:
    thread.join()
print("Done")

  if sys.path[0] == '':
  # Remove the CWD from sys.path while we load stuff.


Copy: Start of new sequence: {'time_tag': 317552126948000000, 'seq0': 0, 'chan0': 1234, 'cfreq0': 30850000.0, 'bw': 73600000.0, 'navg': 24, 'nbeam': 1, 'nchan': 2944, 'npol': 4, 'pols': 'XX,YY,CR,CI', 'complex': False, 'nbit': 32}
Writer: Start of new sequence: {'time_tag': 317552126948000000, 'seq0': 0, 'chan0': 1234, 'cfreq0': 30850000.0, 'bw': 73600000.0, 'navg': 24, 'nbeam': 1, 'nchan': 2944, 'npol': 4, 'pols': 'XX,YY,CR,CI', 'complex': False, 'nbit': 32}
   317552126948000000.dat @ 11776000
   317552126995040000.dat @ 11776000
   317552127042080000.dat @ 11776000
   317552127089120000.dat @ 11776000
   317552127136160000.dat @ 11776000
   317552127183200000.dat @ 11776000
   317552127230240000.dat @ 11776000
   317552127277280000.dat @ 11776000
   317552127324320000.dat @ 11776000
   317552127371360000.dat @ 11776000
   317552127418400000.dat @ 11776000
   317552127465440000.dat @ 11776000
   317552127512480000.dat @ 11776000
   317552127559520000.dat @ 11776000
   317552127606560

  # This is added back by InteractiveShellApp.init_path()
  if __name__ == '__main__':


These examples should provide a starting point for building pipelines in Bifrost.  Although the examples run purely on the CPU, GPU versions can also be create, either through copying to GPU memory within each block or by using rings in the 'cuda' memory space.  If 'cuda' rings the user needs to ensure that all memory copies to/from the ring are executed before the span is released.  Otherwise, corruption of the ring's contents can happen.  In the next section we will look at logging from inside the blocks to help understand performance.