In [1]:
from sklearn.cluster import KMeans
from scipy.stats import multivariate_normal
import matplotlib.pyplot as plt
from matplotlib import colors as mcolors
from datetime import datetime as dt
import pandas as pd
import numpy as np
import random

In [2]:
def cc(arg):
    return mcolors.to_rgba(arg, alpha=0.6)

def all_colors():
    return ["tab:blue", "tab:orange", "tab:green", "tab:red", "tab:purple", "tab:brown", "tab:pink", "tab:gray", "tab:olive", "tab:cyan",
            "b", "g", "r", "c", "m", "y", "k",
            "limegreen", "cornflowerblue", "mediumblue", "darkorange", "maroon", "deepskyblue", "darkmagenta"]

def random_color():
    pallette = all_colors()
    return cc(random.choice(pallette))

def get_color(index):
    pallette = all_colors()
    return cc(pallette[index])

def all_lines():
    return ["-", "--", "-.", "."]

def all_markers():
    return [".", ",", "o", "v", "^", "1", "8", "*", "H", "d"]

def random_marker():
    return random.choice(all_markers())

In [3]:
def custom_scatter_2D(matrix, labels, cluster_centers, n_clusters, digit, coeffs=[0, 1]):
    """Scatter plot of 2 dimensions of kmeans results"""
    fig = plt.figure()
    ax = fig.add_subplot()

    clustered_matrix = parse_labels(matrix=matrix, labels=labels, n_clusters=n_clusters)
    # for cluster in clustered_matrix:
    for index in range(len(clustered_matrix)):
        cluster = clustered_matrix[index]
        xs = cluster[:, coeffs[0]]
        ys = cluster[:, coeffs[1]]
        ax.scatter(x=xs, y=ys, s=0.5, color=get_color(index=index), marker=random_marker())
    
    ax.set_xlabel(f"MFCC {coeffs[0]}")
    ax.set_ylabel(f"MFCC {coeffs[1]}")
    ax.set_title(f"K-Means Result for Digit {digit} with {n_clusters} Clusters")

    plt.show()

In [4]:
def create_filter_arr(labels, cluster):
    """Create boolean flagged filter array to apply to filter np array"""
    filter_arr = []
    for label in labels:
        if label == cluster:
            filter_arr.append(True)
        else:
            filter_arr.append(False)
    return filter_arr

def parse_labels(matrix, labels, n_clusters):
    """Filter matrix and labels into clustered matrices for scatter plotting"""
    clustered_matrix = []

    for cluster in range(n_clusters):
        filter_arr = create_filter_arr(labels=labels, cluster=cluster)
        sub_matrix = matrix[filter_arr]
        clustered_matrix.append(sub_matrix)

    return clustered_matrix

def analyze_cluster(matrix, labels, cluster_centers, n_clusters):
    """Compute covariance and pi value for gmm vars of clusters"""
    covariance_matrix = []
    pi_matrix = []

    for cluster in range(n_clusters):
        filter_arr = create_filter_arr(labels=labels, cluster=cluster)
        sub_matrix = matrix[filter_arr]

        pi = len(sub_matrix) / len(matrix)
        covariance = np.cov(np.transpose(sub_matrix))

        pi_matrix.append(pi)
        covariance_matrix.append(covariance)
        
    return (covariance_matrix, pi_matrix)

In [5]:
class GaussParams:
    """ Gaussian Mixture Model object to encapsulate params """
    def __init__(self, u, pi, cov):
        self.u = u
        self.pi = pi
        self.cov = cov

    def __str__(self):
        return f"u: {self.u}\npi: {self.pi}\ncov: {self.cov}"

In [6]:
# Define all relative file paths to actually get files
DATA_PATH = "../../data/02_intermediate/"
TRAIN_PATH = f"{DATA_PATH}train_digits/train_0"
TEST_PATH = f"{DATA_PATH}test_digits/test_0"
EXT = ".txt"

In [7]:
# Modeling parameters
DIGITS = 10
USE_COEFFS = 7
MODEL_COEFFS = range(USE_COEFFS)
PLOT_COEFFS = [0, 1]
CLUSTERS = 4

In [8]:
# Generate gaussian models and results
gauss_results = []
for digit in range(DIGITS):
    # Read in train file and parse as dataframe
    filename = f"{TRAIN_PATH}{digit}{EXT}"
    df = pd.read_csv(filename, skip_blank_lines=True, delimiter=' ', header=None)
    df.dropna(axis=0, inplace=True)

    # Filter dataframe down to only model coefficient columns
    df_filter = df.iloc[:, MODEL_COEFFS]
    matrix = df_filter.values
    n_clusters = CLUSTERS

    # Apply kmeans on the matrix of values
    kmeans = KMeans(n_clusters=n_clusters, random_state=0)
    kmeans.fit(matrix)
    labels = kmeans.labels_
    cluster_centers = kmeans.cluster_centers_

    # Record the GMM results (u, pi, and cov)
    cluster_covariance, cluster_pi = analyze_cluster(matrix=matrix, labels=labels, cluster_centers=cluster_centers, n_clusters=n_clusters)    
    gauss = GaussParams(u=cluster_centers, pi=cluster_pi, cov=cluster_covariance)
    gauss_results.append(gauss)

    # Visualize the kmeans plot as scatter in 2D
    # custom_scatter_2D(matrix=matrix, labels=labels, cluster_centers=cluster_centers, n_clusters=n_clusters, digit=digit, coeffs=PLOT_COEFFS)

In [9]:
# Printing probability of ending up in each mixture
pi_vals = []
for index in range(len(gauss_results)):
    result = gauss_results[index]
    pi_vals.append(result.pi)

print(f"PI VALUES FOR GAUSS RESULTS (cluster result x digit)")
pd.DataFrame(pi_vals)


PI VALUES FOR GAUSS RESULTS (cluster result x digit)


Unnamed: 0,0,1,2,3
0,0.285831,0.220068,0.1939,0.300202
1,0.188372,0.289776,0.288451,0.233401
2,0.397194,0.138316,0.200959,0.263531
3,0.277424,0.263342,0.164393,0.29484
4,0.269458,0.210712,0.229145,0.290685
5,0.303267,0.22157,0.264128,0.211035
6,0.245644,0.249715,0.311885,0.192755
7,0.087546,0.28619,0.368039,0.258225
8,0.186648,0.327874,0.168503,0.316975
9,0.304529,0.154202,0.416733,0.124536


In [10]:
def get_all_dataframes(digit, write_path, read_path, stopwatch):
    """
    Get all of the dataframes for a single digit 
    Use single_person data folder as intermediary for pandas read csv ease of use
    """
    start_time = dt.now()
    read_filename = f"{read_path}{digit}.txt"
    write_filename = f"{write_path}{digit}.txt"

    f = open(write_filename, "w")
    line_count = 0

    df_all = []

    # Open file and build out data
    with open(read_filename, "r") as file:
        for line in file:
            if len(line.strip()) != 0:
                f.write(line)
                line_count += 1
            elif line_count > 0:
                # Close file descriptor, read in written data, update dataframes
                f.close()
                df = pd.read_csv(write_filename, skip_blank_lines=True, delimiter=' ', header=None)
                df_all.append(df)

                # Reset line count and file descriptor for new dataframe parse
                line_count = 0
                f = open(write_filename, "w")

    # Likely have one more (no missing line on final line)
    if line_count > 0:
        f.close()
        df = pd.read_csv(write_filename, skip_blank_lines=True, delimiter=' ', header=None)
        df_all.append(df)

    end_time = dt.now()
    total_time = (end_time - start_time).total_seconds()

    if (stopwatch):
        print(f"Parsed {len(df_all)} frames in {total_time} sec")

    return df_all
     
def print_summary(digit, total_time, correct, utterances):
    """Output summary from classification to console"""
    accuracy = correct / utterances * 100
    accuracy = round(accuracy, 3)
    dt_format = "%H:%M:%S"
    cur_time = dt.strftime(dt.now(), dt_format) 
    print(f"#{digit}\taccuracy: {accuracy}%\tcorrect: {correct}\tutterances: {utterances}\ttotal_time: {round(total_time, 3)} sec\tcur_time: {cur_time}")

In [11]:
def classify_dataframe(df, gauss_results, debug):
    """classify a dataframe based on gaussian results"""
    # Perform classification on some test data
    posterior_all = []

    for d in range(DIGITS):
        """Iterate over all digits (possible classifications"""
        posterior_digit = 0

        for n, row in df.iterrows():
            """Iterate over all n frames of the sample"""
            frames_n = row.to_numpy()[MODEL_COEFFS]

            sum_m = 0
            result_m = gauss_results[d]
            """Iterate over all results from gmm parameters"""
            cov, pi, u = result_m.cov, result_m.pi, result_m.u   

            for m in range(len(u)):
                """Iterate over all m dimensions of mixture model"""
                u_m = u[m]
                cov_m = cov[m]
                pi_m = pi[m]

                y = multivariate_normal.pdf(x=frames_n, mean=u_m, cov=cov_m)
                posterior_i = y * pi_m
                sum_m += posterior_i

            # end sum over all gauss components for digit
            posterior_digit += np.log(sum_m)
            
            # circuit break on underflow, no longer needed with logpdf
            # y = multivariate_normal.pdf(x=frames_n, mean=u_m, cov=cov_m)  # this causes underflow
            if posterior_digit == 0:
                sys.exit()

        # TODO - normalize by the number of samples (is this necessary?)
        # end product of all n frames
        if (debug):
            print(f"digit: {d}\tposterior_digit: {posterior_digit}")
        posterior_all.append(posterior_digit)
    
    classification = posterior_all.index(max(posterior_all))
    return (classification, posterior_all)

In [12]:
test_read_path = f"{DATA_PATH}test_digits/test_0"
test_write_path = f"{DATA_PATH}single_person/test_0"

classify_every = 5
classify_results = []
summary_lists = []
for digit in range(DIGITS):
    total_classified = 0
    correct = 0
    df_all = get_all_dataframes(digit=digit, write_path=test_write_path, read_path=test_read_path, stopwatch=False)
    classify_digit = [0]*DIGITS

    index = 0
    start_time = dt.now()
    for df in df_all:
        if index % classify_every == 0:
            (classification, posterior_all) = classify_dataframe(df=df, gauss_results=gauss_results, debug=False)    
            total_classified += 1
            classify_digit[classification] += 1
            if classification == digit:
                correct += 1
        index += 1

    classify_results.append(classify_digit)

    end_time = dt.now()
    total_time = f"{round((end_time - start_time).total_seconds(), 3)} sec"
    accuracy = f"{round(correct / total_classified * 100, 3)} %"
    summary = [accuracy, correct, total_classified, total_time]
    summary_lists.append(summary)

pd.DataFrame(summary_lists, columns=["Accuracy (%)", "Correct", "Classified", "Time (s)"])

Unnamed: 0,Accuracy (%),Correct,Classified,Time (s)
0,88.636 %,39,44,17.568 sec
1,93.182 %,41,44,15.996 sec
2,61.364 %,27,44,23.419 sec
3,68.182 %,30,44,29.04 sec
4,79.545 %,35,44,24.945 sec
5,81.818 %,36,44,20.442 sec
6,95.455 %,42,44,26.114 sec
7,72.727 %,32,44,17.675 sec
8,75.0 %,33,44,22.497 sec
9,86.667 %,39,45,19.589 sec


In [None]:
# TODO - print averaages of dataframe
# TODO - write results to a data file (results folder?)
# Run MANY iterations, determine results

In [18]:
# Why is this so short now...
print(f"\tCONFUSION MATRIX")
pd.DataFrame(classify_results)

	CONFUSION MATRIX


Unnamed: 0,0,1,2,3,4,5,6,7,8,9
0,37,0,0,2,0,0,3,1,0,1
1,1,37,0,1,2,1,0,1,1,0
