In [None]:
import torch
from torch.utils.data import DataLoader
import os
import numpy as np

In [5]:
import concurrent.futures
import itertools

In [None]:
class Fitting():
    def __init__(
        self,
        datatype,
        sigma=0.4, # None if unknown
        default_shift=1,
        batch_indices=(0,-1) # indices for the test data, in order to distribute jobs
    ):
        self.device = torch.device('cuda:3' if torch.cuda.is_available() else 'cpu')
        print(f'Using device: {self.device}')
        self.datatype = datatype
        if self.datatype=='SHO':
            from data_sho import damped_sho_np as func
            from data_sho import DataGenerator
        elif self.datatype=='SineGaussian':
            from data_sinegaussian import sine_gaussian_np as func
            from data_sinegaussian import DataGenerator
        elif self.datatype=='LIGO':
            pass # TODO: Implement LIGO data handling
        else:
            raise ValueError(f'Unknown datatype: {self.datatype}')
        self.func = func
        self.datadir = f'/ceph/submit/data/user/k/kyoon/KYoonStudy/ssm_regression/{self.datatype}'
        self.modeldir = os.path.join(self.datadir, 'models')
        self.test_dict = torch.load(os.path.join(self.datadir, 'test.pt'))
        self.test_data = DataGenerator(self.test_dict)
        self.test_dataloader = DataLoader(
            self.test_data,
            batch_size=1,
            shuffle=False
        )
        self.num_points=200
        self.n_repeats=10
        self.sigma = sigma

    def lmfit(self, max_events=None, max_workers=4):
        from lmfit import Model, Parameters
        model = Model(self.func)
        params = Parameters()
        params.add('shift', value=1.0, vary=False) # Default shift value
        if self.datatype == 'SHO':
            params.add('omega_0', min=0.1, max=1.9)
            params.add('beta', min=0., max=0.5)
        elif self.datatype == 'SineGaussian':
            params.add('f_0', min=0.1, max=1.9)
            params.add('tau', min=1., max=4.)
        elif self.datatype == 'LIGO':
            pass # TODO: Implement LIGO model parameter hints
        else:
            raise ValueError(f'Unknown datatype: {self.datatype}')

        def fit_one(args):
            idx, (theta_u, theta_s, data_u, data_s) = args
            t_vals_np = torch.linspace(start=-1, end=10, steps=self.num_points)
            y = data_u[0][0].to(device='cpu')
            y_np = y.numpy()
            result = model.fit(y_np, params, t=t_vals_np)
            return idx, result.fit_report()

        # Prepare data iterator (limit events if max_events is set)
        data_iter = enumerate(self.test_dataloader)
        if max_events is not None:
            data_iter = itertools.islice(data_iter, max_events)

        with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
            futures = [executor.submit(fit_one, args) for args in data_iter]
            for future in concurrent.futures.as_completed(futures):
                idx, report = future.result()
                print(f'Fit result for event {idx}: {report}')

    def bilby(self, nlive=1000, sampler='dynesty', max_events=None, max_workers=4):
        import bilby
        from bilby.core.prior import Uniform
        from bilby.core.likelihood import GaussianLikelihood
        priors = {}
        if self.datatype=='SHO':
            priors['omega_0'] = Uniform(0.1, 1.9, name='omega_0', latex_label='$\omega_0$')
            priors['beta'] = Uniform(0, 0.5, name='beta', latex_label='$\beta$')
            injection_parameters = dict(omega_0=1., beta=0.3, shift=1)
        elif self.datatype=='SineGaussian':
            priors['f_0'] = Uniform(0.1, 1.9, name='f_0', latex_label='$f_0$')
            priors['tau'] = Uniform(1., 4., name='tau', latex_label='$\tau$')
            injection_parameters = dict(f_0=1., tau=2.5, shift=1)
        def bilby_one(args):
            idx, (theta_u, theta_s, data_u, data_s) = args
            y = data_u[0][0].to(device='cpu')
            t_vals_np = self.t_vals.numpy()
            y_np = y.numpy()
            log_l = GaussianLikelihood(t_vals_np, y_np, self.func, sigma=self.sigma)
            result = bilby.run_sampler(
                likelihood=log_l, priors=priors, sampler=sampler,
                nlive=nlive, npool=4, save=True, clean=True,
                injection_parameters=injection_parameters,
                output_dir=self.savedir,
                label=self.datatype
            )
            return idx, result.fit_report()
    
        data_iter = enumerate(self.test_dataloader)
        if max_events is not None:
            data_iter = itertools.islice(data_iter, max_events)

        with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
            futures = [executor.submit(bilby_one, args) for args in data_iter]
            for future in concurrent.futures.as_completed(futures):
                idx, report = future.result()
                print(f'Fit result for event {idx}: {report}')

In [4]:
fitting = Fitting(datatype='SHO')
fitting.lmfit()

Using device: cuda:3


  self.test_dict = torch.load(os.path.join(self.datadir, 'test.pt'))


tensor([0.5934, 0.0266, 1.0000])


  warn("Using UFloat objects with std_dev==0 may give unexpected results.")


Fit result for event 0: [[Model]]
    Model(damped_sho_np)
[[Fit Statistics]]
    # fitting method   = leastsq
    # function evals   = 53
    # data points      = 200
    # variables        = 2
    chi-square         = 29.0780669
    reduced chi-square = 0.14685892
    Akaike info crit   = -381.666638
    Bayesian info crit = -375.070004
    R-squared          = 0.70230752
[[Variables]]
    shift:    1 (fixed)
    omega_0:  0.60066423 +/- 0.00720122 (1.20%) (init = 0.1)
    beta:     3.9003e-06 +/- 0.00691021 (177170.59%) (init = 0)
tensor([0.1395, 0.2813, 1.0000])
Fit result for event 1: [[Model]]
    Model(damped_sho_np)
[[Fit Statistics]]
    # fitting method   = leastsq
    # function evals   = 68
    # data points      = 200
    # variables        = 2
    chi-square         = 33.0872371
    reduced chi-square = 0.16710726
    Akaike info crit   = -355.833949
    Bayesian info crit = -349.237314
    R-squared          = 0.35799721
[[Variables]]
    shift:    1 (fixed)
    omega_0:

  spercent = f'({abs(par.stderr/par.value):.2%})'


tensor([1.6439, 0.3293, 1.0000])
Fit result for event 3354: [[Model]]
    Model(damped_sho_np)
[[Fit Statistics]]
    # fitting method   = leastsq
    # function evals   = 44
    # data points      = 200
    # variables        = 2
    chi-square         = 31.4390650
    reduced chi-square = 0.15878316
    Akaike info crit   = -366.053228
    Bayesian info crit = -359.456593
    R-squared          = 0.24701097
[[Variables]]
    shift:    1 (fixed)
    omega_0:  1.47083257 +/- 0.07820797 (5.32%) (init = 0.1)
    beta:     0.30067978 +/- 0.05231973 (17.40%) (init = 0)
tensor([0.6873, 0.4113, 1.0000])
Fit result for event 3355: [[Model]]
    Model(damped_sho_np)
[[Fit Statistics]]
    # fitting method   = leastsq
    # function evals   = 41
    # data points      = 200
    # variables        = 2
    chi-square         = 30.6393181
    reduced chi-square = 0.15474403
    Akaike info crit   = -371.206655
    Bayesian info crit = -364.610020
    R-squared          = 0.36059065
[[Variables]]
 

In [None]:
import torch
from torch.utils.data import DataLoader
from scipy.optimize import curve_fit
import numpy as np

# Example model function for curve fitting (replace with your actual model)
def model_func(x, a, b, c):
    return a * np.exp(-b * x) + c

# Load .pt files (assuming each file contains a tensor of samples)
# Replace 'your_data.pt' with your actual file names
data = torch.load('your_data.pt')  # shape: (N, features)
targets = torch.load('your_targets.pt')  # shape: (N,)

# Create DataLoader
dataset = TensorDataset(data, targets)
dataloader = DataLoader(dataset, batch_size=1, shuffle=False)

results = []

for x_tensor, y_tensor in dataloader:
    x = x_tensor.numpy().flatten()
    y = y_tensor.numpy().flatten()
    try:
        # Fit the model to the data
        popt, pcov = curve_fit(model_func, x, y, maxfev=10000)
        perr = np.sqrt(np.diag(pcov))  # uncertainties
        results.append({'params': popt, 'uncertainties': perr})
    except Exception as e:
        results.append({'params': None, 'uncertainties': None, 'error': str(e)})

# results now contains fitted parameters and uncertainties for each sample