# testing implementation of forward step-wise regression

In [1]:
import numpy as np
import pandas as pd 
from sklearn.feature_selection import SequentialFeatureSelector
from sklearn.linear_model import LinearRegression
from sklearn.linear_model import Ridge
from sklearn.linear_model import Lasso
import connectivity.evaluation as ev # will be used in stepwise regression



In [2]:
# creaete a toy example
from sklearn.datasets import make_regression

X, y, coef = make_regression(n_samples=92, n_features=20, n_targets=5,
                                      n_informative=5, noise=10,
                                      coef=True, random_state=0)

In [78]:
class ModelMixin:
    """
    This is a class that can give use extra behaviors or functions that we want our connectivity models to have - over an above the basic functionality provided by the stanard SK-learn BaseEstimator classes
    As an example here is a function that serializes the fitted model
    Not used right now, but maybe potentially useful. Note that Mixin classes do not have Constructor!
    """

    def to_dict(self):
        data = {"coef_": self.coef_}
        return data



class WNTA(ModelMixin):

    def __init__(self, n_max, alpha):
        self.n_max = n_max # maximum number of features selected
        self.alpha = alpha # parameter used in ridge regression

    class WNiTA(Ridge, ModelMixin):
        def __init__(self, n = 1, alpha = 1):
            self.n = n
            super(Ridge, self).__init__(alpha = alpha, fit_intercept=False)
      
        def _add_features(self, X, Y):
            """
            1. start with evaluation of individual features
            2. select the one feature that results in the best performance
                ** what is the best? That depends on the selected evaluation criteria (in this case it can be R)
            3. Consider all the possible combinations of the selected feature and another feature and select the best combination
            4. Repeat 1 to 3 untill you have the desired number of features

            Args: 
            X(np.ndarray)   -    design matrix   
            Y(np.ndarray)   -    response variables
            n(int)          -    number of features to select
            """

            remaining = list(set(range(X.shape[1])) - set(self.selected)) #list containing features that are to be examined

            # 2. loop over features
            while (remaining) and (len(self.selected)< self.n): # while remaining is not empty and n features are not selected 
                scores = pd.Series(np.empty((len(remaining))), index=remaining) # the scores will be stored in this 
                for i in remaining:
            
                    feats = self.selected +[i] # list containing the current features that will be used in regression
                    # fit the model
                    ## get the features from X
                    X_feat = X[:, list(feats)]

                    ## scale X_feat
                    scale_ = np.sqrt(np.nansum(X_feat ** 2, 0) / X_feat.shape[0])
                    Xs = X_feat / scale_
                    Xs = np.nan_to_num(Xs) # there are 0 values after scaling

                    ## fit the model
                    mod = LinearRegression(fit_intercept=False).fit(Xs, Y)

                    ## get the score and put it in scores
                    scores.loc[i] = mod.score(Xs, Y, sample_weight=None)

                # find the feature/feature combination with the best score
                best = scores.idxmax()
                self.selected.append(best)
                # update remaining
                ## remove the selected feature/features from remaining
                remaining.remove(best)

            return 

        def select_features(self, X, Y, support_):
            """
            loops over voxels and select features
            """
            self.support_ = np.zeros((Y.shape[1], X.shape[1]))
            for vox in range(Y.shape[1]):

                # first use self.support_ to get the features already selected 
                self.selected = list(np.where(support_[vox, :] == 1)[0])

                self._add_features(X, Y[:, vox])

                self.support_[vox, self.selected] = int(1)

            return self.support_

        def _fit(self, X, Y, support):
            """
            """
            # get the scaling
            self.scale_ = np.sqrt(np.nansum(X ** 2, 0) / X.shape[0])
            wnta_coef = np.zeros((Y.shape[1], X.shape[1]))
            for vox in range(Y.shape[1]):
                # use ModelN.support_ to fit a regression with the selected features
                support_vox = support[vox, :]
                # get the selected features for the voxel
                selected_vox = list(np.where(support_vox == 1)[0])

                # get the selected features
                X_selected = X[:, selected_vox]

                ## scale X_feat
                scale_ = np.sqrt(np.nansum(X_selected ** 2, 0) / X_selected.shape[0])
                Xs = X_selected / scale_
                Xs = np.nan_to_num(Xs) # there are 0 values after scaling

                # scale it
                Xs = X_selected
                super(Ridge, self).fit(Xs, Y[:, vox])

                wnta_coef[vox, selected_vox] = self.coef_

            self.coef_ = wnta_coef

        def _predict(self, X):
            Xs = X / self.scale_
            Xs = np.nan_to_num(Xs) # there are 0 values after scaling
            return Xs @ self.coef_.T  # weights need to be transposed (throws error otherwise)

    def fit(self, X, Y):
        self.models = dict()
        support_updated = np.zeros((Y.shape[1], X.shape[1]))
        for n in range(1, self.n_max+1):
            self.Model = self.WN_1TA(n = n, alpha = self.alpha)
            support_updated = self.Model.select_features(X, Y, support_=support_updated) # this creates a numpy array with 1s for the selected features and 0s otherwise

            # fit_ridge
            self.Model._fit(X, Y, support_updated)

            self.models[n] = self.Model

    def predict(self):
        self.pred = dict()
        for n in range(self.n_max):
            self.pred[n] = self.models[n]._predict()


In [79]:
myModel = WNTA(n_max=5, alpha = 1)

myModel.fit(X, y)
myModel.models

{1: WN_1TA(), 2: WN_1TA(n=2), 3: WN_1TA(n=3), 4: WN_1TA(n=4), 5: WN_1TA(n=5)}

In [82]:
a = myModel.models[1]
# print(dir(a))
print(a.n)

['__abstractmethods__', '__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__getstate__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__setstate__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_abc_impl', '_add_features', '_check_n_features', '_decision_function', '_estimator_type', '_fit', '_get_param_names', '_get_tags', '_more_tags', '_predict', '_preprocess_data', '_repr_html_', '_repr_html_inner', '_repr_mimebundle_', '_set_intercept', '_validate_data', 'alpha', 'coef_', 'copy_X', 'fit', 'fit_intercept', 'get_params', 'intercept_', 'max_iter', 'n', 'n_features_in_', 'n_iter_', 'normalize', 'predict', 'random_state', 'scale_', 'score', 'select_features', 'selected', 'set_params', 'solver', 'support_', 'to_dict', 'tol']
1


In [33]:
def forward_devel(X, Y, n = 1, selected = []):
    """
    1. start with evaluation of individual features
    2. select the one feature that results in the best performance
        ** what is the best? That depends on the selected evaluation criteria (in this case it can be R)
    3. Consider all the possible combinations of the selected feature and another feature and select the best combination
    4. Repeat 1 to 3 untill you have the desired number of features

    Args: 
    X(np.ndarray)   -    design matrix   
    Y(np.ndarray)   -    response variables
    n(int)          -    number of features to select
    """

    # 1. starting with an empty list: the list will be filled with best features eventual
    # selected = []
    # print(selected)
    remaining = list(range(X.shape[1]))

    # 2. loop over features
    while (remaining) and (len(selected)< n): # while remaining is not empty and n features are not selected 
        # print("looping")
        scores = pd.Series(np.empty((len(remaining))), index=remaining) # the scores will be stored in this 
        for i in remaining:
    
            feats = selected +[i] # list containing the current features that will be used in regression
            # fit the model
            ## get the features from X
            X_feat = X[:, list(feats)]

            ## scale X_feat
            scale_ = np.sqrt(np.nansum(X_feat ** 2, 0) / X_feat.shape[0])
            Xs = X_feat / scale_
            Xs = np.nan_to_num(Xs) # there are 0 values after scaling

            ## fit the model
            model = LinearRegression(fit_intercept=False).fit(Xs, Y)

            ## get the score and put it in scores
            # print(y.shape)
            scores.loc[i] = model.score(Xs, Y, sample_weight=None)

        # find the feature/feature combination with the best score
        best = scores.idxmax()
        selected.append(best)
        # update remaining
        ## remove the selected feature/features from remaining
        remaining.remove(best)

    return selected

In [34]:
def fit(X, Y, feature_mask = [], alpha = 2, n = 1):
        """
        feature_mask is a numpy array (#cerebellar voxel-by-#cortical parcel)
        with 1s for the selected feature for each voxel and 0s otherwise
        the default is an empty list which will be set in the fit routine
        """
        # get the scaling
        scale_ = np.sqrt(np.nansum(X ** 2, 0) / X.shape[0])

        # looping over cerebellar voxels
        num_vox = Y.shape[1]
        wnta_coef = np.zeros((Y.shape[1], X.shape[1]))

        if not feature_mask: # if the mask is empty, initialize it to be all zeros
            feature_mask = np.zeros((Y.shape[1], X.shape[1]))

        for vox in range(num_vox):
            print(f"for vox {vox}")
            selected = []
            # print(f"initial selected {selected}")
            # print(f"vox {vox}")
            # print(f"{vox}.", end = "", flush = True)
            ## get current voxel 
            y = Y[:, vox]

            if np.any(y): # there are voxels with all zeros. Those voxels are skipped and the corresponding coef will be 0
                ## use forward selection method to get the best features for each cerebellar voxel
                # get the selected features for each cerebellar voxel based off of feature_mask
                # print(selected)
                if selected:
                    # print("here")
                    selected = np.argwhere(feature_mask[vox, :] == 1)[0] # get the selected features
                    
                else:
                    # print("here2")
                    selected = []

                # print(n)
                # print(selected)
                a = forward_devel(X, y, selected = selected, n = n)
                print(f"selected: {a}")
                # print(f"a {a}")
                # print(f"len a {len(a)}")
                # print(f"selected before: {selected}")
                
                selected.append(a[0])
                # print(selected)
                # print(f"len selected after {len(selected)}")
                # print(f"selected after: {selected}")
                
                # update the feature mask
                feature_mask[vox, selected] = 1

                ## use the selected featuers to do a ridge regression 
                X_selected = X[:, selected]

                ### scale X_selected
                scale_ = np.sqrt(np.nansum(X_selected ** 2, 0) / X_selected.shape[0])
                Xs = X_selected / scale_
                Xs = np.nan_to_num(Xs) # there are 0 values after scaling

                # print(f"doing ridge regression")
                model = Ridge(alpha = np.exp(alpha), fit_intercept=False)
                model.fit(Xs, y)

                # fill in the elements of the coef
                wnta_coef[vox, selected] = model.coef_

        # self.t1 = time.time()
        # self.t1 = time.ctime(self.t1)
        # print(f"\nfitting finished at {self.t1}")
        # print(f"fitting took {self.t1 - self.t0} seconds")
        model.coef_ = wnta_coef

        return model.coef_, feature_mask

In [11]:
def forward_scikit(X, Y, n):
    """
    uses scikit learn SequentialFeatureSelector to select the best features

    """
    selector = SequentialFeatureSelector(LinearRegression(fit_intercept=False), n_features_to_select=n)
    
    # scale X
    scale_ = np.sqrt(np.nansum(X ** 2, 0) / X.shape[0])
    Xs = X / scale_
    Xs = np.nan_to_num(Xs) # there are 0 values after scaling

    selector.fit(Xs, Y)

    selected_bool = selector.get_support()
    selected = np.where(selected_bool)[0]


    return selected


In [206]:
class ModelMixin:
    """
    This is a class that can give use extra behaviors or functions that we want our connectivity models to have - over an above the basic functionality provided by the stanard SK-learn BaseEstimator classes
    As an example here is a function that serializes the fitted model
    Not used right now, but maybe potentially useful. Note that Mixin classes do not have Constructor!
    """

    def to_dict(self):
        data = {"coef_": self.coef_}
        return data

class WINNERS(ModelMixin):

    def __init__(self, n_features_to_select = 1):
        self.n_featrues_to_select = n_features_to_select
        
    def add_features(self, X, y, selected = []):
       
        """
        1. start with evaluation of individual features
        2. select the one feature that results in the best performance
            ** what is the best? That depends on the selected evaluation criteria (in this case it can be R)
        3. Consider all the possible combinations of the selected feature and another feature and select the best combination
        4. Repeat 1 to 3 untill you have the desired number of features

        Args: 
        X(np.ndarray)   -    design matrix   
        Y(np.ndarray)   -    response variables
        n(int)          -    number of features to select
        """
        print(f"the winner class n_features_to_select {self.n_featrues_to_select}")
        remaining = list(set(range(X.shape[1])) - set(selected)) #list containing features that are to be examined

        # 2. loop over features
        loop = True
        while (remaining) and (len(selected) < self.n_featrues_to_select): # while remaining is not empty and n features are not selected 
            
            scores = pd.Series(np.empty((len(remaining))), index=remaining) # the scores will be stored in this 
            for i in remaining:
        
                candidates = selected +[i] # list containing the current features that will be used in regression
                # fit the model
                ## get the features from X
                X_feat = X[:, list(candidates)]

                ## scale X_feat
                scale_ = np.sqrt(np.nansum(X_feat ** 2, 0) / X_feat.shape[0])
                Xs = X_feat / scale_
                Xs = np.nan_to_num(Xs) # there are 0 values after scaling

                ## fit the model
                mod = LinearRegression(fit_intercept=False).fit(Xs, y)

                ## get the score and put it in scores
                ## get the score
                score_i, _    = ev.calculate_R(y, mod.predict(Xs))
                scores.loc[i] = score_i
                                

            # find the feature/feature combination with the best score
            best       = scores.idxmax()
            selected.append(best)

            if len(selected) == self.n_featrues_to_select:
                print("breaking the thing")
                break
            # update remaining
            ## remove the selected feature/features from remaining
            remaining.remove(best)

        return selected

    def set_support_(self, X, Y, support_ = None):
        """
        gets the support (and updates it) for all the voxels
        support_ can then be used to get the selected features
        Ars:
            X(ndarray)          : contains regressors (cortical regions)
            Y(ndarray)          : contains responses (cerebellar voxels)
            support_(ndarray)   : initial mask to select features. None: starts from scratch with all zeros
        """

        if support_ is None:
            # starting from scratch
            self.support_= np.zeros((Y.shape[1], X.shape[1]))
            scores_init = []
        else:
            self.support_ = support_
            print(self.n_featrues_to_select)

        # loop over voxels
        for vox in range(Y.shape[1]):

            # get the selected features for the current voxel
            initial_feats = list(np.where(self.support_[vox, :] == 1)[0])

            # add features to the selected set
            feats = self.add_features(X, Y[:, vox], selected = initial_feats)

            print(f"selected features {feats}")

            # update support 
            self.support_[vox, feats] = int(1)

        return self.support_
        
class WNTA(Ridge, ModelMixin):

    def __init__(self, winner_model = None, alpha = 0, positive = False, n_features_to_select = 1):
        """
        should be initialized with an instance of WINNERS class. 
        if None is entered, it will start from scratch, create an instance of WINNERS 
        and get the support_ for selecting features. Otherwise, It uses the support_ attribute
        of the WINNERS class
        """

        super(Ridge, self).__init__(fit_intercept=False, alpha = alpha)

        if winner_model is None:
            # initialize a winner model class
            self.winner = WINNERS(n_features_to_select = n_features_to_select)
        else: 
            self.winner = winner_model
            

        self.n_features_to_select = n_features_to_select

            
 
    def fit(self, X, Y):

        # get the scaling
        self.scale_ = np.sqrt(np.nansum(X ** 2, 0) / X.shape[0])

        # first get the winners
        if hasattr(self.winner, "support_"): # if it has support_ then it's already been done 
            self.winner.n_featrues_to_select = self.n_features_to_select
            self.feature_mask = self.winner.set_support_(X, Y, self.winner.support_)
        else: # then it hasn't been done, so do it
            self.feature_mask = self.winner.set_support_(X, Y)
            
        
        # loop over voxels and fit ridge
        wnta_coef = np.zeros((Y.shape[1], X.shape[1]))
        for vox in range(Y.shape[1]):
            # get the selected features for the current voxel
            selected_vox = list(np.where(self.feature_mask[vox, :] == 1)[0])

            ## use the selected featuers to do a ridge regression 
            X_selected = X[:, selected_vox]

            ### scale X_selected
            scale_ = np.sqrt(np.nansum(X_selected ** 2, 0) / X_selected.shape[0])
            Xs = X_selected / scale_
            Xs = np.nan_to_num(Xs) # there are 0 values after scaling

            # print(f"doing ridge regression")
            super(Ridge, self).fit(Xs, Y[:, vox])

            # fill in the elements of the coef
            wnta_coef[vox, selected_vox] = self.coef_

        # set the coef_ attribute
        self.coef_ = wnta_coef

    def predict(self, X):
        Xs = X / self.scale_
        Xs = np.nan_to_num(Xs) # there are 0 values after scaling
        return Xs @ self.coef_.T  # weights need to be transposed (throws error otherwise)


In [207]:
MModel = WINNERS(n_features_to_select=2)
MModel.set_support_(X, y)

print(MModel.support_)

support is None
initial set []
the winner class n_features_to_select 2
len selected 1
number of selected 2
len selected 2
number of selected 2
breaking the thing
selected features [10, 3]
initial set []
the winner class n_features_to_select 2
len selected 1
number of selected 2
len selected 2
number of selected 2
breaking the thing
selected features [1, 3]
initial set []
the winner class n_features_to_select 2
len selected 1
number of selected 2
len selected 2
number of selected 2
breaking the thing
selected features [3, 1]
initial set []
the winner class n_features_to_select 2
len selected 1
number of selected 2
len selected 2
number of selected 2
breaking the thing
selected features [2, 4]
initial set []
the winner class n_features_to_select 2
len selected 1
number of selected 2
len selected 2
number of selected 2
breaking the thing
selected features [2, 4]
[[0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 1. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [

In [208]:
Model2 = WNTA(n_features_to_select= 3)
Model2.fit(X, y)
print(Model2.coef_.shape)
print(np.sum(Model2.coef_ != 0, axis = 1))

starting from scratch
calculating support_
support is None
initial set []
the winner class n_features_to_select 3
len selected 1
number of selected 3
len selected 2
number of selected 3
len selected 3
number of selected 3
breaking the thing
selected features [10, 3, 4]
initial set []
the winner class n_features_to_select 3
len selected 1
number of selected 3
len selected 2
number of selected 3
len selected 3
number of selected 3
breaking the thing
selected features [1, 3, 4]
initial set []
the winner class n_features_to_select 3
len selected 1
number of selected 3
len selected 2
number of selected 3
len selected 3
number of selected 3
breaking the thing
selected features [3, 1, 10]
initial set []
the winner class n_features_to_select 3
len selected 1
number of selected 3
len selected 2
number of selected 3
len selected 3
number of selected 3
breaking the thing
selected features [2, 4, 3]
initial set []
the winner class n_features_to_select 3
len selected 1
number of selected 3
len sele

In [209]:
Model3 = WNTA(n_features_to_select= 3, winner_model=MModel)
Model3.fit(X, y)
print(Model3.coef_.shape)
print(np.sum(Model3.coef_ != 0, axis = 1))

using the model
support_ already calculated
support is not None
3
initial set [3, 10]
the winner class n_features_to_select 3
len selected 3
number of selected 3
breaking the thing
selected features [3, 10, 4]
initial set [1, 3]
the winner class n_features_to_select 3
len selected 3
number of selected 3
breaking the thing
selected features [1, 3, 4]
initial set [1, 3]
the winner class n_features_to_select 3
len selected 3
number of selected 3
breaking the thing
selected features [1, 3, 10]
initial set [2, 4]
the winner class n_features_to_select 3
len selected 3
number of selected 3
breaking the thing
selected features [2, 4, 3]
initial set [2, 4]
the winner class n_features_to_select 3
len selected 3
number of selected 3
breaking the thing
selected features [2, 4, 3]
(5, 20)
[3 3 3 3 3]


In [210]:
np.array_equal(Model2.coef_, Model3.coef_)

True