In [1]:
# Converts wav files to spectrogram features which are saved to a specified file

import numpy as np
import tensorflow as tf
import scipy.io.wavfile as wav
import numpy as np
from pathlib import Path

from tensorflow.lite.experimental.microfrontend.python.ops import audio_microfrontend_op as frontend_op 

wav_file_path = "Dipco/audio/"
wav_file_glob = "**/*U01.CH1.wav"
features_output_fname = 'dipco_features.npy'

def generate_features_for_clip(clip_data):
    micro_frontend = frontend_op.audio_microfrontend(
        tf.convert_to_tensor(clip_data),
        sample_rate=16000,
        window_size=30,
        window_step=20,
        num_channels=40,
        upper_band_limit=7500,
        lower_band_limit=125,
        enable_pcan=True,
        min_signal_remaining=0.05,
        out_scale=1,
        out_type=tf.float32)
    scaled = tf.multiply(micro_frontend, 0.0390625)
    numpy = scaled.numpy()

    return numpy

clips = [str(i) for i in Path(wav_file_path).glob(wav_file_glob)]

concatenated = False

for clip in sorted(clips):
    _,data = wav.read(clip)
    padded_data = np.pad(data, (0,0),'constant',constant_values=(0,))
    
    print("Processing", clip)
    features = generate_features_for_clip(padded_data)

    if not concatenated:
      features_output = np.squeeze(features)
      concatenated = True
    else:
      features_output = np.append(features_output, np.squeeze(features),axis=0)

np.save(features_output_fname, features_output)


Processing Dipco/audio/dev/S02_U01.CH1.wav
Processing Dipco/audio/dev/S04_U01.CH1.wav
Processing Dipco/audio/dev/S05_U01.CH1.wav
Processing Dipco/audio/dev/S09_U01.CH1.wav
Processing Dipco/audio/dev/S10_U01.CH1.wav
Processing Dipco/audio/eval/S01_U01.CH1.wav
Processing Dipco/audio/eval/S03_U01.CH1.wav
Processing Dipco/audio/eval/S06_U01.CH1.wav
Processing Dipco/audio/eval/S07_U01.CH1.wav
Processing Dipco/audio/eval/S08_U01.CH1.wav


In [2]:
# Given features data and a model, this computes the probability of the wakeword one stride at a time through all of the features and saves the probabilities to a file
#  - The model can be quantized or not
#  - The model can be streaming or nonstreaming

import numpy as np
import tensorflow as tf

features_fname = 'dipco_features.npy'
model_fname = '../trained_models/hey_jarvis/tflite_stream_state_external/stream_state_external.tflite'
output_probabilities_fname = 'model_probabilities.npy'



features_data = np.load(features_fname)

interpreter = tf.lite.Interpreter(model_path=model_fname)

interpreter.allocate_tensors()
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

is_quantized_model = (input_details[0]['dtype'] == np.int8)
input_feature_slices = input_details[0]['shape'][1]

window_stride = 1
start = 0
end = input_feature_slices


running_probability = []

for s in range(len(input_details)):
    if is_quantized_model:
        interpreter.set_tensor(input_details[s]['index'], np.zeros(input_details[s]['shape'], dtype=np.int8))
    else:
        interpreter.set_tensor(input_details[s]['index'], np.zeros(input_details[s]['shape'], dtype=np.float32))

def quantize_input_data(data, input_details):
  """quantize the input data using scale and zero point

  Args:
      data (np.array in float): input data for the interpreter
      input_details : output of get_input_details from the tflm interpreter.

  Returns:
    np.ndarray: quantized data as int8 dtype
  """
  # Get input quantization parameters
  data_type = input_details['dtype']
  
  input_quantization_parameters = input_details['quantization_parameters']
  input_scale, input_zero_point = input_quantization_parameters['scales'][
      0], input_quantization_parameters['zero_points'][0]
  # quantize the input data
  data = data / input_scale + input_zero_point
  return data.astype(data_type)

def dequantize_output_data(data: np.ndarray,
                           output_details: dict) -> np.ndarray:
  """Dequantize the model output

  Args:
      data: integer data to be dequantized
      output_details: TFLM interpreter model output details

  Returns:
      np.ndarray: dequantized data as float32 dtype
  """
  output_quantization_parameters = output_details['quantization_parameters']
  output_scale = output_quantization_parameters['scales'][0]
  output_zero_point = output_quantization_parameters['zero_points'][0]
  # Caveat: tflm_output_quant need to be converted to float to avoid integer
  # overflow during dequantization
  # e.g., (tflm_output_quant -output_zero_point) and
  # (tflm_output_quant + (-output_zero_point))
  # can produce different results (int8 calculation)
  return output_scale * (data.astype(np.float32) - output_zero_point)  

while end < features_data.shape[0]:
    new_data_to_input = features_data[start:end,:]
    
    if is_quantized_model:
        new_data_to_input = quantize_input_data(new_data_to_input, input_details[0])
    
    # update indexes of streamed updates
    start += window_stride
    end += window_stride
    
    # Input new data and invoke the interpreter
    interpreter.set_tensor(input_details[0]['index'], np.expand_dims(new_data_to_input,0))
    interpreter.invoke()   

    # get output states and feed them as inputs
    # which will be fed in the next inference cycle for externally streaming models
    for s in range(1,len(input_details)):
        interpreter.set_tensor(input_details[s]['index'], interpreter.get_tensor(output_details[s]['index']))
        
    output = interpreter.get_tensor(output_details[0]['index'])
    
    if is_quantized_model:
        wakeword_probability = dequantize_output_data(output[0][0], output_details[0])
    else:
        wakeword_probability = output[0][0]

    running_probability.append(wakeword_probability)

np.save(output_probabilities_fname, running_probability)

INFO: Created TensorFlow Lite XNNPACK delegate for CPU.


In [3]:
# Takes in running probabilities and determines whether the model predicts the wake word or not
# Computes false accept and false reject rates

import numpy as np

probabilities_fname = 'model_probabilities.npy'
label_fname = ''                        # label file with wake word timestamps generated by Picovoice's benchmark setup (https://github.com/Picovoice/wake-word-benchmark)
probability_threshold = 0.5             # probability cutoff for wake word detection
probabilities_sliding_window_size = 10  # how many probabilities in the sliding window that are averaged
ignore_after_positive_setting = 74      # how many window strides after detecting the wake word to ignore before accepting the next wake word probability (currently, the model is built on spectrograms with 74 window strides)


running_probability = np.load(probabilities_fname)
probabilities_duration_h = running_probability.shape[0]*0.02/3600.0 # window stride is 0.02 s, dividing by 3600.0 to convert to hours

ignore_after_positive = ignore_after_positive_setting
true_accept = 0
false_accept = 0

keyword_times_sec = list()
total_positive_phrases = 0
labels = np.zeros((running_probability.shape[0],), dtype=bool)

if label_fname != '':
    with open(label_fname, 'r') as f:
        for line in f.readlines():
            keyword_times_sec.append(tuple(float(x) for x in line.strip('\n').split(', ')))

    total_positive_phrases = len(keyword_times_sec)

    # Using Picovoice's benchmark setup https://github.com/Picovoice/wake-word-benchmark
    for start_sec, end_sec in keyword_times_sec:
        start_frame = int(start_sec//0.02)
        end_frame = int((end_sec//0.02))
        labels[start_frame:(end_frame + 1)] = True

def running_average_detection(window_probabilities, threshold):
    return np.average(window_probabilities) > threshold

for index in range(0, running_probability.shape[0]):
    if ignore_after_positive > 0:
        ignore_after_positive -= 1
    else:
        detected = running_average_detection(running_probability[index-probabilities_sliding_window_size:index], probability_threshold)
        
        if detected:
            ignore_after_positive = ignore_after_positive_setting
            if labels[index-probabilities_sliding_window_size]:
                true_accept += 1
            else:
                false_accept += 1
                print("False accept at frame", index, "; timestamp", ((index-probabilities_sliding_window_size)*0.02), "s; probability", (np.average(running_probability[index-probabilities_sliding_window_size:index])))
                
                
if total_positive_phrases > 0:
    print("False reject rate:", (total_positive_phrases-true_accept)/total_positive_phrases)
print("False accepts per hour:", false_accept/probabilities_duration_h)

False accept at frame 52556 ; timestamp 1050.92 s; probability 0.50478536
False accepts per hour: 0.18745098937673585
