In [2]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import pickle as pkl
from sklearn.base import BaseEstimator
import os
import numpy as np
import pickle as pkl
from data_generation import GaussianDataGenerator


import warnings

warnings.filterwarnings("ignore")

In [4]:
class AtomBaggingBase(BaseEstimator):
    # Submodel base
    def __init__(
        self,
        K,
        atom_bag_percent=1,
        select_atom_percent=0,
        random_seed=0,
        ignore_warning=False,
    ):
        """
        Args:

        This class is used to perform atom bagging
        Each object of this class is a submodel

        K (int): Number of iterations
        atom_bag_percent (float): Percentage of the original dictionary
        select_atom_percent (float): Percentage of the selected atoms
        random_seed (int): Random seed
        """

        self.K = K
        self.atom_bag_percent = np.max([0, np.min([1, atom_bag_percent])])
        self.select_atom_percent = np.max([0, np.min([1, select_atom_percent])])
        self.atom_bag_flag = atom_bag_percent < 1
        self.atom_weak_select_flag = select_atom_percent > 0

        self.indices = []
        self.s = None
        self.phi = None
        self.a = None
        self.coefficients = None
        self.r = None

        self.random_seed = random_seed
        self.ignore_warning = ignore_warning

    def fit(self, phi, s):
        pass

    def reset(self):
        self.indices = []
        self.s = None
        self.phi = None
        self.a = None
        self.coefficients = None
        self.r = None

    def fit(self, phi, s):
        return None

    def predict(self, phi_test):
        """
        Args:
        phi_test (numpy.ndarray): Test data

        Returns:
        numpy.ndarray: Predicted output
        """

        return phi_test @ self.coefficients

    def score(self, phi_test, s_test):
        s_pred = phi_test @ self.final_c
        pred_mse = np.mean((s_pred - s_test) ** 2)
        return pred_mse

    def input_coefficients(self, coefficients):
        self.coefficients = coefficients

    def update_seed(self, random_seed):
        self.random_seed = random_seed

In [5]:
class OMP_Explore(AtomBaggingBase):
    def __init__(
        self, K_lst, select_atom_percent=0, random_seed=None, ignore_warning=False
    ):
        self.K_lst = K_lst
        self.random_seed = random_seed
        self.select_atom_percent = select_atom_percent
        if select_atom_percent == 0:
            self.atom_weak_select_flag = False

        self.indices = []
        self.coefficients = None
        self.ignore_warning = ignore_warning

        self.coefficients_list = []
        self.error_list = []

    def fit(self, phi, s):
        """
        Args:
        s (numpy.ndarray): Input signal
        phi (numpy.ndarray): Dictionary
        """
        self.reset()
        self.s = s
        self.phi = phi
        self.a = np.zeros_like(self.s)
        self.coefficients = np.zeros(phi.shape[1])
        self.r = self.s.copy()
        if self.random_seed is not None:
            np.random.seed(self.random_seed)

        for k in range(np.max(self.K_lst)+1):
            inner_products = (phi.T @ self.r).flatten()
            # so that we will not select the same atom
            inner_products[self.indices] = 0
            if self.atom_weak_select_flag:
                top_ind = np.argsort(np.abs(inner_products))[::-1][
                    : int(phi.shape[1] * self.select_atom_percent)
                ]
                # randomly select one atom
                lambda_k = np.random.choice(top_ind)
            else:
                lambda_k = np.argmax(np.abs(inner_products))

            # Ordinary least squares
            X = phi[:, self.indices + [lambda_k]]

            try:
                betas = np.linalg.inv(X.T @ X) @ X.T @ self.s
            except:
                if not self.ignore_warning:
                    print("Singular matrix encountered in OMP")
                break

            # Update indices
            self.indices.append(lambda_k)

            # Update Coefficients
            self.coefficients = np.zeros(phi.shape[1])
            self.coefficients[self.indices] = betas.flatten()

            # Update Projection
            self.a = X @ betas

            # Update Residual
            self.r = self.s - self.a
            if k in self.K_lst:
                self.coefficients_list.append(self.coefficients.copy())
                self.error_list.append(np.sum(self.r**2))


        minimal_k_index = np.argmin(self.error_list)

        # Update Coefficients

        self.coefficients = self.coefficients_list[minimal_k_index]

        # Update Projection
        self.a = phi @ self.coefficients

        # Update Residual
        self.r = self.s - self.a

        return self.a, self.coefficients

    def multi_score(self, phi_test, s_test):
        """
        Args:
        phi_test (numpy.ndarray): Test data
        s_test (numpy.ndarray): Test labels

        Returns:
        numpy.ndarray: Predicted output
        """
        test_score = []
        for i in range(len(self.K_lst)):
            self.coefficients = self.coefficients_list[i]
            projection = phi_test @ self.coefficients
            residual = s_test - projection
            test_score.append(np.mean(residual**2))
        return test_score

    def reset(self):
        super().reset()
        self.coefficients_list = []
        self.error_list = []

In [16]:
def cv_split(true_signal, dictionary, cv_num):
    true_signal = true_signal.ravel()
    cv_signal = np.split(true_signal, cv_num)
    cv_dictionary = np.split(dictionary, cv_num)
    # Get the list of train and test set
    cv_res = []
    for i in range(cv_num):
        train_signal = np.concatenate(cv_signal[:i] + cv_signal[i + 1 :], axis=0)
        train_dictionary = np.concatenate(
            cv_dictionary[:i] + cv_dictionary[i + 1 :], axis=0
        )
        test_signal = cv_signal[i]
        test_dictionary = cv_dictionary[i]
        cv_res.append((train_signal, train_dictionary, test_signal, test_dictionary))
    return cv_res


def cal_cv_error(algorithm, cv_num, signal, dictionary):
    cv_res = cv_split(signal, dictionary, cv_num)
    error_lst = []
    for i in range(cv_num):
        train_signal, train_dictionary, test_signal, test_dictionary = cv_res[i]
        algorithm.fit(train_dictionary,train_signal)
        error_lst.append(algorithm.multi_score(test_dictionary,test_signal))
    return np.mean(error_lst,axis = 0)


def cv_best_K(signal, dictionary, cv_num, K_lst):
    K_cv_error = []
    OMP_tmp = OMP_Explore(K_lst, ignore_warning=True)
    K_cv_error = cal_cv_error(OMP_tmp, cv_num, signal, dictionary)
    lowest_error = np.min(K_cv_error)
    lowest_error_K = K_lst[np.argmin(K_cv_error)]
    return lowest_error, lowest_error_K, K_cv_error

In [20]:
output_path = "./memory/"  # your specified path here

if not os.path.exists(output_path):
    os.mkdir(output_path)


def cv_best_K_noise_level_multi_trial(
    N, d, m, noise_level_lst, cv_num, K_lst, trial_num, output_filename=None
):
    if output_filename is None:
        output_filename = (
            "OMP"
            + str(N)
            + "_"
            + str(d)
            + "_"
            + str(m)
            + "_"
            + str(trial_num)
            + "_"
            + str(cv_num)
            + ".pkl"
        )
    else:
        output_filename = "OMP" + output_filename + ".pkl"
    res_log = {
        "parameters": {
            "N": N,
            "d": d,
            "m": m,
            "noise_level_lst": noise_level_lst,
            "cv_num": cv_num,
            "trial_num": trial_num,
            "K_lst": K_lst,
        },
        "noise_level_best_K": [],
        "noise_level_lowest_MSE": [],
        "log": [],
    }
    noise_level_best_K = []
    noise_level_lowest_MSE = []
    for noise_level in noise_level_lst:
        print("Cross validating K under noise level: ", noise_level)
        trials_best_K_tmp = []
        MSE_loweset_K_temp = []
        for trial in range(trial_num):
            Data_Geneartor = GaussianDataGenerator(N, d, m, noise_level, trial)
            (
                true_signal,
                dictionary,
                true_indices,
                true_coefficients,
                perturbed_signal,
            ) = Data_Geneartor.shuffle()
            lowest_error, lowest_error_K, cv_err_lst = cv_best_K(
                perturbed_signal, dictionary, cv_num, K_lst
            )
            trials_best_K_tmp.append(lowest_error_K)
            MSE_loweset_K_temp.append(lowest_error)
            print(
                "Trial: ",
                trial,
                " Best K: ",
                lowest_error_K,
                " Lowest Error: ",
                lowest_error,
            )
            log_tmp = {
                "noise_level": noise_level,
                "trial": trial,
                "data": Data_Geneartor,
                "cv_error_lst": cv_err_lst,
                "lowest_error": lowest_error,
                "lowest_error_K": lowest_error_K,
            }
            res_log["log"].append(log_tmp)
        noise_level_best_K.append(np.mean(trials_best_K_tmp))
        noise_level_lowest_MSE.append(np.mean(MSE_loweset_K_temp))
        print(
            "Average best K for noise level: ",
            noise_level,
            " is: ",
            np.mean(trials_best_K_tmp),
            " with MSE: ",
            np.mean(MSE_loweset_K_temp),
        )
    res_log["noise_level_best_K"] = noise_level_best_K
    res_log["noise_level_lowest_MSE"] = noise_level_lowest_MSE
    with open(os.path.join(output_path, output_filename), "wb") as f:
        pkl.dump(res_log, f)
    print("Finished!")
    print("Log file saved to: ", os.path.join(output_path, output_filename))
    return noise_level_best_K, noise_level_lowest_MSE, res_log


noise_level_lst = [0.1, 0.3, 0.5]
N = 1000
d = 600
m = 20
trial_num = 10
cv_num = 5
K_lst = list(range(1, 21, 1))


if __name__ == "__main__":
    (
        noise_level_best_K,
        noise_level_lowest_MSE,
        res_log,
    ) = cv_best_K_noise_level_multi_trial(
        N,
        d,
        m,
        noise_level_lst,
        cv_num,
        K_lst,
        trial_num,
        output_filename="test_002_01.pkl",
    )

Cross validating K under noise level:  0.1
Trial:  0  Best K:  12  Lowest Error:  0.011804923923324655
Trial:  1  Best K:  14  Lowest Error:  0.009751582425454444
Trial:  2  Best K:  17  Lowest Error:  0.01051267407359402
Trial:  3  Best K:  17  Lowest Error:  0.011083424058424541
Trial:  4  Best K:  16  Lowest Error:  0.01062542302441949
Trial:  5  Best K:  11  Lowest Error:  0.010547946303916568
Trial:  6  Best K:  13  Lowest Error:  0.011492231874623084
Trial:  7  Best K:  17  Lowest Error:  0.012630563987436455
Trial:  8  Best K:  13  Lowest Error:  0.010954758886104447
Trial:  9  Best K:  15  Lowest Error:  0.01034220334737489
Average best K for noise level:  0.1  is:  14.5  with MSE:  0.010974573190467258
Cross validating K under noise level:  0.3
Trial:  0  Best K:  8  Lowest Error:  0.10426860801834041
Trial:  1  Best K:  2  Lowest Error:  0.09556435060037786
Trial:  2  Best K:  9  Lowest Error:  0.10275167040559799
Trial:  3  Best K:  2  Lowest Error:  0.10707146772214685
Tria