-
Notifications
You must be signed in to change notification settings - Fork 12
/
vad.py
97 lines (76 loc) · 2.59 KB
/
vad.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
#! /usr/bin/env python
# encoding: utf-8
import numpy
import scipy.io.wavfile as wf
import sys
print(sys.argv)
class VoiceActivityDetection:
def __init__(self):
self.__step = 160
self.__buffer_size = 160
self.__buffer = numpy.array([],dtype=numpy.int16)
self.__out_buffer = numpy.array([],dtype=numpy.int16)
self.__n = 0
self.__VADthd = 0.
self.__VADn = 0.
self.__silence_counter = 0
# Voice Activity Detection
# Adaptive threshold
def vad(self, _frame):
frame = numpy.array(_frame) ** 2.
result = True
threshold = 0.1
thd = numpy.min(frame) + numpy.ptp(frame) * threshold
self.__VADthd = (self.__VADn * self.__VADthd + thd) / float(self.__VADn + 1.)
self.__VADn += 1.
if numpy.mean(frame) <= self.__VADthd:
self.__silence_counter += 1
else:
self.__silence_counter = 0
if self.__silence_counter > 20:
result = False
return result
# Push new audio samples into the buffer.
def add_samples(self, data):
self.__buffer = numpy.append(self.__buffer, data)
result = len(self.__buffer) >= self.__buffer_size
# print('__buffer size %i'%self.__buffer.size)
return result
# Pull a portion of the buffer to process
# (pulled samples are deleted after being
# processed
def get_frame(self):
window = self.__buffer[:self.__buffer_size]
self.__buffer = self.__buffer[self.__step:]
# print('__buffer size %i'%self.__buffer.size)
return window
# Adds new audio samples to the internal
# buffer and process them
def process(self, data):
if self.add_samples(data):
while len(self.__buffer) >= self.__buffer_size:
# Framing
window = self.get_frame()
# print('window size %i'%window.size)
if self.vad(window): # speech frame
self.__out_buffer = numpy.append(self.__out_buffer, window)
# print('__out_buffer size %i'%self.__out_buffer.size)
def get_voice_samples(self):
return self.__out_buffer
# usage:
wav = wf.read(sys.argv[1])
ch = wav[1].shape[1]
sr = wav[0]
c0 = wav[1][:,0]
c1 = wav[1][:,1]
print('c0 %i'%c0.size)
vad = VoiceActivityDetection()
vad.process(c0)
voice_samples = vad.get_voice_samples()
wf.write('%s.1.wav'%sys.argv[2],sr,voice_samples)
if ch==1:
exit()
vad = VoiceActivityDetection()
vad.process(c1)
voice_samples = vad.get_voice_samples()
wf.write('%s.2.wav'%sys.argv[2],sr,voice_samples)