# Image C3171 Mosaic

A quick notebook to look at extracting the pixel size and FWHM of the synthesised beam given a set of uv files to image. 

In [1]:
%matplotlib inline
import os
import sys
import glob
import shutil
import numpy as np
import pandas as pd
import subprocess as sp
import matplotlib.pyplot as plt

from dask import delayed
from dask.diagnostics import ProgressBar

import dask

In [2]:
# # Constants for the reduction for robust=2
CELL = '0.485381'
ROBUST = '2'
FWHM = '6.95,3.0'
PA = '2.2'
OFFSET = '3:32:22.0,-27:48:37'
SELFCAL = 4



In [3]:
data = '../Semester_2_models/*uv'
files = glob.glob(data)

print('The found file are: ')
print(files)
out_dir = f'../Semester_2_models/Image_R{ROBUST}'

The found file are: 
['../Semester_2_models/c3171_2.uv', '../Semester_2_models/c3171_1.uv', '../Semester_2_models/c3171_3.uv', '../Semester_2_models/c3171_4.uv', '../Semester_2_models/c3171_5.uv', '../Semester_2_models/c3171_6.uv', '../Semester_2_models/c3171_7.uv', '../Semester_2_models/c3171_8.uv', '../Semester_2_models/c3171_9.uv', '../Semester_2_models/c3171_10.uv', '../Semester_2_models/c3171_11.uv', '../Semester_2_models/c3171_12.uv', '../Semester_2_models/c3171_13.uv', '../Semester_2_models/c3171_14.uv', '../Semester_2_models/c3171_15.uv', '../Semester_2_models/c3171_16.uv', '../Semester_2_models/c3171_17.uv', '../Semester_2_models/c3171_18.uv', '../Semester_2_models/c3171_19.uv', '../Semester_2_models/c3171_20.uv', '../Semester_2_models/c3171_21.uv', '../Semester_2_models/c3171_22.uv', '../Semester_2_models/c3171_23.uv', '../Semester_2_models/c3171_24.uv', '../Semester_2_models/c3171_25.uv', '../Semester_2_models/c3171_26.uv', '../Semester_2_models/c3171_27.uv', '../Semester_2_

In [4]:
def task_str(task, kw, execute=False):
    '''Return a string to pass to subprocess run args
    task - str, the name of the task
    kw - dict, the dictionary to parse
    '''
    task = ' '.join([task]+[f'{key}={val}' for key, val in kw.items()])

    if execute:
        return run_str(task)
    else:
        return task
    
def run_str(task):
    '''Given an task string, execute it and return the subproces result
    task - str, the string produced by task_str() or equiviliant
    '''
    return sp.run(args=task, stdout=sp.PIPE, stderr=sp.PIPE, shell=True, check=True)

In [5]:
def start_up(out_dir=out_dir):
    '''Common steps to set up the processing
    '''
    if not os.path.exists(out_dir):
        print('Creating output directory...')
        os.mkdir(out_dir)

def close_down(out_dir=out_dir):
    '''Common steps to close down the processing
    '''
    if os.path.exists(out_dir):
        print(f'Deleting the output directory {out_dir}')
        shutil.rmtree(out_dir)

# @delayed
def file_out(name, out_dir=out_dir,self_rnd=None):
    '''A consistent method on crafting a filename for each task. Will
    append the name to the out_dir, as well as any extra requested options,
    like selfcal appending, and folder locations if needed
    
    name - str, name of the file/folder to make
    out_dir - str, name of the output folder
    self_rnd - int or None, the self calibration round currently up to
    '''
    if self_rnd is not None:
        out_dir = f'{out_dir}/Self_{self_rnd}'
        if not os.path.exists(out_dir):
            try:
                os.makedirs(out_dir)
            except OSError:
                pass

    return f'{out_dir}/{name}'

# @delayed
def get_val(k, key, self_rnd=None):
    '''A helper function to get the appropriate key value from dictionary
    produced by the pipeline
    
    k - dict, the dictionary produced by the pipeline
    key - str, the string of the key to extract
    self_rnd - int or None, the current self calibration round
    '''
    if self_rnd is not None:
        key = f'{key}_self{self_rnd}'
    
    return k[key]

# @delayed
def set_val(k, key, val, self_rnd=None):
    '''A helper function to consistently set values to the dictionary populated
    by the pipeline while being aware of the self-calibration round
    
    k - dict, the dictionary produced by the pipeline
    key - str, the key to add
    val - object, item to add to the dictionary
    rnd - int or None, the self calibration round
    '''
    if self_rnd is not None:
        key = f'{key}_self{self_rnd}'
    
    k[key] = val
    
    return k
    
@delayed
def get_name(uv):
    '''Make up the name of the pointing consistently
    '''
    return uv.split('/')[-1]
    
@delayed
def invert(uv, robust=ROBUST, cell=CELL, offset=OFFSET, name=None, self_rnd=None, kw=None):
    '''Call the Miriad Invert task to make up the beam
    uv - str, name of the uv file to image
    robust - str or int, number used for the weighting
    cell - str, the cell sizes to be used
    offset - str, the RA and DEC position to use for reference pixel
    name - str or None, the source name, try to figure it out if none
    out_dir - str, where to write the output files
    self_rnd - int or None, the self calibration round
    kw - dict or None, if self calibration round, then kw is the dict up to that point otherwise None
    '''
    if name is None:
        name = get_name(uv)
        print(f'name of {uv} is {name}')
    
    i_map = file_out(f'{name}.map', self_rnd=self_rnd)
    i_beam = file_out(f'{name}.beam', self_rnd=self_rnd)
    
    # if self calibrated, overwrite the supplied uv file
    if self_rnd is not None:
        uv = get_val(kw, 'vis', self_rnd=self_rnd)
    
    invert_kw = {'vis':uv, 'robust':robust, 'options':'mfs,double,sdb,mosaic',
                            'beam': i_beam, 'map': i_map, 'offset':offset, 
                            'cell':cell, 'stokes':'i', 'imsize':'5,5,beam'}
    p = task_str('invert', invert_kw, execute=True)

    if kw is None:
        kw = invert_kw
        kw['name'] = name
    else:
        kw = set_val(kw, 'map', i_map, self_rnd=self_rnd)
        kw = set_val(kw, 'beam', i_beam, self_rnd=self_rnd)
#     print(kw) 
    return kw

@delayed
def stokes_v_rms(k, self_rnd=None):
    '''Given a uv file, work out the RMS from the stokes v image
    k - dict, containing the results of the processing up to this point
    '''
    name = k['name']
    vis = get_val(k, 'vis', self_rnd=self_rnd)
    v_map = file_out(f'{name}.v.map', self_rnd=self_rnd)

    p = task_str('invert', {'vis':k['vis'], 'options':k['options'], 'stokes':'v',
                            'map':v_map, 'robust':k['robust']},
                execute=True)
    
    p1 = task_str('sigest', {'in':v_map}, execute=True)
    
    for l in p1.stdout.decode('utf-8').split('\n'):
        if 'Estimated rms' in l:
            items = l.split()
            k = set_val(k, 'v_rms', float(items[3]), self_rnd=self_rnd)
    
    return k

@delayed
def mfclean(k, sigma=5, self_rnd=None):
    '''Function to clean the image, given the outouts from invert and the
    stokes_v_rms function. 
    
    k - dict, results thus far from the pipeline
    out_dir - str, where to save the clean components
    sigma - int, how deep to clean
    self_rnd, int or None, the self calibration round
    '''
    name = k['name']
    out = file_out(f'{name}.clean', self_rnd=self_rnd)
#     cutoff = sigma * get_val(k, 'v_rms', self_rnd=self_rnd)
    cutoff = sigma * get_val(k, 'v_rms')
    
    i_map = get_val(k, 'map', self_rnd=self_rnd)
    i_beam = get_val(k, 'beam', self_rnd=self_rnd)
         
    p = task_str('mfclean', {'map':i_map, 'beam':i_beam, 'region':'perc\(66\)',
                             'niters':'5000,100', 'cutoff':cutoff, 'out':out},
                execute=True)

    k = set_val(k, 'mfclean_model', out, self_rnd=self_rnd)
    
    return k

@delayed
def restor_original(k, fwhm=FWHM, pa=PA, self_rnd=None):
    '''Restor a map given the cleaned model
    fwhm - str, the FWHM to convolve with thats passed directly to restor
    pa - str, the pa to convolve with thats passed directly to restor
    self_rnd - int or None, the self calibration round
    '''
    name = k['name']
    out = file_out(f'{name}.restor', self_rnd=self_rnd)
    i_map = get_val(k, 'map', self_rnd=self_rnd)
    i_beam = get_val(k, 'beam', self_rnd=self_rnd)
    i_model = get_val(k, 'mfclean_model', self_rnd=self_rnd)
                           
    p = task_str('restor', {'map':i_map, 'beam':i_beam, 'model':i_model,
                            'out':out, 'options':'mfs', 'fwhm':fwhm, 'pa':pa},
                 execute=True)
    
    k = set_val(k, 'restor', out, self_rnd=self_rnd)
    
    return k

@delayed
def restor(k, self_rnd=None):
    '''Restor a map given the cleaned model
    self_rnd - int or None, the self calibration round
    '''
    name = k['name']
    out = file_out(f'{name}.restor', self_rnd=self_rnd)
    i_map = get_val(k, 'map', self_rnd=self_rnd)
    i_beam = get_val(k, 'beam', self_rnd=self_rnd)
    i_model = get_val(k, 'mfclean_model', self_rnd=self_rnd)
                           
    p = task_str('restor', {'map':i_map, 'beam':i_beam, 'model':i_model,
                            'out':out, 'options':'mfs'},
                 execute=True)
    
    k = set_val(k, 'restor', out, self_rnd=self_rnd)
    
    return k

@delayed
def convol(k, fwhm=FWHM, pa=PA, self_rnd=None):
    '''Convol each of the restored image to a common resolution. 
    fwhm - str, the FWHM to convole to that is passed directly to convol
    pa - str, the pa to convole to that is passed directly to convol
    self_rnd - int or None, the self calibration round
    '''
    name = k['name']
    out = file_out(f'{name}.convol', self_rnd=self_rnd)
    i_map = get_val(k, 'restor', self_rnd=self_rnd)
                           
    p = task_str('convol', {'map':i_map, 'fwhm':fwhm, 'pa':pa,
                            'out':out, 'options':'final'},
                 execute=True)
    
    k = set_val(k, 'convol', out, self_rnd=self_rnd)
    
    return k
    
@delayed(nout=len(files))
def linmos(items, temp='linmos_temp.dat', file_name='all_days', self_rnd=None):
    '''Linmos all the restored images together
    items - list of dicts, contains all the outputs of the pipeline thus far
    '''
    if self_rnd == 0:
        self_rnd = None
    if self_rnd is not None:
        temp = f'{temp}_self{self_rnd}'
        file_name = f'{file_name}_self{self_rnd}'
    with open(f'{temp}','w') as f:
        for k in items:
#             print(get_val(k, 'restor', self_rnd=self_rnd),file=f)
            print(get_val(k, 'convol', self_rnd=self_rnd),file=f)
    
    out = file_out(f'{file_name}.linmos', self_rnd=self_rnd)
    p = task_str('linmos', {'in':f'@{temp}', 'bw':'4.096', 'out':out}, execute=True)
    
    return items
  
@delayed
def self_uvaver(k, self_rnd=None):
    '''Perform the required uvaver on a dataset to apply the calibration tables that
    is required before selcal can be used
    
    k - dict, results of processing up to this point
    self_cal - int, the self calibration round number
    '''
    # Get the input visibility file
    if self_rnd == 1:
        in_vis = k['vis']
    else:
        prev_rnd = self_rnd - 1
        in_vis = get_val(k, 'vis', self_rnd=self_rnd - 1)
    
    name = k['name']
    out = file_out(f'{name}_self{self_rnd}.vis', self_rnd=self_rnd)
    
    p = task_str('uvaver', {'vis':in_vis, 'out':out}, execute=True)
#     print(name)
#     print(p.stdout.decode('utf-8'))
#     print(p.stderr.decode('utf-8'))
    
    set_val(k, 'vis', out, self_rnd=self_rnd)
    return k
    
@delayed
def selfcal(k, self_rnd, options='mfs,phase'):
    '''Run the self calibration task given a model and visibility file
    
    k - dict, dictionary produced by the pipeline
    self_rnd - int, the current self calibration round
    options - str, options passed directly to selfcal
    '''
    
    vis = get_val(k, 'vis', self_rnd=self_rnd)
    if self_rnd == 1:
        model = k['mfclean_model']
    else:
        model = get_val(k, 'mfclean_model', self_rnd=self_rnd-1)
    
    p = task_str('selfcal', {'vis':vis, 'model': model, 'options':options, 'interval':'0.1',
                            'nfbin':'6'},
                 execute=True)
#     print(vis)
#     print(p.stdout.decode('utf-8'))
#     print(p.stderr.decode('utf-8'))

    k = set_val(k, f'selfcal_{self_rnd}_stdout', p.stdout.decode('utf-8'))
    k = set_val(k, f'selfcal_{self_rnd}_stderr', p.stderr.decode('utf-8'))
    
    return k

@delayed
def format_results(items):
    '''Process the outputs from the Dask pipeline
    '''
    return pd.DataFrame(items)

@delayed
def reduce(items):
    '''Place holder for the end of dask.compute()
    '''
    return None


In [6]:
close_down()
start_up()

names = [get_name(f) for f in files]
kw  = [invert(f, name=n) for f, n in zip(files, names)]
kw  = [stokes_v_rms(k) for k in kw]
kw  = [mfclean(k) for k in kw]
kw  = [restor(k) for k in kw]
# kw  = [convol(k) for k in kw] 
# kw  = linmos(kw)

sigma_range = np.linspace(10, 3, SELFCAL)
for i in range(1, SELFCAL+1):
    kw = [self_uvaver(k, self_rnd=i) for k in kw] 
    kw = [selfcal(k, i) for k in kw]
    
    kw = [invert(f, name=n, self_rnd=i, kw=k) for f, n, k in zip(files, names, kw)]
    kw = [mfclean(k, self_rnd=i, sigma=sigma_range[i-1]) for k in kw]
    kw = [restor(k, self_rnd=i) for k in kw]
#     kw = [convol(k, self_rnd=i) for k in kw] 
#     kw = linmos(kw, self_rnd=i)

with ProgressBar():
    df = format_results(kw).compute(num_workers=15)
    
# df = format_results(kw).compute(num_workers=10)

Creating output directory...
[########################################] | 100% Completed | 10hr 10min 30.8s


In [7]:
@delayed
def convol(k, fwhm=FWHM, pa=PA, self_rnd=None):
    '''Convol each of the restored image to a common resolution. 
    fwhm - str, the FWHM to convole to that is passed directly to convol
    pa - str, the pa to convole to that is passed directly to convol
    self_rnd - int or None, the self calibration round
    '''
    if self_rnd == 0:
        self_rnd = None
        
    name = k['name']
    out = file_out(f'{name}.convol', self_rnd=self_rnd)
    i_map = get_val(k, 'restor', self_rnd=self_rnd)
    
    if os.path.isdir(out):
        shutil.rmtree(out)
    
    p = task_str('convol', {'map':i_map, 'fwhm':fwhm, 'pa':pa,
                            'out':out, 'options':'final'},
                 execute=True)
    
    k = set_val(k, 'convol', out, self_rnd=self_rnd)
    
    return k

In [8]:
vals = [ convol(k,self_rnd=s) for s in range(0,SELFCAL+1) 
                              for i,k in df.iterrows() ]
with ProgressBar():
    ndf = format_results(vals).compute(num_works=15)

[########################################] | 100% Completed | 10min  9.6s


In [9]:
@delayed
def linmos_delayed(temp='linmos_temp.dat', mode='convol', file_name='all_days', self_rnd=None):
    '''Linmos all the restored images together
    items - list of dicts, contains all the outputs of the pipeline thus far
    '''
    if mode not in ['restor','convol']:
        return
    if self_rnd == 0:
        self_rnd = None
    if self_rnd is not None:
        temp = f'{temp}_self{self_rnd}'
        file_name = f'{file_name}_s{self_rnd}'
    temp = f'{temp}_{mode}'
    with open(f'{temp}','w') as f:
        for k in glob.glob(file_out('*convol', self_rnd=self_rnd)):
            print(k,file=f)
    
    out = file_out(f'{file_name}.{mode}.linmos', self_rnd=self_rnd)
    
    if os.path.isdir(out):
        shutil.rmtree(out)
    
    p = task_str('linmos', {'in':f'@{temp}', 'bw':'4.096', 'out':out}, execute=True)
    
    return self_rnd

In [10]:
vals = [ linmos_delayed(self_rnd=s, mode=m) for s in range(0,SELFCAL+1) 
                                            for m in ['convol']]    
with ProgressBar():
    ldf = format_results(vals).compute(num_works=10)

[########################################] | 100% Completed | 44min 32.4s
