In [None]:
def load_display_file_wf(file_name): # file_name e.g.: 'audiofile.wav'
    samplerate, data = wavfile.read(file_name)
    # Open WAV file using wave module for additional information
    with wave.open(file_name, 'rb') as wave_file:
        print("Number of channels in file:", wave_file.getnchannels())
        print("Samplerate/ data per second:", wave_file.getframerate())
        print("Total data in file:", wave_file.getnframes())
        print("Total file time:", wave_file.getnframes() / wave_file.getframerate())    
    sr, data = wavfile.read(file_name)
    if data.ndim >= 2:
        # Extract the first channel if multi-channel
        x = data[:, 0]
    else:
        # If single channel, use the data as is
        x = data
    return x, sr

In [None]:
def reduce_file_noise_wf (file_name,x,sr):
    reduced_noise = nr.reduce_noise(y=x, sr =sr)
    wav_file_nr = file_name.rsplit('.', maxsplit=1)[0] + '_nr.wav'
    wavfile.write(wav_file_nr, sr, reduced_noise)
    x_nr, sr_nr = librosa.load(wav_file_nr,sr= None)
    return x_nr, sr_nr, wav_file_nr

In [None]:
def wav_file_trim_wf (wav_file_nr, start_time, end_time):
    startTime = start_time*1000 
    endTime = end_time*1000 
    song = AudioSegment.from_wav(wav_file_nr)
    s = int(startTime)           
    e = int(endTime)               
    extract = song[s:e]
    f_name_1 = file_name.rsplit('.', maxsplit=1)[0]
    f_trim_file = f_name_1+ str('_nr')+str('_extract.wav')
    f_name_2 = f_trim_file.rsplit('.', maxsplit=1)[0]
    pp = os.getcwd()
    trim_file_viz = pp + '/' + f_name_2 + '.wav'
    trim_file = extract.export(f_trim_file, format="wav") 
    x_nr_ex, sr_nr_ex = librosa.load(trim_file,sr= None)
    return x_nr_ex, sr_nr_ex, trim_file, f_trim_file,trim_file_viz, f_name_2

In [None]:
def detect_silence_t(fs, signal, audio_segment, silence_thresh_rms, min_silence_len=50,seek_step=1):
    seg_len = len(audio_segment)
    if seg_len < min_silence_len:
        return []
    
    silence_starts = []

    last_slice_start = seg_len - min_silence_len
    slice_starts = range(0, last_slice_start + 1, seek_step)

    if last_slice_start % seek_step:
        slice_starts = itertools.chain(slice_starts, [last_slice_start])
    
    slice_starts_use = []
    
    for j in slice_starts:
        slice_starts_use.append(j)

    for i, k in zip(slice_starts_use,range(len(slice_starts_use))):

        fs_n = fs/1000
        sig_i = int(slice_starts_use[k]*fs_n)
        sig_j = sig_i+int(50*fs_n)
        audio_slice_sig = signal[sig_i:sig_j]

        FRAME_LENGTH = 25
        HOP_LENGTH = 10
        zcr_audio_slice = librosa.feature.zero_crossing_rate(y=audio_slice_sig,frame_length=FRAME_LENGTH, hop_length=HOP_LENGTH)
        s_zcr = (np.sum(zcr_audio_slice))

        if np.mean(librosa.feature.rms(y=audio_slice_sig, frame_length=FRAME_LENGTH, hop_length=HOP_LENGTH))<=silence_thresh_rms and s_zcr == 0:        
                silence_starts.append(i)

    if not silence_starts:
        return []

    silent_ranges = []

    prev_i = silence_starts.pop(0)
    current_range_start = prev_i

    for silence_start_i in silence_starts:
        continuous = (silence_start_i == prev_i + seek_step)
        silence_has_gap = silence_start_i > (prev_i + min_silence_len)

        if not continuous and silence_has_gap:
            silent_ranges.append([current_range_start,
                                  prev_i + min_silence_len])
            current_range_start = silence_start_i
        prev_i = silence_start_i

    silent_ranges.append([current_range_start,
                          prev_i + min_silence_len])
    return silent_ranges  

def detect_nonsilent_t(fs, signal, audio_segment, silence_thresh_rms, min_silence_len=50, seek_step=1):
    silent_ranges = detect_silence_t(fs, signal, audio_segment,  silence_thresh_rms,min_silence_len=50,seek_step=1)
    len_seg = len(audio_segment)

    if not silent_ranges:
        return [[0, len_seg]]

    if silent_ranges[0][0] == 0 and silent_ranges[0][1] == len_seg:
        return []

    prev_end_i = 0
    nonsilent_ranges = []
    
    for start_i, end_i in silent_ranges:
        nonsilent_ranges.append([prev_end_i, start_i])
        prev_end_i = end_i

    if end_i != len_seg:
        nonsilent_ranges.append([prev_end_i, len_seg])

    if nonsilent_ranges[0] == [0, 0]:
        nonsilent_ranges.pop(0)

    return nonsilent_ranges

In [None]:
def event_detection_seg_wf(fs, signal, seg_time, trim_file, segment_size_t):
    signal_n = signal / (2**15)
    signal_len = len(signal_n)
    segment_size = int(segment_size_t * fs)  

    # Break signal into list of segments 
    segments = numpy.array([signal_n[x:x + segment_size] for x in numpy.arange(0, signal_len, segment_size)])
    no_of_segments = len(segments)

    rms_all = []
    for s in segments:
        rms = librosa.feature.rms(y=s)
        rms_1 = np.mean(rms)
        rms_all.append(rms_1)
    
    silence_thresh_rms =  numpy.mean(rms_all) 

    # Get the time points and trim the file 
    startTimes, endTimes = seg_time
    s = int(startTimes * 1000)
    e = int(endTimes * 1000)
    
    song = AudioSegment.from_wav(trim_file)
    extract = song[s:e]

    f = 'wav_file_trim_seg'
    extract.export(f+'_extract.wav', format="wav") 
    
    pp = os.getcwd()
    trim_file_seg = pp + '/' + f + '_extract.wav'
    
    # Detect pauses
    myaudio = intro = AudioSegment.from_wav(trim_file_seg)
    signal_t, fs_t = librosa.load(trim_file_seg, sr= None) 
    
    silence_1 = detect_silence_t(fs_t, signal_t, myaudio,  silence_thresh_rms, min_silence_len=50, seek_step=1)
    silence_p = np.array([((start/1000),(stop/1000)) for start,stop in silence_1]) #in sec
    
    #Trim the file so as to remove the correct pauses detected - thus keeping just 'speech' (also speech-time threshold based)
    myaudio = intro = AudioSegment.from_wav(trim_file_seg)
    silence_2 = detect_nonsilent_t(fs_t, signal_t, myaudio,silence_thresh_rms, min_silence_len=50,seek_step=1)

    silence_rem = np.array([((start/1000),(stop/1000)) for start,stop in silence_2]) 

    return silence_p, silence_rem, silence_thresh_rms

In [None]:
# Segment-wise detection
def event_detection_seg_wf_window(signal, fs, segment_size_t_window, trim_file):
    signal_n_file = signal / (2**15)
    signal_len_file = len(signal_n_file)
    segment_size_file = int(segment_size_t_window * fs)

    # Break signal into list of segments 
    segments_file = numpy.array([signal_n_file[x:x + segment_size_file] for x in numpy.arange(0, signal_len_file, segment_size_file)])
    no_of_segments_file = len(segments_file)
    
    seg_sam_len = [len(seg) for seg in segments_file]
    seg_sam_len_t = np.cumsum(seg_sam_len)
    
    seg_time_req = seg_sam_len_t/fs
    seg_time_req_1 = np.arange(0, segment_size_t_window*len(segments_file), segment_size_t_window, float)
    seg_time = np.vstack((seg_time_req_1,seg_time_req)).T

    all_seg_details_silence_p_file = []
    all_seg_details_silence_rem_file = []

    add_to_pause_speech_detected_time= np.arange(0, segment_size_t_window * len(segments_file), segment_size_t_window, float)
    add_to_pause_speech_detected_time[0] = 0.0
    
    for seg_file, leng in zip(segments_file, range(len(segments_file))):
        indv_seg_details_silence_p_file, indv_seg_details_silence_rem_file, silence_thresh_rms = event_detection_seg_wf(fs, seg_file, seg_time[leng],trim_file,segment_size_t)

        for j in range(len(indv_seg_details_silence_p_file)):
            indv_seg_details_silence_p_file[j] = tuple(y+add_to_pause_speech_detected_time[leng] for y in indv_seg_details_silence_p_file[j])
        
        for k in range(len(indv_seg_details_silence_rem_file)):
            indv_seg_details_silence_rem_file[k] = tuple(x+add_to_pause_speech_detected_time[leng] for x in indv_seg_details_silence_rem_file[k])

        all_seg_details_silence_p_file.append(indv_seg_details_silence_p_file)
        all_seg_details_silence_rem_file. append(indv_seg_details_silence_rem_file)
        
    seg_time_leng = seg_time[leng][1]
    return all_seg_details_silence_p_file, all_seg_details_silence_rem_file, seg_time_leng

In [None]:
# Combine event data for entire file
def detect_proper_events(all_seg_details_silence_p_file,all_seg_details_silence_rem_file,speech_time_threshold,pause_time_threshold, f_name_current, seg_time_leng):
    all_seg_details_silence_rem_file = [x for x in all_seg_details_silence_rem_file if x != []]
    all_seg_details_silence_p_file = [x for x in all_seg_details_silence_p_file if x != []]
    ff = []
    for z in range(len(all_seg_details_silence_p_file)):
        if all_seg_details_silence_p_file[z] == []:
            ff.append(z)
    for ind_del_ff in sorted(ff, reverse=True):
        del all_seg_details_silence_p_file[ind_del_ff]
    silence_p_in = numpy.concatenate( all_seg_details_silence_p_file, axis=0)
    len(silence_p_in)
    
    ff_s = []
    for z_s in range(len(all_seg_details_silence_rem_file)):
        if all_seg_details_silence_rem_file[z_s] == []:
            ff_s.append(z_s)
    for ind_del_ff_s in sorted(ff_s, reverse=True):
        del all_seg_details_silence_rem_file[ind_del_ff_s]
    silence_rem_in = numpy.concatenate( all_seg_details_silence_rem_file, axis=0)
    len(silence_rem_in)
    
    if silence_p_in[-1][1] > silence_rem_in[-1][1]:
        silence_p_in[-1][1] = seg_time_leng
        
    if silence_rem_in[-1][1] > silence_p_in[-1][1]:
        silence_rem_in[-1][1] = seg_time_leng
        
    merged_list_sil_rem = []
    for l_1 in silence_rem_in:
        for h_1 in l_1: 
            merged_list_sil_rem.append(h_1)

    from collections import Counter
    counts_sil_rem = Counter(merged_list_sil_rem)
    merged_list_wo_dup = [k_1 for k_1 in merged_list_sil_rem if counts_sil_rem[k_1] == 1]

    silence_rem_event_list = np.array(merged_list_wo_dup) 
    silence_rem= np.reshape(silence_rem_event_list,((int((len(silence_rem_event_list))/2)),2))
    
    merged_list_sil_p = [] 
    for l_2 in silence_p_in:
        for h_2 in l_2: 
            merged_list_sil_p.append(h_2)

    from collections import Counter
    counts_sil_p = Counter(merged_list_sil_p)
    merged_list_wo_dup_p = [k_2 for k_2 in merged_list_sil_p if counts_sil_p[k_2] == 1]

    silence_p_event_list = np.array(merged_list_wo_dup_p) 
    silence_p = np.reshape(silence_p_event_list,((int((len(silence_p_event_list))/2)),2))
    
    sp_s = numpy.asarray(silence_rem)
    all_speech_ini = [((sp_n_s[1] - sp_n_s[0]) * 1000) for sp_n_s in sp_s] 

    speech_del_in= [i for i,v in enumerate(all_speech_ini) if v > speech_time_threshold] 
    speech_del_chk = [i for i,v in enumerate(all_speech_ini) if v < speech_time_threshold]
    
    silence_rem_final = silence_rem[speech_del_in]
    
    sp_p = numpy.asarray(silence_p)
    
    all_pause_ini = []
    all_pause_ini = [((sp_n_p[1] - sp_n_p[0]) * 1000) for sp_n_p in sp_p]
    
    pause_del_in= [i_p for i_p,v_p in enumerate(all_pause_ini) if v_p > pause_time_threshold] 
    pause_del_chk= [i_p for i_p,v_p in enumerate(all_pause_ini) if v_p < pause_time_threshold]
    
    silence_p_final = silence_p[pause_del_in]
    
    initial_labels_speech = ['s' if i in speech_del_in else 'us' for i in range(len(silence_rem))]
    initial_labels_pause = ['p' if j in pause_del_in else 'up' for j in range(len(silence_p))]

                
    data_s = np.array( list(zip(silence_rem[:,0],silence_rem[:,1],initial_labels_speech)))
    data_p = np.array( list(zip(silence_p[:,0],silence_p[:,1],initial_labels_pause)))
    
    speech_pause_combined = np.concatenate((data_p,data_s))
    panda_speech_pause_combined_intial = pd.DataFrame(speech_pause_combined,columns = ["Time_start","Time_end", "Labels"])
    panda_speech_pause_combined_intial.Time_start = panda_speech_pause_combined_intial.Time_start.astype(float)
    panda_speech_pause_combined_intial.Time_end = panda_speech_pause_combined_intial.Time_end.astype(float)
    panda_speech_pause_combined_intial_u = panda_speech_pause_combined_intial.sort_values('Time_end')
    panda_speech_pause_combined_intial_u.reset_index(drop=True, inplace=True)

    df = panda_speech_pause_combined_intial_u
    df_s = df.loc[(df['Labels'] == 's') | (df['Labels'] == 'us')]
    
    df_s_min = min(df_s.index)
    df_s_max = max(df_s.index)
    
    df= df[df.index > df_s_min-1]
    df= df[df.index < df_s_max+1]
    
    panda_speech_pause_combined_change_label_in = df.reset_index()
    data_labels = np.array(panda_speech_pause_combined_change_label_in.Labels)
    
    all_s = np.where(data_labels == "s")
    all_p = np.where(data_labels == "p")
    all_us = np.where(data_labels == "us")
    all_up = np.where(data_labels == "up")
    
    all_s_and_p = np.sort(numpy.concatenate((all_s[0], all_p[0])))
    for i in all_us:
        panda_speech_pause_combined_change_label_in = panda_speech_pause_combined_change_label_in.drop(i)
        
    for i in all_up:
        panda_speech_pause_combined_change_label_in = panda_speech_pause_combined_change_label_in.drop(i)
        
    panda_speech_pause_combined_change_label_in = panda_speech_pause_combined_change_label_in.reset_index()
    new_data_labels = panda_speech_pause_combined_change_label_in.Labels
    
    silence_p_rep =  [i for i in range(len(new_data_labels)) if new_data_labels[i] == 'p']
    silence_rem_rep =  [i for i in range(len(new_data_labels)) if new_data_labels[i] == 's']
    
    panda_speech_pause_combined_change_label_final = panda_speech_pause_combined_change_label_in
    panda_speech_pause_combined_change_label_final = panda_speech_pause_combined_change_label_final.assign(Labels=new_data_labels)
    
    dddf = panda_speech_pause_combined_change_label_final
    ddf = dddf
    ddf["Labels_X"] = ddf["Labels"].shift()
    drop_chk = []
    for i in dddf.index:
        if i != 0:
            if dddf.Labels[i] == dddf.Labels_X[i] :
                #print(i) 
                ddf = ddf.replace(ddf.Time_end[i-1], dddf.Time_end[i])
                drop_chk.append(i)
                
    ddf = ddf.drop(index=drop_chk, axis=0)
    ddf["Time_diff"]= ddf["Time_end"]-ddf["Time_start"]

    long_p = ddf[(ddf["Labels"] == 'p') & (ddf["Time_diff"] >= 0.15)].index
    short_p = ddf[(ddf["Labels"] == 'p') & (ddf["Time_diff"] < 0.15)].index

    silence_p_final_end = ddf[ddf["Labels"] == 'p'].index
    silence_rem_final_end = ddf[ddf["Labels"] == 's'].index

    long_p_durations = ddf.loc[long_p, "Time_diff"].tolist()
    short_p_durations = ddf.loc[short_p, "Time_diff"].tolist()


    return silence_p_final_end, silence_rem_final_end, ddf, long_p,short_p, long_p_durations,short_p_durations

In [None]:
def viz_S_P_A (trim_file_viz,ddf ):    
    x_nr_ex, sr_nr_ex = librosa.load(trim_file_viz,sr= None)
    %matplotlib notebook
    plt.figure(figsize=(12,5))
    for i in ddf.index:
        if (ddf["Labels"][i] == 'p') & (ddf["Time_diff"][i] > 0.15):
            start = ddf.Time_start[i]
            end = ddf.Time_end[i]
            plt.axvspan(start, end, color='r',alpha=0.2)
        if (ddf["Labels"][i] == 'p') & (ddf["Time_diff"][i] <= 0.15):
            start = ddf.Time_start[i]
            end = ddf.Time_end[i]
            plt.axvspan(start, end, color='blue',alpha=0.2)
        if (ddf["Labels"][i] == 's') :
            start = ddf.Time_start[i]
            end = ddf.Time_end[i]
            plt.axvspan(start, end, color='g', alpha=0.2)

    red_patch = mpatches.Patch(color='r',alpha=0.2, label='Long Pause (>150 ms)')
    blue_patch = mpatches.Patch(color='blue',alpha=0.2, label='Short Pause (<=150 ms)')
    green_patch = mpatches.Patch(color='g',alpha=0.2, label='Vocal Event (>100 ms)')
    plt.legend(handles=[red_patch,blue_patch,green_patch])

    
    librosa.display.waveshow(x_nr_ex, sr = sr_nr_ex)
    plt.xlabel('Time',fontsize=15)
    plt.ylabel('Amplitude',fontsize=15)

In [None]:
def event_statistics_wf(f_name_current, f_trim_file, x_nr_ex, sr_nr_ex,silence_p_final_end, silence_rem_final_end, speech_time_threshold,pause_time_threshold,ddf, long_p, short_p):

    file = wave.open(f_name_current)
    time_ex = (len(x_nr_ex)/sr_nr_ex)*1000 # total time of clipped file in ms
    
    all_pauses = [ddf.Time_diff[s] * 1000 for s in silence_p_final_end]
    all_speech = [ddf.Time_diff[sp] * 1000 for sp in silence_rem_final_end]
    
    pause_dur = sum(all_pauses)
    speech_dur = sum(all_speech)
    
    total_time = pause_dur+speech_dur
    long_p_len = len(long_p)
    short_p_len = len(short_p)
    
    # Statistical calculations
    
    Speech_threshold = speech_time_threshold
    Pause_threshold = pause_time_threshold
    Todal_duration_in_minutes = time_ex/100
    percent_speech = (speech_dur/time_ex)*100
    percent_pause = (pause_dur/time_ex)*100
    Total_Duration_Unclipped_s = (file.getnframes()/file.getframerate())
    Total_Duration_Clipped_s = time_ex/1000
    speech_duration_s = round(speech_dur/1000,4)
    pause_duration_s =  round(pause_dur/1000,4)
    speech_events =  len(all_speech)
    pause_events = len(all_pauses)
    all_speech_s = [x / 1000 for x in all_speech]
    all_pauses_s = [x / 1000 for x in all_pauses]
    if len(all_speech_s)>0:
        mean_speech_s = statistics.mean(all_speech_s)
    else:
        mean_speech_s = numpy.NaN

    if len(all_speech)>1:    
        Std_dev_speech_s = statistics.stdev(all_speech_s)
        CV_speech_duration_s = variation(all_speech_s)
    else: 
        Std_dev_speech_s = numpy.NaN
        CV_speech_duration_s = numpy.NaN
    pass

    if len(all_pauses_s)>0:
        mean_pause_s = statistics.mean(all_pauses_s)
    else: 
        mean_pause_s = numpy.NaN
    if len(all_pauses_s)>1:      
        Std_dev_pause_s = statistics.stdev(all_pauses_s)
        CV_pause_duration_s = variation(all_pauses_s)
    else: 
        Std_dev_pause_s = numpy.NaN
        CV_pause_duration_s = numpy.NaN
    pass
            
    data = {'File_Name': [f_name_current],
        'Speech Time Threshold_ms': [speech_time_threshold],
        'Pause Time Threshold_ms': [pause_time_threshold],
        'Percent_Pause': [percent_pause],  
        'Percent_Speech': [percent_speech],
        'Total_Duration_Unclipped_s': [Total_Duration_Unclipped_s],
        'Total_Duration_Clipped_s': [Total_Duration_Clipped_s],
        'Speech_Duration_s': [speech_dur/1000],
        'Pause_Duration_s': [pause_dur/1000],
        'Speech_Events':[speech_events],
        'Pause_Events': [pause_events],
        'Mean Speech_s': [mean_speech_s],
        'Std Dev Speech':[Std_dev_speech_s],
        'CV Speech': [CV_speech_duration_s],
        'Mean Pause_s':[mean_pause_s],
        'Std Dev Pause':[Std_dev_pause_s],
        'CV Pause':[CV_pause_duration_s]
        }
    df = pd.DataFrame(data, columns= ['File_Name',
                                      'Speech Time Threshold_ms',
                                      'Pause Time Threshold_ms',
                                      'Percent_Pause',
                                      'Percent_Speech',
                                      'Total_Duration_Unclipped_s',
                                      'Total_Duration_Clipped_s',
                                      'Speech_Duration_s', 
                                      'Pause_Duration_s', 
                                      'Speech_Events', 
                                      'Pause_Events',
                                      'Mean Speech_s',
                                      'Std Dev Speech',
                                      'CV Speech',
                                      'Mean Pause_s',
                                      'Std Dev Pause',
                                      'CV Pause'
                                     ])

    return df, ddf