In [7]:
import import_ipynb #if this doesn't work, run 'pip install import-ipynb'

from Algorithms_Functions import *
from Join_Functions import *
from import_script_master import *

from sklearn.metrics import r2_score

import matplotlib.pyplot as plt

In [8]:
def Analyze(initials, test_num_start, test_num_stop, interval, algorithm):
    '''Prints and exports (as CSV file) value from specified algorithm and Wi-Fi scans
    
    Parameters
    ----------
    initials : list of strings
        list of 2 initials indicating devices to compare
    
    test_num_start : int
        starting scan to compare
            
    test_num_stop : int
        ending test to compare
    
    interval : int
        comparison interval in seconds
    
    algorithm : object
        algorithm to use
    
    
    Returns
    ----------
    vals : list of floats
        list of algorithm results
    '''
    
    joined={}
    
    #different join function is used depending on algorithm
    if algorithm==calculate_mu_universal or algorithm==euc_distance:
        for i in range(test_num_start, test_num_stop+1):
            joined[i]=join_master(initials, i, interval)
    else:
        for i in range(test_num_start, test_num_stop+1):
            joined[i]=join_master_unmatched(initials, i, interval)
    
    vals={}
    test_nums=[]
    euc_dists=[]
    
    #iterate through lists of joined tables
    for i in range(test_num_start,test_num_stop+1):
        list_tables=joined.get(i)
        list_1=[]
        
        #iterate through joined tables within list, apply specified algorithm and append results to a list
        for j in list_tables:
            list_1.append(algorithm(j))
            test_nums.append(i)
            euc_dists.append(algorithm(j))
        vals[i]=list_1
    
    #put test numbers and corresponding euclidian distances (or other algorithm result) into a dictionary
    d={'test_num':test_nums, 'euc_dist':euc_dists}
    df= pd.DataFrame(data=d)
    
    #export data to a CSV file
    df.to_csv(str(algorithm) +', ' + str(test_num_start) + "-" + str(test_num_stop) + '.csv' )
    
    return vals

In [9]:
#ADD COMMENTS
def Analyze_and_Plot_threshold(initials, test_num_start, test_num_stop, interval, algorithm, true_dist, path, export, threshold, subject_of_test, joined, pri_obs):
    '''Prints and exports (as png file) plot of algorithm result vs true distance, exports a variety of test statistics into CSV files indicating the accuracy of the algorithm/threshold used. 
    
    Parameters
    ----------
    initials : list of strings
        list of 2 initials indicating devices to compare
    
    test_num_start : int
        starting scan to compare
            
    test_num_stop : int
        ending test to compare
    
    interval : int
        comparison interval in seconds
    
    algorithm : object
        algorithm to use
        
    true_dist : list of ints
        list of true distances, in order that scans were recorded
        
    path : str
        not used currently
        
    export : bool
        save plots as png files?
        
    threshold : int
        threshold to test in desired algorithm
        
    subject_of_test : str
        subject of test for file naming purposes
        
    joined : list of dfs
        list of joined dfs
    
    pri_obs : str
        initials of primary observer, only relevant if algorithm is calculate_mu or calculate_mu_modifies
    
    
    Returns
    ----------
    vals : float
        value of the algorithm calculation
    
    average_percentage_error : float
        average percentage error for the line of best fit
    
    threshold : float
        threshold
    
    a : float
        y-intercept
    
    b : float
        slope
    
    r2 : float
        r^2 value for the line of best fit
    
    ss_res_over_n : float
    
    ss_res_over_nm : float
    
    res_over_tot_over_m : float
    
    
    '''
    
    vals={}
    x_1=[]
    y_1=[]
    true_dist_counter=[]
    for i in range(test_num_start,test_num_stop+1):
        table=joined.get(i)
        list_1=[]
        true_dist_counter.append(len(table))
        for j in table:
            if algorithm == calculate_mu_thresh or algorithm == calculate_mu_modified_thresh:
                list_1.append(algorithm(j, pri_obs, threshold))
            else:
                list_1.append(algorithm(j, threshold))
        vals[i]=list_1
        y_1+=list_1
    

    if (test_num_stop+1)-test_num_start != len(true_dist):
        return print('true_dist entries do not match number of tests')
    
    for i in range(len(true_dist)):
        x_1+=true_dist_counter[i]*[true_dist[i]]
    
    
    #model=np.polyfit(x_1, y_1, 1)
    #predict=np.poly1d(model)
    #r2_auto=r2_score(y_1, predict(x_1))
    
    #trendline
    xbar = sum(x_1)/len(x_1)
    ybar = sum(y_1)/len(y_1)
    n = len(x_1)
    numerator = sum([x_1i*y_1i for x_1i,y_1i in zip(x_1,y_1)]) - n * xbar* ybar
    denominator = sum([x_1i**2 for x_1i in x_1]) - n * xbar**2
    b = numerator/denominator
    a = ybar - b * xbar
    print("y-intercept")
    print(a)
    print("slope")
    print(b)
    
    #plot stuff
    plt.scatter(x_1,y_1)
    yfit = [a + b * x_1i for x_1i in x_1]
    plt.plot(x_1, yfit)
    plt.xlabel('True Distance')
    plt.ylabel('RSSI Euclidean Distance')
    #plt.text(1,1,'y = ' + str(a) + 'x' + ' + ' + str(b))
    
    #averaged percentage error analysis
    total_error = 0
    for i in x_1:
        truevalue = y_1[i]
        calcvalue = a + b * x_1[i]
        if calcvalue==0: ##ADDED THIS, fix?
            individual_percentage_error=0
        else:
            individual_percentage_error = (truevalue - calcvalue)/(calcvalue)
        total_error += abs(individual_percentage_error)
    #get average of percentage error  
    
    
    if len(x_1)==0:
        average_percentage_error=0
    else:
        average_percentage_error = total_error/(len(x_1))
    r2 = r2_score(y_1, yfit)
    #adjustedr2 = 1 - (1-r2)*((len(x_1)-1)/(len(x_1)-2))
    
    #sum of squared errors 
    ss_res = 0
    for i in x_1:
        truevalue = y_1[i]
        calcvalue = a+b*x_1[i]
        ss_res += (truevalue - calcvalue)**2
    
    if len(x_1)==0 or b==0:
        ss_res_over_n = 0
        ss_res_over_nm = 0
        res_over_tot_over_m=0
    else:
        ss_res_over_n = ss_res/len(x_1)
        ss_res_over_nm = ss_res_over_n/b
        res_over_tot_over_m=(1-r2)/b
    
    #neal_performance_stat=(r2**0.5)/b

    
    #plt.text(1, 2, "Average Percentage Error: " + str(average_percentage_error))
    plt.title(str(initials) + ', tests: ' + str(test_num_start) + '-' + str(test_num_stop) + '\n y = ' + str(b) + 'x' + ' + ' + str(a) + '\n threshold = ' + str(threshold) + '\n r^2 = ' + str(r2))

    title = str(initials) + ', tests: ' + str(test_num_start) + '-' + str(test_num_stop) + ' ' + str(subject_of_test) + ' threshold = ' + str(threshold)     
    if export==True:
        plt.savefig(title+'.png', bbox_inches = "tight")
    plt.show()
    
    
    return vals, average_percentage_error, threshold, a, b, r2, ss_res_over_n, ss_res_over_nm, res_over_tot_over_m
    

In [6]:
#ADD COMMENTS
def Analyze_and_Plot(initials, test_num_start, test_num_stop, interval, algorithm, true_dist, path, export, alg_name, pri_obs, y_lab):
    
    '''Prints and exports (as png file) plot of algorithm result vs true distance, exports a variety of test statistics into CSV files indicating the accuracy of the algorithm used. 
    
    Parameters
    ----------
    initials : list of strings
        list of 2 initials indicating devices to compare
    
    test_num_start : int
        starting scan to compare
            
    test_num_stop : int
        ending test to compare
    
    interval : int
        comparison interval in seconds
    
    algorithm : object
        algorithm to use
        
    true_dist : list of ints
        list of true distances, in order that scans were recorded
        
    path : str
        not used currently
        
    export : bool
        save plots as png files?
        
    alg_name : str
        subject of test for file naming purposes
        
    
    pri_obs : str
        initials of primary observer, only relevant if algorithm is calculate_mu or calculate_mu_modifies
        
    y_lab : str
        y-axis label
    
    
    Returns
    ----------
    
    '''
    
    joined={}
    
    if algorithm == calculate_mu or algorithm == calculate_mu_modified or algorithm == calculate_mu_universal or algorithm == euc_distance or algorithm==euc_distance_norm_1 or algorithm==euc_distance_norm_2 or algorithm==euc_distance_norm_3:
        for i in range(test_num_start, test_num_stop+1):
            joined[i]=join_master(initials, i, interval)
    else:
        for i in range(test_num_start, test_num_stop+1):
            joined[i]=join_master_unmatched(initials, i, interval)

    vals={}
    x_1=[]
    y_1=[]
    true_dist_counter=[]
    for i in range(test_num_start,test_num_stop+1):
        table=joined[i]
        list_1=[]
        true_dist_counter.append(len(table))
        for j in table:
            if algorithm == calculate_mu or algorithm == calculate_mu_modified:
                list_1.append(algorithm(j, pri_obs))
            else:
                list_1.append(algorithm(j))
        vals[i]=list_1
        y_1+=list_1
    

    if (test_num_stop+1)-test_num_start != len(true_dist):
        return print('true_dist entries do not match number of tests')
    
    for i in range(len(true_dist)):
        x_1+=true_dist_counter[i]*[true_dist[i]]
    
    
    model=np.polyfit(x_1, y_1, 1)
    predict=np.poly1d(model)
    #r2_auto=r2_score(y_1, predict(x_1))
    
    #trendline
    xbar = sum(x_1)/len(x_1)
    ybar = sum(y_1)/len(y_1)
    n = len(x_1)
    numerator = sum([x_1i*y_1i for x_1i,y_1i in zip(x_1,y_1)]) - n * xbar* ybar
    denominator = sum([x_1i**2 for x_1i in x_1]) - n * xbar**2
    b = numerator/denominator
    a = ybar - b * xbar
    print("y-intercept")
    print(a)
    print("slope")
    print(b)
    
    #plot stuff
    plt.scatter(x_1,y_1)
    yfit = [a + b * x_1i for x_1i in x_1]
    plt.plot(x_1, yfit)
    plt.xlabel('True Distance')
    plt.ylabel(y_lab)
    #plt.text(1,1,'y = ' + str(a) + 'x' + ' + ' + str(b))
    
    #averaged percentage error analysis
    total_error = 0
    for i in x_1:
        truevalue = y_1[i]
        calcvalue = a + b * x_1[i]
        if calcvalue==0: ##ADDED THIS, fix?
            individual_percentage_error=0
        else:
            individual_percentage_error = (truevalue - calcvalue)/(calcvalue)
        total_error += abs(individual_percentage_error)
    #get average of percentage error  
    
    
    if len(x_1)==0:
        average_percentage_error=0
    else:
        average_percentage_error = total_error/(len(x_1))
    r2 = r2_score(y_1, yfit)
    #adjustedr2 = 1 - (1-r2)*((len(x_1)-1)/(len(x_1)-2))
    
    #sum of squared errors 
    ss_res = 0
    for i in x_1:
        truevalue = y_1[i]
        calcvalue = a+b*x_1[i]
        ss_res += (truevalue - calcvalue)**2
    
    if len(x_1)==0 or b==0:
        ss_res_over_n = 0
        ss_res_over_nm = 0
        res_over_tot_over_m=0
    else:
        ss_res_over_n = ss_res/len(x_1)
        ss_res_over_nm = ss_res_over_n/b
        res_over_tot_over_m=(1-r2)/b
    
    #neal_performance_stat=(r2**0.5)/b

    
    #plt.text(1, 2, "Average Percentage Error: " + str(average_percentage_error))
    plt.title(str(initials) + ', tests: ' + str(test_num_start) + '-' + str(test_num_stop) + '\n y = ' + str(b) + 'x' + ' + ' + str(a) + '\n r^2 = ' + str(r2))

    title = str(initials) + ', tests: ' + str(test_num_start) + '-' + str(test_num_stop) + ' ' + alg_name  
    if export==True:
        plt.savefig(title+'.png', bbox_inches = "tight")
    plt.show()
    
    d={'algorithm':[algorithm], 'slope':[b], 'y_int':[a], 'Avg_%_Error': [average_percentage_error], 'r2':[r2], 'ss_res_over_n':[ss_res_over_n], 'ss_res_over_nm':[ss_res_over_nm], 'res_over_tot_over_m':[res_over_tot_over_m]}
    df = pd.DataFrame(data=d)
    print(df)
    df.to_csv(str(initials) + ', tests: ' + str(test_num_start) + '-' + str(test_num_stop) + ' ' + alg_name+ '.csv')
    
    
    return