In [None]:
import numpy as np
from scipy.integrate import solve_ivp
import pysindy as ps
from scipy.signal import savgol_filter
import matplotlib.pyplot as plt
from scipy.io import loadmat
from sklearn.metrics import mean_squared_error
import random
from scipy.optimize import minimize
import cvxpy as cp

In [None]:
data = loadmat("data/burgers.mat")
time = np.ravel(data["t"])
x = np.ravel(data["x"])
u = np.real(data["usol"])
dt = time[1] - time[0]
dx = x[1] - x[0]
#u_noise = add_noise(u, 20)
#u_noise = np.reshape(u, (len(x), len(time), 1))

In [None]:
def add_noise(data, percentage=0):
    rmse = mean_squared_error(u, np.zeros(data.shape), squared=False)
    data = data + np.random.normal(0, (percentage/100)*rmse, u.shape)
    return data

def savgol_denoise_1D(data, window_length=11,poly_order=3):
    denoised_data = savgol_filter(data, window_length, poly_order)  
    return denoised_data

def l1_trend_filter(y, lamb=0):
    n = len(y)
    x = cp.Variable(n)
    # Construct 2nd order difference matrix D2
    D = np.diff(np.eye(n), axis=0)
    D2 = np.diff(D, axis=0)
    # Define the objective
    objective = cp.Minimize(0.5 * cp.norm(x - y, 2)**2 + lamb * cp.norm(D2 @ x, 1))
    # Solve the problem
    problem = cp.Problem(objective)
    # Try using another solver
    try:
        problem.solve()
    except cp.SolverError:
        try:
            problem.solve(solver=cp.SCS)
        except cp.SolverError:
            print("SCS also failed, trying another solver...")
            problem.solve(solver=cp.OSQP)
    return x.value

In [None]:
def burgers_pareto_l1_lambda(u):
    lambdas = np.logspace(-5, 1, 50)
    solution_norms = []
    residual_norms = []

    for lambda_reg in lambdas:
        u_denoised = np.zeros_like(u)
        # Apply L1 Trend Filter
        for j in range(u.shape[1]):
            u_denoised[:, j, 0] = l1_trend_filter(u[:, j, 0], lambda_reg)
        residual = u - u_denoised
        solution_norms.append(np.linalg.norm(u_denoised))
        residual_norms.append(np.linalg.norm(residual))
    
    solution_norms = np.array(solution_norms)
    residual_norms = np.array(residual_norms)
    curvature = np.gradient(np.gradient(solution_norms, residual_norms), residual_norms)
    max_curvature_idx = np.argmax(np.abs(curvature))
    optimal_lambda = lambdas[max_curvature_idx]
    
    return optimal_lambda

In [None]:
def burgers_pareto_savgol_w(u):
    windows = [i for i in range(7, 99, 2)]
    solution_norms = []
    residual_norms = []

    for w in windows:
        u_denoised = np.zeros_like(u)
        # Apply savgol filter
        for j in range(u.shape[1]):
            u_denoised[:, j, 0] = savgol_denoise_1D(u[:, j, 0], w, 3)
        residual = u - u_denoised
        solution_norms.append(np.linalg.norm(u_denoised))
        residual_norms.append(np.linalg.norm(residual))
    
    solution_norms = np.array(solution_norms)
    residual_norms = np.array(residual_norms)
    curvature = np.gradient(np.gradient(solution_norms, residual_norms), residual_norms)
    max_curvature_idx = np.argmax(np.abs(curvature))
    optimal_w = windows[max_curvature_idx]
    
    return optimal_w

In [None]:
num_iterations = 500

noise_levels = [0,5,10,15,20,25,30,35,40,45,50,55,60,65,70,75,80,85,90,95,100]
threshold = 0.0005

true_coeffs = {'x0_11': 0.1, 'x0x0_1': -1.0}

for p in noise_levels: 
    
    success_count = 0
    success_count_savgol = 0
    success_count_l1 = 0
    success_count_savl1 = 0
    success_count_l1sav = 0
    
    coeff_errors = []
    coeff_errors_savgol = []
    coeff_errors_l1 = []
    coeff_errors_savl1 = []
    coeff_errors_l1sav = []
    
    wrong_terms_list = []
    wrong_terms_savgol_list = []
    wrong_terms_l1_list = []
    wrong_terms_savl1_list = []
    wrong_terms_l1sav_list = []
    
    print(f"Testing noise level: {p}")

    for _ in range(num_iterations):
        
        data = loadmat("data/burgers.mat")
        time = np.ravel(data["t"])
        x = np.ravel(data["x"])
        u = np.real(data["usol"])
        dt = time[1] - time[0]
        dx = x[1] - x[0]

        u = add_noise(u, p)
        u = np.reshape(u, (len(x), len(time), 1))
        
        # Define weak form PDE library
        library_functions = [lambda x: x, lambda x: x * x]
        library_function_names = [lambda x: x, lambda x: x + x]

        # Need to define the 2D spatiotemporal grid before calling the library
        X, T = np.meshgrid(x, time)
        XT = np.asarray([X, T]).T
        pde_lib = ps.WeakPDELibrary(
            library_functions=library_functions,
            function_names=library_function_names,
            derivative_order=2,
            spatiotemporal_grid=XT,
            is_uniform=True,
            K=1000,
        )
        
        u_savgol = np.empty_like(u)
        u_l1 = np.empty_like(u)
        u_savl1 = np.empty_like(u)
        u_l1sav = np.empty_like(u)
        
        u_pareto_lambda = burgers_pareto_l1_lambda(u)
        u_pareto_w = burgers_pareto_savgol_w(u)
    
        # Apply Savitzky-Golay filter
        for j in range(u.shape[1]):
            u_savgol[:, j, 0] = savgol_denoise_1D(u[:, j, 0], u_pareto_w, 3)

        # Apply L1 Trend Filter
        for j in range(u.shape[1]):
            u_l1[:, j, 0] = l1_trend_filter(u[:, j, 0], u_pareto_lambda)
   
        savgol_pareto_lambda = burgers_pareto_l1_lambda(u_savgol)
        l1_pareto_w = burgers_pareto_savgol_w(u_l1)
        #print(f"savgol_pareto_lambda: {savgol_pareto_lambda}")
        # Apply combined Savitzky-Golay and L1 Trend Filter (both directions)
        for j in range(u.shape[1]):
            u_savl1[:, j, 0] = l1_trend_filter(u_savgol[:, j, 0], savgol_pareto_lambda)
        for j in range(u.shape[1]):
            u_l1sav[:,j,0] = savgol_denoise_1D(u_l1[:, j, 0], l1_pareto_w, 3)
            
               
        optimizer = ps.SR3(
            threshold=0.1, thresholder="l0", tol=1e-8, normalize_columns=True, max_iter=1000
        )   
        
        model = ps.SINDy(feature_library=pde_lib, optimizer=optimizer)
        model_savgol = ps.SINDy(feature_library=pde_lib, optimizer=optimizer)
        model_l1 = ps.SINDy(feature_library=pde_lib, optimizer=optimizer)
        model_savl1 = ps.SINDy(feature_library=pde_lib, optimizer=optimizer)
        model_l1sav = ps.SINDy(feature_library=pde_lib, optimizer=optimizer)
        
        model.fit(u, quiet=True)
        coeffs = model.coefficients()
        #print("no filter")
        #model.print()
        model_savgol.fit(u_savgol, quiet=True)
        coeffs_savgol = model_savgol.coefficients()
        #print("savgol")
        #model_savgol.print()
        model_l1.fit(u_l1, quiet=True)
        coeffs_l1 = model_l1.coefficients()
        #print("l1")
        #model_l1.print()
        model_savl1.fit(u_savl1, quiet=True)
        coeffs_savl1 = model_savl1.coefficients()
        #print("savl1")
        #model_savl1.print()
        model_l1sav.fit(u_l1sav, quiet=True)
        coeffs_l1sav = model_l1sav.coefficients()
        #print("l1sav")
        #model_l1sav.print()
        
        all_terms = model.get_feature_names()

        # For the original model
        non_zero_indices = np.where(np.abs(coeffs) > threshold)[1] # the [1] is used since coeffs is 2D
        active_terms = [all_terms[i] for i in non_zero_indices]
        # For the savgol model
        non_zero_indices_savgol = np.where(np.abs(coeffs_savgol) > threshold)[1]
        active_terms_savgol = [all_terms[i] for i in non_zero_indices_savgol]
        # For the tikhonov model
        non_zero_indices_l1 = np.where(np.abs(coeffs_l1) > threshold)[1]
        active_terms_l1 = [all_terms[i] for i in non_zero_indices_l1]
        # For the double model
        non_zero_indices_savl1 = np.where(np.abs(coeffs_savl1) > threshold)[1]
        active_terms_savl1 = [all_terms[i] for i in non_zero_indices_savl1]
        non_zero_indices_l1sav = np.where(np.abs(coeffs_l1sav) > threshold)[1]
        active_terms_l1sav = [all_terms[i] for i in non_zero_indices_l1sav]
        

        desired_terms = set(['x0_11', 'x0x0_1'])

        # Check if the model's active terms match the desired terms
        if set(active_terms) == desired_terms:
            success_count += 1
        if set(active_terms_savgol) == desired_terms:
            success_count_savgol += 1   
        if set(active_terms_l1) == desired_terms:
            success_count_l1 += 1   
        if set(active_terms_savl1) == desired_terms:
            success_count_savl1 += 1   
        if set(active_terms_l1sav) == desired_terms:
            success_count_l1sav += 1   
            
        
        # Coefficient Errors
        coeff_error = 0
        coeff_error_savgol = 0
        coeff_error_l1 = 0
        coeff_error_savl1 = 0
        coeff_error_l1sav = 0
        for term in desired_terms:
            idx = all_terms.index(term)
            coeff_error += abs(coeffs[0, idx] - true_coeffs[term])
            coeff_error_savgol += abs(coeffs_savgol[0, idx] - true_coeffs[term])
            coeff_error_l1 += abs(coeffs_l1[0, idx] - true_coeffs[term])
            coeff_error_savl1 += abs(coeffs_savl1[0, idx] - true_coeffs[term])
            coeff_error_l1sav += abs(coeffs_l1sav[0, idx] - true_coeffs[term])

        coeff_errors.append(coeff_error)
        coeff_errors_savgol.append(coeff_error_savgol)
        coeff_errors_l1.append(coeff_error_l1)
        coeff_errors_savl1.append(coeff_error_savl1)
        coeff_errors_l1sav.append(coeff_error_l1sav)
        
        # Wrong terms count
        wrong_terms = [term for term in active_terms if term not in desired_terms]
        wrong_terms_savgol = [term for term in active_terms_savgol if term not in desired_terms]
        wrong_terms_l1 = [term for term in active_terms_l1 if term not in desired_terms]
        wrong_terms_savl1 = [term for term in active_terms_savl1 if term not in desired_terms]
        wrong_terms_l1sav = [term for term in active_terms_l1sav if term not in desired_terms]

        wrong_terms_list.append(len(wrong_terms))
        wrong_terms_savgol_list.append(len(wrong_terms_savgol))
        wrong_terms_l1_list.append(len(wrong_terms_l1))
        wrong_terms_savl1_list.append(len(wrong_terms_savl1))
        wrong_terms_l1sav_list.append(len(wrong_terms_l1sav))
            
            
    # Compute average and standard deviation for each metric
    # Compute success rates
    success_rate = (success_count / num_iterations) * 100
    success_rate_savgol = (success_count_savgol / num_iterations) * 100
    success_rate_l1 = (success_count_l1 / num_iterations) * 100
    success_rate_savl1 = (success_count_savl1 / num_iterations) * 100
    success_rate_l1sav = (success_count_l1sav / num_iterations) * 100

    print(f"Success rate using original data: {success_rate}%")
    print(f"Success rate using savgol data: {success_rate_savgol}%")
    print(f"Success rate using l1 data: {success_rate_l1}%")
    print(f"Success rate using savl1 data: {success_rate_savl1}%")
    print(f"Success rate using l1sav data: {success_rate_l1sav}%")
    
    print(f"Average coefficient error using original data: {np.mean(coeff_errors)} ± {np.std(coeff_errors)}")
    print(f"Average coefficient error using savgol data: {np.mean(coeff_errors_savgol)} ± {np.std(coeff_errors_savgol)}")
    print(f"Average coefficient error using l1 data: {np.mean(coeff_errors_l1)} ± {np.std(coeff_errors_l1)}")
    print(f"Average coefficient error using savl1 data: {np.mean(coeff_errors_savl1)} ± {np.std(coeff_errors_savl1)}")
    print(f"Average coefficient error using l1sav data: {np.mean(coeff_errors_l1sav)} ± {np.std(coeff_errors_l1sav)}")
                                                                                          
    print(f"Average number of wrong terms using original data: {np.mean(wrong_terms_list)} ± {np.std(wrong_terms_list)}")
    print(f"Average number of wrong terms using savgol data: {np.mean(wrong_terms_savgol_list)} ± {np.std(wrong_terms_savgol_list)}")
    print(f"Average number of wrong terms using l1 data: {np.mean(wrong_terms_l1_list)} ± {np.std(wrong_terms_l1_list)}")
    print(f"Average number of wrong terms using savl1 data: {np.mean(wrong_terms_savl1_list)} ± {np.std(wrong_terms_savl1_list)}") 
    print(f"Average number of wrong terms using l1sav data: {np.mean(wrong_terms_l1sav_list)} ± {np.std(wrong_terms_l1sav_list)}")