In [1]:
import os
import sys
import numpy as np
import time
import pyaudio
import wave
import zipfile
import wavio
import random
import IPython
from queue import Queue
from threading import Thread
np.set_printoptions(threshold=sys.maxsize)

In [2]:
import tensorflow as tf




In [3]:
sys.path.append("../ipynb_files/")
sys.path.append("../")

In [4]:
from common import utils as U
from TestSharedLib.bytes import to_bytes, from_bytes, byte_conversion_tests, load_data, load_raw, save_raw, save_scores
from TestSharedLib.constants import quant_support, crops, feature_count

In [5]:
from datetime import datetime
def genDataTimeStr():
    return datetime.today().strftime('%Y-%m-%d %H:%M:%S').replace('-',"").replace(' ',"").replace(':',"");

In [6]:
def quantize_int8(x, axis):
  '''Quantization into int8_t precision, operating on x along axis'''
  scaling_factor_shape = tuple(np.append([len(x)],np.ones(x.ndim - 1, dtype = int)))
  epsilon = 0.000000001
  x_scaling_factor = (np.max(np.abs(x), axis) / 128) + epsilon
  x_scaling_factor = x_scaling_factor.reshape(scaling_factor_shape)
  # x_zero_offset = -0.5 #-0.25 #-0.25
  result = (x / x_scaling_factor) #+ x_zero_offset
  return np.rint(result).astype(np.int8)

In [7]:
tflite_quant_model_path = "../../trained_models/step_6_QAT_and_Convert2TFLite/final_qat_model_acc87.46_testacc89_20240418190654/";
quanted_interpreter = tf.lite.Interpreter(model_path=tflite_quant_model_path)
print("model loaded....")

model loaded....


### Show the model information

In [57]:
input_details = quanted_interpreter.get_input_details()
output_details = quanted_interpreter.get_output_details()

print("== Input details ==")
print("name:", input_details[0]['name'])
print("shape:", input_details[0]['shape'])
print("type:", input_details[0]['dtype'])

print("\n== Output details ==")
print("name:", output_details[0]['name'])
print("shape:", output_details[0]['shape'])
print("type:", output_details[0]['dtype'])
#allocate tensor for testing
quanted_interpreter.allocate_tensors();

== Input details ==
name: input.2_te_transform
shape: [    1     1 30225     1]
type: <class 'numpy.int8'>

== Output details ==
name: Xq.1
shape: [1 3]
type: <class 'numpy.int8'>


### sound preprocessing codes

In [58]:
_inputLen = 30225
_nCrops = 2
def preprocess_setup():
    funcs = []
    funcs += [U.padding( _inputLen// 2),
              U.normalize(32768.0),
              U.multi_crop(_inputLen, _nCrops)]
              # U.single_crop(_inputLen)]
              # 

    return funcs

def preprocess_debug():
    debug_funcs = []
    debug_funcs += [U.padding( _inputLen// 2),
              # U.normalize(32768.0),]
              U.multi_crop(_inputLen, _nCrops)]
              # U.single_crop(_inputLen)]
              # 

    return debug_funcs


def preprocess(sound, funcs):
    for f in funcs:
        sound = f(sound)
    return sound;
    

def padding(pad):
    def f(sound):
        return np.pad(sound, pad, 'constant')

    return f
    

# def random_crop(size):
#     def f(sound):
#         org_size = len(sound)
#         start = random.randint(0, org_size - size)
#         return sound[start: start + size]

#     return f



In [10]:
_funcs = preprocess_setup()

In [63]:
def doSoundClassification(input_wav=None, label=None):
    sound = wavio.read(input_wav).data.T[0]
    start = sound.nonzero()[0].min();
    end = sound.nonzero()[0].max();
    sound = sound[start: end + 1];
    if len(sound)> 220500:
        sound = sound[:220500]
    sound = np.int16(preprocess(sound, _funcs));
    # label = label;
    s_test = np.expand_dims(sound[0], axis=0);
    s_test = np.expand_dims(s_test, axis=1);
    s_test = np.expand_dims(s_test, axis=3);
    # print(f"len of s_test:{len(s_test)}, shape of s_test:{s_test.shape}")
    s_test = quantize_int8(s_test,axis=-2)
    quanted_interpreter.set_tensor(input_details[0]['index'], s_test);
    quanted_interpreter.invoke()
    pred = quanted_interpreter.get_tensor(output_details[0]['index'])
    # print(f"Prediction result shape:{pred.shape}\n");
    print(f"Prediction result: {pred}, and true label: {label}")
    # print(f"channel of inpu_wav:{len(sound)}");

### List Devices

In [60]:
# audio = pyaudio.PyAudio();
# result = []
# for i in range(audio.get_device_count()):
#     device_info = audio.get_device_info_by_index(i)
#     result.append(device_info.get("name"))
# audio.terminate()
# print(result)

In [53]:
messages = Queue()
recordings = Queue()

In [54]:
CHUNK = 8192
FORMAT = pyaudio.paInt16
CHANNELS = 1 if sys.platform == 'darwin' else 2;
RATE = 20000
RECORD_SECONDS = 2
SAMPLE_SIZE = 2
# p = pyaudio.PyAudio()

### code-snipet 2

In [69]:
def record_sound(record_second=2.5):
    RECORD_SECONDS = record_second;
    WAVE_OUTPUT_FILENAME = "mic_test_sound_{}.wav".format(genDataTimeStr());
    p = pyaudio.PyAudio()
    stream = p.open(format=FORMAT,
                    channels=CHANNELS,
                    rate=RATE,
                    input=True,
                    output=True,
                    frames_per_buffer=CHUNK)
    
    print("進行2秒聲音錄製")
    
    frames = []
    
    for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)):
        data = stream.read(CHUNK)
        frames.append(data)
    
    # stream.stop_stream()
    stream.close()
    p.terminate()
    test_wav = "./mic_record_savedir/{}".format(WAVE_OUTPUT_FILENAME);
    wf = wave.open(test_wav, 'wb')
    wf.setnchannels(CHANNELS)
    wf.setsampwidth(p.get_sample_size(FORMAT))
    wf.setframerate(RATE)
    wf.writeframes(b''.join(frames))
    wf.close()
    
    print("錄製完成，進行辨識{}".format(test_wav));
    doSoundClassification(input_wav=test_wav,label=2);
    return test_wav;
    # 

In [70]:
play_wav1 = record_sound();
IPython.display.Audio(play_wav1)

進行2秒聲音錄製
錄製完成，進行辨識./mic_record_savedir/mic_test_sound_20240424115250.wav
Prediction result: [[-27 -31  46]], and true label: 2


In [71]:
val_wav = "../../datasets/CurrentUse/wav_files/Single_Fold/train/positive/moaning_5sec_56/moaning_04_56.wav"
doSoundClassification(input_wav=val_wav)
IPython.display.Audio(play_wav1)

Prediction result: [[-27 -31  46]], and true label: None


### code snipet 3

In [9]:
# def record_microphone(chunk=1024):
#     p = pyaudio.PyAudio();
#     stream = p.open(format=FORMAT,
#                     channels=CHANNELS,
#                     rate=RATE,
#                     input=True,
#                     # input_device_index=2,
#                     frames_per_buffer=chunk)

#     frames = []

#     while not messages.empty():
#         data = stream.read(chunk)
#         frames.append(data)
#         if len(frames) >= (RATE * RECORD_SECONDS) / chunk:
#             recordings.put(frames.copy())
#             frames = []

#     stream.stop_stream()
#     stream.close()
#     p.terminate()

In [25]:
# def test_sound_classification():
#     while not messages.empty():
#         frames = recordings.get()
        
#         rec.AcceptWaveform(b''.join(frames))
#         result = rec.Result()
#         text = json.loads(result)["text"]
#         print(f"accept data....")
        
#         time.sleep(1)

In [26]:
# record_microphone()
# test_sound_classification()

### code snipet 1

In [2]:
# DURATION = 2;  # seconds

# def callback(in_data, frame_count, time_info, status):
#     print("data received.....");
#     #return (in_data, pyaudio.paContinue)


# stream = p.open(format=p.get_format_from_width(2),
#                 channels=1 if sys.platform == 'darwin' else 2,
#                 rate=20000,
#                 input=True,
#                 output=True,
#                 stream_callback=callback)

# start = time.time()
# while stream.is_active() and (time.time() - start) < DURATION:
#     pass
#     # time.sleep(1)

# stream.close()
# p.terminate()