In [1]:
import numpy as np
import matplotlib.pyplot as plt
import ipywidgets as widgets
import functools
from IPython.display import clear_output

In [2]:
def insert_sample(d,f,fs,samp):        
    if(f in d):
        if(fs in d[f]):
            d[f][fs]['source'].append(samp)
        else:
            d[f][fs] = {'source':[samp]}
    else: ## create new entry
        d[f] = {fs:{'source':[samp]}}


def read_file():
    #read from file, expected frequency and sample array
    #run test, compare expected to processed result
    with open('dataset.txt') as f:
        sample_chunks =[]
        d=dict()
        target_f = 0
        fs = 0
        for line in f:
            line = line.replace(" ","")        
            if(line.lower().find("f=") != -1):
                target_f = (line[line.lower().find("f=")+2:line.find('\n')])
                #if(target_f.isnumeric()):
                #    target_f=np.float64(target_f)
                #else:
                #    target_f = 0
            if(line.lower().find("fs=") != -1):
                fs = np.uint16(line[line.lower().find("fs=")+3:line.find('\n')])

            data_start = line.find('[')
            if(data_start != -1):
                if(fs == 0):
                    print("warning, FS not declared")
                data_chunk = np.array(line[data_start+1:line.find(']')].split(',')).astype(np.int32)
                insert_sample(d,target_f,fs,data_chunk)
    return d    

In [3]:
# total samples not counting auto
def get_total_samples_by_fs(d,fs):
    sample_tot = 0
    for f in d.keys():
        if fs in d[f].keys() and f != 'auto':
            sample_tot += len(d[f][fs]['source'])
    return sample_tot

def get_error_total_of_test(d,fs,test,strat,interp):
    err_tot = 0
    for f in d.keys():
        if fs in d[f].keys():
            err_tot += sum(d[f][fs][test][strat][interp]['error'])
    return err_tot

def list_of_fs(d):
    sorted_fs = []
    for f in d.keys():
        for fs in d[f].keys():
            if fs not in sorted_fs:
                sorted_fs.append(fs)
    sorted_fs.sort(key=int)
    return sorted_fs

def list_of_tests(d):
    tests = []
    first_f = list(d.keys())[0]
    first_fs = list(d[first_f].keys())[0]

    for test in d[first_f][first_fs].keys():
        if test != 'source':
            tests.append(test)    
    return tests

def list_of_strats(d):
    strats = []
    first_f = list(d.keys())[0]
    first_fs = list(d[first_f].keys())[0]
    first_test = list_of_tests(d)[0]
    
    for strat in d[first_f][first_fs][first_test].keys():
        if strat not in strats and strat != 'result':
            strats.append(strat)
    return strats

def list_of_interps(d):
    interp_type = []
    first_f = list(d.keys())[0]
    first_fs = list(d[first_f].keys())[0]
    first_test = list_of_tests(d)[0]
    first_strat = list_of_strats(d)[0]
    
    for interp in d[first_f][first_fs][first_test][first_strat].keys():
        if interp not in interp_type and interp != 'period':
            interp_type.append(interp)
    return interp_type

def show_basic_result(d):
    fs_list = list_of_fs(d)
    tests = list_of_tests(d)
    strats = list_of_strats(d)
    interps = list_of_interps(d)
    string = ''
    for fs in fs_list:
        samples = get_total_samples_by_fs(d,fs)
        string += '--------------------------------\n'
        string += 'fs: '+str(fs)+' total samples: '+str(samples)+'\n'
        string += ('|type').ljust(9)+'|strat'.ljust(11)+'|interp'.ljust(10)+'|err'.ljust(4)+'|%'.ljust(5)+'|\n'
        for test in tests:
            for strat in strats:
                for interp in interps:
                    errors = get_error_total_of_test(d,fs,test,strat,interp)
                    percent_error = (errors/samples)*100
                    string += ('|'+test).ljust(9)+('|'+strat).ljust(11)+('|'+interp).ljust(10)+('|'+str(errors)).ljust(4)
                    string += ('|{:2.1f}'.format(percent_error)).ljust(5)+'|\n'
    return string

In [4]:
def show_source_plot(fs,t,signal,target_f):
    f = 150
    if target_f.isnumeric():
        f = np.int32(target_f)    
    plt.subplot(2,2,1)
    plt.title("Source signal. sample length = %i " %(len(signal)))
    plt.plot(t, signal)
    plt.axhline(0,color='black')
    
    plt.subplot(2,2,2)    
    raw_signal_fft = np.fft.rfft(signal)
    raw_signal_fft_bins = np.fft.rfftfreq(signal.size, d=1/fs)
    max_peak = np.argmax(abs(raw_signal_fft[1:]))+1
    acc = ((fs/2)/(len(signal)/2))
    plt.title("FFT acc +/- %i Hz | dominant f %i Hz" %(acc,raw_signal_fft_bins[max_peak]))
    plt.axvline(raw_signal_fft_bins[max_peak],color="green")
    plt.plot(raw_signal_fft_bins,np.abs(raw_signal_fft))
    plt.xlim(0, f*5) #show range up to 4th harmonic    
    plt.tight_layout()
    plt.show()    

In [5]:
def extra_plots(fs,t,result,target_f,t_idx):    
    minx1 = get_minx(fs,int(target_f))
    maxx1 = get_maxx(fs,int(target_f))
    # minx,maxx = get_min_max(fs,int(target_f))
    found_minx,found_maxx = get_min_max(fs,int(fs/t_idx))
    plt.subplot(2,2,1)
    plt.axhline(result[0]/2,color='blue')
    
    plt.subplot(2,2,2)
    plt.axhline(result[0]/2,color='blue')
    # print("freq range f = {.2f} - {.2f}" .format(fs/maxx,fs/minx))
    # print(get_range_list(fs))
    # print("target freq range f = %.2fhz - %.2f hz index %d-%d:"%(fs/maxx,fs/minx,minx,maxx))
    # print("found freq f[%d] = %.2f in range: f=%.2f - %.2f index: %d - %d" %(t_idx,fs/t_idx,fs/found_maxx, fs/found_minx,found_minx,found_maxx))
    # print("Error %\t {:3.2f}".format((err/s_total)*100))
    # print("found",t_idx, "min",minx,"max",maxx)
    # print("initial range min",minx1,"max",maxx1,"diff",(maxx1-minx1),"expanded",(maxx-minx)-(maxx1-minx1))
    if t_idx >= minx1 and t_idx <= maxx1:        
        plt.axvline(t[minx1],color='green',label="min boundry")
        plt.axvline(t[maxx1],color='orange',label="max boundry")        
    

def show_result_plot(fs,t,result,target_f,t_idx,strat,interp):
    if strat == 'targeted':
        extra_plots(fs,t,result,target_f,t_idx)
    
    plt.subplot(2,2,1)
    plt.plot(t,result)
    plt.plot(t[t_idx],result[t_idx],'ro')

    plt.title("f[%i]=%.2f Hz | T=%.3f ms" %(t_idx,fs/t_idx,(t_idx*1000)/fs))    
    
    t_adj = get_interpolation(result,fs,t_idx,interp)
    plt.subplot(2,2,2)
    window = 5
    plt.plot(t[t_idx-window:t_idx+window+1],result[t_idx-window:t_idx+window+1])
    
    plt.plot(t[t_idx+1],result[t_idx+1],"go",label = 'center +1')
    plt.plot(t[t_idx],result[t_idx],"ro",label='center')
    if interp != 'none':
        plt.plot(t_adj/fs,result[t_idx],"bo",label = 'adjusted center')
    plt.plot(t[t_idx-1],result[t_idx-1],"go",label = 'center -1')
    plt.legend()
    plt.title("f[%.2f]=%.2f Hz | T=%.3f ms" %(t_adj,fs/t_adj,(t_adj*1000)/fs))
    plt.tight_layout()
    plt.show()

In [6]:
def get_minx(fs,target):
    min_idx = int(fs/(target+10))-1
    return min_idx
def get_maxx(fs,target):
    max_idx = int(fs/(target-10))+1
    return max_idx

def get_min_max(fs,target):
    center_idx = int(fs/target)
    min_idx = int(fs/(target+10))
    max_idx = int(fs/(target-10))
    
    if(max_idx-min_idx > 12):
        max_idx = center_idx+6
        min_idx = center_idx-6    
    elif(max_idx-min_idx < 3):
        max_idx += 3
        min_idx -= 3
    elif(max_idx-min_idx < 4):
        max_idx += 2
        min_idx -= 2    
    return min_idx,max_idx

def get_targets():
    targets = [329,246,197,146,110,82]
    return targets

def get_target_order():
    targets = [0,5,1,4,3,2]
    # targets = [329,82,246,110,146,197]
    return targets

def get_range_list(fs):
    targets = get_targets()
    target_order =get_target_order()
    target_list = []
    for string_id in target_order:
        minx,maxx = get_min_max(fs,targets[string_id])
        target_list.append(minx)
        target_list.append(maxx)
    return target_list

def find_peak(r,minx,maxx,threshold):
    t_idx = -1
    a=b=c=0
    for k in range(minx,maxx):
        c=b
        b=a
        a=r[k]
        if(b > a and c < b and b > threshold and c != 0):
            t_idx = k-1   
    return t_idx  

# def find_peak2(r,minx,maxx):
#     max_val = 0
#     t_idx = -1
#     for k in range(minx,maxx):
#         if r[k] > max_val:
#             max_val = r[k]
#             t_idx = k
#     return t_idx

# def targeted_points(r,fs): #break on first target
#     target_order = get_target_order()
#     targets = get_targets()
#     threshold = r[0]/2    
#     t_idx = -1
#     for order in target_order:
#         minx,maxx = get_min_max(fs,targets[order])
#         t_idx = find_peak(r,minx,maxx,threshold)
#         if t_idx != -1:
#             break
#     return t_idx

def targeted_points(r,fs):
    targets=get_targets()
    target_order = get_target_order()
    threshold = r[0]/2
    t_idx = -1
    max_val = 0
    for string_id in target_order:
        minx = get_minx(fs,targets[string_id])
        maxx = get_maxx(fs,targets[string_id])
        # minx,maxx = get_min_max(fs,targets[string_id])
        for k in range(minx,maxx+1):
            if r[k] > max_val:
                max_val = r[k]
                t_idx = k
        if string_id == 0 and r[t_idx] > threshold and t_idx >= get_minx(fs,targets[0]) and t_idx <= get_maxx(fs,targets[0]) :
            # print("break %d %d %.2f"%(fs,t_idx,fs/t_idx))
            break
    return t_idx
    

In [7]:
def max_peak(x,fs,test):
    T_MIN = fs/365
    T_MAX = fs/65
    target_val = 0
    t_idx = -1
    test = test.split('_')[0]
    
    if test == "amdf":
        target_val = np.iinfo(np.int16).max
    
    #find the max peak or trough between min,max range
    for i in range(len(x)):        
        if( i  > T_MIN and i < T_MAX):
            if test == "acf":                
                if x[i] > target_val:
                    target_val = x[i]
                    t_idx = i 
            elif test == "amdf":
                if x[i] < target_val:
                    target_val = x[i]
                    t_idx = i
    return t_idx

def get_poi(x,fs,test,strat):
    t_idx = -1
    if strat == "targeted":
        t_idx = targeted_points(x,fs)
    elif strat == "full_range": 
        t_idx = max_peak(x,fs,test)
    return t_idx

    
def get_quadratic(x,t_idx):
    p_adj = ((x[t_idx+1] - x[t_idx-1])/(2*(2*x[t_idx] - x[t_idx-1] - x[t_idx+1])))
    return t_idx+p_adj

def get_barycentric(x,t_idx):
    y1 = x[t_idx-1]
    y2 = x[t_idx]
    y3 = x[t_idx+1]
    d = (y3 - y1) / (y1 + y2 + y3)
    return t_idx+d

def get_interpolation(x,fs,t_idx,interp):
    if interp == 'quadratic':
        return get_quadratic(x,t_idx)
    if interp == 'barycentric':
        return get_barycentric(x,t_idx)
    if interp == 'none':
        return t_idx
    
def get_frequency(x,fs,poi,interp):
    if poi <=0:
        return 0
    return fs/get_interpolation(x,fs,poi,interp)


In [8]:
def get_test(x,test): # build autocorrelation result arrays
    r = np.zeros(len(x))
    sub_test = ''
    if '_' in test:
        tests = test.split('_')
        test = tests[0]
        sub_test = tests[1]
    
    for k in range(0,len(x)):
        for j in range(len(x)-k):
            if test == 'amdf':
                r[k] += abs(x[j]-x[j+k])
            if test == 'acf':
                r[k] += (x[j]*x[j+k]) #acf
        if sub_test == '': # decaying signal
            r[k] = r[k]/len(x)
        elif sub_test == 'normal': # normalized signal
            r[k] = r[k]/(len(x)-k)
    return r

def get_results(d,target_f,fs,test,strats,interp_type):
    res_d={'result':[]}    
    len_samples = len(d[target_f][fs]['source'])    
    
    for strat in strats:
        res_d[strat] = {'period':[]}
        for interp in interp_type:
            res_d[strat][interp] = {'f':[],'error':[]}

    for i in range(len_samples):
        signal = d[target_f][fs]['source'][i]
        result = get_test(signal,test)
        res_d['result'].append(result)
        for strat in strats:
            period = get_poi(result,fs,test,strat)  #discrete period           
            res_d[strat]["period"].append(period)
            for interp in interp_type:
                freq = get_frequency(result,fs,period,interp)
                res_d[strat][interp]["f"].append(freq)
                if target_f.isnumeric():
                    f = np.int32(target_f)
                    res_d[strat][interp]["error"].append(abs(f - freq) > 2)        
    return res_d      
   

In [9]:
#run all tests
def run_tests(d,tests,strats,interp_type):    
    for f in d.keys():
        for fs in d[f].keys():
            for test in tests:
                result = get_results(d,f,fs,test,strats,interp_type)
                d[f][fs][test] = result

d = read_file()
# tests = ["acf","acf_normal","amdf","iir_elyptic"]
tests = ["acf"]
# interp_type =["quadratic","barycentric"]
# strat = ['targeted','full_range','zc_average']
strats = ['targeted',"full_range"]
interp_type =["quadratic","none"]
run_tests(d,tests,strats,interp_type)

In [10]:
text_result = show_basic_result(d)
freq_list = d.keys()

f_select = widgets.Dropdown(options=freq_list,description = 'Freq',layout={'width': 'max-content'})
fs_select = widgets.Dropdown(options=d[f_select.value].keys(),description='FS',layout={'width': 'max-content'})
test_list = list_of_tests(d)
test_select = widgets.Dropdown(options=test_list,description='Test',layout={'width': 'max-content'})
strat_list = list_of_strats(d)
strat_select = widgets.Dropdown(options=strat_list,description='Strat',layout={'width': 'max-content'})
interp_list = list_of_interps(d)
interp_select = widgets.Dropdown(options=interp_list,description='Interpolate',layout={'width': 'max-content'})

button_grid =widgets.Output(layout={'width':'auto','border': '1px solid black'})
basic_info_out =widgets.Output(layout={'width':'auto','border': '1px solid black'})
out =widgets.Output(layout={'width':'auto','border': '1px solid black'})

source_options = widgets.VBox([f_select,fs_select])
test_options = widgets.VBox([test_select,strat_select,interp_select])
options_box = widgets.HBox([source_options,test_options])
ui = widgets.VBox([options_box,button_grid])
selected = 0

def show_plot(f,fs,test,strat,interp,sample_idx):
    with out:
        clear_output()
        signal_data = d[f][fs]['source'][sample_idx]
        s_total = len(d[f][fs]['source'])
        l = len(signal_data)/fs 
        t = np.arange(0,l,1.0/fs) #x axis time scale
        
        result = d[f][fs][test]['result'][sample_idx]
        t_idx = d[f][fs][test][strat]['period'][sample_idx]
        err = sum(d[f][fs][test][strat][interp]['error'])
    
        print("f",f,"fs",fs,test,"sample",sample_idx)
        print("Num of samples in test",s_total)
        print("Errors\t",err)
        print("Error %\t {:3.2f}".format((err/s_total)*100))
        # print("Errors", d[f][fs][test]["error"])
        
        plt.rcParams['figure.figsize'] = [12, 7]
        show_source_plot(fs,t,signal_data,f)        
        show_result_plot(fs,t,result,f,t_idx,strat,interp)    

def sample_selected(b,f,fs,test,strat,interp):
    global selected 
    selected = int(b.description)
    show_plot(f,fs,test,strat,interp,selected)

def update_buttons(f,fs,test,strat,interp):            
    error_list = []
    if(f.isnumeric()):
        error_list = (d[f][fs][test][strat][interp]['error'])
    data = d[f][fs][test]['result']
    list_len = len(data)    
    button_layout = widgets.Layout(height='auto', width='auto')
    grid_layout=widgets.Layout(grid_template_columns='auto auto auto auto auto')
    buttons = []
    for i in range(list_len):
        button_style = ''
        if f.isnumeric():
            button_style = "danger"*int(error_list[i]) or "success"
        buttons.append(widgets.Button(description=str(i), layout=button_layout,button_style=button_style))

    for i in range(list_len):
        buttons[i].on_click(functools.partial(sample_selected,f=f,fs=fs,test=test,strat=strat,interp=interp)) 
    grid = widgets.GridBox(children=buttons,layout=grid_layout)
    return grid

def change_event(*args):
    global selected
    fs_select.options = d[f_select.value].keys()
    sample_max = len(d[f_select.value][fs_select.value]['source'])
    if selected >= sample_max: selected = 0
    btn_grid = update_buttons(f_select.value,fs_select.value,test_select.value,strat_select.value,interp_select.value)
    #update button grid
    with button_grid:        
        clear_output()
        display(btn_grid)
    show_plot(f_select.value,fs_select.value,test_select.value,strat_select.value,interp_select.value,selected)
        
f_select.observe(change_event, 'value')
fs_select.observe(change_event, 'value')
test_select.observe(change_event, 'value')
strat_select.observe(change_event,'value')
interp_select.observe(change_event,'value')

# init
with button_grid:
    btn_grid = update_buttons(f_select.value,fs_select.value,test_select.value,strat_select.value,interp_select.value)
    display(btn_grid)
with basic_info_out:
    print(text_result)
with out:
    show_plot(f_select.value,fs_select.value,test_select.value,strat_select.value,interp_select.value,selected)

widgets.HBox([widgets.VBox([ui,basic_info_out]),out])


HBox(children=(VBox(children=(VBox(children=(HBox(children=(VBox(children=(Dropdown(description='Freq', layout…