# Basic Exploratory Data Analysis

In [None]:
# Import necessary libraries
import os
import numbers
from copy import deepcopy
from functools import partial
from typing import Callable, Union, Optional, Any

import pandas as pd
from tabulate import tabulate
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
from scipy import stats

%pip install ipyfilechooser 
from ipyfilechooser import FileChooser
from ipywidgets import interact, widgets
from IPython.display import display

# Set plotting styles
sns.set(style="whitegrid")

## Graphing Functions

In [2]:
def display_basic_statistics(df: pd.DataFrame):
    """
    Display basic statistics of the dataset: first 5 rows, summary statistics, dataset info, and missing values.
    :param df: The dataset to display statistics for
    :return: 
    """
    print("First 5 rows of the dataset:")
    print(df.head())

    print("\nSummary statistics for numeric columns:")
    print(df.describe())

    print("\nDataset info:")
    print(df.info())

    print("\nMissing values in each column:")
    print(df.isnull().sum())


def plot_distributions(df: pd.DataFrame):
    """
    Plot the distribution of each numeric and categorical columns in the dataset.
    
    :param df: The dataset to plot distributions for
    :return: 
    """
    df = df.select_dtypes(include=[np.number, "category"])
    num_rows = int(np.ceil(len(df.columns) ** 0.5))
    num_cols = int(np.ceil(len(df.columns) / num_rows))
    fig, axes = plt.subplots(num_rows, num_cols, figsize=(10 * num_cols, 6 * num_rows))
    for ax, column in zip(axes.flat, df.columns):
        if isinstance(df[column].dtype, pd.CategoricalDtype) or df[column].dtype == object:
            sns.histplot(df[column], kde=False, bins=30, ax=ax)
        else:
            sns.histplot(df[column], kde=True, bins=30, ax=ax)
        ax.set_title(f"Distribution of {column}")
    plt.tight_layout()
    plt.show()


def plot_scatter_matrix(df: pd.DataFrame):
    """
    Plot the scatter matrix of the numeric columns in the dataset.
    
    :param df: The dataset to plot the scatter matrix for
    :return: 
    """
    num_cols = df.select_dtypes(include=[np.number]).columns
    pd.plotting.scatter_matrix(df[num_cols], figsize=(15, 10), diagonal="kde")
    plt.suptitle("Scatter Matrix for Numeric Features")
    plt.tight_layout()
    plt.show()


def plot_boxplots(df: pd.DataFrame):
    """
    Plot box plots for each numeric column in the dataset.
    
    :param df: The dataset to plot box plots for 
    :return: 
    """
    df = df.select_dtypes(include=[np.number])
    plt.figure(figsize=(10, 6))
    sns.boxplot(data=df)
    plt.title(f"Boxplot")
    plt.tight_layout()
    plt.show()


def plot_count_plots(df):
    """
    Plot count plots for each categorical column in the dataset.
    
    :param df: The dataset to plot count plots for 
    :return: 
    """
    for column in df.columns:
        if isinstance(df[column].dtype, pd.CategoricalDtype) or df[column].dtype == object:
            plt.figure(figsize=(10, 6))
            sns.countplot(y=df[column], order=df[column].value_counts().index)
            plt.title(f"Count plot of {column}")
            plt.tight_layout()
            plt.show()


def detect_outliers(df, threshold=3):
    """
    Detect outliers in the dataset using a Z-test
    
    :param df: The dataset to detect outliers in
    :param threshold: The threshold for the Z-test
    :return: 
    """
    outliers = {}
    for column in df.columns:
        if pd.api.types.is_numeric_dtype(df[column]):
            z_scores = np.abs(stats.zscore(df[column].dropna()))
            outliers[column] = np.where(z_scores > threshold)[0]
    return outliers

In [4]:
def exploratory_data_analysis(data):
    display_basic_statistics(data)
    plot_distributions(data)

    print("Pair plot for numeric features:")
    sns.pairplot(data.select_dtypes(include=[np.number]))
    plt.title("Pair Plot for Numeric Features")
    plt.tight_layout()
    plt.show()

    # Correlation matrix
    print("Correlation matrix:")
    corr_matrix = data.select_dtypes(include=[np.number]).corr()
    print(corr_matrix)

    print("Plotting correlation heatmap:")
    plt.figure(figsize=(12, 8))
    sns.heatmap(corr_matrix, annot=True, cmap="coolwarm", linewidths=0.5)
    plt.title("Correlation Matrix Heatmap")
    plt.tight_layout()
    plt.show()

    print("Plotting scatter matrix for numeric features:")
    plot_scatter_matrix(data)

    print("Plotting boxplots for numeric features:")
    plot_boxplots(data)

    print("Plotting count plots for categorical features:")
    plot_count_plots(data)

    print("Detecting outliers using Z-score method:")
    outliers = detect_outliers(data)
    print(outliers)

    # Summary
    print("\nSummary of Exploratory Data Analysis:")
    print(f"Dataset contains {data.shape[0]} rows and {data.shape[1]} columns.")
    print(f"Numeric columns: {data.select_dtypes(include=[np.number]).columns.tolist()}")
    print(f"Categorical columns: {data.select_dtypes(include=['object', 'category']).columns.tolist()}")

# Data Augmentation

## Data Loading

In [7]:
data_frame = None
def interactive_data_load(button, file_selection: str = None):
    global data_frame
    
    with output:
        output.clear_output()
        if file_selection is None:
            print("No file was chosen!")
            return

        try:
            data_frame = pd.read_csv(file_selection)
            display(data_frame.head(5))
        except FileNotFoundError:
            print(f"Cannot find the requested file:\n\t{file_selection}")

file_chooser = FileChooser(
                title="<h2>Select file to load data</h2>",
                sandbox_path=os.getcwd(),
                filter_pattern="*.csv",
)
submit_button = widgets.Button(
    description="Submit",
    icon="Check"
)
output = widgets.Output()
display_box = widgets.VBox([
    file_chooser,
    submit_button,
    output
])
display(display_box)

submit_button.on_click(lambda button: interactive_data_load(button, file_chooser.selected))

VBox(children=(FileChooser(path='C:\Users\yanna\PycharmProjects\EDA', filename='', title='<h2>Select file to l…

In [26]:
children_names = list(data_frame.columns)

output = widgets.Output()

@output.capture(clear_output=True)
def update_information(column: str):
    global data_frame
    index = tab.selected_index
    column = tab.titles[index]
    
    box = tab.children[index]
    text, dropdown, missing_values, *_ = box.children

    new_name = text.value
    new_type = dropdown.value
    missing_value_options = [option.strip() for option in missing_values.value.split(",")]

    if new_name != "":
        tab.set_title(index, new_name)
        text.placeholder = new_name
        text.value = new_name

        data_frame = data_frame.rename(columns={column: new_name})
        column = new_name
    if len(missing_value_options) > 0:
        data_frame[column] = data_frame[column].replace(missing_value_options, None)

    if new_type == "category":
        data_frame[column] = data_frame[column].astype(str)
    data_frame[column] = data_frame[column].astype(new_type, errors="ignore")

    display(data_frame.head(5))

texts = {}
missing_values = {}
dropdowns = {}
buttons = {}
for name in children_names:
    buttons[name] = widgets.Button(
        description="Update",
        icon="Check"
    )
    buttons[name].on_click(lambda b: update_information(name))

    dropdowns[name] = widgets.Dropdown(
        options=["bool", "int", "float", "str", "category", "object"],
        description="Choose type: ",
        value=data_frame[name].dtype,

        layout=widgets.Layout(width="auto", height="auto"),
        style={"description_width": "auto"},
    )

    texts[name] = widgets.Text(
        description="New name: ",
        placeholder=name,
        value=name,

        layout=widgets.Layout(width="auto", height="auto"),
        style={"description_width": "auto"},
    )

    missing_values[name] = widgets.Text(
        description="Missing values (separate by comma ','): ",
        placeholder="NaN, -1, -",
        value="",

        layout=widgets.Layout(width="auto", height="auto"),
        style={"description_width": "auto"},
    )
    

tab = widgets.Tab()
tab.children = [
    widgets.VBox([
        texts[name],
        dropdowns[name],
        missing_values[name],

        buttons[name],
    ])
    for name in children_names
]
for index, name in enumerate(children_names):
    tab.set_title(index, name)

display(tab, output)

Tab(children=(VBox(children=(Text(value='Age', description='New name: ', layout=Layout(height='auto', width='a…

Output()

## Sample Augmentation

In [40]:
def add_samples(data_frame: pd.DataFrame, num_samples: int, augmentation_factor: int,
                noise_levels: dict[str, Union[float, np.ndarray]], category_options: dict[str, list]):
    """
    Augment new samples

    :param num_samples: The number of samples to base the augmentation on (negative for all)
    :param augmentation_factor: The factor by which to augment the data
    :param noise_levels: The level of noise for the augmentation for each feature
    :return: The data with the added samples. Output size = num_samples * (augmentation_factor - 1) + len(data_frame)
    """
    global matrix_output
    
    if num_samples < 0:
        num_samples = len(data_frame)

    # Select samples to augment
    base_sample_indices = np.random.choice(len(data_frame), num_samples, replace=False)
    base_sample_indices = np.tile(base_sample_indices, augmentation_factor)
    base_samples = data_frame.iloc[base_sample_indices]

    # Augment samples
    augmented_samples = pd.DataFrame()
    for column in data_frame.columns:
        columns_type = data_frame[column].dtype
        if column in noise_levels:
            if isinstance(noise_levels[column], numbers.Number):
                # Add Gaussian noise (for continuous features)
                noise = np.random.normal(0, noise_levels[column], (num_samples * augmentation_factor,))
                augmented_samples[column] = noise + base_samples[column].values
            elif isinstance(noise_levels[column], np.ndarray):
                # Random flips according to the flipping matrix (for categorical/binary features)
                options = category_options[column]
                weights = noise_levels[column] / noise_levels[column].sum(axis=1, keepdims=True)
                noisy_values = base_samples[column].apply(lambda x: np.random.choice(options, p=weights[options.index(x)]))
                augmented_samples[column] = noisy_values.values
            augmented_samples[column] = augmented_samples[column].astype(columns_type)
        else:
            augmented_samples[column] = base_samples[column].values

    # Remove original samples
    data_frame = data_frame.drop(base_sample_indices)

    # Add augmented samples
    data_frame = pd.concat([data_frame, augmented_samples])

    return data_frame

In [49]:
output = widgets.Output()

@output.capture(clear_output=True)
def interactive_sample_augmentation():
    global data_frame, features
    
    source_samples = source_samples_count.value
    requested_output_size = output_count.value
    
    augmentation_factor = int(np.ceil((requested_output_size - len(data_frame)) / source_samples) + 1)

    noise_levels = deepcopy(matrix_data)
    for column in data_frame.select_dtypes(exclude=["category", "bool"]):
        noise_levels[column] = 0.1 * data_frame[column].std()

    data_frame = add_samples(data_frame, source_samples, augmentation_factor, noise_levels, features)[:requested_output_size]
    
    source_samples_count.max = len(data_frame)
    source_samples_count.value = len(data_frame)
    output_count.value = len(data_frame)

    display(data_frame.head(10))

source_samples_count = widgets.IntSlider(
    value=len(data_frame),
    min=1,
    max=len(data_frame),
    step=1,
    description="Number of examples to use:",
    orientation="horizontal",
    readout=True,
    readout_format="d",

    layout=widgets.Layout(width="auto", height="auto"),
    style={"description_width": "auto"},
)

output_count = widgets.IntText(
    value=len(data_frame),
    description="Requested number for total samples:",

    layout=widgets.Layout(width="auto", height="auto"),
    style={"description_width": "auto"},
)

submit_button = widgets.Button(
    description="Augment Samples!",
    icon="Check",
    layout=widgets.Layout(width="auto", margin="0 auto 0 auto")
)
submit_button.on_click(lambda b: interactive_sample_augmentation())

###################################################################################################

features = {}
for category in data_frame.select_dtypes(include=["category"]):
    features[category] = list(data_frame[category].unique().categories)
for category in data_frame.select_dtypes(include=["bool"]):
    features[category] = [True, False]

# Initialize matrix sizes and data for each feature based on its categories
matrix_sizes = {feature: len(categories) for feature, categories in features.items()}
matrix_data = {feature: np.eye(len(categories), dtype=float) for feature, categories in features.items()}

# Function to create a matrix widget for a given feature
def create_matrix_widget(size, categories, data):
    rows = []
    # Create a row with editable category names and matrix data inputs
    for i in range(size):
        category_input = widgets.Text(value=str(categories[i]) if i < len(categories) else "", placeholder="Category Name")
        data_inputs = [widgets.FloatText(value=data[i, j] if i < len(categories) and j < len(categories) else 0,
                                       layout=widgets.Layout(width="70px")) for j in range(size)]
        row = [category_input] + data_inputs
        rows.append(widgets.HBox(row))
    return widgets.VBox(rows)

# Matrix widgets stored by feature
matrices = {}

# Output widget to display matrices
matrix_output = widgets.Output()

# Dropdown for selecting features
feature_dropdown = widgets.Dropdown(
    options=list(features.keys()),
    description="Feature:",
    layout={"width": "max-content"}
)

# Integer text box for adjusting the size of the matrix
size_text_box = widgets.IntText(
    value=min(matrix_sizes.values()),  # Start with the smallest matrix size
    description="Matrix Size:",
    layout={"width": "max-content"}
)

# Function to update matrix display based on feature and size
def update_matrix_display(feature, size):
    categories = features[feature][:size]  # Get up to 'size' categories
    data = matrix_data[feature][:size, :size]  # Get current data
    matrix_widget = create_matrix_widget(size, categories, data)
    matrices[feature] = matrix_widget  # Update stored widget
    with matrix_output:
        matrix_output.clear_output()
        display(matrix_widget)

# Button to save the updates
submit_matrix_button = widgets.Button(description="Submit Changes")

# Handle submit button click
def on_submit_matrix_button_clicked(b):
    global data_frame
    
    feature = feature_dropdown.value
    size = size_text_box.value
    new_categories = []
    new_data = np.zeros((size, size), dtype=float)
    for i, row in enumerate(matrices[feature].children):
        new_categories.append(row.children[0].value)  # Update category names
        for j in range(1, len(row.children)):
            new_data[i, j-1] = row.children[j].value  # Update data
    
    for old_category, new_category in zip(features[feature], new_categories):
        data_frame[feature] = data_frame[feature].replace(old_category, new_category)
    features[feature] = new_categories  # Save new categories
    
    matrix_data[feature] = new_data  # Save new data

submit_matrix_button.on_click(on_submit_matrix_button_clicked)

# Handle feature and size changes
def on_feature_change(change):
    feature = change["new"]
    size = matrix_sizes[feature]
    size_text_box.value = size  # Update text box to current feature's matrix size
    update_matrix_display(feature, size)

def on_size_change(change):
    size = change["new"]
    feature = feature_dropdown.value
    matrix_sizes[feature] = size  # Update size in storage
    update_matrix_display(feature, size)

feature_dropdown.observe(on_feature_change, names="value")
size_text_box.observe(on_size_change, names="value")

###################################################################################################

box = widgets.VBox([source_samples_count, output_count,
                    widgets.VBox([feature_dropdown, size_text_box, matrix_output, submit_matrix_button]),
                    submit_button, output])
on_feature_change({"new": feature_dropdown.value})  # Initialize display with the first feature

display(box)

VBox(children=(IntSlider(value=100, description='Number of examples to use:', layout=Layout(height='auto', wid…

## Feature Augmentation

In [50]:
def apply_transformation(data: pd.DataFrame, column: str, transformation: Callable):
    """
    Apply a restriction to the data

    :param data: The data to apply the restriction to
    :param column: The column to apply the restriction to
    :param transformation: The restriction function to apply (e.g. lambda x: x if x > 0 else 0)
    :return: The data with the restriction applied
    """

    data[column] = data[column].apply(transformation)
    return data


def add_feature(data: pd.DataFrame, base_features: list[str], new_feature_name: str, feature_function: Callable):
    """
    Add a new feature to the data

    :param data: The data to add the feature to
    :param base_features: The features to base the new feature on
    :param new_feature_name: The name of the new feature
    :param feature_function: The function to calculate the new feature
    :return: The data with the new feature added
    """

    data[new_feature_name] = data[base_features].apply(feature_function, axis=1)
    return data


def conditional_sample(distribution: Callable, condition: Callable, *args, **kwargs):
    """
    Draws a sample from a distribution with a given arguments, while satisfying a condition

    :param distribution: The distribution to sample from
    :param condition: The condition to satisfy
    :return:
    """
    sample = distribution(*args, **kwargs)
    while not condition(sample):
        sample = distribution(*args, **kwargs)

    return sample


def conditional_range_sample(distribution: Callable, min_value: float, max_value: float, *args, **kwargs):
    """
    Draws a sample from a distribution with a given arguments, while satisfying a condition

    :param distribution: The distribution to sample from
    :param min_value: The minimum value to satisfy
    :param max_value: The maximum value to satisfy
    :return:
    """
    return conditional_sample(distribution, lambda x: min_value <= x < max_value, *args, **kwargs)

## Restrictions and Missing Data 