In [None]:
import numpy as np
import matplotlib.pyplot as plt

from astropy.io import fits
from astropy.modeling import models, fitting

from scipy.signal import find_peaks
from scipy.optimize import curve_fit

# Custom fitter
from astropy.modeling.fitting import Fitter, model_to_fit_params
from astropy.modeling.statistic import leastsquare
from astropy.modeling.optimizers import SLSQP
from astropy.modeling.models import custom_model
from scipy import optimize

In [None]:
# Generate parameters of initial guesses dependeing on the number of desired Gaussian components
def guess_gen(spec, n):
    guess = []
    peaks, info = find_peaks(spec, height=0., prominence=0.2, width = 0)
    for i in range(len(peaks)):
        center = peaks[i]
        width = info['widths'][i] * 0.8
        for j in np.linspace(center - width, center + width, n):
            guess.append(spec[int(j)])
            guess.append(j)
            guess.append(width / n)
    return guess

In [None]:
# Custom joint fitter
class joint(Fitter):

    supported_constraints = ['bounds']

    def __init__(self, optimizer = SLSQP, statistics = leastsquare):
        super().__init__(optimizer, statistics)

    def objective_function_I(self, fps, *args):
        model = args[0]
        weight = args[1]
        meas_g = args[-1][0]
        model[:-1].parameters = fps
        res = (model[:-1](*args[2:-1]) - meas_g)**2 * weight * (100 / max(meas_g))
        return res
    
    def objective_function_V(self, fps, *args):
        model = args[0]
        weight = args[1]
        meas_g = args[-1][0]
        meas_v = args[-1][1]
        model.parameters = fps
        res = (model[:-1](*args[2:-1]) - meas_g)**2 * (100 / max(meas_g)) + (model(*args[2:-1]) - meas_v)**2 * (100 / max(meas_v))
        res *= weight
        return res
    
    def __call__(self, model, x, y , maxiter=1000, weights = None):

        if weights is None: weights = np.ones_like(y)

        self.bounds = model.bounds
        model_copy = model.copy()

        lower = [i[0] for i in self.bounds.values()]
        upper = [i[1] for i in self.bounds.values()]
        lower = lower[:-2]
        upper = upper[:-2]

        # Fit Stokes I first and the joint fit both Stokes I and V
        init_values, _, _ = model_to_fit_params(model_copy)
        optimize.least_squares(self.objective_function_I, init_values[:-2],
                                args=(model_copy, weights, x, y),
                                max_nfev=maxiter,
                                bounds = [lower, upper]
                                )

        lower = [i[0] for i in self.bounds.values()]
        upper = [i[1] for i in self.bounds.values()]

        init_values, _, _ = model_to_fit_params(model_copy)
        self.fitparams = optimize.least_squares(self.objective_function_V, init_values,
                                            args=(model_copy, weights, x, y),
                                            max_nfev=maxiter,
                                            bounds = [lower, upper]
                                            )
        print(self.fitparams)
        return model_copy

In [None]:
# Opens the FITS file and reads the data
name = 'DR21W'
I_hdu = fits.open(name + '_I.FITS')
I_d = np.squeeze(I_hdu[0].data)
I_h = I_hdu[0].header

V_hdu = fits.open(name + '_V.FITS')
V_d = np.squeeze(V_hdu[0].data)
V_h = V_hdu[0].header

# Reading the spectrum of the selected pixel
# Maser A
x = 147
y = 134
# Maser B
x = 128
y = 127
# plt.imshow(I_d[128, :, :], origin='lower')
np.unravel_index(I_d[128, 140:160, 130:150].argmax(), I_d[128, 140:160, 130:150].shape)

spec = I_d[:, x, y]
V = V_d[:, x, y]

In [None]:
guess = guess_gen(spec, 8)
# Visualize the initial guesses
# for i in range(0, len(guess), 3):
#     plt.axvline(guess[i+1], color='r')
# plt.show()

# Turning the guesses into a model
g = models.Gaussian1D(amplitude=guess[0], mean=guess[1], stddev=guess[2],
                    bounds={'amplitude': (0, np.max(spec)), 
                            'mean': (guess[1] - 3*guess[2], guess[-2] + 3*guess[-1]), 
                            'stddev': (0, len(spec))})

for i in range(3, len(guess), 3):
    g += models.Gaussian1D(amplitude=guess[i], mean=guess[i+1], stddev=guess[i+2],
                    bounds={'amplitude': (0, np.max(spec)), 
                            'mean': (guess[1] - 3*guess[2], guess[-2] + 3*guess[-1]), 
                            'stddev': (0, len(spec))})

# Custom model of the form V(nGauss(x))    
@custom_model
def StokesV(I, alpha = 1, beta = 1):
    """Beta term in V = alpha * I + beta * dI/dv"""

    return alpha * I + beta * np.gradient(I)

stokesv_g = g | StokesV()
stokesv_g[-1].alpha.bounds = (-np.inf, np.inf)
stokesv_g[-1].beta.bounds = (-np.inf, np.inf)

In [None]:
# Defining weight and apply fit
bound_multiplier = 5
left_bound = int(guess[1] - bound_multiplier * guess[2])
right_bound = int(guess[-2] + bound_multiplier * guess[-1])
error = np.std(np.concatenate((spec[:int(left_bound)], spec[int(right_bound):])), ddof = 1)
weight = np.concatenate((np.ones(left_bound), np.ones(right_bound - left_bound)*1000 , np.ones(len(spec) - right_bound)))

print(left_bound, right_bound, error, np.max(spec))

fit_joint = fitting.JointFitter([g, stokesv_g], 
                                {g: g.param_names, stokesv_g: stokesv_g[:-1].param_names}, 
                                g.parameters)

fit = joint()
fitted = fit(stokesv_g, np.arange(len(spec)), [spec, V], maxiter = 1000000, weights = weight)

# Plotting Stokes I
fig, axs = plt.subplots(2, 1, figsize=(15, 8), sharex=True)
axs[0].set(title=name + " Stokes I Fit", ylabel = "Flux Density (Jy)")
axs[0].step(np.arange(0, len(spec)), spec, color = 'black', label = 'Data', where = 'mid')
axs[0].plot(fitted[:-1](np.arange(len(spec))), label = 'Fit', color = 'blue')
axs[0].plot(spec - fitted[:-1](np.arange(len(spec))), 'x', markersize = 2, color = 'red', label = 'Residuals')

params = fitted.parameters
for i in range(0, len(params) - 2, 3):
    axs[0].plot(np.arange(len(spec)), models.Gaussian1D(amplitude=params[i], mean=params[i+1], stddev=params[i+2])(np.arange(len(spec))), label = f'Gauss{i//3}')
    print('Amp:' , f"{params[i]:.2f}", 'Center:', f"{params[i+1]:.2f}", 'Width:', f"{params[i+2]:.2f}", sep='\t')
print('Alpha:' , f"{params[-2]:.5f}", 'Beta:', f"{params[-1]:.5f}", sep='\t')
axs[0].legend()
axs[1].plot((spec - fitted[:-1](np.arange(len(spec)))))
print('Chi2:', np.sum((spec - fitted[:-1](np.arange(len(spec))))**2))
plt.show()

In [None]:
# Plotting Stokes V
tofit = V
noise = np.std(np.concatenate((V[:int(left_bound)], V[int(right_bound):])), ddof = 1)

alpha = fitted.parameters[-2]
beta = fitted.parameters[-1]

print("Alpha: ", alpha, "Beta: ", beta)

x = np.linspace(start = I_h['CRVAL3'], stop = I_h['CRVAL3'] + (I_h['NAXIS3'] - I_h['CRPIX3']) * I_h['CDELT3'], num = I_h['NAXIS3']) / 1e9
plt.figure(figsize = (16, 7))
fig, (ax1, ax2) = plt.subplots(2, 1, figsize= (15, 8), sharex = True, sharey = False )

V_beta = V - fitted[:-1](np.arange(len(spec))) * alpha

fit = fitted(np.arange(len(spec))) - fitted[:-1](np.arange(len(spec))) * alpha
print("Chi2:", np.sum((V-fit)**2))

ax1.plot(x, V_beta, 'x', markersize = 4, color = 'k', label = "Data")
ax1.errorbar(x, V_beta, yerr = noise, fmt = 'none', ecolor = 'k', elinewidth = 1, capsize = 2)
# ax1.plot(x, StokesValpha(I_fit, alpha), label = "Alpha")
ax1.plot(x, fit, color = 'red', label = "Fit")
ax1.set(title=name + " Stokes V Fit", ylabel = "Flux Density (Jy)")
# ax1.plot(x, (V - fit),'x', label = "Residual")
ax2.plot(x, (V_beta - fit), label = "Residual")
ax2.set(title="Fit Residual", xlabel="Frequency (GHz)", ylabel = "Flux Density (Jy)")
ax1.legend()
ax2.legend()
plt.show()