In [None]:
from pathlib import Path
import os

from astropy.nddata import CCDData
from astropy.visualization import hist
from astropy.stats import mad_std
import ccdproc as ccdp
import matplotlib.pyplot as plt
import numpy as np

from convenience_functions import show_image

In [None]:
def find_nearest_dark_exposure(
        image,
        dark_exposure_times,
        tolerance=0.5
    ) -> float:
    """
    Find the nearest exposure time of a dark frame to the exposure time of the image,
    raising an error if the difference in exposure time is more than tolerance.
    
    Parameters
    ----------
    
    image : astropy.nddata.CCDData
        Image for which a matching dark is needed.
    
    dark_exposure_times : list
        Exposure times for which there are darks.
    
    tolerance : float or ``None``, optional
        Maximum difference, in seconds, between the image and the closest dark. Set
        to ``None`` to skip the tolerance test.
    
    Returns
    -------
    
    float
        Closest dark exposure time to the image.
    """

    dark_exposures = np.array(list(dark_exposure_times))
    idx = np.argmin(np.abs(dark_exposures - image.header['exptime']))
    closest_dark_exposure = dark_exposures[idx]

    if (tolerance is not None and 
        np.abs(image.header['exptime'] - closest_dark_exposure) > tolerance):
        
        raise RuntimeError('Closest dark exposure time is {} for flat of exposure '
                           'time {}.'.format(closest_dark_exposure, a_flat.header['exptime']))
        
    
    return closest_dark_exposure

def subtract_bias(
        frames : ccdp.ImageFileCollection,
        master_bias : CCDData,
        output_path : Path
    ) -> ccdp.ImageFileCollection:
    """
    Subtract bias from one of more frames

    Parameters
    ----------
    frames : ccdproc.ImageFileCollection
        Frames from which to subtract bias
    combined_bias : astropy.nddata.CCDData
        Combined/Calibrated bias data
    output_path : pathlib.Path
        Output directory to save CCD data
    

    Returns
    -------
    ccdproc.ImageFileCollection
        Calibrated frames

    """
    if(output_path is not None):
        output_path.mkdir(exist_ok=True)
    
    for ccd, file_name in frames.ccds(return_fname=True, save_location=output_path, unit='adu'):
        ccd = ccdp.subtract_bias(ccd, master_bias, unit='adu')
        ccd.write()
        
    return frames

def subtract_dark(
        frames : ccdp.ImageFileCollection,
        master_dark : CCDData,
        output_path: Path
    ) -> ccdp.ImageFileCollection:
    """
    Subtract dark from one of more frames

    Parameters
    ----------
    frames : ccdproc.ImageFileCollection
        Frames from which to subtract dark
    combined_bias : astropy.nddata.CCDData
        Combined/Calibrated bias data
    output_path : pathlib.Path
        Output directory to save CCD data
    

    Returns
    -------
    ccdproc.ImageFileCollection
        Calibrated frames

    """
    if(output_path is not None):
        output_path.mkdir(exist_ok=True)
    
    for ccd, file_name in frames.ccds(return_fnamt=True, save_location=output_path, unit='adu'):
        ccd = ccdp.subtract_dark(ccd, master_dark, unit='adu', exposure_time='exptime', exposure_unit=u.second, scale=True)
        ccd.write()
    
    return frames

def filtered_image_collection(
        path: Path,
        imagetyp: str):
    fits_files = ccdp.ImageFileCollection(path, find_fits_by_reading=True)
    paths = filenames=fits_files.files_filtered(imagetyp=imagetyp, include_path=False)
    frames = ccdp.ImageFileCollection(location=path, filenames=[str(i) for i in paths], find_fits_by_reading=False)
    
    return frames
        
def calibrate_bias(
        science_frame : CCDData,
        bias_path : Path
    ) -> CCDData:
    """
    Calibrate and combine bias frames to create a master bias frame

    Parameters
    ----------
    science_frame : astropy.nddata.CCDData
        Science frame
    bias_path : pathlib.Path
        directory to search for bias frames

    Returns
    -------
    astropy.nddata.CCDData
        Calibrated master bias frame

    """
    fits_files = ccdp.ImageFileCollection(location=bias_path, find_fits_by_reading=False, glob_include='*.fits')
    bias_frames = fits_files.files_filtered(imagetyp='bias', include_path=True)
    
    combined_bias = ccdp.combine(
            bias_frames,
            method='average',
            sigma_clip=True,
            sigma_clip_low_thresh=5,
            sigma_clip_high_thresh=5,
            sigma_clip_func=np.ma.median,
            sigma_clip_dev_func=mad_std,
            mem_limit=350e6,
            unit='adu'
    )

    combined_bias.meta['combined'] = True
    
    return combined_bias
    
def calibrate_dark(
        science_frame : CCDData,
        dark_path : Path=Path.cwd(),
        master_bias : CCDData=None
    ) -> ccdp.ImageFileCollection:
    """
    Calibrate and combine dark frames to create master dark frame(s)

    Parameters
    ----------
    science_frame : astropy.nddata.CCDData
        Science frame
    dark_path : pathlib.Path
        directory to search for dark frames
    master_bias : astropy.nddata.CCDData


    Returns
    -------
    ccdproc.ImageFileCollection
        Calibrated master dark frames

    """   
    fits_files = ccdp.ImageFileCollection(dark_path, find_fits_by_reading=False, glob_include='*.fits').filter(imagetyp='dark')
    filenames = []
    
    if(master_bias is not None):
        fits_files = subtract_bias(fits_files, master_bias, Path('.') / 'dark-temp')

    dark_frames = fits_files.files_filtered(imagetyp='dark', include_path=True)

    dark_times = set(fits_files.summary['exptime'])
    print(dark_times)
    
    for exp_time in sorted(dark_times):
        calibrated_darks = fits_files.files_filtered(imagetyp='dark', exptime=exp_time, include_path=True)

        combined_dark = ccdp.combine(
                calibrated_darks,
                method='average',
                sigma_clip=True,
                sigma_clip_low_thresh=5,
                sigma_clip_high_thresh=5,
                sigma_clip_func=np.ma.median,
                sigma_clip_dev_func=mad_std,
                mem_limit=350e6,
                unit='adu'
        )

        combined_dark.meta['combined'] = True

        dark_file_name = dark_path / 'combined_dark_{:6.3f}.fits'.format(exp_time)
        filenames.append(dark_file_name)
        
        combined_dark.write(dark_file_name, overwrite=True)
        
    return ccdp.ImageFileCollection(location=dark_path, filenames=filenames, find_fits_by_reading=False)
        
def calibrate_flat(
        science_frame : CCDData,
        flat_path : Path,
        bias_path : Path,
        dark_path : Path
    ) -> ccdp.ImageFileCollection:

    fits_files = ccdp.ImageFileCollection(location=flat_path, find_fits_by_reading=True).filter(imagetyp='flat')
    
    for ccdp, file_name in fits_files.ccds():
        master_bias = calibrate_bias(ccdp, bias_path)
        master_dark = calibrate_dark(ccdp, dark_path)
        
        ccdp.subtract_bias(ccdp, master_bias, unit='adu')
        ccdp.subtract_dark(ccdp, master_dark, unit='adu', exposure_time='exptime', exposure_unit=u.second, scale=True)
    
    return fits_files

def generate_cache_path(
        science_frame : CCDData,
        base_path : Path
    ) -> Path:
    """
    """
    xdim = science_frame.header['naxis1']
    ydim = science_frame.header['naxis2']
    
    cache_path = base_path / '.cache' / '{0}x{1}'.format(xdim, ydim)
    
    return cache_path

def generate_master_bias(
        science_frame : CCDData,
        bias_path : Path,
        use_cache : bool=True
    ) -> CCDData:
    """
    """
    cache_path = generate_cache_path(science_frame, bias_path) / 'bias'
    cache_file = cache_path / 'master.fits'

    if use_cache and cache_file.is_file():
        ccd = CCDData.read(cache_file)
            
        if ccd is not None:
            return ccd
    
    cache_path.mkdir(parents=True, exist_ok=True)
    
    ccd = calibrate_bias(science_frame, bias_path)
    
    if ccd is not None:
        ccd.write(cache_file)
        
    return ccdp

def generate_master_dark(
        science_frame : CCDData,
        bias_path : Path,
        dark_path : Path,
        use_cache : bool=True
    ) -> CCDData:
    """
    """
    cache_path = generate_cache_path(science_frame, bias_path) / 'dark'
    cache_file = cache_path / 'master.fits'
    
    if use_cache and cache_file.is_file():
        ccd = CCDData.read(cache_file)
        
        if ccd is not None:
            return ccd
    
    cache_path.mkdir(parents=True, exist_ok=True)
    
    ccd = calibrate_dark(science_frame, bias_path, dark_path)

    if ccd is not None:
        ccd.write(cache_file)
    
    return ccd
    
def generate_master_flat(
        science_frame : CCDData,
        bias_path : Path,
        dark_path : Path,
        flat_path : Path,
        use_cache : bool=True
    ) -> CCDData:
    """
    """
    cache_path = generate_cache_path(science_frame, bias_path) / 'flat'
    cache_file = cache_path / 'master.fits'
    
    if use_cache and cache_file.is_file():
        ccd = CCDData.read(cache_file)
        
        if ccd is not None:
            return ccd
    
    cache_path.mkdir(parents=True, exist_ok=True)
    
    ccd = calibrate_flat(science_frame, bias_path, dark_path, flat_path)

    if ccd is not None:
        ccd.write(cache_file)
    
    return ccd
    
    
def calibrate_science(
        science_frame : CCDData,
        flat_path : Path,
        bias_path : Path,
        dark_path : Path
    ) -> CCDData:
    """
    """
    master_bias = generate_master_bias(science_frame, bias_path)
    master_dark = generate_master_dark(science_frame, bias_path, dark_path)
    master_flat = generate_master_flat(science_frame, bias_path, dark_path, flat_path)
    
    if master_bias is not None:
        science_frame = ccdp.subtract_bias(science_frame, master_bias, unit='adu')
    if master_dark is not None:
        science_frame = ccdp.subtract_dark(science_frame, master_dark, unit='adu', exposure_time='exptime', exposure_unit=u.second, scale=True)
    if master_flat is not None:
        science_frame = ccdp.flat_correct(science_frame, master_flat)
    
    return science_frame
    
    

In [None]:
path = Path('/home/localadmin/2020-04-13')

science_frame = CCDData.read(path / 'PIRATE_262279_OSL_00_ds4336_CRTS_J144508_6_050514_00_B300_00_2020_04_13_01_04_39.fits', unit='adu')
bias_dir = path / 'Calibration'
dark_dir = path / 'Calibration'

science_frame = calibrate_science(science_frame, None, bias_dir, dark_dir)
