In [56]:
import numpy as np
from torch import hub
from torch import from_numpy
import torch.onnx
import pyaudio

# PyAudio Setup
num_samples = 3024
FORMAT = pyaudio.paInt16
CHANNELS = 1
SAMPLE_RATE = 16000
CHUNK = int(SAMPLE_RATE / 10)

audio = pyaudio.PyAudio()
recording_length = 15 # in seconds
time_between_recordings = 300
confidence_threshold = 0.92
conf_length = 20 # how much measurements to take
conf_enough = 10

window = None
lock_vad = None

cur_feeling = None


def int2float(sound):
    abs_max = np.abs(sound).max()
    sound = sound.astype('float32')
    if abs_max > 0:
        sound *= 1/abs_max
    sound = sound.squeeze()  # depends on the use case
    return sound

def export_silero_vad():
    # model setup
    model, utils = hub.load(repo_or_dir='snakers4/silero-vad',
                              model='silero_vad',
                              force_reload=True)

    (get_speech_timestamps,
    save_audio,
    read_audio,
    VADIterator,
    collect_chunks) = utils
    stream = audio.open(format=FORMAT,
                    channels=CHANNELS,
                    rate=SAMPLE_RATE, 
                    input=True,
                    frames_per_buffer=CHUNK,
                    input_device_index=1
                    )

    info = audio.get_host_api_info_by_index(0)
    numdevices = info.get('deviceCount') 
    for i in range(0, numdevices):
        if (audio.get_device_info_by_host_api_device_index(0, i).get('maxInputChannels')) > 0:
            print("Input Device id ", i, " - ", audio.get_device_info_by_host_api_device_index(0, i).get('name'))

    audio_chunk = stream.read(num_samples, exception_on_overflow = False)    
    # takes significant amount of time
    # if done on the fly in the same thread, brings noticeable artifacts to 
    audio_int16 = np.frombuffer(audio_chunk, np.int16)
    audio_float32 = int2float(audio_int16)
    # get the confidences
    torch_input = (from_numpy(audio_float32), 16000)
    new_confidence = model(from_numpy(audio_float32), 16000).item()
    
    # torch.onnx.export(model,               # model being run
    #              (from_numpy(audio_float32), 16000),  # model input (or a tuple for multiple inputs)
    #               "silero_vad.onnx",   # where to save the model (can be a file or file-like object)
    #               export_params=True,        # store the trained parameter weights inside the model file
    #               opset_version=10,          # the ONNX version to export the model to
    #               do_constant_folding=True,  # whether to execute constant folding for optimization
    #               input_names = ['input', 'freq'],   # the model's input names
    #               output_names = ['output'], # the model's output names
    #               dynamic_axes={'input' : {0 : 'batch_size'},    # variable length axes
    #                             'output' : {0 : 'batch_size'}})
    # export_output = torch.onnx.dynamo_export(model, torch_input)
    # export_output.save("my_image_classifier.onnx")

    print(new_confidence)
    print('silero_vad exported')   
    stream.close()

    return audio_float32


In [57]:
audio_float32 = export_silero_vad()

Downloading: "https://github.com/snakers4/silero-vad/zipball/master" to C:\Users\ioci/.cache\torch\hub\master.zip


Input Device id  0  -  Microsoft Sound Mapper - Input
Input Device id  1  -  Микрофон (2- Samson GoMic)
Input Device id  2  -  Микрофон (NVIDIA Broadcast)
0.21194827556610107
silero_vad exported


In [58]:
import onnx

onnx_model = onnx.load("silero_vad.onnx")
onnx.checker.check_model(onnx_model, full_check=True)

In [59]:
import onnxruntime

ort_session = onnxruntime.InferenceSession("silero_vad.onnx", providers=["CPUExecutionProvider"])

def to_numpy(tensor):
    return tensor.detach().cpu().numpy() if tensor.requires_grad else tensor.cpu().numpy()

inputs = ort_session.get_inputs()
for i in inputs:
    print(i.name)

input
sr
h
c


In [60]:
print(audio_float32)

[ 0.5378049   0.827439    0.5        ... -0.11280487  0.0554878
  0.31036586]


In [61]:
from typing import Callable, List
import warnings

languages = ['ru', 'en', 'de', 'es']


class OnnxWrapper():

    def __init__(self, path, force_onnx_cpu=False):
        import numpy as np
        import onnxruntime

        opts = onnxruntime.SessionOptions()
        opts.inter_op_num_threads = 1
        opts.intra_op_num_threads = 1

        if force_onnx_cpu and 'CPUExecutionProvider' in onnxruntime.get_available_providers():
            self.session = onnxruntime.InferenceSession(path, providers=['CPUExecutionProvider'], sess_options=opts)
        else:
            self.session = onnxruntime.InferenceSession(path, sess_options=opts)

        self.reset_states()
        self.sample_rates = [8000, 16000]

    def _validate_input(self, x, sr: int):
        if x.dim() == 1:
            x = x.unsqueeze(0)
        if x.dim() > 2:
            raise ValueError(f"Too many dimensions for input audio chunk {x.dim()}")

        if sr != 16000 and (sr % 16000 == 0):
            step = sr // 16000
            x = x[:,::step]
            sr = 16000

        if sr not in self.sample_rates:
            raise ValueError(f"Supported sampling rates: {self.sample_rates} (or multiply of 16000)")

        if sr / x.shape[1] > 31.25:
            raise ValueError("Input audio chunk is too short")

        return x, sr

    def reset_states(self, batch_size=1):
        self._h = np.zeros((2, batch_size, 64)).astype('float32')
        self._c = np.zeros((2, batch_size, 64)).astype('float32')
        self._last_sr = 0
        self._last_batch_size = 0

    def __call__(self, x, sr: int):

        x, sr = self._validate_input(x, sr)
        batch_size = x.shape[0]

        if not self._last_batch_size:
            self.reset_states(batch_size)
        if (self._last_sr) and (self._last_sr != sr):
            self.reset_states(batch_size)
        if (self._last_batch_size) and (self._last_batch_size != batch_size):
            self.reset_states(batch_size)

        if sr in [8000, 16000]:
            ort_inputs = {'input': x.numpy(), 'h': self._h, 'c': self._c, 'sr': np.array(sr, dtype='int64')}
            ort_outs = self.session.run(None, ort_inputs)
            out, self._h, self._c = ort_outs
        else:
            raise ValueError()

        self._last_sr = sr
        self._last_batch_size = batch_size

        out = torch.tensor(out)
        return out

    def audio_forward(self, x, sr: int, num_samples: int = 512):
        outs = []
        x, sr = self._validate_input(x, sr)

        if x.shape[1] % num_samples:
            pad_num = num_samples - (x.shape[1] % num_samples)
            x = torch.nn.functional.pad(x, (0, pad_num), 'constant', value=0.0)

        self.reset_states(x.shape[0])
        for i in range(0, x.shape[1], num_samples):
            wavs_batch = x[:, i:i+num_samples]
            out_chunk = self.__call__(wavs_batch, sr)
            outs.append(out_chunk)

        stacked = torch.cat(outs, dim=1)
        return stacked.cpu()

In [62]:
model = OnnxWrapper('silero_vad.onnx', True)

In [63]:
sr = 16000
x = audio_float32

In [64]:
# compute ONNX Runtime output prediction

# x, sr = _validate_input(from_numpy(audio_float32), sr)

_h = np.zeros((2, 1, 64)).astype('float32')
_c = np.zeros((2, 1, 64)).astype('float32')
_sr = np.array(sr, dtype='int64')
_x = np.expand_dims(x, axis = 0)

ort_inputs = {'input': _x, 'sr': _sr, 'h': _h, 'c': _c}
ort_outs = ort_session.run(None, ort_inputs)

# compare ONNX Runtime and PyTorch results
# np.testing.assert_allclose(to_numpy(torch_out), ort_outs[0], rtol=1e-03, atol=1e-05)

# print("Exported model has been tested with ONNXRuntime, and the result looks good!")

In [65]:
ort_outs[0][0][0]

0.21194823

In [78]:
import os

# the above is valid on Windows (after 7) but if you want it in os normalized form:
desktop = os.path.normpath(os.path.expanduser("~/Desktop"))
print(desktop)
p = os.path.normpath(os.path.join(desktop, 'records'))
print(p)

C:\Users\ioci\Desktop
C:\Users\ioci\Desktop\records


In [89]:
record_name = 'name'
result = 'result'
records_path = 'records'
records_txt = '_records.txt'

In [92]:
p = os.path.normpath(os.path.join(desktop, records_path))
print(p)
try:
    with open(os.path.join(p, records_txt), 'a') as f:
        f.write(record_name + ': ' + result + '\n')
except FileNotFoundError:
    p = os.path.normpath(os.path.join(desktop, 'records'))
    os.mkdir(p)
    with open(os.path.join(p, records_txt), 'w') as f:
        f.write(record_name + ': ' + result + '\n')

C:\Users\ioci\Desktop\records


In [2]:
a = {}
a[1] = 'a'
a[2] = 'b'

for i in a:
    print(i)

1
2


In [6]:
next(iter(a.keys()))

1