In [1]:
import numpy as np
import dask.array as da
from xarrayms import xds_from_ms, xds_from_table

Load data from the Measurement Set

In [2]:
xds = list(xds_from_ms("1491291289.1GC.ms", columns=["ANTENNA1", "ANTENNA2"], 
                       group_cols=[], index_cols=[], chunks={"row": 1e9}))

Successful readonly open of default-locked table 1491291289.1GC.ms: 26 columns, 129480 rows


Access Individual elements in xds variable

Use for loop look to loop through all columns at a later stage

Find the unique baselines

In [3]:
ds = xds[0]
assert ds.ANTENNA1.dtype == ds.ANTENNA2.dtype == np.int32

Stack antenna1 and antenna2 data

In [4]:
bl = da.stack([ds.ANTENNA1.data, ds.ANTENNA2.data], axis=1)
bl.shape

(129480, 2)

Convert array to dtype int64 from int32

In [5]:
bl = bl.rechunk(-1,2).view(np.int64)
bl.shape

(129480, 1)

Get unique values (baselines)

In [6]:
ubl = da.unique(bl)

In [7]:
ubl = da.compute(ubl)[0].view(np.int32).reshape(-1,2)
ubl.shape

Successful readonly open of user-locked table 1491291289.1GC.ms: 26 columns, 129480 rows


(120, 2)

In [8]:
xds = list(xds_from_ms("1491291289.1GC.ms", columns=["TIME", "ANTENNA1", "ANTENNA2", "UVW", "FLAG_ROW"], 
                       group_cols=["SCAN_NUMBER"], index_cols=[], chunks={"row": 1e9}))

Successful readonly open of default-locked table 1491291289.1GC.ms: 26 columns, 129480 rows


**Averaging Function** (This is what I need to complete)

So what is needed is the following variables: 
 * **bl_max_ew**  (Maximum EW distance for the valid rows from uvw)
 * **rows** (Count number of valid rows, where valid_rows == True)
 * **bl_max_dist** (Maximum Baseline distance for the valid rows) 

With the above three variables, it should be possible to calculate baseline_avg_rows (the row size of the compressed data) <br />
**bl_max_dist** is calculated from all the uvw values (not just the valid ones)

I think the formular for this calculation is the following: <br/>
 number of valid rows divide by (EW max distance divide by Max baseline)

In [9]:
def _averaging_rows(time, ant1, ant2, uvw, flagged_rows, ubl=None):
    # TODO(smasoka)
    # Understand this
    # Need to index passed argument lists,
    # as atop contracts over dimensions not present in the output.
    # ant1 = ant1[0]
    # ant2 = ant2[0]
    # uvw = uvw[0][0]
    # flagged_rows = flagged_rows[0]

    print ant1.shape
    print ant2.shape
    print uvw.shape
    print flagged_rows.shape

    # Create empty array container with shape of the unique baselines
    baseline_avg_rows = np.empty(ubl.shape[0], dtype=np.int32)
    print baseline_avg_rows.shape
    print

    unflagged = flagged_rows == False
    print unflagged
    print type(unflagged)

    print "UBL"
    print "UBL SHAPE " +str(ubl.shape)
    print "UBL SIZE " +str(ubl.size)
    print

    # Foreach baseline
    for bl, (a1, a2) in enumerate(ubl):
        print "bl : %s, a1 : %s, a2 : %s" %(bl, a1, a2)
        # Find rows associated with each baseline
        # also removing flagged rows
        valid_rows = (ant1 == a1) & (ant2 == a2) & unflagged
        # depending on the unflagged, valid_rows can be reduced, smaller
        print "VALID_ROWS " +str(valid_rows.shape)
        # print
        # Maximum EW distance for each baseline
        bl_max_ew = np.abs(uvw[valid_rows, 0]).sum(axis=0)
        print "MAX EW DISTANCE " +str(bl_max_ew)

        # Figure out what the averaged number of rows will be
        # I think np.divmod is the way to do this
        # baseline_avg_rows[i] = np.divmod(valid_rows)
        # Number (count) of valid_rows for this baseline
        rows = da.where(valid_rows == True)[0].size
        baseline_num_rows.append(rows)
        # basically for each ubl (120) array, baseline_avg_rows (120) each
        # containing the number of lines/rows (int) : baseline_num_rows

        # After the BDA, the row number will change, so I need to calculate
        # how they will change so I can create dask array(with chucking)
        # to store these new compressed rows. This is where bl_max_ew variable
        # comes in. Right?

    return baseline_avg_rows

Array container to hold all the average rows

In [10]:
scan_baseline_avg_rows = []

For loop that uses dask atop function

In [11]:
for ds in xds:
    # calls _averaging_rows
    # output block pattern ("bl",) --> array of baselines
    # TIME data (row array)
    # ANTENNA1 data (row array)
    # ANTENNA2 data (row array)
    # UVW data (row array and all columns) ???
    # FLAG_ROW data (row array)
    # creates a new_axis for the number of baselines
    # pass the actual array of unique baselines
    # dtype of results
    avg_rows = da.core.atop(_averaging_rows, ("bl",),
                            ds.TIME.data, ("row",),
                            ds.ANTENNA1.data, ("row",),
                            ds.ANTENNA2.data, ("row",),
                            ds.UVW.data, ("row", "(u,v,w)"),
                            ds.FLAG_ROW.data, ("row",),
                            # Must tell dask about the number of baselines
                            new_axes={"bl": ubl.shape[0]},
                            # Pass through ubl to all instances of
                            ubl=ubl,
                            dtype=np.int32)

    scan_baseline_avg_rows.append(avg_rows)

Dask compute with argument "scan_baseline_avg_rows"

In [None]:
dask.compute(scan_baseline_avg_rows)