# 1. Import libraries:


In [1]:
import numpy as np
import pandas as pd
import seaborn as sns
from matplotlib.pyplot import figure
from sklearn.decomposition import PCA
import plotly.figure_factory as ff
import plotly.express as px
import plotly.graph_objects as go
from sklearn import preprocessing
from sklearn.model_selection import train_test_split
from sklearn import neighbors
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score
import matplotlib.pyplot as plt
from matplotlib.colors import ListedColormap
from pylab import rcParams
from sklearn.inspection import DecisionBoundaryDisplay
from scipy.cluster.hierarchy import dendrogram
%matplotlib inline

# 2. Directory:

In [2]:
#df['bar'].value_counts()       # get distinct element and their number of occurence
#df.shape[0]                    # get dataframe row count
#df.drop(columns=['bar'])       # remove column from dataframe
#df.copy()                      # copy dataframe
#np.ravel(df)                   # flatten array
#np.unique(array)               # find the unique elements of numpy array
#df['bar'].apply(pd.to_numeric) # convert to numeric type

# 3. Basic functions:


In [3]:
# Searching for a value across the entire dataset. Output: {column: count, column: count, ...}
def get_count_by_column(dataframe, seach, print_message=True):
    unique, counts = np.unique(np.argwhere(np.isin(dataframe.to_numpy(), seach))[:, 1], return_counts=True)
    result = dict(zip(dataframe.columns[unique], counts))
    if print_message != True:
        return result
    print(f"Search {seach} value per column: {dict() if not bool(result) else ''}")
    if bool(result):
        for column, data in result.items():
            print(f"\tColumn {column}: ", data)

def get_match_mask(df, regex):
    np_df = np.ravel(df.to_numpy())
    mask = pd.Series(np_df).astype(str).str.contains(regex) #contains only works correctly with strings
    return mask.values.reshape(df.shape) #return numpy.ndarray

#returns a list of mask row positions where at least one value equals True
def get_true_mask_row_positions(mask, invert_mask=False):
    mask_series = np.any(~mask if invert_mask else mask, axis=1) #convert matrix to series; axis=1 is x coordinate
    return np.where(mask_series == True) #returns the positions of elements from series

def get_count_by_column_match(df, regex):
    mask = get_match_mask(df, regex)
    df_mask = pd.DataFrame(mask, columns=list(df.columns.values))
    count_by_column = get_count_by_column(df_mask, True, False)
    print(f"Search count per column by regular expression '{regex}' (without single quotes): {dict() if not bool(count_by_column) else ''}")
    if bool(count_by_column):
        for column, data in count_by_column.items():
            print(f"\tColumn {column}: ", data)

def get_unique_by_column_match(df, regex):
    mask = get_match_mask(df, regex)
    mdf = df.where(mask, other=np.nan) #mdf - masked dataframe
    mdf = mdf.dropna(how='all') #drop rows where all values in row are nan
    result = []
    for column in mdf:
        notna = mdf[column].dropna()
        unique = pd.unique(notna)
        if unique.size != 0:
            result.append(f"\r\n\tColumn {column}: {list(unique)}")
    print(f"Search unique value per column by regular expression '{regex}' (without single quotes): {''.join(result) if bool(result) else dict()}")

#regex parameter is for searching special characters by default
def analysis_dataframe_values_by_column(df, values = [0, np.nan, None, [np.inf, -np.inf]], regex='\W'):
    for v in values:
        get_count_by_column(df, v)
    get_count_by_column_match(df, regex)
    get_unique_by_column_match(df, regex)

def show_heatmap_corr(df, fname = None):
    corr_df = df.corr()
    sns.set(font_scale = 0.8)
    figure(figsize = (4, 3), dpi = 120)
    sns.heatmap(corr_df)
    if fname is not None:
        plt.savefig(fname)

def std_scale(df, columns):
    columns = columns if isinstance(columns, list) else [columns]
    std_scale = preprocessing.StandardScaler().fit_transform(df[columns])
    for idx, column in enumerate(columns):
        print(f'Mean after standardization: {column} = {std_scale[:,idx].mean()}')
        print(f'Standard deviation after standardization: {column} = {std_scale[:,idx].std()}\n')
    return std_scale

def minmax_scale(df, columns):
    columns = columns if isinstance(columns, list) else [columns]
    minmax_scale = preprocessing.MinMaxScaler().fit_transform(df[columns])
    for idx, column in enumerate(columns):
        print(f'Min-value after min-max scaling: {column}={minmax_scale[:,idx].min()}')
        print(f'Max-value after min-max scaling: {column}={minmax_scale[:,idx].max()}\n')
    return minmax_scale
    
def PCA_dimensionality_reduction(df, dimension, columns):
    pca = PCA(n_components = dimension)
    pca.fit(df)
    df_PCA = pca.transform(df)
    df_PCA = pd.DataFrame(df_PCA, columns=columns)
    return df_PCA

def get_KNeighborsClassifier_accuracy_score_list(X_train, y_train, X_test, y_test, k_range=range(1,100)):
    accuracy_score_list = {}
    for k in k_range:
        cls = KNeighborsClassifier(n_neighbors=k)
        cls.fit(X_train, y_train.values.ravel())
        y_pred = cls.predict(X_test)
        accuracy_score_list[k] = accuracy_score(y_test, y_pred)
    return accuracy_score_list

# 1. Sort by dictionary values (ascending)
# 2. Get all the keys of the sorted dictionary
# 3. Return the latest (best) key
# Parameters:
# accuracy_score_list type:dict
def get_best_n_neighbors(accuracy_score_list):
    return list(dict(sorted(accuracy_score_list.items(), key=lambda item: item[1])).keys())[-1]

def get_training_and_test_parts(df, target):
    X, y = df.loc[:, df.columns != target], df[target]
    return train_test_split(X, y, random_state = 0)

def get_value_counts_by_column(df_cr, column=False):
    for column in df_cr[:] if column == False else df_cr[column]:
        print('Column: ', column)
        print(df_cr[column].value_counts(), '\r\n')
        
def get_features_target_split(df, target):
    return df.loc[:, df.columns != target], df[target]

def get_column_names_except_target(df, target):
    return list(df.columns[df.columns != target])

# 4. Graphs 

## 4.1 Basic charts:


In [4]:
def show_histogram(df, column):
    fig = px.histogram(df[column], x=column)
    fig.show()

def show_histogram_for_multiple_columns(df, columns):
    fig = go.Figure()
    for column in columns:
        fig.add_trace(go.Histogram(x=df[column], name=column))
    fig.update_layout(barmode = 'overlay')
    fig.show()
    
def show_pair_plot(df, x, y, kind='scatter'):
    sns.jointplot(x=x, y=y, data=df, kind=kind);
    
def show_multiple_plot(df, columns, fname = None):
    %config InlineBackend.figure_format = 'png'
    df_pairplot = sns.pairplot(df[columns]);
    if fname is not None:
        df_pairplot.fig.savefig("fname")

## 4.2 Building a classification graph that implements the voting of k nearest neighbors (KNeighborsClassifier):


In [5]:
def classify_and_plot(X, y, n_neighbors):
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.33, random_state = 41)

    h           = .02  # step size in the mesh

    # Create color maps
    cmap_light = ListedColormap(['#FFAAAA', '#AAAAFF'])
    cmap_bold  = ListedColormap(['#FF0000', '#0000FF'])

    rcParams['figure.figsize'] = 5, 5
    for weights in ['uniform', 'distance']:
        clf = neighbors.KNeighborsClassifier(n_neighbors, weights=weights)
        clf.fit(X_train, y_train)

        # Plot the decision boundary. For that, we will assign a color to each
        # point in the mesh [x_min, x_max]x[y_min, y_max].
        x_min, x_max = X[:, 0].min() - 1, X[:, 0].max() + 1
        y_min, y_max = X[:, 1].min() - 1, X[:, 1].max() + 1
        xx, yy = np.meshgrid(np.arange(x_min, x_max, h), np.arange(y_min, y_max, h))
        Z = clf.predict(np.c_[xx.ravel(), yy.ravel()])
        Z = Z.reshape(xx.shape)
        fig = plt.figure()
        plt.pcolormesh(xx, yy, Z, cmap=cmap_light)

        plt.scatter(X[:, 0], X[:, 1], c=y, cmap=cmap_bold, edgecolor='k', s=20)   
        plt.xlim(xx.min(), xx.max())
        plt.ylim(yy.min(), yy.max())
        plt.title("0/1 outcome classification (k = %i, weights = '%s')" % (n_neighbors, weights))
        plt.show()
        fig.savefig(weights +'.png')

## 4.3 Building decision boundaries for decision trees (DecisionTreeClassifier, DecisionBoundaryDisplay):


In [6]:
def get_first_series_for_pair_train(feature_number, start=0):
    series = []
    chunk_number = feature_number
    for n in range(start, feature_number):
        for c in range(start, chunk_number):
            series.extend([n])
        chunk_number -= 1
    return series

def get_second_series_for_pair_train(feature_number, start=0):
    series = []
    chunk_number = feature_number
    shift = start
    for n in range(start, feature_number):
        shift += 1
        for c in range(start, chunk_number):
            series.extend([c + shift])
        chunk_number -= 1
    return series

#plot_colors="ryb" is a possible value
def show_decision_trees_plot(data, y, feature_number, n_classes, plot_colors, plot_column_count=3, plot_step=0.02, start=0):
    first_series = get_first_series_for_pair_train(feature_number, start)
    second_series = get_second_series_for_pair_train(feature_number, start)
    feature_pairs = [list(a) for a in zip(first_series, second_series)]
    for pairidx, pair in enumerate(feature_pairs):
        # We only take the two corresponding features
        X = data.iloc[:, pair]

        # Train
        clf = DecisionTreeClassifier().fit(X, y)

        # Plot the decision boundary
        plot_row_count = len(feature_pairs) // plot_column_count + 1
        ax = plt.subplot(plot_row_count, plot_column_count, pairidx + 1)
        plt.tight_layout(h_pad=0.5, w_pad=0.5, pad=2.5)
        DecisionBoundaryDisplay.from_estimator(
            clf,
            X,
            cmap=plt.cm.RdYlBu,
            response_method="predict",
            ax=ax,
            xlabel=data.columns.values[pair[0]],
            ylabel=data.columns.values[pair[1]],
        )

        # Plot the training points
        for i, color in zip(range(n_classes), plot_colors):
            idx = np.where(y == i)[0]
            plt.scatter(
                X.iloc[idx, 0],
                X.iloc[idx, 1],
                c=color,
                label=np.unique(y)[i],
                # cmap=plt.cm.RdYlBu,
                edgecolor="black",
                s=15,
            )

    # plt.suptitle("Decision surface of decision trees trained on pairs of features")
    plt.legend(loc="lower right", borderpad=0, handletextpad=0)
    _ = plt.axis("tight")

## 4.4 Visualizing K-Means Clustering Results on Data Reduced with PCA (KMeans, PCA)


In [7]:
# Parameters:
# reduced_data shape:(n, 2)
def show_KMeans_plot(reduced_data, n_clusters, n_init=4):
    kmeans = KMeans(init="k-means++", n_clusters=n_clusters, n_init=n_init)
    kmeans.fit(reduced_data)

    # Step size of the mesh. Decrease to increase the quality of the VQ.
    h = 0.02  # point in the mesh [x_min, x_max]x[y_min, y_max].

    # Plot the decision boundary. For that, we will assign a color to each
    x_min, x_max = reduced_data[:, 0].min() - 1, reduced_data[:, 0].max() + 1
    y_min, y_max = reduced_data[:, 1].min() - 1, reduced_data[:, 1].max() + 1
    xx, yy = np.meshgrid(np.arange(x_min, x_max, h), np.arange(y_min, y_max, h))

    # Obtain labels for each point in mesh. Use last trained model.
    Z = kmeans.predict(np.c_[xx.ravel(), yy.ravel()])

    # Put the result into a color plot
    Z = Z.reshape(xx.shape)
    plt.figure(1)
    plt.clf()
    plt.imshow(
        Z,
        interpolation="nearest",
        extent=(xx.min(), xx.max(), yy.min(), yy.max()),
        cmap=plt.cm.Paired,
        aspect="auto",
        origin="lower",
    )

    plt.plot(reduced_data[:, 0], reduced_data[:, 1], "k.", markersize=2)
    # Plot the centroids as a white X
    centroids = kmeans.cluster_centers_
    plt.scatter(
        centroids[:, 0],
        centroids[:, 1],
        marker="x",
        s=169,
        linewidths=3,
        color="w",
        zorder=10,
    )
    plt.title(
        "K-means clustering on the digits dataset (PCA-reduced X)\n"
        "Centroids are marked with white cross"
    )
    plt.xlim(x_min, x_max)
    plt.ylim(y_min, y_max)
    plt.xticks(())
    plt.yticks(())
    plt.show()

## 4.5 HDBSCAN: visualization


In [8]:
def show_HDBSCAN_plot(X):
    reduced_data = PCA(n_components=2).fit_transform(X)

    clusterer = hdbscan.HDBSCAN(min_cluster_size=15).fit(reduced_data)
    color_palette = sns.color_palette('deep', 8)
    cluster_colors = [color_palette[x] if x >= 0
                      else (0.5, 0.5, 0.5)
                      for x in clusterer.labels_]
    cluster_member_colors = [sns.desaturate(x, p) for x, p in
                             zip(cluster_colors, clusterer.probabilities_)]
    plt.scatter(*reduced_data.T, s=50, linewidth=0, c=cluster_member_colors, alpha=0.25)

## 4.6 Construction of a hierarchical cluster dendrogram


In [9]:
def plot_dendrogram(model, **kwargs):
    # Create linkage matrix and then plot the dendrogram

    # create the counts of samples under each node
    counts = np.zeros(model.children_.shape[0])
    n_samples = len(model.labels_)
    for i, merge in enumerate(model.children_):
        current_count = 0
        for child_idx in merge:
            if child_idx < n_samples:
                current_count += 1  # leaf node
            else:
                current_count += counts[child_idx - n_samples]
        counts[i] = current_count

    linkage_matrix = np.column_stack(
        [model.children_, model.distances_, counts]
    ).astype(float)

    # Plot the corresponding dendrogram
    dendrogram(linkage_matrix, **kwargs)

## 4.7 Building a graph for selecting the optimal number of clusters (kmeans, kmeans.inertia_, silhouette_score)


In [10]:
# Parameters:
# metric possible values:silhouette_score, squared_distances_sum
def kmeans_metric_plot(X, clusters_range, metric):
    result = []
    for c in clusters_range :
        kmeans = KMeans(n_clusters=c)
        kmeans.fit(X)
        if metric == 'squared_distances_sum':
            result.append(kmeans.inertia_)
        elif metric == 'silhouette_score':
            cluster_labels = kmeans.labels_
            result.append(silhouette_score(X, cluster_labels))
    plt.plot(clusters_range, result, 'bx-')
    plt.xlabel('Values of K') 
    if metric == 'squared_distances_sum':
        plt.ylabel('Sum of squared distances/Inertia') 
        plt.title('Elbow Method For Optimal K')
    elif metric == 'silhouette_score':
        plt.ylabel('Silhouette score') 
        plt.title('Silhouette analysis For Optimal k')
    plt.show()

## 4.8 Building a graph of information loss with a decrease in dimension:


In [11]:
def show_explained_variance_ratio_plot(PCA):
    plt.bar(range(len(PCA.explained_variance_ratio_)), PCA.explained_variance_ratio_, alpha=0.5, align='center', label='Individual explained variance')
    plt.step(range(len(PCA.explained_variance_ratio_.cumsum())), PCA.explained_variance_ratio_.cumsum(), where='mid',label='Cumulative explained variance')
    plt.ylabel('Explained variance ratio')
    plt.xlabel('Principal component index')
    plt.legend(loc='best')
    plt.tight_layout()
    plt.show()
    print('PCA.explained_variance_               ', PCA.explained_variance_)
    print('PCA.explained_variance_ratio_         ', PCA.explained_variance_ratio_)
    print('PCA.explained_variance_ratio_.cumsum()', PCA.explained_variance_ratio_.cumsum())