## 測試項目
1. 訓練集中的alarm聲
2. 使用pruned and quantized tflite model
4. export to cc file

In [2]:
import os
import sys
import datetime
import shutil
import numpy as np
import tensorflow as tf
# from tensorflow import keras
import zipfile
import wavio
from common import utils as U
np.set_printoptions(threshold=sys.maxsize)

In [3]:
sys.path.append(os.path.abspath("../../deployment/"))
sys.path.append(os.path.abspath("../../common/"))
sys.path.append(os.path.abspath("../../"))

In [4]:
from lib_original.bytes import to_bytes, from_bytes, byte_conversion_tests, load_data, load_raw, save_raw, save_scores
from lib_original.constants import quant_support, crops, feature_count
from lib_original.quantize import quantization_tests, get_cast

In [12]:
from datetime import datetime
import re

In [None]:
def get_cast_input_data(x, dtype = None):
    '''Retrieves the input data set, casting to target dtype'''
    x, y = self.get_input_data()

    if dtype is None:
      print('dtype not provided')
      return x  
    return get_cast(dtype)(x, axis = -2)

### Loading TFlite Model

In [5]:
tflite_quant_model_path = "./models/tflite/retrain_pruned_model_20240130161638_from_keras.tflite";
# Load quantized TFLite model
tflite_interpreter_quant = tf.lite.Interpreter(model_path=tflite_quant_model_path)

In [6]:
# model.summary()
input_details = tflite_interpreter_quant.get_input_details()
output_details = tflite_interpreter_quant.get_output_details()

print("== Input details ==")
print("name:", input_details[0]['name'])
print("shape:", input_details[0]['shape'])
print("type:", input_details[0]['dtype'])

print("\n== Output details ==")
print("name:", output_details[0]['name'])
print("shape:", output_details[0]['shape'])
print("type:", output_details[0]['dtype'])

#resize the input and output
# tflite_interpreter_quant.resize_tensor_input(input_details[0]['index'], (2, 1, 30225, 1))
# tflite_interpreter_quant.resize_tensor_input(output_details[0]['index'], (2, 2))

# input_details = tflite_interpreter_quant.get_input_details()
# output_details = tflite_interpreter_quant.get_output_details()

# print("== Input details ==")
# print("name:", input_details[0]['name'])
# print("shape:", input_details[0]['shape'])
# print("type:", input_details[0]['dtype'])

# print("\n== Output details ==")
# print("name:", output_details[0]['name'])
# print("shape:", output_details[0]['shape'])
# print("type:", output_details[0]['dtype'])

tflite_interpreter_quant.allocate_tensors();

== Input details ==
name: serving_default_input_2:0
shape: [    1     1 30225     1]
type: <class 'numpy.int8'>

== Output details ==
name: StatefulPartitionedCall:0
shape: [1 2]
type: <class 'numpy.int8'>


INFO: Created TensorFlow Lite XNNPACK delegate for CPU.


### loading dataset from npz format
1. ACDNet input length is 30225
2. sr is 44100 and 20000
3. need to convert 16K to 20000
### ACDNet Config Setting
#### Training Parameters
1. opt.batchSize = 64;
2. opt.weightDecay = 5e-4;
3. opt.momentum = 0.9;
4. opt.nEpochs = 2000;
5. opt.LR = 0.1;
6. opt.schedule = [0.3, 0.6, 0.9];
7. opt.warmup = 10; 
#### Basic Net Configuration
- nClasses = 50
- nFolds = 5
- splits = \[i for in range(1, nFolds + 1)\]
- sr = 20000
- inputLength = 30225
<br>ngth = 30225;
### How to convert 16K sound to 44.1K with python and sox

if using sox the command is as following: <br />
    sox old.wav -b 16 new.wav 
if using python you can do as following: <br />
    import soundfile
    
data, samplerate = soundfile.read('old.wav
    <br />)
soundfile.write('new.wav', data, samplerate, subtype='PCM_1
6')

In [13]:
def genDataTimeStr():
    return datetime.today().strftime('%Y-%m-%d %H:%M:%S').replace('-',"").replace(' ',"").replace(':',"");

In [14]:
def getFileList(srcDir,regex='.*\.wav'):
    results = os.listdir(srcDir)
    out_files = []
    cnt_files = 0
    for file in results:
        if os.path.isdir(os.path.join(srcDir, file)):
            out_files += getFileList(os.path.join(srcDir, file))
        elif re.match(regex, file,  re.I):  # file.startswith(startExtension) or file.endswith(".txt") or file.endswith(endExtension):
            out_files.append(os.path.join(srcDir, file))
            cnt_files = cnt_files + 1
    return out_files

## sound preprocessing functions

In [15]:
_inputLen = 30225
_nCrops = 2
def preprocess_setup():
    funcs = []
    funcs += [U.padding( _inputLen// 2),
              U.normalize(32768.0),
              U.multi_crop(_inputLen, _nCrops)]
              # U.single_crop(_inputLen)]
              # 

    return funcs

def preprocess_debug():
    debug_funcs = []
    debug_funcs += [U.padding( _inputLen// 2),
              U.normalize(32768.0),]
              # U.multi_crop(_inputLen, _nCrops)]
              # U.single_crop(_inputLen)]
              # 

    return debug_funcs

def preprocess(sound, funcs):
    for f in funcs:
        sound = f(sound)

    return sound;

In [16]:
_funcs = preprocess_setup()
# _debug_funcs = preprocess_debug()

### Read Test Wav File

In [20]:
#specify the input type:
dtype = 'int8'

positive_test_wavs = "./test_sounds/positive/"
negative_test_wavs_others = "./test_sounds/negative/othersounds/"
negative_test_wavs_siren = "./test_sounds/negative/siren/"

p_wav_list = getFileList(positive_test_wavs);
n_wav_list_other = getFileList(negative_test_wavs_others);
n_wav_list_siren = getFileList(negative_test_wavs_siren);
print(f"total alarm sounds:{len(p_wav_list)} ");
print(f"total other sounds:{len(n_wav_list_other)}")
print(f"total siren sounds:{len(n_wav_list_siren)}")
p_label = 52;
n_label = 99;

sounds_1 = [];
labels_1 = [];
sounds_2 = [];
labels_2 = [];
## sound = preprocess(sound, _funcs)
for i in p_wav_list:
    sound = wavio.read(i).data.T[0]
    start = sound.nonzero()[0].min();
    end = sound.nonzero()[0].max();
    sound = sound[start: end + 1]
    label = int(p_label);
    if len(sound)> 220500:
        sound = sound[:220500]
    sound = preprocess(sound, _funcs);
    sounds_1.append(sound);
    labels_1.append(label);
    sounds_2.append(sound);
    labels_2.append(label);
    
for j in n_wav_list_other:
    sound = wavio.read(j).data.T[0]
    start = sound.nonzero()[0].min();
    end = sound.nonzero()[0].max();
    sound = sound[start: end + 1];
    label = int(n_label);
    sound = np.float32(preprocess(sound, _funcs));
    sounds_1.append(sound);
    labels_1.append(label);

for k in n_wav_list_siren:
    sound = wavio.read(j).data.T[0]
    start = sound.nonzero()[0].min();
    end = sound.nonzero()[0].max();
    sound = sound[start: end + 1];
    label = int(n_label);
    sound = np.float32(preprocess(sound, _funcs));
    sounds_1.append(sound);
    labels_1.append(label);
    sounds_2.append(sound);
    labels_2.append(label);

print(f"sound testset 1 length: {len(sounds_1)}");
print(f"sound testset 2 length: {len(sounds_2)}");

# sound = wavio.read(test_sound_file).data.T[0]
# start = sound.nonzero()[0].min()
# end = sound.nonzero()[0].max()
# sound = sound[start: end + 1]  # Remove silent sections
# label = 18 #int(os.path.splitext(test_sound_file)[0].split('-')[-1])

# if len(sound)> 220500:
#     sound = sound[:220500]

# ec50_sound1 =  wavio.read(ec50_18_sound).data.T[0]
# start_ec50 = ec50_sound1.nonzero()[0].min()
# end_ec50 = ec50_sound1.nonzero()[0].max()
# ec50_sound1 = ec50_sound1[start_ec50:end_ec50+1]
# ec50_18_label = 18

total alarm sounds:116 
total other sounds:128
total siren sounds:91
sound testset 1 length: 335
sound testset 2 length: 207


In [21]:
# print(sounds)
# print(labels)

In [22]:
print(sounds_1[0][1].shape)
print(type(labels_1[0]))

(30225,)
<class 'int'>


In [23]:
# sound = np.expand_dims(sound, axis=1)
# sound = np.expand_dims(sound, axis=3)
# print(sound.shape)

In [26]:
test1_len = len(sounds_1);
test2_len = len(sounds_2)
len_alarm = len(p_wav_list);
len_siren = len(n_wav_list_siren);
len_other = len(n_wav_list_other);
n_total_len = len_other + len_siren ;
# s_total_len = len(n_wav_list_siren);
print(f"total alarm sounds is {len_alarm}\ntotal other sounds is {n_total_len}(other:{len_other} + siren:{len_siren})");
test1_correct_num = 0;
test2_correct_num = 0;
test1_p_correct_num = 0;
test2_p_correct_num = 0;
n_correct_num = 0;
n_correct_siren_num = 0;
n_correct_othersounds_num = 0;

for w in range(test1_len): 
    s = sounds_1[w]
    l = labels_1[w]
    s_test = np.expand_dims(s[0], axis=0);
    s_test = np.expand_dims(s_test, axis=1);
    s_test = np.expand_dims(s_test, axis=3);
    #get_cast(dtype)(sound, axis = -2)
    # s_test = np.int8(s_test);
    s_test = get_cast(dtype)(s_test, axis=-2)
    print(s_test.shape)
    tflite_interpreter_quant.set_tensor(input_details[0]['index'], s_test);
    tflite_interpreter_quant.invoke()
    pred = tflite_interpreter_quant.get_tensor(output_details[0]['index'])
    # print(f"Prediction result shape:{pred.shape}\n");
    print(f"Prediction result: {pred}, and true label: {l}")
    if l == 52: #positive
        if pred[0][0] > pred[0][1]:
            test1_correct_num += 1;
            test1_p_correct_num += 1;
    if l == 99: #negative
        if pred[0][0] < pred[0][1]:
            test1_correct_num += 1;
            n_correct_num += 1;
            
for w in range(test2_len): 
    s = sounds_2[w]
    l = labels_2[w]
    s_test = np.expand_dims(s[0], axis=0);
    s_test = np.expand_dims(s_test, axis=1);
    s_test = np.expand_dims(s_test, axis=3);
    # s_test = np.float32(s_test);
    s_test = get_cast(dtype)(s_test, axis=-2)
    # print(s_test.shape)
    # print(f"the {w+1} item's shape:\n  {s.shape}");
    # Run inference
    # print(f"s type:{type(s)}, and s shape:{s.shape}")
    tflite_interpreter_quant.set_tensor(input_details[0]['index'], s_test);
    tflite_interpreter_quant.invoke()
    pred = tflite_interpreter_quant.get_tensor(output_details[0]['index'])
    # print(f"Prediction result shape:{pred.shape}\n");
    print(f"Prediction result: {pred}, and true label: {l}")
    if l == 52: #positive
        if pred[0][0] > pred[0][1]:
            test2_correct_num += 1;
            test2_p_correct_num += 1;
    if l == 99: #negative
        if pred[0][0] < pred[0][1]:
            test2_correct_num += 1;
            n_correct_siren_num += 1;
print(f"test1: total sounds(alarm+other+siren)test result is {100*(test1_correct_num/test1_len)}");
print(f"test1: total alarm sounds result is {100*(test1_p_correct_num/len_alarm)}");
print(f"test1 total other sounds(other+siren) result is {100*n_correct_num/n_total_len}");
print("**************************************************************************************")
print(f"test2: total sounds(alarm+siren)test result is {100*(test2_correct_num/test2_len)}")
print(f"test1: total alarm sounds result is {100*(test2_p_correct_num/len_alarm)}");
print(f"test2 total siren sounds result is {100*n_correct_siren_num/len_siren}");

total alarm sounds is 116
total other sounds is 219(other:128 + siren:91)
Casting dataset to int8
(1, 1, 30225, 1)
Prediction result: [[-97  97]], and true label: 52
Casting dataset to int8
(1, 1, 30225, 1)
Prediction result: [[-84  84]], and true label: 52
Casting dataset to int8
(1, 1, 30225, 1)
Prediction result: [[-97  97]], and true label: 52
Casting dataset to int8
(1, 1, 30225, 1)
Prediction result: [[-101  101]], and true label: 52
Casting dataset to int8
(1, 1, 30225, 1)
Prediction result: [[-101  101]], and true label: 52
Casting dataset to int8
(1, 1, 30225, 1)
Prediction result: [[-113  113]], and true label: 52
Casting dataset to int8
(1, 1, 30225, 1)
Prediction result: [[-86  86]], and true label: 52
Casting dataset to int8
(1, 1, 30225, 1)
Prediction result: [[-73  73]], and true label: 52
Casting dataset to int8
(1, 1, 30225, 1)
Prediction result: [[-101  101]], and true label: 52
Casting dataset to int8
(1, 1, 30225, 1)
Prediction result: [[-97  97]], and true label: 5

In [34]:
# scores = model.predict(sound, batch_size=len(sound), verbose=0);
# print(type(scores))
# print(scores.shape)

# for res in scores:
#     max_value = res.max()的
#     max_index = np.argmax(res)
#     print(f"max value:{max_value:.5f} and index is {max_index}")
#     print('\n'.join('{}: {:.5f}'.format(*k) for k in enumerate(res)))