Skip to content
Gijs Molenaar edited this page Feb 13, 2014 · 4 revisions

The stream control mechanism has changed a number of ways

The VisDataMux node

Up to now, to read an MS we would call mqs.init(inputrec,outputrec,...) and the kernel would auto-magically start feeding MS data to sinks and spigots. This was rather opaque and difficult to generalize. I have therefore divested the kernel itself of this responsibility, and moved it into a special node called the MeqVisDataMux. The mqs.init() function has been phased out.

A MeqVisDataMux (or VDM for short) node connects to a set of sinks and spigots. When you give it a specially-formed request --- one that contains input and output fields that are similar to the input and output record of the former mqs.init() command --- it begins reading/writing a visibility stream and activating the sinks. Once the stream is finished it returns an (empty) result. Here's an example (from an updated !MeqServer/test/matrix343.py):

def create_inputrec(msname, tile_size=1500): inputrec=record() inputrec.ms_name = msname inputrec.data_column_name = 'DATA' inputrec.snippet_size = tile_size # renamed at Jan's request inputrec.selection = record(channel_start_index=25, channel_end_index=40, channel_increment=1, selection_string=''); return inputrecdef create_outputrec(output_column='CORRECTED_DATA'): outputrec=record() outputrec.write_flags=False outputrec.predict_column=output_column return outputrecdef _tdl_job_source_flux_fit_no_calibration (mqs, parent): # (... skipped ...) inputrec = create_inputrec(); req = meq.request(); req.input = record(ms=inputrec,python_init='MAB_read_msvis_header.py'); req.output = record(ms=outputrec); mqs.execute('VisDataMux',req,wait=False);``` A VDM will be created for you automatically if you create any sinks and/or spigots. All sinks and spigots will automatically become children of this node. This auto-created VDM is unimaginatively named VisDataMux. If you prefer to give it a different name, you can create the VDM manually, e.g.:

ns.my_vdm_name << Meq.VisDataMux()

All sinks and spigots will again be made its children automatically. In the future we may allow trees with multiple muxes reading different data sources, in which case each mux's sinks and spigots will need to be assigned explicitly. 


## Other major changes

(Refer to the old [[StreamControl|StreamControl]] document for details of the old implementation.) 

* `tile_size` has been renamed to `snippet_size`, so as to avoid confusion with tiled solutions. 
* I/O is performed by entities called **channels** (these used to be known as "sinks", leading to obvious confusion with MeqSinks, hence the new name). A VDM maintains an input channel and an output channel. Useful channel types are: "ms" (for reading/writing a MS), "boio" (for reading/writing a BOIO file), and "default" (reads/writes nothing, sort of like `/dev/null`).  
* the type of the channel is determined by a **field name** in the `input` and `output` records. For example, the code above places a subrecord called `ms` within the input record. This causes an ms-type input channel to be constructed. The contents of the `input.ms` subrecord are then used to intialize the channel. These are the same as described in [[StreamControl|StreamControl]] (with the exception of `tile_size` being renamed to `snippet_size`). 
* the Python init-script, if any, is independent of the input channel, and is therefore specified directly in the `input` record, i.e. in `input.python_init` (this used to be one level deeper, inside the channel init record). 
* messages to the browser will now be slightly different, since it's a node rather than the kernel itself that is handling the I/O. 
* The current/last input and output records no longer reside in forest state (not being a global property of the forest anymore). Instead, you can find a copy of them in the [[VisDataMux|VisDataMux]] state record. This will brerak the [[StreamControl|StreamControl]] plugin until Rob manages to update it. 

# VisDataMux details

A VDM checks the types of its children, and distinguishes between spigots and sinks. Spigots only get updated with visibility data and are otherwise ignored, while sinks are polled with requests when data snippets arrive. 

At the moment, all created spigots and sinks are associated with a single VDM. This is created implictly if the user has not defined one explicitly. This logic is taken care of at the TDL pre-processing level. 


## VDM I/O record structure

The VDM is activated by passing it a Request with an `input` subrecord (the `output` subrecord is optional; if it is not specified, the VDM works as read-only). The subrecord is structured similarly to the old [[StreamControl|StreamControl]], with an important difference: the **type** of the channel is identified by a **field name**, while the init-record for the channel is placed in a subrecord of that name. Thus, 

* ```python
req.input.ms = record(ms_name='...', ..);.        # creates MS input channelreq.output.boio = record(file_name='...', ...);    # creates BOIO output channel```
The VDM has a number of extra options that apply to all channel types. These are specified by optional fields directly in the `input and `output` records: 

* `input.python_init`: the Python init-script, just like the old [[StreamControl|StreamControl]]. 
* `input.mt_queue_size`, `output.mt_queue_size`: the I/O queue size. Actual reading/writing of the channel object is always done in a separate thread. This speeds things up a bit since the VDM does not need to wait for a channel to, e.g., finish writing data to MS. This option controls the size of the inter-thread queue. You do not normally need to change this, unless you start running out of memory (i.e. when writing huge snippets -- need shorter queue), or find the VDM waiting for I/O too much (i.e. when writing many small snippets -- need longer queue). The default of 256 should be OK for  most cases. 
* `output.sync`: (default is True) controls whether the VDM waits for its output channel to finish writing before returning a result. If False, it will return immediately (letting the writing thread finish "in the background"). In an interactive environment you probably want sync=True, to make sure your MS is entirely written before the VDM returns.  

## VDM sequence children

A VDM can also perform some of the functions of a [[ReqSeq|ReqSeq]]. It has three optional labelled children, `start`, `pre` and `post`. These children are normally unassigned, but you can assign them if you create a VDM explicitly. These children are called as follows: 

* When the VDM detects a new snippet of data, it creates a new Request (with the new snippet's grid) and calls the **start** child, if any. This is _before_ any spigots are populated. 
* The entire snippet is read and all spigots are populated with data. 
* If a **pre** child is attached, it is called with the new Request. 
* All sinks are called with the new Request. 
* If a **post** child is attached, it is called with the new Request. 
Clone this wiki locally