In [1]:
import pandas as pd
import numpy as np
import os

In [2]:
def check_duplicates(filename='pop_test.csv',main_folder='C:/Users/shiri/OneDrive/Desktop/diagnostics'):
    '''it will check for duplicate rows and single out all 
    the indices in the csv file which are duplicated'''
    
    #read in the dataframe
    df=pd.read_csv(os.path.join(main_folder,filename))
    df.columns.str.lower()
    
    #get the dataframe where we have duplicates, keep=False will keep both duplicated rows
    df_dup=df[df.duplicated(keep=False)]
    #groupby all the columns to get groups of duplicated rows
    grouped_df=df_dup.groupby(list(df.columns))

    #getting tuples of indices for duplicated rows. adding 2 to the index to match the index in csv file
    return [tuple(v.index+2) for k,v in grouped_df]
        

In [3]:
check_duplicates()

[]

In [4]:
def check_constraints(filename='pop_test.csv',dictionary_folder='dictionary',
                      main_folder='C:/Users/shiri/OneDrive/Desktop/diagnostics'):
    
    '''it will go through all the categorical columns and check for constraints violation
    by comparing each column to a dictionary file in a specified folder
    if there is a violation it will print out the indices of the csv datafile, the column involved 
    and the values which are not found in the dictionary for that specific column'''
    
    #separating date columns from the rest of the columns, named here as categ_cols
    df=pd.read_csv(os.path.join(main_folder,filename))
    df.columns=df.columns.str.lower()

    date_cols=[]
    categ_cols=[]

    for c in df.columns:
        try:
            date_col=str(int(c))
            date_cols.append(date_col)
        except ValueError:
            categ_cols.append(c)
            
    for f in categ_cols:
        try:
            df_dictionary=pd.read_excel(os.path.join(main_folder,dictionary_folder,f+'_dictionary.xlsx'))
            print(f'checking constraints for column: {f}')
            
            #check the constraints
            allowed_vals=df_dictionary.iloc[:,0].unique()
            #get unique values for column f for the main datafile
            unique_vals=df[f].unique()
            #get the difference in sets to exclude values in the main dataset not found in the dictionary
            diff=list(set(df[f].unique()).difference(set(df_dictionary.iloc[:,0].unique())))
            #get the index of the main csv file where there is an outlier value
            if len(diff)>0:
                idx=list(df[df[f].isin(diff)].index+2)
                print(f'WARNING !!! outliers in column: {f} and indices {idx} in the csv file')
                print(f'WARNING !!! the outliers are: {diff}\n')
            else:
                print('no violation for constraints\n')
                
        except:
            print(f'could not  find {f}')

In [None]:
check_constraints()

In [33]:
def detect_outliers(filename='pop_test.csv',main_folder='C:/Users/shiri/OneDrive/Desktop/diagnostics',
        mad_multiplier=3, iqr_multiplier=3):
    
    '''this function will 
    1-extract the dataframe with dates as columns
    2-loop over rows and separate rows with less than 5 datapoints from the rest
    3-apply iqr and MAD on the rows with >=5 datapoints and incase of both detect anomaly, 
    save the row index alongside the column with the anomaly'''
    
    #separating date columns from the rest of the columns, named here as categ_cols
    df=pd.read_csv(os.path.join(main_folder,filename))
    df.columns=df.columns.str.lower()

    date_cols=[]
    categ_cols=[]

    for c in df.columns:
        try:
            #have to convert back to strings because in the original dataframe they are in strings
            date_col=str(int(c))
            date_cols.append(date_col)
        except ValueError:
            categ_cols.append(c)
            
    df_values=df[date_cols]
    
    #loop over rows to separate rows with <5 datapoints from the rest
    idx_gte5=[] #indices list for greater or equal to 5 datapoints
    idx_lt5=[]  #indices list for less or equal to 5 datapoints
    outlier_idx=[]  #outliers index
    outlier_col_idx=[]
    
    for idx,row in df_values.iterrows():
        #use item() to extract the integer from numpy.int64
        if row.count().item()>=5:
            
            # Drop NaNs from the row
            cleaned_row = row.dropna()
            idx_gte5.append(idx)
            #apply IQR on the row
            Q1=cleaned_row.quantile(0.25)
            Q3=cleaned_row.quantile(0.75)
            IQR=Q3-Q1

            lower_bound = Q1 - iqr_multiplier * IQR
            upper_bound = Q3 + iqr_multiplier * IQR
            
            #if there was at least one datapoint outside the range print the index of the row and 
            #the column index of the outlier
            bool_iqr=(cleaned_row<lower_bound) | (cleaned_row>upper_bound)
            
            #apply MAD
            median = cleaned_row.median()
            absolute_deviation = np.abs(cleaned_row - median)
            mad = absolute_deviation.mean()
            
            median = row.median()
            threshold = mad * mad_multiplier
            absolute_deviation = np.abs(cleaned_row - median)
            # Identify outliers
            bool_mad = absolute_deviation > threshold
            
            #if both vote true then it then the row considered to have an outlier
            bool=bool_iqr*bool_mad
            
            if any(bool):
                outlier_idx.append(idx)
                
                #get the column indices for the outliers
                col_idx=list(cleaned_row[bool].index)
                outlier_col_idx.append(col_idx)
            
        else:
            idx_lt5.append(idx)
    
    #write the dataframe with outliers to an excel file
    df_outlier=df.iloc[outlier_idx].copy()
    #add the column indices as a column to the dataframe
    df_outlier['column_indices']=outlier_col_idx
    df_outlier.to_excel(main_folder+'/outliers.xlsx', index=False, sheet_name='Sheet1')
    
    #write the dataframe with less than 5 datapoints to an excel file
    df_lt5=df.iloc[idx_lt5].copy()
    df_lt5.to_excel(main_folder+'/df_lt5.xlsx', index=False, sheet_name='Sheet1')
    #print out the indicator names
    unique_inds=df_lt5['indicator_name'].unique()
    print('the indicators below have less than 5 datapoints:\n')
    print(unique_inds)
    print('------------------------------------------------')
    
    df_gte5=df.iloc[idx_gte5]
    df_values_gte5=df_values.iloc[idx_gte5]
    
    #apply IQR for each row
    
    

In [34]:
detect_outliers()

the indicators below have less than 5 datapoints:

['Registered deaths (number)' 'Population size (number)'
 'Causes of death (percent)']
------------------------------------------------


In [24]:
import pandas as pd
import numpy as np

# Sample DataFrame with NaN values
data = {
    'A': [10, np.nan, 3, 4,8,7,4,5,6,60],
    'B': [10, np.nan, 3, 4,8,7,4,5,6,60],
    'C': [10, np.nan, 3, 4,8,7,4,5,6,60]
}

df = pd.DataFrame(data).T

df1=df.iloc[0,:]

cleaned_row = df1.dropna()
if len(cleaned_row) == 0:
    np.nan
median = cleaned_row.median()
absolute_deviation = np.abs(cleaned_row - median)
mad = absolute_deviation.mean()
print(mad)

median = df1.median()
threshold = mad * 3
# Drop NaNs and find absolute deviations
cleaned_row = df1.dropna()
absolute_deviation = np.abs(cleaned_row - median)
# Identify outliers
bool_mad = absolute_deviation > threshold
bool_mad


7.666666666666667


0    False
2    False
3    False
4    False
5    False
6    False
7    False
8    False
9     True
Name: A, dtype: bool

In [26]:
cleaned_row = df1.dropna()

#apply IQR on the row
Q1=cleaned_row.quantile(0.25)
Q3=cleaned_row.quantile(0.75)
IQR=Q3-Q1

lower_bound = Q1 - 3 * IQR
upper_bound = Q3 + 3 * IQR

#if there was at least one datapoint outside the range print the index of the row and 
#the column index of the outlier
bool_iqr=(cleaned_row<lower_bound) | (cleaned_row>upper_bound)
bool_iqr

0    False
2    False
3    False
4    False
5    False
6    False
7    False
8    False
9     True
Name: A, dtype: bool