In [2]:
import numpy as np
import pandas as pd
import random
from typing import List, Dict

In [3]:
%run Algorithms.ipynb

In [6]:
datasets = {
    'education_2_df': 'Datasets/predict-dropout-or-academic-success.csv'
}

In [5]:
def inject_z_score(df: pd.DataFrame, col_name: str, cell_value: float, zscores: List[float] = [-3, -2, 2, 3]) -> float:
    # Dictionary to store values for Z-scores
    zscore_values = {}

    
    # Calculate mean and std for each column, then calculate values for the given Z-scores
    mean = df[col_name].mean()
    std = df[col_name].std()
    
    # Calculate values for each Z-score in the list
    values_for_z = {}
    for z in zscores:
        if z == -2:
            diff_zneg = abs(cell_value - (z * std + mean))
        elif z == 2:
            diff_z = abs(cell_value - (z * std + mean))
        value_at_z = z * std + mean
        values_for_z[z] = value_at_z
    
    zscore_values[col_name] = values_for_z


    if diff_zneg >= diff_z:
        x = 2
    elif diff_z > diff_zneg:
        x = -3

    new_cell_value = random.uniform(values_for_z[x], values_for_z[x+1])
    return new_cell_value


In [19]:
def inject_mz_score(df: pd.DataFrame, col_name: str, cell_value: float, mzscores: List[float] = [-3, -2, 2, 3]) -> float:
    # Dictionary to store values for Modified Z-scores
    modified_mzscore_values = {}

    # Calculate the median and MAD for the column
    median = df[col_name].median()
    mad = np.median(np.abs(df[col_name] - median))

    # Avoid division by zero for MAD
    if mad == 0:
        mad = 1e-9  # Use a small value to avoid division by zero

    # Calculate values for the given Modified Z-scores directly using MAD
    values_for_mz = {}
    mz_score = 0.6745 * (df[col_name] - median) / mad
    
    for mz in mzscores:
        values_for_mz[mz] = mz * (mad / 0.6745) + median

        if mz == -2:
            diff_mzneg = abs(cell_value - (mz * (mad / 0.6745) + median))
        elif mz == 2:
            diff_mz = abs(cell_value - (mz * (mad / 0.6745) + median))

    modified_mzscore_values[col_name] = values_for_mz

    # Determine closer Z-score range
    if diff_mzneg >= diff_mz:
        x = 2
    else:
        x = -3

    # Inject a random value between the selected Z-score range
    new_cell_value = random.uniform(values_for_mz[x], values_for_mz[x + 1])
    return new_cell_value



In [7]:
def inject_iqr(df: pd.DataFrame, col_name: str, cell_value: float) -> float:

    
    Q1 = df[col_name].quantile(0.25)
    Q3 = df[col_name].quantile(0.75)
    
    # Calculate IQR
    IQR = Q3 - Q1
    
    # Calculate lower and upper bounds
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR

    diff_upper = abs(cell_value - upper_bound)
    diff_lower = abs(cell_value - lower_bound)

    skew = (Q1 - lower_bound) + (upper_bound - Q3)

    anomaly_boundary = (skew / 2) * 0.4
    
    if diff_lower >= diff_upper:
        y = upper_bound + anomaly_boundary
        x = upper_bound
    elif diff_upper > diff_lower:
        x = lower_bound - anomaly_boundary
        y = lower_bound

    new_cell_value = random.uniform(x, y)
    return new_cell_value


In [8]:
def inject_random(df: pd.DataFrame, col_name: str, cell_value: float, zscores: List[int] = [-2, 0, 2]) -> float:
    zscore_values = {}

    
    # Calculate mean and std for each column, then calculate values for the given Z-scores
    mean = df[col_name].mean()
    std = df[col_name].std()
    
    # Calculate values for each Z-score in the list
    values_for_z = {}
    for z in zscores:
        value_at_z = z * std + mean
        values_for_z[z] = value_at_z
        if z == -2:
            diff_zneg = abs(cell_value - (z * std + mean))
        elif z == 2:
            diff_z = abs(cell_value - (z * std + mean))

    if diff_zneg <= diff_z:
        y = 2
        x = 0
    elif diff_z < diff_zneg:
        x = -2
        y = 0


    new_cell_value = random.uniform(values_for_z[x], values_for_z[y])
    return new_cell_value


In [9]:
def type_point(df: pd.DataFrame, column: str, cell: float) -> float:
    choice = random.randint(0, 2)
    if choice == 0:
        return inject_z_score(df, column, cell)
    if choice == 1:
        return inject_mz_score(df, column,  cell)
    if choice == 2:
        return inject_iqr(df, column, cell)

In [10]:
def type_contextual(df: pd.DataFrame, column: str, cell: float) -> float:
    return inject_random(df, column, cell)

In [11]:
def round_to_significant_all_columns(df: pd.DataFrame) -> Dict[str, int]:
    # Create a copy of the DataFrame to avoid modifying the original
    df_rounded = df.copy()
    
    # Dictionary to hold the column name as key and number of decimal places as value
    rounding_info = {}
    
    for column in df_rounded.select_dtypes(include=[np.number]).columns:
        # Determine the number of decimal places for the column
        decimal_places = df_rounded[column].apply(
            lambda x: len(str(x).split('.')[-1]) if '.' in str(x) else 0
        ).max()  # Find the maximum number of decimal places
        
        # Add this information to the dictionary
        rounding_info[column] = decimal_places
        
        # Round all values in the column to the identified precision
        df_rounded[column] = df_rounded[column].round(decimal_places)
    
    # Return the dictionary containing column names and their respective decimal places
    return rounding_info

In [59]:
def inject_anomalies(df: pd.DataFrame, total_rows: float, point_contam: float, contextual_contam: float, output_file: str) -> None:
    all_anomalies_list = []
    rounding_info = round_to_significant_all_columns(df)
    index_cell = 0
    index_anomaly = 0
    used_cols = [col for col in df.columns if df[col].nunique() > 50]
    for col in used_cols:
        df[col] = df[col].astype(float)

    total_row_anomalies = round(df.shape[0] * total_rows)  # Number of rows to have anomalies
    selected_rows = set(random.sample(range(df.shape[0]), total_row_anomalies))  # Randomly select rows
    
    total_cells = len(used_cols) * len(selected_rows)
    total_anomalies = round(total_cells * point_contam) - 1  # Avoid out-of-range index
    anomaly_index_set = set()
    
    while len(anomaly_index_set) < total_anomalies:
        rand_row = random.choice(list(selected_rows))  # Pick a random row from the selected ones
        rand_col = random.choice(used_cols)  # Pick a random column
        anomaly_index_set.add((rand_row, rand_col))
    
    for row_index, col in anomaly_index_set:
        df.at[row_index, col] = type_point(df, col, df.loc[row_index, col])
        all_anomalies_list.append((row_index, col))
    
    index_cell = 0
    index_anomaly = 0
    total_anomalies = round(total_cells * contextual_contam) - 1  # Avoid out-of-range index
    anomaly_index_set_2 = set()
    
    while len(anomaly_index_set_2) < total_anomalies:
        rand_row = random.choice(list(selected_rows))  # Pick a random row from the selected ones
        rand_col = random.choice(used_cols)  # Pick a random column
        if (rand_row, rand_col) in anomaly_index_set:
            continue
        else:
            anomaly_index_set_2.add((rand_row, rand_col))
    
    for row_index, col in anomaly_index_set_2:
        df.at[row_index, col] = type_contextual(df, col, df.loc[row_index, col])
        all_anomalies_list.append((row_index, col))
    
    for col, decimal_places in rounding_info.items():
        df[col] = df[col].round(decimal_places)
    
    df.to_csv(output_file, index=False)
    return all_anomalies_list


In [58]:
def run_inject(rows: float, point: float, contextual: float) -> None:
    total_list = []
    for df_name, file_path in datasets.items():
        df = pd.read_csv(file_path)
        df = preprocessing(df, file_path)
        output_file = file_path.replace('.csv', '_anomaly.csv')
        x = inject_anomalies(df,total_rows=rows, point_contam=point, contextual_contam=contextual, output_file=output_file)
        total_list.append(x)
    return total_list