#### Outlier Handling

In [0]:
# imports
# hides all warnings
import warnings
warnings.filterwarnings('ignore')
# import pandas for spark
import pyspark.pandas as ps
# pandas 
import pandas as pd
# numpy
import numpy as np

In [0]:
# outlier handling

# outlier limits
"""
returns: 
    upper boud & lower bound for array values or df[col] 
usage: 
    OutlierLimits(df[col]): 
"""
def colOutlierLimits(colValues, pMul=3): 
    if (pMul != 3 and pMul != 2.5 and pMul != 2 and pMul != 1.5):
        pMul = 3
    pMul = float(pMul)    
    q1, q3 = np.percentile(colValues, [25, 75])
    iqr = q3 - q1
    ll = q1 - (iqr * pMul)
    ul = q3 + (iqr * pMul)
    #print("Limits:",q1,q3,iqr,ll,ul)
    return ll, ul


# outlier count for column
"""
returns: 
    count of outliers in the colName
usage: 
    colOutCount(colValues)
"""
def colOutlierCount(colValues, pMul=3):
    if (pMul != 3 and pMul != 2.5 and pMul != 2 and pMul != 1.5):
        pMul = 3
    ll, ul = colOutlierLimits(colValues, pMul)
    ndOutData = np.where((colValues > ul) | (colValues < ll))
    ndOutData = np.array(ndOutData)
    #print(ndOutData)
    return ndOutData.size

# outlier count for dataframe
"""
returns: 
    count of outliers in each column of dataframe
usage: 
    OutlierCount(df): 
"""
def OutlierCount(df, pMul=3): 
    if (pMul != 3 and pMul != 2.5 and pMul != 2 and pMul != 1.5):
        pMul = 3
    pMul = float(pMul)    
    colNames = df.columns
    #print(colNames)
    dsRetValue = pd.Series() 
    for colName in colNames:
        #print(df[colName].dtypes)
        if (df[colName].dtypes == 'object'):
            continue
        #print(colName)
        colValues = df[colName].values
        dsRetValue[colName] = colOutlierCount(colValues, pMul)
    return(dsRetValue)

# oulier index for column
"""
returns: 
    row index in the colName
usage: 
    colOutIndex(colValues)
"""
def colOutlierIndex(colValues, pMul=3):
    if (pMul != 3 and pMul != 2.5 and pMul != 2 and pMul != 1.5):
        pMul = 3
    ll, ul = colOutlierLimits(colValues, pMul)
    ndOutData = np.where((colValues > ul) | (colValues < ll))
    ndOutData = np.array(ndOutData)
    return ndOutData


# oulier index for data frame
"""
returns: 
    row index of outliers in each column of dataframe
usage: 
    OutlierIndex(df): 
"""
def OutlierIndex(df, pMul=3): 
    if (pMul != 3 and pMul != 2.5 and pMul != 2 and pMul != 1.5):
        pMul = 3
    pMul = float(pMul)    
    colNames = df.columns
    dsRetValue = pd.Series() 
    for colName in colNames:
        if (df[colName].dtypes == 'object'):
            continue
        colValues = df[colName].values
        dsRetValue[colName] = str(colOutlierIndex(colValues, pMul))
    return(dsRetValue)  

# outlier values for column 
"""
returns: 
    actual outliers values in the colName
usage: 
    colOutValues(colValues)
"""
def colOutlierValues(colValues, pMul=3):
    if (pMul != 3 and pMul != 2.5 and pMul != 2 and pMul != 1.5):
        pMul = 3
    ll, ul = colOutlierLimits(colValues, pMul)
    ndOutData = np.where((colValues > ul) | (colValues < ll))
    ndOutData = np.array(colValues[ndOutData])
    #ndOutData = np.array(ndOutData)
    return ndOutData


# outlier values for dataframe 
"""
returns: 
    actual of outliers in each column of dataframe
usage: 
    OutlierValues(df): 
"""
def OutlierValues(df, pMul=3): 
    if (pMul != 3 and pMul != 2.5 and pMul != 2 and pMul != 1.5):
        pMul = 3
    pMul = float(pMul)    
    colNames = df.columns
    dsRetValue = pd.Series() 
    for colName in colNames:
        if (df[colName].dtypes == 'object'):
            continue
        colValues = df[colName].values
        dsRetValue[colName] = colOutlierValues(colValues, pMul)
    return(dsRetValue)

# column level handle outlier by capping
# at lower limit & upper timit respectively
"""
returns: 
    array values or df[col].values without any outliers
usage: 
    HandleOutlier(df[col].values): 
"""
def colHandleOutliers(colValues, pMul=3):
    ll, ul = colOutlierLimits(colValues, pMul)
    colValues = np.where(colValues < ll, ll, colValues)
    colValues = np.where(colValues > ul, ul, colValues)
    return (colValues)

# data frame level handline outliers
"""
desc:
    HandleOutliers - removes Outliers from all cols in df except exclCols 
usage: 
    HandleOutliers(df, colClass) 
params:
    df datarame, exclCols - col to ignore while transformation, Multiplier  
"""
def HandleOutliers(df,  lExclCols=[], pMul=3):
    #lExclCols = depVars
    # orig col names
    colNames = df.columns.to_list()
    # if not list convert to list
    if not isinstance(lExclCols, list):
        lExclCols = [lExclCols]
    #print(lExclCols)
    # if not empty, create a dataframe of ExclCols
    if lExclCols != []:
        for vExclCol in lExclCols:
            colNames.remove(vExclCol)
    # handle outlier for each col
    for colName in colNames:
        colValues = df[colName].values
        if (colOutlierValues(colValues, pMul) > 0):
            #print(colName)
            colValues = colHandleOutliers(colValues, pMul)
            df[colName] = colValues.tolist()
    return df



In [0]:
# read csv

# File location and type
file_location = "/FileStore/tables/test/outliers.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)

In [0]:
# convert dataframe to pandas spark dataframe
psdf = ps.DataFrame(df)
print(psdf.head(5))

In [0]:
# info
print("*** Info ***")
print(psdf.info())

In [0]:
# check outlier count
print('\n*** Outlier Count ***')
print(OutlierCount(psdf))

In [0]:
# check outlier row index
print('\n*** Outlier Index ***')
print(OutlierIndex(psdf))

In [0]:
# check outlier values
print('\n*** Outlier Values ***')
print(OutlierValues(psdf))

In [0]:
# handle outlier 
print('\n*** Handle Outliers ***')
HandleOutliers(psdf)
print("Done ...")

In [0]:
# check outlier count
print('\n*** Outlier Count ***')
print(OutlierCount(psdf))

In [0]:
# display df
display(psdf)