Import common packages

In [1]:
import os
import sys
import datetime
import copy
import shutil
from operator import itemgetter
from functools import partial
from itertools import *

Import pyspark and other packages

In [2]:
import pyspark
from pyspark import SparkContext, SparkFiles, SparkConf
from pyspark.sql import SQLContext, Row
from py4j.java_gateway import JavaGateway
#import pydoop

None


Import task-oriented packages

In [3]:
import astromatic_wrapper as aw #TODO: install aw to itaf virtual env if needed
from astropy.io import fits
import pyraf
from pyraf import iraf
sys.path.insert(0, '/home/ser/Dev/astro_engine/code')
from astro_utils import AEDirsTreeConfigurer, AEJsonConfigLoader
from fits_utils import *

<h2>Used functions</h2>

<h4>Some help functions</h4>

In [4]:
def list_to_file(list_, filename, sep='\n'):
    with open(filename, 'w') as fid:
        for item in list_:
            fid.write('%s%s' % (str(item), sep))

def to_list(something):
    return something if isinstance(something, list) else [something]

def fits_ImageHDU_from_file(filename, mode='denywrite', lazy_load_hdus=False):
    f = fits.open(filename, mode='denywrite', lazy_load_hdus=False)
    fits_image = fits.ImageHDU(data=f[0].data, header=f[0].header)
    f.close()
    return fits_image

def filter_fits_by_header(fits_filenames, fits_path=None, match_any=False, ignore_case=True,
                          ignore_edge_spaces=True, hdu_num=0, **kwargs):
    iffs = [os.path.join(fits_path, os.path.basename(iff)) if fits_path is not None else iff\
            for iff in to_list(fits_filenames)]
    res = []
    for iff in iffs:
        fits_file_header = fits.getheader(iff, hdu_num)
        keywords_match = 0
        for keyword, value in kwargs.iteritems():
            if keyword in fits_file_header:
                header_val = str(fits_file_header[keyword]).strip() if ignore_edge_spaces \
                else str(fits_file_header[keyword])
                header_val = header_val.lower() if ignore_case else header_val
                kwargs_val = str(value).strip() if ignore_edge_spaces else str(value)
                kwargs_val = kwargs_val.lower() if ignore_case else kwargs_val
                if header_val == kwargs_val:
                    keywords_match += 1
                    if match_any:
                        break
        if (keywords_match and match_any) or (keywords_match == len(kwargs)):
            res.append(iff)
    return res

def header_keywords_values(fits_filenames, fits_path=None, keywords=['SIMPLE'], keys_keywords=True, lower_case=True,
                           split_edge_spaces=True, hdu_num=0):
    iffs = [os.path.join(fits_path, os.path.basename(iff)) if fits_path is not None else iff \
            for iff in to_list(fits_filenames)]
    kws = to_list(keywords)
    res = {}
    for iff in iffs:
        fits_file_header = fits.getheader(iff, hdu_num)
        if keys_keywords:
            for keyword in kws:
                if keyword not in res.keys():
                    res[keyword] = {}
                if keyword in fits_file_header:
                    val = fits_file_header[keyword]
                    if val not in res[keyword].keys():
                        res[keyword][val] = []
                    res[keyword][val].append(iff)
        else:   
            for keyword in kws:
                if keyword in fits_file_header:
                    res[iff] = {keyword: fits_file_header[keyword]}
                else:
                    res[iff] = {keyword: None}
    return res

def change_header(fits_filename, fits_path=None, new_path=None, clobber=True, **kwargs):
    iff = os.path.join(fits_path, os.path.basename(fits_filename)) if fits_path is not None else fits_filename
    data, header = fits.getdata(iff, header=True)
    for keyword, value in kwargs.iteritems():
        header[keyword] = value
    if new_path is None:
        new_path = fits_path
    iff = os.path.join(new_path, os.path.basename(iff))
    fits.writeto(iff, data, header, clobber=clobber)
    return iff

def to_header_str_format(str_value, length=8):
    return str_value.strip().ljust(length)

def apply_to_fits(fits_filenames, fits_path=None, func=None):
    iffs = [os.path.join(fits_path, os.path.basename(iff)) if fits_path is not None else iff \
            for iff in to_list(fits_filenames)]
    if not func:
        return iffs
    res = []
    for iff in iffs:
        res.append(func(iff))
    return res

def images_stat(images, fields='mode,mean', return_dict=True, stdout=1, stderr=2):
    iraf.images(_doprint=0)
    if return_dict:
        iraf.images.imstat.setParam('format', 'no')
        #iraf.images.imstat.setParam('')
    iraf.images.imstat.setParam('fields', fields)
    images = images if isinstance(images, list) else [images]
    res = []
    for image in images:
        res.append(iraf.imstat(image, Stdout=stdout, Stderr=stderr))
    return res

<h4>Basic reduction</h4>

In [5]:
def compute_superbias(input_images_names, paths, pipeline_config, pipeline_stage, pipeline_step):
    # TODO: check config, stage and step for needed for this function keys
    # select pipeline stage and step in config
    step_config = copy.deepcopy(pipeline_config['pipeline_stages'][pipeline_stage]['stage_steps'][pipeline_step])
    step_params_config = step_config['step_params']
    software_params = step_params_config['software_params']
    
    input_tech_frames_dir = paths[step_params_config['input_params']['tech_frames']['frames_dir']]
    output_tech_frames_dir = paths[step_params_config['output_params']['tech_frames']['frames_dir']]

    input_bias_config = step_params_config['input_params']['tech_frames']['bias']
    input_bias_config['format'] = to_ext_format(input_bias_config['format'])
    output_bias_config = step_params_config['output_params']['tech_frames']['bias']
    output_bias_config['format'] = to_ext_format(output_bias_config['format'])
    
    input_bias_dir = os.path.join(input_tech_frames_dir, input_bias_config['sub_dir'])
    output_bias_dir = os.path.join(output_tech_frames_dir, output_bias_config['sub_dir'])

    superbias_filename = os.path.join(output_bias_dir,
                                      (output_bias_config['name'] + output_bias_config['format']))
    
    filter_bias_kwargs = {input_bias_config['type_keyword']: input_bias_config['type_keyword_value']}
    return filter_bias_kwargs
    input_bias_filenames = filter_fits_by_header(input_images_names, input_bias_dir, 
                                                 **filter_bias_kwargs)
    
    iraf.noao.imred(_doprint=0)
    iraf.noao.imred.ccdred(_doprint=0)
    iraf.noao.imred.ccdred.instrument = os.path.join(paths[software_params['config']['INSTRUMENT_DIR']],
                                                     software_params['config']['INSTRUMENT_FILE'])
    
    if 'ccdtype' in input_bias_config.keys():
        iraf.noao.imred.ccdred.zerocombine.setParam('ccdtype', input_bias_config['ccdtype'])
    else:
        iraf.noao.imred.ccdred.zerocombine.setParam('ccdtype', '')
    if 'gain_keyword' in input_bias_config.keys():
        gain = fits.getheader(input_bias_filenames[0], 0)[input_bias_config['gain_keyword']]
        iraf.noao.imred.ccdred.zerocombine.setParam('gain', gain)
    
    bias_list = os.path.join(paths['temp'], 'bias_list.txt')
    list_to_file(input_bias_filenames, bias_list)
    
    iraf.noao.imred.ccdred.zerocombine(input='@' + bias_list, output=superbias_filename, 
                                       process='no', delete='no', clobber='no')
    
    return superbias_filename

def compute_superdark(input_images_names, paths, pipeline_config, pipeline_stage, pipeline_step):
    # TODO: check config, stage and step for needed for this function keys
    # select pipeline stage and step in config
    step_config = copy.deepcopy(pipeline_config['pipeline_stages'][pipeline_stage]['stage_steps'][pipeline_step])
    step_params_config = step_config['step_params']
    software_params = step_params_config['software_params']
    
    input_tech_frames_dir = paths[step_params_config['input_params']['tech_frames']['frames_dir']]
    output_tech_frames_dir = paths[step_params_config['output_params']['tech_frames']['frames_dir']]

    input_dark_config = step_params_config['input_params']['tech_frames']['bias']
    input_dark_config['format'] = to_ext_format(input_dark_config['format'])
    output_dark_config = step_params_config['output_params']['tech_frames']['bias']
    output_dark_config['format'] = to_ext_format(output_dark_config['format'])
    
    input_dark_dir = os.path.join(input_tech_frames_dir, input_dark_config['sub_dir'])
    output_dark_dir = os.path.join(output_tech_frames_dir, output_dark_config['sub_dir'])

    superdark_filename = os.path.join(input_dark_dir,
                                      (output_dark_config['name'] + output_dark_config['format']))
    
    filter_dark_kwargs = {input_dark_config['type_keyword']: input_dark_config['type_keyword_value']}
    return filter_dark_kwargs
    input_dark_filenames = filter_fits_by_header(input_images_names, input_dark_dir, 
                                                 **filter_dark_kwargs)
    
    iraf.noao.imred(_doprint=0)
    iraf.noao.imred.ccdred(_doprint=0)
    iraf.noao.imred.ccdred.instrument = os.path.join(paths[software_params['config']['INSTRUMENT_DIR']],
                                                     software_params['config']['INSTRUMENT_FILE'])
    
    if 'ccdtype' in input_bias_config.keys():
        iraf.noao.imred.ccdred.darkcombine.setParam('ccdtype', input_dark_config['ccdtype'])
    else:
        iraf.noao.imred.ccdred.darkcombine.setParam('ccdtype', '')
    if 'gain_keyword' in input_bias_config.keys():
        gain = fits.getheader(input_bias_filenames[0], 0)[input_dark_config['gain_keyword']]
        iraf.noao.imred.ccdred.darkcombine.setParam('gain', gain)
    
    dark_list = os.path.join(paths['temp'], 'dark_list.txt')
    list_to_file(input_dark_filenames, dark_list)
    
    iraf.noao.imred.ccdred.darkcombine(input='@' + dark_list, output=superdark_filename, 
                                       process='no', delete='no', clobber='no')
    
    return superdark_filename

In [6]:
def compute_supers(input_images_names, paths, pipeline_config, pipeline_stage, pipeline_step):
    # TODO: check config, stage and step for needed for this function keys
    # select pipeline stage and step in config
    step_config = copy.deepcopy(pipeline_config['pipeline_stages'][pipeline_stage]['stage_steps'][pipeline_step])
    step_params_config = step_config['step_params']
    input_params_config = step_params_config['input_params']
    output_params_config = step_params_config['output_params']
    
    # prepare io params 
    exp_times = to_list(input_params_config['images_exp_times_secs'])
    filters = to_list(input_params_config['images_filters'])
    images_filter_keyword = input_params_config['images_filter_keyword']
    input_tech_frames_dir = paths[input_params_config['tech_frames']['frames_dir']]
    output_tech_frames_dir = paths[output_params_config['tech_frames']['frames_dir']]
    use_bias = 'bias' in input_params_config['tech_frames']['use_frames']
    use_dark = 'dark' in input_params_config['tech_frames']['use_frames']
    use_flat = 'flat' in input_params_config['tech_frames']['use_frames']

    
    if use_bias:
        superbias_filename = compute_superbias(input_images_names, paths, 
                                               pipeline_config, pipeline_stage, pipeline_step)   
    if use_dark:
        # compute superdarks for all exp_times for data images
        #if no exp_time, take exp_time with biggest list of corresponding darks
        exp_times_darks = {}
        for exp_time in exp_times:
            exp_times_darks[exp_time] = compute_superdark(input_images_names, paths, 
                                               pipeline_config, pipeline_stage, pipeline_step)

<h4>Catalog extraction basic function</h4>

In [None]:
def create_catalogs(input_images_names, paths, pipeline_config, pipeline_stage, pipeline_step):
    # TODO: check config, stage and step for needed for this function keys
    # select pipeline stage and step in config
    step_config = copy.deepcopy(pipeline_config['pipeline_stages'][pipeline_stage]['stage_steps'][pipeline_step])
    step_params_config = step_config['step_params']
    
    # use paths to fill path-depended params TODO: preprocess config to do this
    step_params_config['software_params']['config_file'] = os.path.join(paths['config'],
                                                                step_params_config['software_params']['config_file'])
    step_params_config['software_params']['temp_path'] = paths[step_params_config['software_params']['temp_path']]
    step_params_config['software_params']['config']['FILTER_NAME'] = os.path.join(paths['config'],
                                                      step_params_config['software_params']['config']['FILTER_NAME'])
    step_params_config['software_params']['config']['STARNNW_NAME'] = os.path.join(paths['config'],
                                                      step_params_config['software_params']['config']['STARNNW_NAME'])
    
    # prepare io params   
    # input images
    input_images_format = to_ext_format(step_params_config['input_params']['images_format'])
    input_images_dir = paths[step_params_config['input_params']['images_dir']]
    iffs = build_filenames_list(input_images_names, new_ext=input_images_format, 
                                new_path=input_images_dir)
    
    # input psf models, if exist
    look_for_psf = False
    ipfs = range(len(iffs))
    if 'psf_models_format' in step_params_config['input_params'].keys() and \
    'psf_models_dir' in step_params_config['input_params'].keys():
        look_for_psf = True
        input_psf_models_format = to_ext_format(step_params_config['input_params']['psf_models_format'])
        input_psf_models_dir = paths[step_params_config['input_params']['psf_models_dir']]
        ipfs = build_filenames_list(input_images_names, new_ext=input_psf_models_format, 
                                    new_path=input_psf_models_dir)
    
    # output catalogs
    output_catalogs_format = to_ext_format(step_params_config['output_params']['catalogs_format'])
    output_catalogs_dir = paths[step_params_config['output_params']['catalogs_dir']]
    ocfs = build_filenames_list(input_images_names, new_ext=output_catalogs_format, 
                                    new_path=output_catalogs_dir)
    
    
    if 'check_params_for_scamp_required' in step_params_config['output_params'].keys():
        if step_params_config['output_params']['check_params_for_scamp_required'] == 'true':
            missing_req_scamp_params = list(set(pipeline_config['scamp_required_catalog_params']) - \
            set(step_params_config['software_params']['params']))
            step_params_config['software_params']['params'].extend(missing_req_scamp_params)
    
    if 'check_params_for_psfex_required' in step_params_config['output_params'].keys():
        if step_params_config['output_params']['check_params_for_psfex_required'] == 'true':
            missing_req_psfex_params = list(set(pipeline_config['psfex_required_catalog_params']) - \
            set(step_params_config['software_params']['params']))
            step_params_config['software_params']['params'].extend(missing_req_psfex_params)
    
    # output is a list of catalogs full pathnames
    catalogs = []
    
    for (iff, ipf, ocf) in zip(iffs, ipfs, ocfs):
        software_params = step_params_config['software_params'] # TODO: deepcopy or not?
        # TODO: is it needed or SExtractor recognizes it by itself?
        if 'params_from_fits_header' in step_params_config['data_handle'].keys():
            fits_file = fits.open(iff)
            fits_file_header = fits_file[0].header
            for param_name, fits_header_keyword in step_params_config['data_handle']['params_from_fits_header'].iteritems():
                if fits_header_keyword in fits_file_header:
                    software_params['config'][param_name] = str(fits_file[0].header[fits_header_keyword])
        software_params['config']['CATALOG_NAME'] = ocf
        if look_for_psf:
            software_params['config']['PSF_NAME'] = ipf
        sextractor = aw.api.Astromatic(**software_params)
        #this_cmd, kwargs2 = sextractor.build_cmd(iff, **software_params)
        #print this_cmd
        sextractor.run(iff) #TODO: handle run status (maybe return it?)
        catalogs.append(ocf)
    
    return catalogs

<h4>Calculate calibration basic function</h4>

In [8]:
def calculate_calibration(input_images_names, paths, pipeline_config, pipeline_stage, pipeline_step):
    # TODO: check config, stage and step for needed for this function keys
    # select pipeline stage and step in config
    step_config = copy.deepcopy(pipeline_config['pipeline_stages'][pipeline_stage]['stage_steps'][pipeline_step])
    step_params_config = step_config['step_params']
    
    # use paths to fill path-depended params TODO: preprocess config to do this
    step_params_config['software_params']['config_file'] = os.path.join(paths['config'],
                                                                step_params_config['software_params']['config_file'])
    step_params_config['software_params']['temp_path'] = paths[step_params_config['software_params']['temp_path']]
    
    # prepare io params   
    # input catalogs
    input_catalogs_format = to_ext_format(step_params_config['input_params']['catalogs_format'])
    input_catalogs_dir = paths[step_params_config['input_params']['catalogs_dir']]
    icfs = build_filenames_list(input_images_names, new_ext=input_catalogs_format, 
                                    new_path=input_catalogs_dir)
    
    # output is empty list
    result = []
    
    calculate_calibration_separately = False
    if 'calculate_calibration_separately' in step_params_config['output_params'].keys():
        if step_params_config['output_params']['calculate_calibration_separately'] == 'true':
            calculate_calibration_separately = True
            
    software_params = step_params_config['software_params'] # TODO: deepcopy or not?
    
    if calculate_calibration_separately:
        for icf in icfs:
            scamp = aw.api.Astromatic(**software_params)
            #this_cmd, kwargs2 = scamp.build_cmd(icf, **software_params)
            #print this_cmd
            scamp.run(icf)
    else:
        scamp = aw.api.Astromatic(**software_params)
        scamp.run(' '.join(icfs)) # TODO: create file with catalogs list, and pass as @catalogs_list
        
    return result

<h4>Image SWarp basic function</h4>

In [9]:
def swarp_images(input_images_names, paths, pipeline_config, pipeline_stage, pipeline_step):
    # TODO: check config, stage and step for needed for this function keys
    # select pipeline stage and step in config
    step_config = copy.deepcopy(pipeline_config['pipeline_stages'][pipeline_stage]['stage_steps'][pipeline_step])
    step_params_config = step_config['step_params']
    
    # use paths to fill path-depended params TODO: preprocess config to do this
    step_params_config['software_params']['config_file'] = os.path.join(paths['config'],
                                                                step_params_config['software_params']['config_file'])
    step_params_config['software_params']['temp_path'] = paths[step_params_config['software_params']['temp_path']]
    step_params_config['software_params']['config']['RESAMPLE_DIR'] = paths[step_params_config['software_params']['config']['RESAMPLE_DIR']]
    
    # prepare io params
    # input images
    input_images_format = to_ext_format(step_params_config['input_params']['images_format'])
    input_images_dir = paths[step_params_config['input_params']['images_dir']]
    iffs = build_filenames_list(input_images_names, new_ext=input_images_format, 
                                new_path=input_images_dir)
    
    # output images
    output_images_format = to_ext_format(step_params_config['output_params']['images_format'])
    output_images_dir = paths[step_params_config['output_params']['images_dir']]
    
    # output weightmaps
    output_weightmaps_format = to_ext_format(step_params_config['output_params']['weightmaps_format'])
    output_weightmaps_dir = paths[step_params_config['output_params']['weightmaps_dir']]
    
    # output is a tuple of lists: swarped images full pathnames and weightmaps full pathnames
    swarped_images, weightmaps = [], []
    
    swarp_separately = False
    if 'swarp_separately' in step_params_config['output_params'].keys():
        if step_params_config['output_params']['swarp_separately'] == 'true':
            swarp_separately = True
            
    software_params = step_params_config['software_params'] # TODO: deepcopy or not?
                   
    if swarp_separately:
        # prepare output_images_filenames and output_weightmaps_filenames (convert them to full pathname)
        offs = build_filenames_list(input_images_names, new_ext=output_images_format, 
                                    new_path=output_images_dir)
        owmfs = build_filenames_list(input_images_names, new_ext=output_weightmaps_format, 
                                    new_path=output_weightmaps_dir)
        
        for (iff, off, owmf) in zip(iffs, offs, owmfs):
            software_params['config']['IMAGEOUT_NAME'] = off
            software_params['config']['WEIGHTOUT_NAME'] = owmf
            swarp = aw.api.Astromatic(**software_params)
            swarp.run(iff)
            swarped_images.append(off)
            weightmaps.append(owmf)   
    else:
        # build coadded image and weightmap names
        output_filename = pipeline_config['observation_id']
        coadd_images_nums = sorted(
            [os.path.basename(iff).rstrip(input_images_format).rsplit('_',1)[1] for iff in iffs])
        nums_ranges = []
        for k, g in groupby(enumerate(coadd_images_nums), lambda (i,x):int(i)-int(x)):
            nums_range = map(itemgetter(1),g)
            nums_ranges.append(nums_range[0] if len(nums_range) == 1 else nums_range[0] + '-' + nums_range[-1])
        nums_ranges = '_'.join(nums_ranges)
        output_image_name = output_filename + '_' + nums_ranges + output_images_format
        output_image_name = os.path.join(output_images_dir, output_image_name)
        output_weightmap_name = output_filename + '_' + nums_ranges + output_weightmaps_format
        output_weightmap_name = os.path.join(output_weightmaps_dir, output_weightmap_name)
        # run SWarp
        software_params['config']['IMAGEOUT_NAME'] = output_image_name
        software_params['config']['WEIGHTOUT_NAME'] = output_weightmap_name
        swarp = aw.api.Astromatic(**software_params)
        swarp.run(' '.join(iffs)) # TODO: create file with images list, and pass as @images_list
        swarped_images.append(output_image_name)
        weightmaps.append(output_weightmap_name)
    
    return swarped_images, weightmaps

<h4>Calculate PSF basic function</h4>

In [10]:
def calculate_psf(input_images_names, paths, pipeline_config, pipeline_stage, pipeline_step):
    # TODO: check config, stage and step for needed for this function keys
    # select pipeline stage and step in config
    step_config = copy.deepcopy(pipeline_config['pipeline_stages'][pipeline_stage]['stage_steps'][pipeline_step])
    step_params_config = step_config['step_params']
    
    # use paths to fill path-depended params TODO: preprocess config to do this
    step_params_config['software_params']['config_file'] = os.path.join(paths['config'],
                                                                step_params_config['software_params']['config_file'])
    step_params_config['software_params']['temp_path'] = paths[step_params_config['software_params']['temp_path']]
    
    # prepare io params   
    # input catalogs
    input_catalogs_format = to_ext_format(step_params_config['input_params']['catalogs_format'])
    input_catalogs_dir = paths[step_params_config['input_params']['catalogs_dir']]
    icfs = build_filenames_list(input_images_names, new_ext=input_catalogs_format, 
                                    new_path=input_catalogs_dir)
    
    # output psf models
    output_psf_models_format = to_ext_format(step_params_config['output_params']['psf_models_format'])
    output_psf_models_dir = paths[step_params_config['output_params']['psf_models_dir']]
    # TODO: ispect BUG in PSFEx, it replaces PSF_SUFFIX only last extension part (after last .)
    step_params_config['software_params']['config']['PSF_SUFFIX'] = ext_split(output_psf_models_format)[-1]
    step_params_config['software_params']['config']['PSF_DIR'] = output_psf_models_dir
    opmfs = build_filenames_list(input_images_names, new_ext=output_psf_models_format, 
                                    new_path=output_psf_models_dir)
    
    # output is list of calculated psf models full pathnames
    #
    
    calculate_psf_separately = False
    if 'calculate_psf_separately' in step_params_config['output_params'].keys():
        if step_params_config['output_params']['calculate_psf_separately'] == 'true':
            calculate_psf_separately = True
            
    software_params = step_params_config['software_params'] # TODO: deepcopy or not?
    
    if calculate_psf_separately:
        for icf in icfs:
            psfex = aw.api.Astromatic(**software_params)
            #this_cmd, kwargs2 = psfex.build_cmd(icf, **software_params)
            #print this_cmd
            psfex.run(icf)
    else:
        psfex = aw.api.Astromatic(**software_params)
        #this_cmd, kwargs2 = psfex.build_cmd(' '.join(icfs), **software_params)
        #print this_cmd
        psfex.run(' '.join(icfs)) # TODO: create file with catalogs list, and pass as @catalogs_list
    
    return opmfs

In [11]:
#catalog = aw.utils.ldac.get_table_from_ldac(catalogs_5[0])
#catalog

<h3>Spark functions for pipeline stages</h3>

In [13]:
# map func for split images on groupes based on exposure type
def split_tech_images(input_item):
    
    # get config from broadcast variable
    pipeline_config = broadcast_pipeline_config.value
    
    # create dirs tree
    wrk_path = str(pipeline_config['observation_id']) + '_images_preparation_map_'
    wrk_path = wrk_path + datetime.datetime.now().strftime('%Y-%m-%d_%H:%M:%S:%f')
    dirs = AEDirsTreeConfigurer(wrk_path=wrk_path)
    dirs.build_dirs_tree()
    dirs.new_log_dir()
    paths = dirs.get_paths()
    
    # put all config files in paths['config'] dir
    config_files = broadcast_config_files.value
    for config_file in config_files:
        shutil.copyfile(SparkFiles.get(config_file), os.path.join(paths['config'], 
                                                                  os.path.basename(config_file)))
    # save image to executor local file system
    image_filename = os.path.join(paths['temp'], os.path.basename(input_item[0]))
    with open(image_filename, 'wb') as fid:
        fid.write(input_item[1])
    exp_type = fits.getheader(image_filename, 0)[pipeline_config['exposure_type_keyword']]
    
    # TODO: paste here preprocessing pipeline, defined by users
    # need to define user preproc functions interface and create dispatcher of them 
    
    # TODO: paste this dirs tree removal into AEDirsTreeConfigurer class
    shutil.rmtree(wrk_path, True)
    return exp_type, input_item

# (reduceByKey) for images combination to one list per each criteria 
# (i.e. observation id, it`s set in map func(s) return(s))
def join_to_list_by_key(input_item_1, input_item_2):
    return to_list(input_item_1) + to_list(input_item_2)

# current input_items are tuples (exp_type, fits)
# output after collect wil be a list of dicts: key=frame_type, value=list of tuples(filename, fits)
# futher processing of techframes is done locally (it`s a simple task and needs to be done once)
def compute_super_tech_frames_dicts(input_items):
    frames_type = str(input_items[0])
    res = {frames_type: []}
    for item in input_items[1]:
        res[frames_type].append(item)
    return res

In [157]:
# makes chunks of input list with size of chunk_size
def get_chunks(input_list, chunk_size):
    for i in xrange(0, len(input_list), chunk_size):
        yield input_list[i:i + chunk_size]

# input is a full set of images data
# input_rdd is a tuple(exp_type, list(tuple(full_pathname, bytes)))
def split_images_to_chunks(input_rdd):
    chunk_size = int(broadcast_pipeline_config.value['images_split_chunk_size'])
    input_items = input_rdd[1]
    rdd_chunks = get_chunks(input_items, chunk_size)
    key = 0
    res = []
    for chunk in rdd_chunks:
        res.append((key, chunk))
        key += 1
    return res

# input is a chunk of all images data
# input_rdd is a tuple(chunk_num, list(tuple(full_pathname, bytes)))
def calibrate_images_stage(input_rdd):
    
    input_items = input_rdd[1]
    
    # get config from broadcast variable
    pipeline_config = broadcast_pipeline_config.value
    
    # create dir for dirs tree, name is name of first image in partition
    #wrk_path = str(change_file_extension(os.path.basename(input_items[0][0]), '')) # [0][0] for full files
    wrk_path = str(pipeline_config['observation_id']) + '_images_calibration_map_'
    wrk_path = wrk_path + datetime.datetime.now().strftime('%Y-%m-%d_%H:%M:%S:%f')
    
    # create dirs tree
    dirs = AEDirsTreeConfigurer(wrk_path=wrk_path)
    dirs.build_dirs_tree()
    dirs.new_log_dir()
    paths = dirs.get_paths()
    
    # take input images dir from json config
    input_images_dir = paths[pipeline_config['pipeline_stages']['images_calibration']\
    ['stage_params']['input_params']['images_dir']]

    # put all config files in paths['config'] dir
    config_files = broadcast_config_files.value
    for config_file in config_files:
        shutil.copyfile(SparkFiles.get(config_file), os.path.join(paths['config'], 
                                                                  os.path.basename(config_file)))
    # prepare a list of images names
    # ii[0] for full files
    input_images_names = sorted([change_file_extension(os.path.basename(ii[0]), '') for ii in input_items])
    # check if input images list is correct
    #with open(os.path.join(paths['temp'], 'images_list.txt'), 'w') as fid:
        #for ii in input_images_names:
            #fid.write('%s\n' % ii)
    
    # for each item in input_items, save it to executor local file system
    for input_item in input_items:
        # save image to executor local file system
        image_filename = os.path.join(paths['temp'], os.path.basename(input_item[0]))
        with open(image_filename, 'wb') as fid:
            fid.write(input_item[1])

    # here call steps basic functions
    # stage: images_calibration
    catalogs_1 = create_catalogs(input_images_names, paths, pipeline_config, 'images_calibration', 'create_calibration_catalogs')
    calculate_calibration(input_images_names, paths, pipeline_config, 'images_calibration', 'create_calibration_headers')
    swarped_images_1, _ = swarp_images(input_images_names, paths, pipeline_config, 'images_calibration', 'calibrate_images')
    catalogs_2 = create_catalogs(input_images_names, paths, pipeline_config, 'images_calibration', 'create_calibrated_images_catalogs')
    calculate_psf(input_images_names, paths, pipeline_config, 'images_calibration', 'model_psf_calibrated_images_catalogs')
    catalogs_3 = create_catalogs(input_images_names, paths, pipeline_config, 'images_calibration', 'create_calibrated_images_catalogs_psf')
    
    # here load result data
    output_dict = {}
    cal_images = {im: fits_ImageHDU_from_file(im) for im in swarped_images_1}
    cal_cats = {cat: aw.utils.ldac.get_table_from_ldac(cat) for cat in catalogs_2}
    cal_cats_psf = {cat: aw.utils.ldac.get_table_from_ldac(cat) for cat in catalogs_3}
    output_dict['calibrated_images'] = cal_images
    output_dict['calibrated_images_catalogs'] = cal_cats
    output_dict['calibrated_images_catalogs_psf'] = cal_cats_psf
    
    # TODO: paste this dirs tree removal into AEDirsTreeConfigurer class
    shutil.rmtree(wrk_path, True)
    
    return 1, output_dict

In [158]:
# each input dict is a dict{type:dict{fn:bytes}}
def join_to_dict_by_key(dict1, dict2):
    res = {}
    for k, v in dict1.iteritems():
        if k not in res.keys():
            res[k] = {}
        for kk, vv in dict1[k].iteritems():
            res[k][kk] = vv

    for k, v in dict2.iteritems():
        if k not in res.keys():
            res[k] = {}
        for kk, vv in dict2[k].iteritems():
            res[k][kk] = vv
    return res    


# input is a dict of all calibrated images, cats anf psf_cats data
# input_rdd is a tuple(1, dict{data_type:dict{filename:bytes}})
def coadd_images_stage(input_rdd):
    
    # get json config from broadcast variable
    pipeline_config = broadcast_pipeline_config.value
    
    # we need as files only calibrated images, but all input is provided futher with new data computed at this stage
    output_dict = input_rdd[1]
    
    # to list of tuples, like in previuos fuctions
    input_items = output_dict['calibrated_images'].items()
    
    # create dir for dirs tree, name is name of first image in partition
    wrk_path = str(pipeline_config['observation_id']) + '_images_coaddition_map_'
    wrk_path = wrk_path + datetime.datetime.now().strftime('%Y-%m-%d_%H:%M:%S:%f')
    
    # create dirs tree
    dirs = AEDirsTreeConfigurer(wrk_path=wrk_path)
    dirs.build_dirs_tree()
    #dirs.new_log_dir()
    paths = dirs.get_paths()
    
    # take input images dir from json config
    input_images_dir = paths[pipeline_config['pipeline_stages']['images_coaddition']\
    ['stage_params']['input_params']['images_dir']]
    
    # put all config files in paths['config'] dir
    config_files = broadcast_config_files.value
    for config_file in config_files:
        shutil.copyfile(SparkFiles.get(config_file), os.path.join(paths['config'], 
                                                                  os.path.basename(config_file)))
    
    # prepare a list of images names
    # ii[0] for full files
    input_images_names = sorted([change_file_extension(os.path.basename(ii[0]), '') for ii in input_items])
    # check if input images list is correct
    #with open(os.path.join(paths['temp'], 'images_list.txt'), 'w') as fid:
        #for ii in input_images_names:
            #fid.write('%s\n' % ii)
    
    # for each item in input_items, save it to executor local file system
    for input_item in input_items:
        fits.writeto(os.path.join(input_images_dir, os.path.basename(input_item[0])), 
                     data=input_item[1].data, header=input_item[1].header)
    
    # here call steps basic functions
    # stage: images_coaddition
    swarped_images_2, _ = swarp_images(input_images_names, paths, pipeline_config, 'images_coaddition', 'coadd_calibrated_images')
    catalogs_4 = create_catalogs(swarped_images_2, paths, pipeline_config, 'images_coaddition', 'create_coadded_image_catalog')
    calculate_psf(swarped_images_2, paths, pipeline_config, 'images_coaddition', 'model_psf_coadded_image_catalog')
    catalogs_5 = create_catalogs(swarped_images_2, paths, pipeline_config, 'images_coaddition', 'create_coadded_image_catalog_psf')
    
    # here load result data (merge new data with data provided from above)
    cal_images = {im: fits_ImageHDU_from_file(im) for im in swarped_images_2}
    cal_cats = {cat: aw.utils.ldac.get_table_from_ldac(cat) for cat in catalogs_4}
    cal_cats_psf = {cat: aw.utils.ldac.get_table_from_ldac(cat) for cat in catalogs_5}
    
    output_dict['calibrated_coadded_images'] = cal_images
    output_dict['calibrated_coadded_images_catalogs'] = cal_cats
    output_dict['calibrated_coadded_images_catalogs_psf'] = cal_cats_psf
    
    # TODO: paste this dirs tree removal into AEDirsTreeConfigurer class
    shutil.rmtree(wrk_path, True)
    
    #for k, v in output_dict.iteritems():
        #output_dict[k] = v.items()
    
    return 1, output_dict#.items()

In [159]:
def get_data_by_key(input_rdd, key):
    input_items = input_rdd[1]
    if key in input_items.keys():
        res = input_items[key]
    else:
        res = {}
    return res

In [160]:
def save_catalogs(input_rdd, dir_name='catalogs'):
    res =[]
    for k, v in input_rdd.iteritems():
        pathname = os.path.join(broadcast_paths.value[dir_name], os.path.basename(k))
        aw.utils.ldac.save_table_as_ldac(v, pathname, clobber=True)
        res.append(pathname)
    return res

def save_images(input_rdd, dir_name='images'):
    res =[]
    for k, v in input_rdd.iteritems():
        pathname = os.path.join(broadcast_paths.value[dir_name], os.path.basename(k))
        fits.writeto(pathname, data=v.data, header=v.header, clobber=True)
        res.append(pathname)
    return res

<h3>Spark application main code (driver code)</h3>

In [176]:
# init dirs tree for driver
dirs = AEDirsTreeConfigurer()
dirs.build_dirs_tree()
dirs.new_log_dir()
paths = dirs.get_paths()

#load json config for pipeline
all_configs = AEJsonConfigLoader(os.path.join(paths['config'], 'common_config.json'))
all_configs.build_config()
pipeline_config = all_configs.get_pipeline_config()

In [177]:
# create spark configuration
spark_conf = SparkConf().setMaster('local[4]').setAppName('PipelineTest').setSparkHome(os.environ.get('SPARK_HOME'))
spark_conf.set('spark.pyspark.python', os.environ.get('PYSPARK_PYTHON'))
spark_conf.set('spark.pyspark.driver.python', os.environ.get('PYSPARK_PYTHON'))
spark_conf.set('spark.driver.memory', '4g')
spark_conf.set('spark.files.overwrite', 'true')

# create spark context
sc = SparkContext(conf=spark_conf)
#sc = SparkContext(conf=spark_conf, gateway=gateway)

# add custom python modules to all executors
sc.addPyFile('/home/ser/Dev/astro_engine/code/astro_utils.py')
sc.addPyFile('/home/ser/Dev/astro_engine/code/fits_utils.py')

# add config files to spark context
config_files = get_fits_images_from_dir(paths['config'], '')
config_files = [os.path.join(paths['config'], os.path.basename(c)) for c in config_files]
for config_file in config_files:
    sc.addFile(config_file)

# define broadcast variables
broadcast_paths = sc.broadcast(paths)
broadcast_config_files = sc.broadcast(config_files)
broadcast_pipeline_config = sc.broadcast(pipeline_config)

In [178]:
sc.getConf().getAll()

[(u'spark.pyspark.driver.python',
  u'/home/ser/Dev/Python/2/anaconda2/bin/python'),
 (u'spark.master', u'local[4]'),
 (u'spark.driver.memory', u'4g'),
 (u'spark.app.name', u'PipelineTest'),
 (u'spark.pyspark.python', u'/home/ser/Dev/Python/2/anaconda2/bin/python'),
 (u'spark.executor.id', u'driver'),
 (u'spark.driver.host', u'10.0.2.15'),
 (u'spark.driver.port', u'43953'),
 (u'spark.rdd.compress', u'True'),
 (u'spark.serializer.objectStreamReset', u'100'),
 (u'spark.home', u'/home/ser/Dev/Spark/spark-2.1.0-bin-hadoop2.7'),
 (u'spark.app.id', u'local-1494982042526'),
 (u'spark.submit.deployMode', u'client'),
 (u'spark.files.overwrite', u'true')]

In [179]:
images = sc.binaryFiles(paths['temp']).filter(lambda x: x[0].endswith(pipeline_config['input_images_format']))
#images = sc.binaryFiles("hdfs://localhost:9000/ser/temp")
print 'images RDD:', images
print 'observation_id:', broadcast_pipeline_config.value['observation_id']

images RDD: PythonRDD[1] at RDD at PythonRDD.scala:48
observation_id: GRB130427


In [180]:
# split images on groups by image type
images_groups = images.map(split_tech_images).reduceByKey(join_to_list_by_key)
tech_frames = images_groups.filter(lambda x: x[0] != pipeline_config['images_exposure_type'])
raw_images = images_groups.filter(lambda x: x[0] == pipeline_config['images_exposure_type'])

# process tech_frames
tech_frames_dicts_list = tech_frames.map(compute_super_tech_frames_dicts).collect()
tech_frames_dict = dict([(key, d[key]) for d in tech_frames_dicts_list for key in d])
super_frames_dict = compute_supers(tech_frames_dict, paths, pipeline_config, 'preparation', 'tech_frames_preprocessing')

In [181]:
# TODO: COMPUTE SUPERFRAMES (AND APPLY THEM, so raw images will be not raw) MANUALLY, FILL THIS DICT
# super_frames_dict
# use Pydoop for interacting with HDFS within workers

In [182]:
# process raw_images and save results as a PickleFile
if int(broadcast_pipeline_config.value['images_split_chunk_size']) > 0:
    raw_images = raw_images.flatMap(split_images_to_chunks)
result_data_rdd = raw_images.map(calibrate_images_stage).reduceByKey(join_to_dict_by_key).map(coadd_images_stage)#.collect()

In [183]:
#result_data_rdd[0][1].keys()

In [184]:
#%%time
#result_dir = 'test_result_2'
#result_data_rdd.saveAsPickleFile(result_dir)

On 16 images 512x512, local mode:<br>
CPU times: user 748 ms, sys: 120 ms, total: 868 ms<br>
Wall time: 1min 13s<br>
Result size: 48.5Mb<br><br>
On 60 images 512x512, local mode:<br>
CPU times: user 40 ms, sys: 4 ms, total: 44 ms<br>
Wall time: 4min 32s, 3min 56s, <br>
Result size: 242.8Mb<br>

In [185]:
#result_dir = 'test_result'
#new_rdd = sc.pickleFile(result_dir)

In [186]:
data_types = ['calibrated_coadded_images','calibrated_coadded_images_catalogs',
              'calibrated_coadded_images_catalogs_psf', 'calibrated_images', 
              'calibrated_images_catalogs', 'calibrated_images_catalogs_psf']
catalogs_types = ['calibrated_coadded_images_catalogs', 'calibrated_coadded_images_catalogs_psf',
                  'calibrated_images_catalogs', 'calibrated_images_catalogs_psf']
images_types = ['calibrated_coadded_images', 'calibrated_images']

# cache rdd
result_data_rdd.cache()

# define rdds dict for all images
images_rdd_dict = {}
for image_type in images_types:
    partial_get = partial(get_data_by_key, key=image_type)
    images_rdd_dict[image_type] = result_data_rdd.map(partial_get)

# define rdds dict for all catalogs
catalogs_rdd_dict = {}
for catalog_type in catalogs_types:
    partial_get = partial(get_data_by_key, key=catalog_type)
    catalogs_rdd_dict[catalog_type] = result_data_rdd.map(partial_get)

In [187]:
%%time
# save catalogs and images to HDFS or local driver fyle system, store dict of paths to saved files
catalogs_paths_dict = {}
images_paths_dict = {}

for cat_type, cat_rdd in catalogs_rdd_dict.iteritems():
    catalogs_paths_dict[cat_type] = cat_rdd.map(save_catalogs).collect()[0]

images_paths_dict['calibrated_coadded_images'] = images_rdd_dict['calibrated_coadded_images'].map(
    lambda x: save_images(x, 'stacks')).collect()[0]
images_paths_dict['calibrated_images'] = images_rdd_dict['calibrated_images'].map(save_images).collect()[0]

CPU times: user 264 ms, sys: 88 ms, total: 352 ms
Wall time: 5min 43s


3 images 512x512<br>
CPU times: user 176 ms, sys: 12 ms, total: 188 ms<br>
Wall time: 9.77 s<br>
60 images 512x512<br>
CPU times: user 264 ms, sys: 88 ms, total: 352 ms<br>
Wall time: 5min 43s<br>

In [188]:
import gc
gc.collect()

1208

In [189]:
#print files.map(extract).reduce(join_cats)
#files.map(partial(extract_1, broadcast_fits=broadcast_fits)).map(lambda x: [x]).reduce(lambda a, b: a + b)
#files.mapPartitions(rt_1).map(lambda x : [x]).reduce(lambda a, b: a + b)
#catalogs = files.map(extract_3).collect()
#len(catalogs)
#catalogs.map(lambda x: [x]).reduce(lambda a, b: a + b)
#catalogs.saveAsSequenceFile(cfg.get_paths()['catalogs'])

In [None]:
ppp = os.path.join(paths['catalogs'], 'GRB130427A-60sec-001_001.cat')
#catalog = aw.utils.ldac.get_table_from_ldac(catalogs_5[0])

In [191]:
sc.stop()