In [1]:
from peakaboo.peak_classify import data_grouping
from peakaboo.peak_classify import cluster_classifier
import numpy as np
import pandas as pd
from sklearn.cluster import KMeans

In [2]:
def data_grouping(index_df, height_df, fwhm_df, threshold):
    peak_list = []
    
    for i in range(index_df.shape[0]):
        for j in range(index_df.shape[1]):
            peak_list.append(
            [index_df.loc[i,j], height_df.loc[i,j], fwhm_df.loc[i,j], i])
        
    all_points = pd.DataFrame(peak_list, 
    columns=['Position', 'Height', 'Width', 'Time'])
    fill_na = all_points.fillna(value=0)
    corrected_output = fill_na.drop(fill_na[abs(fill_na.Height) < threshold].index)
    corrected_output = corrected_output.reset_index(drop=True)
    
    return corrected_output

In [3]:
def test_data_grouping():
    index_df = np.zeros((2, 2))
    height_df = pd.DataFrame([1, 2, 3])
    fwhm_df = pd.DataFrame([4, 5, 6])
    threshold = 1
    try:
        data_grouping(index_df, height_df, fwhm_df, threshold)
    except AttributeError:
        pass
    else:
        print('Incorrect data type passed', 'Check peak_finding_master output')
        
    index_df = pd.DataFrame()
    height_df = pd.DataFrame([1, 2, 3])
    fwhm_df = pd.DataFrame([4, 5, 6])
    threshold = 1
    t = data_grouping(index_df, height_df, fwhm_df, threshold)
    assert len(t) == 0, "Index data frame is empty"
    
    index_df = pd.DataFrame([1, 2, 3])
    height_df = pd.DataFrame()
    fwhm_df = pd.DataFrame([4, 5, 6])
    threshold = 1
    try:
        data_grouping(index_df, height_df, fwhm_df, threshold)
    except KeyError:
        pass
    else:
        print('Height data frame empty', 'Check peak_finding_master output')
        
    index_df = pd.DataFrame([[1, 2, 3], [4, 5, 6], [7, 8, 9]])
    height_df = pd.DataFrame([[1, 2, 3], [4, 5, 6], [7, 8, 9]])
    fwhm_df = pd.DataFrame([[1, 2, 3], [4, 5, 6], [7, 8, 9]])
    threshold = 10
    t = data_grouping(index_df, height_df, fwhm_df, threshold)
    assert len(t) == 0, "Threshold is too high"

In [4]:
test_data_grouping()

In [None]:
def cluster_classifier(index_df, corrected_output):
    found_peak = index_df.shape[1]
    cluster = KMeans(n_clusters=found_peak).fit(corrected_output.iloc[:,:-2])
    cluster_dict = {}
    
    for i in range(found_peak):
            cluster_dict['peak_%s' % i] = []
            
    for j in range(corrected_output.shape[0]):
        peak = cluster.predict([corrected_output.values[j,:-2]])
        for k in range(found_peak):
            if (peak == k):
                cluster_dict['peak_%s' % k].append(corrected_output.values[j])
        
    peak_dict = { k:v for k, v in cluster_dict.items() if len(v) >= 20}
    return peak_dict

In [5]:
def test_cluster_classifier():
    index_df = pd.DataFrame([[1, 1, 5], [1, 2, 10], [1, 2, 6]])
    corrected_output = pd.DataFrame()
    try:
        cluster_classifier(index_df, corrected_output)
    except ValueError:
        pass
    else:
        print('ValueError not handled for empty input dataframe.')
        
    index_df = pd.DataFrame([[1, 1, 5], [1, 2, 10], [1, 2, 6]])
    corrected_output = pd.DataFrame([[1, 1, 1, 1], [1, 2, 1, 1], [5, 5, 5, 1],
                                   [1, 1, 2, 2], [2, 2, 1, 2], [10, 7, 6, 2], 
                                   [1, 2, 2, 3], [2, 1, 3, 3], [6, 6, 6, 3]])
    t = cluster_classifier(index_df, corrected_output)    
    assert len(t) == 0, \
        "Did not truncate sparse peaks"
        
    index_df = pd.DataFrame([[1, 1], [1, 2], [1, 2]])
    corrected_output = pd.DataFrame([[1, 1, 1, 1], [1, 2, 1, 1], [5, 5, 5, 1],
                                    [1, 1, 2, 2], [2, 2, 1, 2], [10, 7, 6, 2], 
                                    [1, 2, 2, 3], [2, 1, 3, 3], [6, 6, 6, 3],
                                    [1, 2, 2, 3], [2, 1, 3, 3], [6, 6, 6, 3],
                                    [1, 2, 2, 3], [2, 1, 3, 3], [6, 6, 6, 3],
                                    [1, 2, 2, 3], [2, 1, 3, 3], [6, 6, 6, 3],
                                    [1, 2, 2, 3], [2, 1, 3, 3], [100, 100, 6, 3]])
    t = cluster_classifier(index_df, corrected_output)   
    assert len(t) == 1, \
        "Did not properly classify peaks"
    assert len(t['peak_0']) == 20, \
        "Dictionary did not populate properly"

In [6]:
test_cluster_classifier()