## Preprocessing PSF data for SNR/eigenPSF reconstruction tests

In [13]:
import sys
sys.path.append('../')
import numpy as np
import matplotlib.pyplot as plt
import matplotlib
import pickle
import random
from ShapePipe.shapepipe.pipeline import file_io

## Test parameters

- CCD = 38
- Nb of eigenPSF = 16
- Gaussian noise std dev =  0.4e-3 (SNR ~ 40dB)
- Total nb of stars (per set) = 50
- Train stars = 80% (40 stars)
- Test stars = 20% (10 stars)
- The star positions of the train/test set are defined at first (randomly) and are maintained throughout all the 50 star sets.

In [14]:
# Test parameters

CCDn = 38
eigenPSFn = 16
sigmaN = 0.4e-3 # SNR ~ 40dB
star_nb = 50
train_per = 0.8
train_star_nb = np.floor(train_per*star_nb).astype(int)
test_star_nb = star_nb - train_star_nb


In [15]:
# Import the data from the NPY extension
raw_ccd = np.load('../JB-data/npy-data/psf_ccd_38.npy', allow_pickle=True)
cat = raw_ccd.item()

# Take a look at the data
print(cat.keys())
print(cat['vignet'].shape)
print(cat['e1_true'].shape)
print(cat['e2_true'].shape)
print(cat['fwhm'].shape)
print(cat['CCD_n'].shape)
print(cat['X'].shape)
print(cat['Y'].shape)

total_nb_stars = cat['X'].shape[0]
xdim = cat['vignet'][0,:,:].shape[0]
ydim = cat['vignet'][0,:,:].shape[1]

dict_keys(['vignet', 'e1_true', 'e2_true', 'fwhm', 'CCD_n', 'X', 'Y'])
(24823, 51, 51)
(24823,)
(24823,)
(24823,)
(24823,)
(24823,)
(24823,)


In [10]:
# Define fits saving function

def save_fits(vignet, xwin, ywin, train_bool, CCDn, id_nb):
    # Save data into the FITS format extension
    data = {'VIGNET': vignet, 'XWIN_IMAGE': xwin, 'YWIN_IMAGE': ywin}
    
    train_path = '/Users/tliaudat/Documents/PhD/codes/venv_p3/tests/preprocessed_data/train/'
    test_path = '/Users/tliaudat/Documents/PhD/codes/venv_p3/tests/preprocessed_data/test/'
    train_pattern = 'train_star_selection'
    test_pattern = 'test_star_selection'
    
    number_scheme = "-%2d-%06d"%(CCDn,id_nb)
    ext = '.fits'
    
    if train_bool == True:
        saving_path = train_path + train_pattern + number_scheme + ext
    elif train_bool == False:
        saving_path = test_path + test_pattern + number_scheme + ext
        
    fits_file = file_io.FITSCatalog(saving_path,\
        open_mode = file_io.BaseCatalog.OpenMode.ReadWrite, SEx_catalog=True)
    fits_file.save_as_fits(data, \
        sex_cat_path = '/Users/tliaudat/Documents/PhD/codes/venv_p3/tests/preprocessed_data/example-fits.fits')
    

In [11]:
# Extract the different star sets (of 50 stars) from the overall star catalog

# Prepare the stars
X_grid = np.unique(cat['X']) # 5
Y_grid = np.unique(cat['Y']) # 10
star_id = np.zeros((total_nb_stars,), dtype=int)

# Identify each star with its id
for it in range(total_nb_stars):
    X_coor = np.where(X_grid == cat['X'][it])
    Y_coor = np.where(Y_grid == cat['Y'][it])
    
    star_id[it] = (10*X_coor[0][0] + Y_coor[0][0]).astype(int)
    
# Count how many stars are there at each position
star_quantity = np.zeros((star_nb,), dtype=int)
for it in range(star_nb):
    star_quantity[it] = np.where(star_id == it)[0].shape[0]
    
# Number of star sets to be created
nb_star_sets = np.min(star_quantity)

# Generate the star sets to be used
star_sets = np.zeros((star_nb,nb_star_sets), dtype=int)
star_counter = np.zeros((nb_star_sets,), dtype=int)

for it in range(total_nb_stars):
    star_position = star_id[it]
    star_set_nb = star_counter[star_position]
    
    if star_set_nb < nb_star_sets:
        star_sets[star_position,star_set_nb] = it
        star_counter[star_position] += 1
        
# Generate the random test positions in the star field
# The positions will be maintained throughout the star sets
rand_seq = np.random.randn(star_nb).argsort()
train_ids = rand_seq[0:train_star_nb]
test_ids = rand_seq[train_star_nb:]

In [12]:
# Start each set preparation

# Train sets
for it_set in range(nb_star_sets):
    myset_vignet = np.zeros((train_star_nb,xdim,ydim),dtype=np.float32)
    myset_X = np.zeros((train_star_nb,),dtype=np.float64)
    myset_Y = np.zeros((train_star_nb,),dtype=np.float64)
    
    for it_star in range(train_star_nb):
        id_tr = train_ids[it_star] # Get the right training id
        my_star = star_sets[id_tr,it_set] # Get the right star
        
        # Star position
        myset_X[it_star] = cat['X'][my_star]
        myset_Y[it_star] = cat['Y'][my_star]
        
        # Star image noise addition
        star_img = cat['vignet'][my_star,:,:]
        myNoise = sigmaN*np.random.randn(xdim,ydim)
        myset_vignet[it_star,:,:] = star_img + myNoise
    
    train_bool = True
    save_fits(myset_vignet, myset_X, myset_Y, train_bool, CCDn, it_set)

    
# Test sets
for it_set in range(nb_star_sets):
    myset_vignet = np.zeros((test_star_nb,xdim,ydim),dtype=np.float32)
    myset_X = np.zeros((test_star_nb,),dtype=np.float64)
    myset_Y = np.zeros((test_star_nb,),dtype=np.float64)
    
    for it_star in range(test_star_nb):
        id_tr = test_ids[it_star] # Get the right training id
        my_star = star_sets[id_tr,it_set] # Get the right star
        
        # Star position
        myset_X[it_star] = cat['X'][my_star]
        myset_Y[it_star] = cat['Y'][my_star]
        
        # Star image (without noise)
        myset_vignet[it_star,:,:] = cat['vignet'][my_star,:,:]
    
    train_bool = False
    save_fits(myset_vignet, myset_X, myset_Y, train_bool, CCDn, it_set)

In [22]:
# Save the position matrix for the correct plotting
np.save('X_unique',np.unique(cat['X']))
np.save('Y_unique',np.unique(cat['Y']))