In [1]:
import pandas as pd
import numpy as np

In [2]:
class OutlierHandler:
    
    '''
    - Handles outliers by either removing them or clipping them.
    
    - Strategies must be specified as a dict like {column_name: strategy}.
    
    - Strategies can be of 3 types: "quantile", "values" and "iqr"
    
    - If strategy="quantile", it should be specified like {"col_name": ['quantile', [lower_quantile, upper_quantile], fill_value]}
       - lower_bound value is computed using lower_quantile (float) and upper_bound value is computed using upper_quantile (float). 
       - fill_value must be 
          - "clip" to make (values < lower_bound = lower_bound) and (values > upper_bound = upper_bound)
          - 'na' to make (values < lower_bound = NaN) and (values > upper_bound = NaN)
          
    - If strategy="values", it should be specified like {"col_name": ['values', [lower_bound, upper_bound], fill_value]}
       - fill_value must be 
          - "clip" to make (values < lower_bound = lower_bound) and (values > upper_bound = upper_bound)
          - 'na' to make (values < lower_bound = NaN) and (values > upper_bound = NaN)
          
    - If strategy="iqr", it should be specified like {"col_name": ['iqr', fill_value]}
       - lower_bound is calculated as [q1 - (1.5 * IQR)] and upper_bound is calculated as [q3 + (1.5 * IQR)]
       - fill_value must be 
          - "clip" to make (values < lower_bound = lower_bound) and (values > upper_bound = upper_bound)
          - 'na' to make (values < lower_bound = NaN) and (values > upper_bound = NaN)
      
      Inputs:
      strategies: dict
          Dict specifying imputation strategies.
    '''
    
    def __init__(self, strategies):
        self.strategies = strategies
        self.outlier_bounds = {}
        self.fill_values = {}
        
    @staticmethod
    def __find_iqr_bounds(x):
        '''
        Compute lower and upper bounds using the IQR method
        '''
        if not isinstance(x, pd.Series):
            x = pd.Series(x)
        q1 = x.quantile(0.25)
        q3 = x.quantile(0.75)
        iqr = q3 - q1
        lower_bound = (q1 - (1.5 * iqr))
        upper_bound = (q3 + (1.5 * iqr))
        return [lower_bound, upper_bound]
    
    @staticmethod
    def __find_quantile_bounds(x, bounds):
        '''
        Compute lower and upper bounds using the specified lower and upper quantiles
        '''
        if not isinstance(x, pd.Series):
            x = pd.Series(x)
        lower_bound = x.quantile(bounds[0])
        upper_bound = x.quantile(bounds[1])
        return [lower_bound, upper_bound]
        
    def fit(self, x):
        for col in self.strategies:
            strategy = self.strategies[col]
            method = strategy[0]
            if method == 'quantile':
                self.outlier_bounds[col] = self.__find_quantile_bounds(x[col], strategy[1])
                self.fill_values[col] = strategy[2]
            elif method == 'values':
                self.outlier_bounds[col] = strategy[1]
                self.fill_values[col] = strategy[2]
            else:
                self.outlier_bounds[col] = self.__find_iqr_bounds(x[col])
                self.fill_values[col] = strategy[1]
                
        return self   
    
    def transform(self, x):
        x = x.copy()
        for col in self.strategies:
            strategy = self.strategies[col]
            method = strategy[0]
            bounds = self.outlier_bounds[col]
            lower_bound = bounds[0]
            upper_bound = bounds[1]
            fill_value = self.fill_values[col]

            if fill_value == 'na':
                x.loc[(x[col] < lower_bound), col] = np.nan
                x.loc[(x[col] > upper_bound), col] = np.nan
            else:
                x.loc[(x[col] < lower_bound), col] = lower_bound
                x.loc[(x[col] > upper_bound), col] = upper_bound
                
        return x
    
    def __repr__(self):
        return f'OutlierHandler(strategies={self.strategies})'

### Example

In [3]:
df = pd.DataFrame({'a':[1,2,100,1000], 'b':[-100,45,200,2000], 'c':[540,10,59,2]})
df

Unnamed: 0,a,b,c
0,1,-100,540
1,2,45,10
2,100,200,59
3,1000,2000,2


In [4]:
oh = OutlierHandler({'a': ['quantile', [0.2, 0.8], 'na'], 'b':['values', [0, 200], 'clip'], 'c':['iqr', 'clip']})

In [5]:
oh.fit(df)

OutlierHandler(strategies={'a': ['quantile', [0.2, 0.8], 'na'], 'b': ['values', [0, 200], 'clip'], 'c': ['iqr', 'clip']})

In [6]:
res = oh.transform(df)
res

Unnamed: 0,a,b,c
0,,0,436.125
1,2.0,45,10.0
2,100.0,200,59.0
3,,200,2.0


In [7]:
oh.outlier_bounds # lower and upper bounds of each feature

{'a': [1.6, 460.00000000000034], 'b': [0, 200], 'c': [-248.875, 436.125]}

In [8]:
oh.fill_values # fill values of each column specified by the user

{'a': 'na', 'b': 'clip', 'c': 'clip'}