In [None]:
import astroalign as aa
import ccdproc as ccdp
import csv
import glob
import matplotlib.pyplot as plt
import numpy as np
import os
import pandas as pd
import warnings

from astropy import units as u
from astropy.coordinates import SkyCoord
from astropy.io import fits
from astropy.nddata import CCDData
from astropy.stats import mad_std
from astropy.time import Time
from astropy.utils.exceptions import AstropyWarning
from astropy.visualization import ZScaleInterval
from copy import deepcopy
from itertools import combinations
from photutils.aperture import CircularAperture, CircularAnnulus, aperture_photometry
from photutils.detection import DAOStarFinder
from photutils.utils import calc_total_error
from scipy.ndimage import shift
from scipy.spatial import cKDTree
from tqdm.notebook import tqdm

***Path***

In [None]:
# dictionaire des chemins ammenant aux images de sciences et de calibration avant et après traitement de l'image.

path = 'E:/Documents/Cours_et_administratif/FAC/Master/M2/ohp/data/'

path_dict = { 'bias' : '2024-09-23-lyon-mtp/bias/', 'flat_r' : '2024-09-24-lyon-mtp/flat/filtre_r/', 'flat_g' : '2024-09-24-lyon-mtp/flat/filtre_g/',
              'dark' : '2024-09-23-lyon-mtp/dark/', 'baseline_23' : '2024-09-23-lyon-mtp/baseline/', 'baseline_24' : '2024-09-24-lyon-mtp/baseline/',
              'obs_sec_r' : '2024-09-23-lyon-mtp/observation_secondaire/', 'obs_sec_g' : '2024-09-24-lyon-mtp/observation_secondaire/',
              'obs_pri_r' : '2024-09-24-lyon-mtp/observation_primaire/', 'baseline_23_t' : '2024-09-23-lyon-mtp_t/baseline/' , 
              'baseline_24_t' : '2024-09-24-lyon-mtp_t/baseline/', 'obs_sec_r_t' : '2024-09-23-lyon-mtp_t/observation_secondaire/',
              'obs_sec_g_t' : '2024-09-24-lyon-mtp_t/observation_secondaire/', 'obs_pri_r_t' : '2024-09-24-lyon-mtp_t/observation_primaire/' }

print(path+path_dict['bias'])

***Master bias***

In [None]:
ifc_raw = ccdp.ImageFileCollection(path+path_dict['bias'])
ifc_bias = ifc_raw.filter(imagetyp='Bias Frame')
# on ouvre le fichier où les bias se situent et par précaution on utilise un filtre qui ne prend que les bias

warnings.filterwarnings("ignore", category=AstropyWarning)  
# permet d'eviter certains warnings lié au problème de configuration de date dans le fichier fits, pas nécéssaire

master_bias = ccdp.combine(ifc_bias.files_filtered(include_path=True), unit='adu', combine='median', mem_limit=9000.e6)
# On fait la mediane des bias puisque le reforidissement du ccd à pas été mis très bas et qu'il y a des risque 
# de rayon cosmique dans les images de sciences

warnings.filterwarnings("default")  
# on réactive les warnings

master_bias.meta['COMBINED'] = True
# on enregistre dans les métadonnées de l'images qu'il s'agit d'une images combinée

master_bias.write(path+'master_bias.fit', overwrite=True) 
# sauvegarde du master bias

***Master flat***

In [None]:
ifc_raw = [ ccdp.ImageFileCollection(path+path_dict['flat_r']), ccdp.ImageFileCollection(path+path_dict['flat_g']) ]
ifc_flatH = [ ifc_raw[0].filter(imagetyp='Flat Field'), ifc_raw[1].filter(imagetyp='Flat Field') ]
# on ouvre les deux fichiers où les flat pour les deux différent filtres se situent et par précaution on utilise un filtre qui ne prend que les flats

master_bias = ccdp.CCDData.read(path+'master_bias.fit', unit='adu')
# on ouvre le master_bias

master_flatH = [None,None]
# initialisation des variables

for i in range(len(ifc_flatH)):
    warnings.filterwarnings("ignore", category=AstropyWarning)
    # permet d'eviter certains warnings lié au problème de configuration de date dans le fichier fits, pas nécéssaire
    
    master_flatH[i] = ccdp.combine(ifc_flatH[i].files_filtered(include_path=True), unit='adu', combine='average', mem_limit=9000.e6)
    # pour les flat on peux à la fois utiliser la medianne et la moyenne, la différence n'est pas très marqué
    
    warnings.filterwarnings("default")
    # on réactive les warnings
    
    master_flatH[i].meta['COMBINED'] = True
    # on enregistre dans les métadonnées de l'images qu'il s'agit d'une images combinée
    
    master_flatH[i] = ccdp.subtract_bias(master_flatH[i], master_bias)
    # on supprimes les trâces propres au phénonèmes impliqué dans les bias pour n'avoir que ceux des flat

master_flatH[0].write(path + 'master_flatH_r.fit', overwrite=True)
master_flatH[1].write(path + 'master_flatH_g.fit', overwrite=True)
# sauvegarde des master flat

***Master dark***

In [None]:
ifc_raw = ccdp.ImageFileCollection(path+path_dict['dark'])
ifc_dark = ifc_raw.filter(imagetyp='Dark Frame')
# on ouvre le fichier où les dark se situent et par précaution on utilise un filtre qui ne prend que les dark

master_bias = ccdp.CCDData.read(path+'master_bias.fit', unit='adu')
# on ouvre le master_bias

warnings.filterwarnings("ignore", category=AstropyWarning)
# permet d'eviter certains warnings lié au problème de configuration de date dans le fichier fits, pas nécéssaire

master_dark = ccdp.combine(ifc_dark.files_filtered(include_path=True), unit='adu', combine='median', mem_limit=9000.e6)
# On fait la mediane des bias puisque le reforidissement du ccd à pas été mis très bas et qu'il y a des risque 
# de rayon cosmique dans les images de sciences

warnings.filterwarnings("default")
# on réactive les warnings

master_dark.meta['COMBINED'] = True
# on enregistre dans les métadonnées de l'images qu'il s'agit d'une images combinée

master_dark = ccdp.subtract_bias(master_dark, master_bias)
# on supprimes les trâces propres au phénonèmes impliqué dans les bias pour n'avoir que ceux des dark

master_dark.write(path+'master_dark.fit', overwrite=True)
# sauvegarde du master bias

***Traitement image***

In [None]:
input = ['baseline_23', 'baseline_24', 'obs_sec_r', 'obs_sec_g','obs_pri_r' ]
output = ['baseline_23_t', 'baseline_24_t', 'obs_sec_r_t', 'obs_sec_g_t','obs_pri_r_t' ]
# initialisation des différent chemin

master_bias = ccdp.CCDData.read(path+'master_bias.fit', unit='adu')
master_dark = ccdp.CCDData.read(path+'master_dark.fit', unit='adu')
master_flatH_r = ccdp.CCDData.read(path+'master_flatH_r.fit', unit='adu')
master_flatH_g = ccdp.CCDData.read(path+'master_flatH_g.fit', unit='adu')
# ouverture des images de calibration

ccd_gain = 1.47 * u.electron / u.adu
ccd_ron = 11 * u.electron
# charactéristique du ccd

for collection in input:
    ifc_raw = ccdp.ImageFileCollection(path+path_dict[collection])
    ifc_sci = ifc_raw.filter(imagetyp='Light Frame')
    # on ouvre le fichier où les images de sciences se situent et par précaution on utilise un filtre qui ne prend que les images de sciences
    # on boucle sur l'ensemble des chemin où les images de sciences sont placé
    
    for image in ifc_sci.summary['file']:
        warnings.filterwarnings("ignore", category=AstropyWarning)
        # permet d'eviter certains warnings lié au problème de configuration de date dans le fichier fits, pas nécéssaire
        
        name = os.path.split(path+path_dict[collection]+image)[1]
        print(path+path_dict[output[input.index(collection)]]+name)
        # chaque image est pris une par une
        
        ccd_sci = ccdp.CCDData.read(image, unit='adu')
        # ouverture de l'image
        
        ccd_sci_b = ccdp.subtract_bias(ccd_sci, master_bias)
        if ccd_sci.header.get('FILTER', 'unknown') == "r'":
            ccd_sci_bf = ccdp.flat_correct(ccd_sci_b, master_flatH_r)
        if ccd_sci.header.get('FILTER', 'unknown') == "g'":
            ccd_sci_bf = ccdp.flat_correct(ccd_sci_b, master_flatH_g)
        ccd_sci_bfd = ccdp.subtract_dark(ccd_sci_bf, master_dark, exposure_time=10, exposure_unit=u.second)
        ccd_sci_bfdg = ccdp.gain_correct(ccd_sci_bfd, ccd_gain)
        # correction des différente images de callibrations
        
        ccd_sci_bfdg.write(path+path_dict[output[input.index(collection)]]+name, overwrite=True)
        # sauvegarde images corrigées
        
        warnings.filterwarnings("default")
        # on réactive les warnings

***Extraction***

In [None]:
# Fonction pour lister tous les fichiers .fit et .fits dans un dossier donné
def list_fits_files(folder):
    return [os.path.join(folder, f) for f in os.listdir(folder) if f.endswith(('.fit', '.fits'))]

# Fonction pour calculer les distances entre les étoiles
def calculate_distances(star_positions):
    distances = []
    for (x1, y1), (x2, y2) in combinations(star_positions, 2):
        distance = np.sqrt((x2 - x1)**2 + (y2 - y1)**2)
        distances.append(distance)
    return distances

data_path = ['baseline_23_t','baseline_24_t','obs_sec_r_t','obs_sec_g_t','obs_pri_r_t']

for data_set in data_path:
    # Initialisation des paramètres
    folder_path = os.path.join(path, path_dict[data_set])
    fits_files = list_fits_files(folder_path)
    # ouverture des data set de fits
    
    num_stars_to_extract = 4
    aperture_radius = 80
    annulus_inner_radius = 130
    annulus_outer_radius = 200
    # initialisation des appeture, annulus et nombres d'étoiles d'extraction + l'étoiles d'étude
    
    intensity_data = []
    normalized_intensities = []
    target_intensities = []
    reference_intensities = []
    target_uncertainties = []
    reference_uncertainties = []
    normalized_uncertainties = []
    observation_dates = []
    filters_used = []
    reference_distances = None
    # Initialiser des listes pour stocker les résultats
    
    for fits_file in tqdm(fits_files, desc="Traitement des images FITS"):
        is_consistent = True
        # variable détectant la consistence d'extraction du flux d'une étoile à une autres
        
        print(f"Traitement du fichier : {fits_file}")
        
        with fits.open(fits_file) as hdu:
            header = hdu[0].header
            date_obs = header.get('DATE-OBS', None)
            filter_used = header.get('FILTER', 'unknown')
            filters_used.append(filter_used)
            # extraction des métadonnées du fichier
    
            image_data = CCDData(hdu[0].data, unit='adu')
            read_noise = header.get('RDNOISE', 0)
            bkg_error = np.std(image_data.data)
            error_map = calc_total_error(image_data.data, bkg_error, read_noise)
            # extraction de l'image et de l'erreur
    
        mean, median, std = np.mean(image_data.data), np.median(image_data.data), mad_std(image_data.data)
        daofind = DAOStarFinder(fwhm=20.0, threshold=5.*std)
        sources = daofind(image_data.data - median)
        # Détection des étoiles
    
        if sources is None or len(sources) < num_stars_to_extract:
            print(f"Pas assez d'étoiles détectées dans {fits_file}")
            is_consistent = False
            intensity_data.append([np.nan] * num_stars_to_extract)
            continue
            # on vérifie que le nombre d'étoiles détecté dans l'images est au minmum le nombres d'étoiles à détecter, si on ne détecte pas assez d'étoiles on saut l'images
    
        sorted_sources = sources[np.argsort(sources['flux'])[::-1]]
        star_positions = np.array([(x, y) for x, y in zip(sorted_sources['xcentroid'], sorted_sources['ycentroid'])])
        star_positions = star_positions[:num_stars_to_extract]
        # extraction des position des 4 étoiles les plus brillantes de l'images
    
        apertures = CircularAperture(star_positions, r=aperture_radius)
        annuli = CircularAnnulus(star_positions, r_in=annulus_inner_radius, r_out=annulus_outer_radius)
        # initialisation des apertures et annulus
    
        current_distances = calculate_distances(star_positions)
        # calcul de la distances entres les 4 premières étoiles
    
        if reference_distances is None:
            reference_distances = current_distances
            print(f"Distances de référence initialisées pour {fits_file}: {reference_distances}")
            # initialisation des distance de référence, on suppose que les distance dans la première images sont correcte
        
        else:
            distance_changes = [abs(current - reference) for current, reference in zip(current_distances, reference_distances)]
            max_change = max(distance_changes)
            if max_change > 10:
                print(f"Incohérence détectée dans {fits_file}")
                is_consistent = False
                # si la différence de distance entre deux valeur des distances et marqué alors ça implique qu'une étoiles n'ai pas la bonne d'une images à une autres on saut l'image
    
        if is_consistent:
            phot_table = aperture_photometry(image_data.data, apertures, error=error_map)
            intensities = phot_table['aperture_sum'][:num_stars_to_extract]
            errors = phot_table['aperture_sum_err'][:num_stars_to_extract]
            # extraction de l'intensité et de l'erreur pour l'image importé
        
            if len(intensities) < num_stars_to_extract:
                intensities = np.pad(intensities, (0, num_stars_to_extract - len(intensities)), constant_values=np.nan)
                errors = np.pad(errors, (0, num_stars_to_extract - len(errors)), constant_values=np.nan)
        
            target_intensity = intensities[1]
            target_uncertainty = errors[1]
            reference_intensity = np.mean([intensities[0], intensities[2], intensities[3]])
            reference_uncertainty = np.sqrt(np.sum(errors[[0, 2, 3]]**2)) / 3
            # extraction des intensités et erreur
            
            if (np.min([intensities[0], intensities[2], intensities[3]])-mean)>100000:
                # on vérifie que les étoiles minimales n'atteigne pas une limite minimale à partir de laquelles l'intensité sature
    
                if date_obs is not None:
                    observation_dates.append(Time(date_obs).jd)
            
                target_intensities.append(target_intensity)
                target_uncertainties.append(target_uncertainty)
                reference_intensities.append(reference_intensity)
                reference_uncertainties.append(reference_uncertainty)
            
                normalized_value = target_intensity / reference_intensity
                normalized_uncertainty = normalized_value * np.sqrt((target_uncertainty / target_intensity)**2 + (reference_uncertainty / reference_intensity)**2)
            
                normalized_intensities.append(normalized_value)
                normalized_uncertainties.append(normalized_uncertainty)
                intensity_data.append(intensities)
                # extraction des valeurs
            
                print(f"Intensités extraites pour {fits_file}")
    
        else:
            print(f"Incohérence des étoiles visée, l'intensité n'as pas été extraite pour {fits_file}")
    
    intensity_data = np.array(intensity_data)
    column_names = ['Étoile 1', 'RT And', 'Étoile 3', 'Étoile 4']
    intensity_df = pd.DataFrame(intensity_data, columns=column_names)
    # on ordonnes les intensité en fonction des étoiles
    
    # Écriture des résultats dans un fichier CSV
    with open(folder_path + '_light_curve_data.csv', 'w', newline='') as csvfile:
        csvwriter = csv.writer(csvfile)
        csvwriter.writerow(['observation_date', 'target_intensity', 'target_intensity_uncertainties', 'reference_intensity', 
                            'reference_intensity_uncertainties', 'normalized_intensity', 'normalized_intensity_uncertainties', 'filter'])
        for i in range(len(observation_dates)):
            csvwriter.writerow([observation_dates[i], 
                                target_intensities[i],
                                target_uncertainties[i],
                                reference_intensities[i], 
                                reference_uncertainties[i],
                                normalized_intensities[i], 
                                normalized_uncertainties[i],
                                filters_used[i]])
    
    # Tracer le graphique des intensités
    plt.figure()
    plt.plot(observation_dates, target_intensities, marker='o', linestyle=' ', color='red', label='Target Star Intensity')
    plt.plot(observation_dates, reference_intensities, marker='o', linestyle=' ', color='blue', label='Reference Star Intensity')
    plt.xticks(rotation=45, ha='right')
    plt.xlabel('Date of Observation (JD)')
    plt.ylabel('Intensity')
    plt.title(f'Light Curve of the Target Star (Filter: {filters_used[0]})')
    plt.legend()
    plt.tight_layout()
    plt.savefig(folder_path + '_light_curve.png', dpi=300)
    plt.show()
    
    # Tracer la courbe de lumière normalisée
    plt.figure()
    plt.plot(observation_dates, normalized_intensities, marker='o', linestyle=' ', color='red', label='Normalized Light Curve')
    plt.xticks(rotation=45, ha='right')
    plt.xlabel('Date of Observation (JD)')
    plt.ylabel('Normalized Intensity')
    plt.title(f'Normalized Light Curve of the Target Star (Filter: {filters_used[0]})')
    plt.legend()
    plt.tight_layout()
    plt.savefig(folder_path + '_light_curve_normalised.png', dpi=300)
    plt.show()
