Data filters to work with the river ML framework

In [57]:
import river

In [58]:
import typing
import math
import pandas as pd
import numpy as np
import operator as o
from river import base as b
from river.stream import iter_pandas
from river.datasets import base
from river import preprocessing
from river import stats

In [59]:
#Global Dataset to use on all of the filters
global_vals = np.array([[1.56,54.50,1,6,22.19,1.7,1],[math.nan,62,1,9.5,math.nan,1.80,1],[1.89,90.40,1,12,25.20,2.12,2],[2.01,100,1,16,24.75,1.96,1],[1.72,75,2,11,25.35,1.80,2],[1.70,65.0,3,11,22.49,1.73,2],[1.70,65.0,3,11,22.49,1.67,2],[math.nan,math.nan,1,10,math.nan,math.nan,3],[1.78,90.0,math.nan,8.5,28.41,1.78,3],[1.89,85,3,12,23.80,1.89,2]])
global_data = pd.DataFrame(global_vals,columns=["feature_1", "feature_2", "feature_3","feature_4","feature_5","feature_6","class_value"])
global_data


Unnamed: 0,feature_1,feature_2,feature_3,feature_4,feature_5,feature_6,class_value
0,1.56,54.5,1.0,6.0,22.19,1.7,1.0
1,,62.0,1.0,9.5,,1.8,1.0
2,1.89,90.4,1.0,12.0,25.2,2.12,2.0
3,2.01,100.0,1.0,16.0,24.75,1.96,1.0
4,1.72,75.0,2.0,11.0,25.35,1.8,2.0
5,1.7,65.0,3.0,11.0,22.49,1.73,2.0
6,1.7,65.0,3.0,11.0,22.49,1.67,2.0
7,,,1.0,10.0,,,3.0
8,1.78,90.0,,8.5,28.41,1.78,3.0
9,1.89,85.0,3.0,12.0,23.8,1.89,2.0


### Fixed Threshold Filter

In [60]:
class FixedThresholdFilter(base.SyntheticDataset):
    """
    Filter that skips instances based on whether a feature value or values adhere to a specific condition 
    
    Parameters
    ----------
    dataFrame
                The stream source in the form of a pandas dataframe
    features
                A list of features that are to be checked from the current stream
    operator
                A comparison operator that is used to make up a condition, one of -> <, <= ,== ,> ,>=
    threshold
                A value you want to compare feature values against
    logical operator
                A logical opeator denoted by "OR" & "AND". This descibes the way in which each feature in an instance is checked.
                For example, if "AND" is specified all feature values in an instance must meet the criterion, for an instance to be filtered 
    """
    
    def __init__(self,
                 dataFrame:pd.DataFrame,
                 features: typing.Tuple[b.typing.FeatureName],
                 operator,
                 threshold,
                 logical_operator="OR",
                 ):
        self.df = dataFrame
        super().__init__(
            n_features=self.df.shape[1],
            n_samples=self.df.shape[0],
            n_classes=len(self.df[self.df.columns[-1]].unique()),
            n_outputs=1,
            task=base.MULTI_CLF,
        )
        
        self.features = set(features)
        self.operator = operator
        self.threshold = threshold
        self.logical_operator = logical_operator

        #List of operators that you can use
        operator_dict ={
            "<":o.lt,
            "<=":o.le,
            "==":o.eq,
            ">=":o.ge,
            ">":o.gt,
        }
        

        self.is_or = True if logical_operator == "OR" else False
        self.comparison_operator = operator_dict[self.operator]
    
    
   
    def __iter__(self):
        feature_cols = self.df.columns[:-1]
        target_col = self.df.columns[-1]
        stream = iter_pandas(X=self.df[feature_cols], y=self.df[target_col])
        return_value = True
        finished = False
        
        for i, (x, y) in enumerate(stream):
            
            #Checks what the logical operator is -> OR
            if self.is_or:
                #Iterates until an instance is found that can be returned
                while(True):
                    
                    #Performs check as specified by the user
                    if(self.__check_or(x)):
                        try:
                            (x,y) = next(stream)
                        except(StopIteration):#If iterator is finished finish dont return anything
                            finished = True
                            break
                    else:
                        break #if the instance is ok break the search loop
                if(not finished):
                    yield x,y
                
            #if the logical operator is an AND
            else:
                #Iterates until an instance is found that can be returned
                while(True):
                    if(self.__check_and(x)):#Peform user specified check on and
                        try:
                            (x,y) = next(stream)
                        except(StopIteration):
                            finished = True
                            break
                    else:
                        break #if the instance is ok break the search loop
                if(not finished):
                    yield x,y
                
                
    def __check_or(self,values):
        """
        Checks if atleast one specified instance meets the user specfied condition
        """
        #Lazy Evaluation
        for f in self.features:
            if(self.comparison_operator(values[f],self.threshold)):
                #remove instance
                return True
            elif(math.isnan(self.threshold) and math.isnan(values[f])):
                return True
        return False
    
    def __check_and(self,values):
        """
        Checks if all specified features meet a certain condition
        """
        for f in self.features:
                if(not self.comparison_operator(values[f],self.threshold)):
                    return False
                elif(math.isnan(self.threshold) and math.isnan(values[f])):
                    return True
         #Remove instance
        return True

        

In [61]:
#Testing fixed threshold filter
ftf_filter1 = FixedThresholdFilter(dataFrame=global_data,features=["feature_1"],operator="<",threshold=1.85)
for i, (x, y) in enumerate(ftf_filter1):
    print(f'Sample {i}: {[v for v in x.values()], y}')

Sample 0: ([nan, 62.0, 1.0, 9.5, nan, 1.8], 1.0)
Sample 1: ([1.89, 90.4, 1.0, 12.0, 25.2, 2.12], 2.0)
Sample 2: ([2.01, 100.0, 1.0, 16.0, 24.75, 1.96], 1.0)
Sample 3: ([nan, nan, 1.0, 10.0, nan, nan], 3.0)
Sample 4: ([1.89, 85.0, 3.0, 12.0, 23.8, 1.89], 2.0)


In [62]:
#Testing fixed threshold filter
ftf_filter2 = FixedThresholdFilter(dataFrame=global_data,features=["feature_1","feature_6"],operator=">=",threshold=1.85,logical_operator="AND")
for i, (x, y) in enumerate(ftf_filter2):
    print(f'Sample {i}: {[v for v in x.values()], y}')

Sample 0: ([1.56, 54.5, 1.0, 6.0, 22.19, 1.7], 1.0)
Sample 1: ([nan, 62.0, 1.0, 9.5, nan, 1.8], 1.0)
Sample 2: ([1.72, 75.0, 2.0, 11.0, 25.35, 1.8], 2.0)
Sample 3: ([1.7, 65.0, 3.0, 11.0, 22.49, 1.73], 2.0)
Sample 4: ([1.7, 65.0, 3.0, 11.0, 22.49, 1.67], 2.0)
Sample 5: ([nan, nan, 1.0, 10.0, nan, nan], 3.0)
Sample 6: ([1.78, 90.0, nan, 8.5, 28.41, 1.78], 3.0)


In [63]:
#Testing fixed threshold filter
ftf_filter3 = FixedThresholdFilter(dataFrame=global_data,features=["feature_1","feature_6"],operator=">=",threshold=1.75,logical_operator="OR")
for i, (x, y) in enumerate(ftf_filter3):
    print(f'Sample {i}: {[v for v in x.values()], y}')

Sample 0: ([1.56, 54.5, 1.0, 6.0, 22.19, 1.7], 1.0)
Sample 1: ([1.7, 65.0, 3.0, 11.0, 22.49, 1.73], 2.0)
Sample 2: ([1.7, 65.0, 3.0, 11.0, 22.49, 1.67], 2.0)
Sample 3: ([nan, nan, 1.0, 10.0, nan, nan], 3.0)


In [64]:
#Testing fixed threshold filter
ftf_filter4 = FixedThresholdFilter(dataFrame=global_data,features=["feature_1", "feature_2", "feature_3","feature_4","feature_5","feature_6"],operator="==",threshold=math.nan,logical_operator="OR")
for i, (x, y) in enumerate(ftf_filter4):
    print(f'Sample {i}: {[v for v in x.values()], y}')

Sample 0: ([1.56, 54.5, 1.0, 6.0, 22.19, 1.7], 1.0)
Sample 1: ([1.89, 90.4, 1.0, 12.0, 25.2, 2.12], 2.0)
Sample 2: ([2.01, 100.0, 1.0, 16.0, 24.75, 1.96], 1.0)
Sample 3: ([1.72, 75.0, 2.0, 11.0, 25.35, 1.8], 2.0)
Sample 4: ([1.7, 65.0, 3.0, 11.0, 22.49, 1.73], 2.0)
Sample 5: ([1.7, 65.0, 3.0, 11.0, 22.49, 1.67], 2.0)
Sample 6: ([1.89, 85.0, 3.0, 12.0, 23.8, 1.89], 2.0)


### Missing Values Filter

In [65]:
class MissingDataFilter(base.SyntheticDataset):
    
    """
    Filter that skips instances based on whether an instance has a certain percentage of missting values 
    
    Parameters
    ----------
    dataFrame
                The stream source in the form of a pandas dataframe
    threshold
                The percentage value that an instsance is allowed to have before it is dropped
    """
    
    def __init__(self, dataFrame:pd.DataFrame,threshold=0.5):
        self.df = dataFrame
        super().__init__(
            n_features=self.df.shape[1],
            n_samples=self.df.shape[0],
            n_classes=len(self.df[self.df.columns[-1]].unique()),
            n_outputs=1,
            task=base.MULTI_CLF,
        )
        self.threshold = threshold
        #Calculating ceiling of average value
        self.total_missing_values = math.ceil(self.threshold * (self.n_features-1))
        
    
    def __iter__(self):
        feature_cols = self.df.columns[:-1]
        target_col = self.df.columns[-1]
        stream = iter_pandas(X=self.df[feature_cols], y=self.df[target_col])
        finished = False
        
        for i, (x, y) in enumerate(stream):

            #Iterates until an instance is found that can be returned
            while(True):
                    #Performs check as specified by the user
                if(self.check_for_missing_values(x)):
                    try:
                        (x,y) = next(stream)
                    except(StopIteration):#If iterator is finished finish dont return anything
                        finished = True
                        break
                else:
                    break #if the instance is ok break the search loop
            if(not finished):
                yield x,y
                
            
    def check_for_missing_values(self,values):
        """
        Checks for a missing values and returns true if there is a certain number of missing values.
        All missing values will be replaced so they can be processed by River, in the case they do not reach the specified threshold to be dropped
        """
        missing_values_count = 0
    
        for k, v in values.items():
            if(math.isnan(v)):
                missing_values_count += 1;
                values[k] = 0 #replacing Nan values with 0
                
                    
            if(missing_values_count >= self.total_missing_values and missing_values_count != 0):
                    return True      
        return False
                    


In [66]:
global_data

Unnamed: 0,feature_1,feature_2,feature_3,feature_4,feature_5,feature_6,class_value
0,1.56,54.5,1.0,6.0,22.19,1.7,1.0
1,,62.0,1.0,9.5,,1.8,1.0
2,1.89,90.4,1.0,12.0,25.2,2.12,2.0
3,2.01,100.0,1.0,16.0,24.75,1.96,1.0
4,1.72,75.0,2.0,11.0,25.35,1.8,2.0
5,1.7,65.0,3.0,11.0,22.49,1.73,2.0
6,1.7,65.0,3.0,11.0,22.49,1.67,2.0
7,,,1.0,10.0,,,3.0
8,1.78,90.0,,8.5,28.41,1.78,3.0
9,1.89,85.0,3.0,12.0,23.8,1.89,2.0


In [67]:
#testing missing data filter 3 missing
mdf_filter1 = MissingDataFilter(dataFrame=global_data,threshold=.5)
for i, (x, y) in enumerate(mdf_filter1):
    print(f'Sample {i}: {[v for v in x.values()], y}')

Sample 0: ([1.56, 54.5, 1.0, 6.0, 22.19, 1.7], 1.0)
Sample 1: ([0, 62.0, 1.0, 9.5, 0, 1.8], 1.0)
Sample 2: ([1.89, 90.4, 1.0, 12.0, 25.2, 2.12], 2.0)
Sample 3: ([2.01, 100.0, 1.0, 16.0, 24.75, 1.96], 1.0)
Sample 4: ([1.72, 75.0, 2.0, 11.0, 25.35, 1.8], 2.0)
Sample 5: ([1.7, 65.0, 3.0, 11.0, 22.49, 1.73], 2.0)
Sample 6: ([1.7, 65.0, 3.0, 11.0, 22.49, 1.67], 2.0)
Sample 7: ([1.78, 90.0, 0, 8.5, 28.41, 1.78], 3.0)
Sample 8: ([1.89, 85.0, 3.0, 12.0, 23.8, 1.89], 2.0)


In [68]:
#testing missing data filter 2 missing vlaues
mdf_filter2 = MissingDataFilter(dataFrame=global_data,threshold=.2)
for i, (x, y) in enumerate(mdf_filter2):
    print(f'Sample {i}: {[v for v in x.values()], y}')

Sample 0: ([1.56, 54.5, 1.0, 6.0, 22.19, 1.7], 1.0)
Sample 1: ([1.89, 90.4, 1.0, 12.0, 25.2, 2.12], 2.0)
Sample 2: ([2.01, 100.0, 1.0, 16.0, 24.75, 1.96], 1.0)
Sample 3: ([1.72, 75.0, 2.0, 11.0, 25.35, 1.8], 2.0)
Sample 4: ([1.7, 65.0, 3.0, 11.0, 22.49, 1.73], 2.0)
Sample 5: ([1.7, 65.0, 3.0, 11.0, 22.49, 1.67], 2.0)
Sample 6: ([1.78, 90.0, 0, 8.5, 28.41, 1.78], 3.0)
Sample 7: ([1.89, 85.0, 3.0, 12.0, 23.8, 1.89], 2.0)


In [69]:
#testing missing data filter 2 missing vlaues
mdf_filter3 = MissingDataFilter(dataFrame=global_data,threshold=1)
for i, (x, y) in enumerate(mdf_filter3):
    print(f'Sample {i}: {[v for v in x.values()], y}')

Sample 0: ([1.56, 54.5, 1.0, 6.0, 22.19, 1.7], 1.0)
Sample 1: ([0, 62.0, 1.0, 9.5, 0, 1.8], 1.0)
Sample 2: ([1.89, 90.4, 1.0, 12.0, 25.2, 2.12], 2.0)
Sample 3: ([2.01, 100.0, 1.0, 16.0, 24.75, 1.96], 1.0)
Sample 4: ([1.72, 75.0, 2.0, 11.0, 25.35, 1.8], 2.0)
Sample 5: ([1.7, 65.0, 3.0, 11.0, 22.49, 1.73], 2.0)
Sample 6: ([1.7, 65.0, 3.0, 11.0, 22.49, 1.67], 2.0)
Sample 7: ([0, 0, 1.0, 10.0, 0, 0], 3.0)
Sample 8: ([1.78, 90.0, 0, 8.5, 28.41, 1.78], 3.0)
Sample 9: ([1.89, 85.0, 3.0, 12.0, 23.8, 1.89], 2.0)


### Number of SD from the Mean Filter

In [70]:
class NSFM(base.SyntheticDataset):
    """
    Filter that skips instances based on whether a feature value is a user specified number of standard deviations from the current feature mean 
    
    Parameters
    ----------
    dataFrame
                The stream source in the form of a pandas dataframe
    features
                A list of features that are to be checked from the current stream
    threshold
                A value that specifies how many standard deviations away from the mean a feature value should be before it is dropped
    Notes
    -----
        Nan values will be replaced with the mean value for that feature
    """
    
    def __init__(self,
                 dataFrame:pd.DataFrame,
                 features: typing.Tuple[b.typing.FeatureName],
                 threshold,
                 ):
        self.df = dataFrame
        super().__init__(
            n_features=self.df.shape[1],
            n_samples=self.df.shape[0],
            n_classes=len(self.df[self.df.columns[-1]].unique()),
            n_outputs=1,
            task=base.MULTI_CLF,
        )
        self.all_features = self.df.columns[:-1] 
        self.features = features
        self.threshold = threshold
        #Creating a list of Stat Tracker objects, one for each feature
        self.stats_objects = {self.all_features[i]: stats.Var() for i in range(len(self.all_features))}

    
   
    def __iter__(self):
        feature_cols = self.df.columns[:-1]
        target_col = self.df.columns[-1]
        stream = iter_pandas(X=self.df[feature_cols], y=self.df[target_col])
        start = True
        
        for i, (x, y) in enumerate(stream):       
            
            #first the missing values are imputed in the instance with the mean feature value
            self.__remove_nan_value(x)
            
            #Letting the first value in and updating running stats
            if(start):
                start = False
                #Update statistics for each feature 
                self.__update_stats(x)
                yield x,y
                continue
                
            while(True):
            #Get a value and test it - do we need to remove it
                if(self.__check_or(x)):
                    try:
                        self.__update_stats(x)
                     
                        (x,y) = next(stream)
                        
                        self.__remove_nan_value(x)
                    except(StopIteration):#If iterator is finished finish dont return anything
                        break
                else:
                    #Update statistics for each feature
                    self.__update_stats(x)
                    yield x,y
                    break #if the instance is ok break the search loop
            
                    
    def __calculate_z_score(self,x,f):
        """
        Calculates the z-score, which in turn gives us how many standard deviations from the mean a certain value is
        """
        std = math.sqrt(self.stats_objects[f].sigma)
        
        if(std == 0.0):
            #No variance in values(all values identified are the same) - dont want to divide by 0, letting next value in to get a SD value
            return 0
        return (x - self.stats_objects[f].mean.get())/std
       
        
                    
    def __update_stats(self,values):
        """
        Updates all the user specfied feature values to get an update mean and variance
        """
        for i,f in enumerate(self.all_features):
            self.stats_objects[f].update(values[f])
    


    def __remove_nan_value(self,values):
        """
        Removes nan values from an instance and replaces them with the mean value of that instance
        """
        for i,f in enumerate(self.all_features):
            if(math.isnan(values[f])):
                values[f] = self.stats_objects[f].mean.get()
        
    def __check_or(self,values):
        """
        Checks if atleast one specified instance meets the user specfied condition
        """
        #Lazy Evaluation
        for i,f in enumerate(self.features):
            if(abs(self.__calculate_z_score(values[f],f)) > self.threshold):
                #remove instance
                return True
        return False
            
            

In [71]:
global_data

Unnamed: 0,feature_1,feature_2,feature_3,feature_4,feature_5,feature_6,class_value
0,1.56,54.5,1.0,6.0,22.19,1.7,1.0
1,,62.0,1.0,9.5,,1.8,1.0
2,1.89,90.4,1.0,12.0,25.2,2.12,2.0
3,2.01,100.0,1.0,16.0,24.75,1.96,1.0
4,1.72,75.0,2.0,11.0,25.35,1.8,2.0
5,1.7,65.0,3.0,11.0,22.49,1.73,2.0
6,1.7,65.0,3.0,11.0,22.49,1.67,2.0
7,,,1.0,10.0,,,3.0
8,1.78,90.0,,8.5,28.41,1.78,3.0
9,1.89,85.0,3.0,12.0,23.8,1.89,2.0


In [72]:
nsfm_filter1 = NSFM(dataFrame=global_data,features=["feature_1","feature_6"],threshold=1)
for i, (x, y) in enumerate(nsfm_filter1):
    print(f'Sample {i}: {[v for v in x.values()], y}')

Sample 0: ([1.56, 54.5, 1.0, 6.0, 22.19, 1.7], 1.0)
Sample 1: ([1.56, 62.0, 1.0, 9.5, 22.19, 1.8], 1.0)
Sample 2: ([1.72, 75.0, 2.0, 11.0, 25.35, 1.8], 2.0)
Sample 3: ([1.7, 65.0, 3.0, 11.0, 22.49, 1.73], 2.0)
Sample 4: ([1.7342857142857142, 73.12857142857143, 1.0, 10.0, 23.522857142857145, 1.8257142857142858], 3.0)
Sample 5: ([1.78, 90.0, 1.625, 8.5, 28.41, 1.78], 3.0)


In [73]:
nsfm_filter2 = NSFM(dataFrame=global_data,features=["feature_1"],threshold=1)
for i, (x, y) in enumerate(nsfm_filter2):
    print(f'Sample {i}: {[v for v in x.values()], y}')

Sample 0: ([1.56, 54.5, 1.0, 6.0, 22.19, 1.7], 1.0)
Sample 1: ([1.56, 62.0, 1.0, 9.5, 22.19, 1.8], 1.0)
Sample 2: ([1.89, 90.4, 1.0, 12.0, 25.2, 2.12], 2.0)
Sample 3: ([1.72, 75.0, 2.0, 11.0, 25.35, 1.8], 2.0)
Sample 4: ([1.7, 65.0, 3.0, 11.0, 22.49, 1.73], 2.0)
Sample 5: ([1.7, 65.0, 3.0, 11.0, 22.49, 1.67], 2.0)
Sample 6: ([1.7342857142857142, 73.12857142857143, 1.0, 10.0, 23.522857142857145, 1.8257142857142858], 3.0)
Sample 7: ([1.78, 90.0, 1.625, 8.5, 28.41, 1.78], 3.0)
