In [None]:
import math
import numpy as np
import numpy.typing as npt
import pandas as pd
from typing import List, Tuple
import matplotlib.pyplot as plt
from scipy.stats import norm


In [None]:
def orderOfMagnitude(number):
    if number == 0:
        return 0
    return math.floor(math.log(number, 10))

class value_with_error:
    def __init__(self,name: str, value: float, error: float) -> None:
        self.name = name
        self.value = value
        self.error = error

    def __str__(self) -> str:
        order = orderOfMagnitude(self.error)
        precision = max(np.abs(order) + 1, 3)
        return f"{self.name}: {self.value:.{precision}f}\u00B1{self.error:.{precision}f}"

    def __repr__(self) -> str:
        return str(self)

In [None]:
def independent_meas_linear_fit(n_param: int, x: List[float], y: List[float], y_error: List[float]) -> Tuple[List[float], List[float]]:
    C = np.ndarray((len(x), 0))
    for i in range(n_param):
        C = np.column_stack((C, x ** i))

    inter_res = C.T @ np.diag(1 / (y_error ** 2))
    var_param_est = np.linalg.inv(inter_res @ C)
    pararm_est = var_param_est @ inter_res @ y
    return pararm_est, np.diag(var_param_est) ** 0.5, C @ pararm_est
    

def sqrt_sum_of_sq(a: npt.ArrayLike) -> float:
    return np.sqrt(np.sum(np.array(a) ** 2))

def nsigma(expected: value_with_error, meas: value_with_error):
    return np.abs(expected.value-meas.value) / sqrt_sum_of_sq([expected.error, meas.error])

def error_combination(derivative: npt.ArrayLike, error: npt.ArrayLike) -> float:
    return sqrt_sum_of_sq(np.multiply(derivative, error))

def norm_hist(name: str, data: npt.ArrayLike) -> value_with_error:
    # Fit a normal distribution to the data:
    mu, std = norm.fit(data)

    # Plot the histogram
    plt.hist(data, bins=25, density=True, alpha=0.6, color='g')
    plt.xlabel(name)

    # Plot the PDF.
    xmin, xmax = plt.xlim()
    x = np.linspace(xmin, xmax, 100)
    p = norm.pdf(x, mu, std)
    plt.plot(x, p, 'k', linewidth=2)
    title = "Fit results: mu = %.4f,  std = %.4f" % (mu, std)
    plt.title(title)
    plt.show()

    return value_with_error(name+"_hist", mu, std)

def bootstrap_compare(fit: value_with_error, hist: value_with_error) -> float:
    res = np.abs(fit.value-hist.value) / fit.error
    
    print(fit)
    print(hist)
    print(f"bootstrap value comparison: {res}")
    print(f"bootstrap error comparison: fit error order e{orderOfMagnitude(fit.error)}, bootstrap error order e{orderOfMagnitude(hist.error)}")

    return res

def residue_plot(xlabel: str, ylabel: str, x: npt.ArrayLike, y: npt.ArrayLike, y_error: npt.ArrayLike, y_est: npt.ArrayLike):
    plt.grid()
    plt.errorbar(x=x, y=y-y_est, yerr=y_error, fmt='o', markersize=2)
    plt.xlabel(xlabel)
    plt.ylabel(ylabel)
    plt.axhline(y = 0, linestyle = '--')
    plt.show()

In [None]:
class microlensing:
    def __init__(self, event_url: str) -> None:
        self.data = pd.read_csv(event_url+"/phot.dat", sep='\s+', comment='#', usecols=range(3), names=["JHD", "magnitude", "magnitude_error"])
        self.param_pd = pd.read_csv(event_url+"/params.dat", sep='\s+', comment='#', index_col=0, skiprows=8, names=["param", "value", "error"])
        self._ogle_calc()
        self._data_calc()
        self.par_params = {}
            
    def _ogle_calc(self):
        self.ogle = {name: value_with_error(name, float(row["value"]), float(row["error"])) for name, row in self.param_pd.iterrows()}
        self.ogle["t0"] = self.ogle.pop("Tmax")
        self.ogle["m0"] = self.ogle.pop("I0")
        self.ogle["m_bl"] = self.ogle.pop("I_bl")
        self.ogle["Imax"] = self.ogle.pop("Amax")
        for name, value in self.ogle.items():
            value.name = name + "_ogle"

    def _data_calc(self):
        self.data["norm_time"] = self.data["JHD"] - np.min(self.data["JHD"])
        m0_ogle = self.ogle["m0"]
        self.data["I"] = 10 ** (-0.4 * (self.data["magnitude"]-m0_ogle.value))
        self.data["I_error"] = 0.4 * np.log(10) * self.data["I"] * np.sqrt(self.data["magnitude_error"] ** 2 + m0_ogle.error ** 2)

    def parabolic_fit(self, mid_range: float, range_len: float):
        min_range, max_range = mid_range-range_len, mid_range+range_len
        data_cut = self.data[(self.data["JHD"]<=max_range) & (self.data["JHD"]>=min_range)]
        a, std_a, est_for_x = independent_meas_linear_fit(n_param=3, x=data_cut["JHD"]-min_range, y=data_cut["I"], y_error=data_cut["I_error"])

        t0_par, Imax_par = self._extract_parabolic_params(a, std_a, min_range)
        self.par_params["t0"] = t0_par
        self.par_params["Imax"] = Imax_par

        # plot
        plt.errorbar(x=data_cut["JHD"], y=data_cut["I"], yerr=data_cut["I_error"], fmt='o', markersize=2)
        plt.plot(data_cut["JHD"], est_for_x)
        plt.grid()
        plt.show()

        # plot residuals
        residue_plot(xlabel="time [JHD]", ylabel="Residue I", x=data_cut["JHD"], y=data_cut["I"], y_error=data_cut["I_error"], y_est=est_for_x)

        # nsigma for ogle
        print("nsigma with ogle params:")
        print(self.ogle['t0'])
        print(self.par_params['t0'])
        print(f"nsigma: {nsigma(self.ogle['t0'], self.par_params['t0'])}")
        print()
        print(self.ogle['Imax'])
        print(self.par_params['Imax'])
        print(f"nsigma: {nsigma(self.ogle['Imax'], self.par_params['Imax'])}")

        self.data_cut=data_cut
        return self.par_params
            

    def _extract_parabolic_params(self, a: npt.ArrayLike, std_a: npt.ArrayLike, time_fix: float):
        # t0_par calc
        t0_par_value = -0.5 * a[1] / a[2] + time_fix
        t0_par_error = error_combination([0.5/a[2], 0.5 * a[1]/(a[2]**2)], std_a[1:])
        t0_par = value_with_error("t0_par", t0_par_value, t0_par_error)

        # Imax_par calc
        Imax_par_value = a[0] - 0.25 * (a[1] ** 2) / a[2]
        Imax_par_error = error_combination([1, 0.5*a[1]/a[2], 0.25*(a[1]/a[2])**2], std_a)
        Imax_par = value_with_error("Imax_par", Imax_par_value, Imax_par_error)

        return t0_par, Imax_par

    def bootstrap(self, mid_range: float, range_len: float, iter: int=10000):
        min_range, max_range = mid_range-range_len, mid_range+range_len
        data_cut = self.data[(self.data["JHD"]<=max_range) & (self.data["JHD"]>=min_range)]

        t0_list, Imax_list = [], []
        for _ in range(iter):
            sample = data_cut.sample(n=len(data_cut), replace=True)
            a, std_a, _ = independent_meas_linear_fit(n_param=3, x=sample["JHD"]-min_range, y=sample["I"], y_error=sample["I_error"])
            t0_par, Imax_par = self._extract_parabolic_params(a, std_a, min_range)
            t0_list.append(t0_par.value)
            Imax_list.append(Imax_par.value)

        t0_hist = norm_hist("t0", t0_list)
        Imax_hist = norm_hist("Imax", Imax_list)

        # compare to the original fit
        print("bootstrap:")
        bootstrap_compare(self.par_params["t0"], t0_hist)
        print()

        bootstrap_compare(self.par_params["Imax"], Imax_hist)
                
       

In [None]:
ms_event = microlensing("https://www.astrouw.edu.pl/ogle/ogle4/ews/2019/blg-0035")

In [None]:
mid_range=ms_event.data["JHD"][np.argmax(ms_event.data["I"])]
range_len=30
ms_event.parabolic_fit(mid_range, range_len)

In [None]:
ms_event.bootstrap(mid_range, range_len, iter=10000)