# Manual reduction script - long-slit spectra

## Configuration

In [None]:
#dir = 'e:/Astro/Captures/20231008_Void/'
dir = 'E:/Astro/Captures/20161006_tp_spectro_L3/'
#bias = 'Bias*.fit'
bias = 'offset-*.fit'
#darks = 'dark*.fit'
darks = 'noir-600-*.fit'
flats = 'flat*.fit'
#flats = 'flat*.fit'
sciences = 'hd158460-*.fit'
#sciences = 'deneb*.fit'
calibs = 'neon-*.fit'

trim_region = '600, 600, 2700, 1400'              # x1, y1, x2, y2 
#trim_region = None

masterbias = 'masterbias.fit'
masterdark = 'masterdark.fit'
masterflat = 'masterflat.fit'
sciencedata = 'hd158460.fit'
mastercalib = 'mastercalib.fit'

overwrite_masters = True         # recreate masters or not ?
memory_limit = 1e9               # how much memory (bytes) to allocate to combine operations ?

camera_electronic_gain = 1.2     # asi 183mm
camera_readout_noise = 2.2       # asi 183mm

## Imports libs

In [None]:
%matplotlib widget
import warnings, fnmatch, os
from time import gmtime, strftime
from pathlib import Path

import numpy as np
import matplotlib.pyplot as plt

from astropy.table import Table, QTable
from astropy import units as u
from astropy.nddata import CCDData, StdDevUncertainty
from astropy.stats import mad_std
from astropy.io import fits
from astropy.utils.exceptions import AstropyWarning

from ccdproc import Combiner, combine, subtract_bias, subtract_dark, flat_correct
from ccdproc import trim_image, Combiner, ccd_process, cosmicray_median

from align_combine import align_and_combine

warnings.simplefilter('ignore', category=AstropyWarning)
#warnings.simplefilter('ignore', category=FITSFixedWarning)


## Create logger

In [None]:
from logger_utils import logger, handler
handler.show_logs()
handler.clear_logs()
logger.setLevel('INFO')

## Create masters

### Bias

In [None]:
### prepare filenames lists
bias_files = [dir + f for f in fnmatch.filter(os.listdir(dir), bias)]
logger.info('bias files to combine: ' + repr(bias_files))

### create masterbias
bias_list =  []
if (os.path.exists(dir + masterbias)) and (overwrite_masters is False):
    logger.info(masterbias + ' not (re)created')
else:
    #bias_list = [CCDData.read(f, unit = u.adu ) for f in bias_files]
    for bias_file in bias_files:
        logger.info('bias trim {} ...'.format(bias_file))
        _bias_data = CCDData.read(bias_file, unit = u.adu )

        _bias_trim = _bias_data
        if trim_region is not None:
            _bias_trim = trim_image(_bias_data[eval(trim_region)[1]:eval(trim_region)[3], eval(trim_region)[0]:eval(trim_region)[2]]) 

        bias_trim = CCDData(_bias_trim.data.astype('float32'), unit = u.adu , header = _bias_data.header)            
        bias_list.append(bias_trim)
        _bias_data = None

    logger.info('masterbias combine...')
    #_bias_master = combine(bias_list, method = 'median', dtype = np.float32, mem_limit = memory_limit)   
    _bias_master = combine(bias_list,
                             method='average',
                             sigma_clip=True, sigma_clip_low_thresh=5, sigma_clip_high_thresh=5,
                             sigma_clip_func=np.ma.median, signma_clip_dev_func=mad_std, dtype = np.float32,
                             mem_limit = memory_limit
                            )

    bias_master = CCDData(_bias_master.data.astype('float32'), unit = u.adu , header = _bias_master.header)

    #logger.info('masterbias nomalization...')
    #bias_master.data = (bias_master.data - 1) / (10 - 1)
    
    bias_master.meta['DATE'] = strftime("%Y-%m-%dT%H:%M:%S", gmtime())
    bias_master.meta['combined'] = True
    bias_master.meta['history'] = 'combined from {} bias file(s)'.format(len(bias_files))
    bias_master.write(dir + masterbias, overwrite=True)
    logger.info('masterbias created - combined from {} bias file(s)'.format(len(bias_files)))
    bias_list = None
    _bias_master = None


### Darks

In [None]:
### prepare filenames lists
dark_files = [dir + f for f in fnmatch.filter(os.listdir(dir), darks)]
logger.info('dark files to combine : ' + repr(dark_files))

### create masterdark
darks_list = []
if (os.path.exists(dir + masterdark)) and (overwrite_masters is False):
    logger.info(masterdark + ' not (re)created')
else:
    for dark_file in dark_files:
        _dark_data = CCDData.read(dark_file, unit = u.adu )
        _dark_trim = _dark_data
        if trim_region is not None:
            logger.info('dark trim {} ...'.format(dark_file))
            _dark_trim = trim_image(_dark_data[eval(trim_region)[1]:eval(trim_region)[3], eval(trim_region)[0]:eval(trim_region)[2]]) 

        logger.info('masterbias sub {} ...'.format(dark_file))
        _dark_sub = subtract_bias(_dark_trim, bias_master)
        dark_sub = CCDData(_dark_sub.data.astype('float32'), unit = u.adu , header = _dark_data.header)
        darks_list.append(dark_sub)
        _dark_data = None
        _dark_sub = None
        _dark_trim = None
    
    logger.info('masterdark combine...')
    #dark_master = combine(darks_list, method='median', dtype=np.uint32, mem_limit = memory_limit)
    _dark_master = combine(darks_list,
                                 method='average',
                                 sigma_clip=True, sigma_clip_low_thresh=5, sigma_clip_high_thresh=5,
                                 sigma_clip_func=np.ma.median, signma_clip_dev_func=mad_std, dtype=np.float32,
                                 mem_limit=memory_limit
                                )
    
    dark_master = CCDData(_dark_master.data.astype('float32'), unit = u.adu , header = _dark_master.header)
    dark_master.meta['DATE'] = strftime("%Y-%m-%dT%H:%M:%S", gmtime())
    dark_master.meta['combined'] = True
    dark_master.meta['history'] = 'masterbias sub + combined from {} dark file(s)'.format(len(dark_files))
    dark_master.write(dir + masterdark, overwrite=True)
    logger.info('masterdark created - bias substracted + combined from {} dark file(s)'.format(len(dark_files)))
    darks_list = None
    _dark_master = None


### Flats

In [None]:
### prepare filenames lists
flat_files = [dir + f for f in fnmatch.filter(os.listdir(dir), flats)]
logger.info('flat files to combine: ' + repr(flat_files))

### create masterflat
flats_list = []

def inv_median(a):
    return 1 / np.median(a)
    
if (os.path.exists(dir + masterflat)) and (overwrite_masters is False):
    logger.info(masterflat + ' not (re)created')
else:
    for flat_file in flat_files:
        _flat_data = CCDData.read(flat_file, unit = u.adu )
        _flat_trim = _flat_data
        if trim_region is not None:
            logger.info('flat trim {} ...'.format(flat_file))
            _flat_trim = trim_image(_flat_data[eval(trim_region)[1]:eval(trim_region)[3], eval(trim_region)[0]:eval(trim_region)[2]]) 

        logger.info('masterbias sub {} ...'.format(flat_file))
        _flat_sub = subtract_bias(_flat_trim, bias_master)
        flat_sub = CCDData(_flat_sub.data.astype('float32'), unit = u.adu , header = _flat_data.header)
        flats_list.append(flat_sub)
        _flat_data = None
        _flat_sub = None
        _flat_trim = None
    
    logger.info('masterflat combine...')
    _flat_master = combine(flats_list, method='sum', dtype=np.float32, mem_limit = memory_limit)
    #_flat_master = combine(flats_list,
     #                            method='average', #scale=inv_median,
      #                           sigma_clip=True, sigma_clip_low_thresh=5, sigma_clip_high_thresh=5,
       #                          sigma_clip_func=np.ma.median, signma_clip_dev_func=mad_std, dtype=np.float32,
        #                         mem_limit=memory_limit
         #                       )
    flat_master = CCDData(_flat_master.data.astype('float32'), unit = u.adu , header = _flat_master.header)
    flat_master.meta['DATE'] = strftime("%Y-%m-%dT%H:%M:%S", gmtime())
    flat_master.meta['combined'] = True
    flat_master.meta['history'] = 'bias substracted + combined from {} flat file(s)'.format(len(flat_files))
    flat_master.write(dir + masterflat, overwrite=True)
    logger.info('masterflat created : bias substracted + combined from {} flat file(s)'.format(len(flat_files)))
    flats_list = None
    _flat_master = None


### Calibration

In [None]:
calib_files = [dir + f for f in fnmatch.filter(os.listdir(dir), calibs)]
logger.info('calibration files to combine: ' + repr(calib_files))

### create mastercalib
calibs_list = []
   
if (os.path.exists(dir + mastercalib)) and (overwrite_masters is False):
    logger.info(mastercalib + ' not (re)created')
else:
    for calib_file in calib_files:
        _calib_data = CCDData.read(calib_file, unit = u.adu )
        _calib_trim = _calib_data
        if trim_region is not None:
            logger.info('calib trim {} ...'.format(calib_file))
            _calib_trim = trim_image(_calib_data[eval(trim_region)[1]:eval(trim_region)[3], eval(trim_region)[0]:eval(trim_region)[2]]) 
        
        logger.info('masterbias sub {} ...'.format(calib_file))
        _calib_sub = subtract_bias(_calib_trim, bias_master)
        calib_sub = CCDData(_calib_sub.data.astype('float32'), unit = u.adu , header = _calib_data.header)
        calibs_list.append(calib_sub)
        _calib_data = None
        _calib_sub = None
    
    logger.info('mastercalib combine...')
    #calib_master = combine(calib_list, method='median', dtype=np.float32, mem_limit = memory_limit)
    _calib_master = combine(calibs_list,
                                 method='average', 
                                 sigma_clip=True, sigma_clip_low_thresh=5, sigma_clip_high_thresh=5,
                                 sigma_clip_func=np.ma.median, signma_clip_dev_func=mad_std, dtype=np.float32,
                                 mem_limit=memory_limit
                                )
    calib_master = CCDData(_calib_master.data.astype('float32'), unit = u.adu , header = _calib_master.header)
    calib_master.meta['DATE'] = strftime("%Y-%m-%dT%H:%M:%S", gmtime())
    calib_master.meta['combined'] = True
    calib_master.meta['history'] = 'bias substracted + combined from {} calib file(s)'.format(len(calib_files))
    calib_master.write(dir + mastercalib, overwrite=True)
    logger.info('mastercalib created : bias substracted + combined from {} calib file(s)'.format(len(calib_files)))
    calib_list = None
    _calib_master = None


## Process science files

### Reduce

In [None]:
### prepare filenames lists
science_files  = [dir + f for f in fnmatch.filter(os.listdir(dir), sciences)]
logger.info('science files : ' + repr(science_files))

if overwrite_masters is False:
    bias_master = CCDData.read(dir + masterbias, unit = u.adu )
    dark_master = CCDData.read(dir + masterdark, unit = u.adu )
    flat_master = CCDData.read(dir + masterflat, unit = u.adu )

"""
### process science files - individual science file process version
for sci_file in science_files:
    logger.info('masterbias sub {} ...'.format(sci_file))
    #sci_data = CCDData.read(sci_file, unit = u.adu )
    sci_sub_bias = subtract_bias(CCDData.read(sci_file, unit = u.adu ), bias_master)
    #sci_data = None
    logger.info('masterdark sub {} ...'.format(sci_file))
    sci_sub_dark = subtract_dark(sci_sub_bias, dark_master, scale = True, exposure_time='EXPTIME', exposure_unit=u.second)
    sci_sub_bias = None
    logger.info('masterflat divide {} ...'.format(sci_file))
    sci_div_flat = flat_correct(sci_sub_dark, flat_master)
    sci_sub_dark = None
    sci_ok = CCDData(sci_div_flat.data.astype('float32'), unit = u.adu , header = sci_div_flat.header)
    sci_div_flat = None
    logger.info('writing science file (-ppr) {}'.format(sci_file))
    sci_ok.meta['DATE'] = strftime("%Y-%m-%dT%H:%M:%S", gmtime())
    sci_ok.meta['history'] = 'bias substracted + dark substrated + flat divide'
    sci_ok.write(Path(sci_file).with_stem(f"{'ppr'}-{Path(sci_file).stem}"), overwrite=True)
"""

### process science files - ccd_process version
for sci_file in science_files:
    sci_data = CCDData.read(sci_file, unit = u.adu )
    _sci_trim = sci_data
    if trim_region is not None:
        logger.info('science trim {} ...'.format(calib_file))
        _sci_trim = trim_image(sci_data[eval(trim_region)[1]:eval(trim_region)[3], eval(trim_region)[0]:eval(trim_region)[2]]) 
    
    logger.info('processing (masterbias sub + masterdark sub + masterflat divide) {} ...'.format(sci_file))
    sci_data_processed = ccd_process(_sci_trim, 
            oscan = None, 
            gain_corrected = False, 
            trim = None, 
            error = False,
            gain = camera_electronic_gain*u.electron/u.adu ,
            readnoise = camera_readout_noise*u.electron,
            master_bias = bias_master,
            dark_frame = dark_master,
            master_flat = flat_master,
            exposure_key = 'EXPTIME',
            exposure_unit = u.second,
            dark_scale = True)
    sci_ok = CCDData(sci_data_processed.data.astype('float32'), unit = u.adu , header = sci_data.header)
    sci_data = None
    logger.info('writing science datafile (-ppr) {}'.format(sci_file))
    sci_ok.meta['DATE'] = strftime("%Y-%m-%dT%H:%M:%S", gmtime())
    sci_ok.meta['history'] = 'bias substracted + dark substrated + flat divide'
    sci_ok.write(Path(sci_file).with_stem(f"{'ppr'}-{Path(sci_file).stem}"), overwrite=True)


### Align and combine

In [None]:
if overwrite_masters is False:
    bias_master = CCDData.read(dir + masterbias, unit = u.adu )
    dark_master = CCDData.read(dir + masterdark, unit = u.adu )
    flat_master = CCDData.read(dir + masterflat, unit = u.adu )


### align images and combine using a SUM
science_files_ppr  = [dir + f for f in fnmatch.filter(os.listdir(dir), 'ppr-*' + sciences)]
logger.info('science files aligning and combining ... : ' + repr(science_files_ppr))
sci_aligned = align_and_combine(science_files_ppr, np.sum)
sci_master = CCDData(sci_aligned, unit = u.adu , header = sci_ok.header)
sci_master.meta['DATE'] = strftime("%Y-%m-%dT%H:%M:%S", gmtime())
sci_master.meta['combined'] = True
sci_master.meta['history'] = 'aligned + combined from {} sciences file(s)'.format(len(science_files))
sci_master.write(dir + 'ppr-' + sciencedata, overwrite=True)
logger.info('aligned + combined from {} sciences file(s)'.format(len(science_files)))


### Remove cosmic rays

In [None]:
if overwrite_masters is False:
    bias_master = CCDData.read(dir + masterbias, unit = u.adu )
    dark_master = CCDData.read(dir + masterdark, unit = u.adu )
    flat_master = CCDData.read(dir + masterflat, unit = u.adu )

### remove cosmic
logger.info('removing cosmic rays...')
sci_cleaned = cosmicray_median(sci_master, thresh = 5, mbox=11, rbox=11, gbox=5, error_image = np.ones(sci_master.shape))
#sci_cleaned = cosmicray_median(sci_master, thresh = 3, mbox=5, rbox=5, gbox=5, error_image = None)
sci_cleaned.meta['DATE'] = strftime("%Y-%m-%dT%H:%M:%S", gmtime())
sci_cleaned.meta['history'] = 'cosmicray median removed'
sci_cleaned.write(dir + 'final-' + sciencedata, overwrite=True)
logger.info('final-' + sciencedata + ' created')


## Display

In [None]:
master_bias = CCDData.read(dir + masterbias, unit = u.adu)
master_dark = CCDData.read(dir + masterdark, unit = u.adu)
master_flat = CCDData.read(dir + masterflat, unit = u.adu)
science_files  = [dir + f for f in fnmatch.filter(os.listdir(dir), sciences)]
ppr_science_files  = [dir + f for f in fnmatch.filter(os.listdir(dir), 'ppr-' + sciences)]
one_science = CCDData.read(science_files[0], unit = u.adu)
one_science_ppr = CCDData.read(ppr_science_files[0], unit = u.adu)
master_science = CCDData.read(dir + 'ppr-' + sciencedata, unit = u.adu)
master_science_cosmic = CCDData.read(dir + 'final-' + sciencedata, unit = u.adu)
master_calib = CCDData.read(dir + mastercalib, unit = u.adu)

#cmap = plt.cm.magma
colormap = plt.cm.magma
cuts = (5, 99.90)

fig, ((ax1, ax2), (ax3, ax4), (ax5, ax6), (ax7, ax8)) = plt.subplots(4, 2,figsize=(15, 15))
#fig.subplots_adjust(bottom=1, top=1.1, left=1, right=1.1)
fig.canvas.toolbar_position = 'bottom'


im1 = ax1.imshow(master_bias, origin='lower', aspect='auto', cmap=colormap)
ax1.xaxis.set_visible(False)
ax1.yaxis.set_visible(False)
im1.set_clim(np.percentile(master_bias, cuts))
ax1.set_title('master_bias')
cb = plt.colorbar(im1)

im2 = ax2.imshow(master_dark, origin='lower', aspect='auto', cmap=colormap)
ax2.xaxis.set_visible(False)
ax2.yaxis.set_visible(False)
im2.set_clim(np.percentile(master_dark, cuts))
ax2.set_title('master_dark')
cb = plt.colorbar(im2)

im3 = ax3.imshow(master_flat, origin='lower', aspect='auto', cmap=colormap)
ax3.xaxis.set_visible(False)
ax3.yaxis.set_visible(False)
im3.set_clim(np.percentile(master_flat, cuts))
ax3.set_title('master_flat')
cb = plt.colorbar(im3)

im4 = ax4.imshow(master_calib, origin='lower', aspect='auto', cmap=colormap)
ax4.xaxis.set_visible(False)
ax4.yaxis.set_visible(False)
im4.set_clim(np.percentile(master_calib, cuts))
ax4.set_title('master_calib')
cb = plt.colorbar(im4)

im5 = ax5.imshow(one_science, origin='lower', aspect='auto', cmap=colormap)
ax5.xaxis.set_visible(False)
ax5.yaxis.set_visible(False)
im5.set_clim(np.percentile(one_science, cuts))
ax5.set_title('one_science_raw')
cb = plt.colorbar(im5)

im6 = ax6.imshow(one_science_ppr, origin='lower', aspect='auto', cmap=colormap)
ax6.xaxis.set_visible(False)
ax6.yaxis.set_visible(False)
im6.set_clim(np.percentile(one_science_ppr, cuts))
ax6.set_title('one_science_processed')
cb = plt.colorbar(im6)

im7 = ax7.imshow(master_science, origin='lower', aspect='auto', cmap=colormap)
ax7.xaxis.set_visible(False)
ax7.yaxis.set_visible(False)
im7.set_clim(np.percentile(master_science, cuts))
ax7.set_title('master_science')
cb = plt.colorbar(im7)

im8 = ax8.imshow(master_science_cosmic.data , origin='lower', aspect='auto', cmap=colormap)
ax8.xaxis.set_visible(False)
ax8.yaxis.set_visible(False)
im8.set_clim(np.percentile(master_science_cosmic.data, cuts))
ax8.set_title('master_science_cosmicray_removed')
cb = plt.colorbar(im8)


## Create spectra 

### Extract science spectra

In [None]:
from astropy.modeling import models
from specreduce import tracing, background, extract
#sci_tr = tracing.FlatTrace(master_science, 686)   #FitTrace(image, peak_method='gaussian', guess=trace_pos)
sci_tr = tracing.FitTrace(master_science,  bins = 64, trace_model=models.Polynomial1D(degree=1), peak_method = 'gaussian')
#trace_model : one of Chebyshev1D, Legendre1D, Polynomial1D, or Spline1D
#peak_method : One of gaussian, centroid, or max. gaussian
bg = background.Background.two_sided(master_science, sci_tr, separation=80, width=50) 
extract = extract.BoxcarExtract(master_science - bg, sci_tr, width = 15)

sci_spectrum = extract()



In [None]:
cuts = (5, 99.0)
plt.figure(figsize=(10,6))
im = plt.imshow(master_science, origin='lower', aspect='auto', cmap=plt.cm.magma)
im.set_clim(np.percentile(master_science, cuts))

plt.imshow(bg.bkg_wimage, origin='lower', aspect='auto', cmap=plt.cm.gray, alpha=0.1)
plt.imshow(sci_tr.image.data, origin='lower', aspect='auto', cmap=plt.cm.gray, alpha=0.1)
plt.plot(sci_tr.trace , color='r')
plt.plot(sci_tr.trace + extract.width , color='g', linestyle='dashed', alpha=0.5)
plt.plot(sci_tr.trace - extract.width , color='g', linestyle='dashed', alpha=0.5)
cb = plt.colorbar()

plt.figure(figsize=(10,6))
plt.plot(sci_spectrum.flux, color='b',  linewidth = '0.6')


### Extract calibration spectra

In [None]:

neon_data = CCDData.read(dir + mastercalib, unit = u.adu)
plt.figure(figsize=(10,6))
plt.imshow(neon_data, origin='lower', aspect='auto', cmap=plt.cm.grey)
plt.title('neon_data')
plt.clim(np.percentile(neon_data, (10, 90)))
cb = plt.colorbar()

In [None]:
from specreduce import tracing, background, extract
#specreduce.tracing.ArrayTrace(image: NDData, trace: ndarray)
neon_tr = tracing.FlatTrace(neon_data,sci_tr.trace[int(sci_tr.shape[0] / 2.0)])   #FitTrace(image, peak_method='gaussian', guess=trace_pos)
#neon_tr = tracing.ArrayTrace(neon_data, sci_tr.trace[int(sci_tr.shape[0] / 2.0)])   #FitTrace(image, peak_method='gaussian', guess=trace_pos)
#bg = background.Background.two_sided(nccd, sci_tr, separation=50, width=50) 
extract = extract.BoxcarExtract(neon_data , neon_tr, width = 5)
neon_spectrum = extract()
plt.figure(figsize=(10,6))
plt.plot(neon_spectrum.flux, color='b',  linewidth = '0.6')


### Calibrate calibration spectra (StarEx 2400)

In [None]:
import astropy.units as u
from specreduce import WavelengthCalibration1D

pixels = [868, 1276, 2342, 3635, 4263]*u.pix
wavelength = [6506.53, 6532.88, 6598.95, 6678.28, 6717.04]*u.AA
line_list = QTable([pixels, wavelength], names=["pixel_center", "wavelength"])
test_cal = WavelengthCalibration1D(neon_spectrum, matched_line_list=line_list)
print(test_cal.residuals )
print(test_cal.fitted_model )
neon_calibrated_spectrum = test_cal.apply_to_spectrum(neon_spectrum)

plt.figure(figsize=(10,6))
plt.plot(neon_calibrated_spectrum.spectral_axis, neon_calibrated_spectrum.flux)  


### Calibrate calibration spectra (DADOS 200)

In [None]:
import astropy.units as u
from specreduce import WavelengthCalibration1D

pixels = [868, 1276, 2342, 3635, 4263]*u.pix
wavelength = [6506.53, 6532.88, 6598.95, 6678.28, 6717.04]*u.AA
line_list = QTable([pixels, wavelength], names=["pixel_center", "wavelength"])
test_cal = WavelengthCalibration1D(neon_spectrum, matched_line_list=line_list)
print(test_cal.residuals )
print(test_cal.fitted_model )
neon_calibrated_spectrum = test_cal.apply_to_spectrum(neon_spectrum)

plt.figure(figsize=(10,6))
plt.plot(neon_calibrated_spectrum.spectral_axis, neon_calibrated_spectrum.flux)  


### Apply to science spectra

In [None]:
sci_calibrated_spectrum = test_cal.apply_to_spectrum(sci_spectrum)

plt.figure(figsize=(10,6))
plt.plot(sci_calibrated_spectrum.spectral_axis, sci_calibrated_spectrum.flux)  


## Analyse spectra

### Identify lines

### Measure R, SNR, Flux, EW