In [1]:
import numpy as np
from numpy import argmax, asarray, copy, diff, log, mean
from numpy.fft import rfft
from scipy.signal import correlate, decimate
from scipy.signal.windows import kaiser
import math;
import wavio
import IPython
from IPython.display import Audio

### frequency estimation algorithms

In [2]:
class freq_estimate:
    
    @staticmethod
    def freq_from_crossings(signal, fs, interp='linear'):
        """
        Estimate frequency by counting zero crossings
    
        Works well for long low-noise sines, square, triangle, etc.
    
        Pros: Fast, accurate (increasing with signal length).
    
        Cons: Doesn't work if there are multiple zero crossings per cycle,
        low-frequency baseline shift, noise, inharmonicity, etc.
        """
        signal = asarray(signal) + 0.0
    
        # Find all indices right before a rising-edge zero crossing
        indices = find((signal[1:] >= 0) & (signal[:-1] < 0))
    
        if interp == 'linear':
            # More accurate, using linear interpolation to find intersample
            # zero-crossings (Measures 1000.000129 Hz for 1000 Hz, for instance)
            crossings = [i - signal[i] / (signal[i+1] - signal[i])
                         for i in indices]
        elif interp == 'none' or interp is None:
            # Naive (Measures 1000.185 Hz for 1000 Hz, for instance)
            crossings = indices
        else:
            raise ValueError('Interpolation method not understood')
    
            # TODO: Some other interpolation based on neighboring points might be
            # better.  Spline, cubic, whatever  Can pass it a function?
        return fs / mean(diff(crossings))
        
    @staticmethod
    def freq_from_fft(signal, fs):
        """
        Estimate frequency from peak of FFT
    
        Pros: Accurate, usually even more so than zero crossing counter
        (1000.000004 Hz for 1000 Hz, for instance).  Due to parabolic
        interpolation being a very good fit for windowed log FFT peaks?
        https://ccrma.stanford.edu/~jos/sasp/Quadratic_Interpolation_Spectral_Peaks.html
        Accuracy also increases with signal length
    
        Cons: Doesn't find the right value if harmonics are stronger than
        fundamental, which is common.
        """
        signal = asarray(signal)
    
        N = len(signal)
    
        # Compute Fourier transform of windowed signal
        windowed = signal * kaiser(N, 100)
        f = rfft(windowed)
    
        # Find the peak and interpolate to get a more accurate peak
        i_peak = argmax(abs(f))  # Just use this value for less-accurate result
        i_interp = parabolic(log(abs(f)), i_peak)[0]
    
        # Convert to equivalent frequency
        return fs * i_interp / N  # Hz
        
    @staticmethod
    def freq_from_autocorr(signal, fs):
        """
        Estimate frequency using autocorrelation
    
        Pros: Best method for finding the true fundamental of any repeating wave,
        even with strong harmonics or completely missing fundamental
    
        Cons: Not as accurate, doesn't find fundamental for inharmonic things like
        musical instruments, this implementation has trouble with finding the true
        peak
        """
        signal = asarray(signal) + 0.0
    
        # Calculate autocorrelation, and throw away the negative lags
        signal -= mean(signal)  # Remove DC offset
        corr = correlate(signal, signal, mode='full')
        corr = corr[len(corr)//2:]
    
        # Find the first valley in the autocorrelation
        d = diff(corr)
        start = find(d > 0)[0]
    
        # Find the next peak after the low point (other than 0 lag).  This bit is
        # not reliable for long signals, due to the desired peak occurring between
        # samples, and other peaks appearing higher.
        i_peak = argmax(corr[start:]) + start
        i_interp = parabolic(corr, i_peak)[0]
        return fs / i_interp
        
    @staticmethod
    def freq_from_hps(signal, fs):
        """
        Estimate frequency using harmonic product spectrum
    
        Low frequency noise piles up and overwhelms the desired peaks
    
        Doesn't work well if signal doesn't have harmonics
        """
        signal = asarray(signal) + 0.0
    
        N = len(signal)
        signal -= mean(signal)  # Remove DC offset
    
        # Compute Fourier transform of windowed signal
        windowed = signal * kaiser(N, 100)
    
        # Get spectrum
        X = log(abs(rfft(windowed)))
    
        # Remove mean of spectrum (so sum is not increasingly offset
        # only in overlap region)
        X -= mean(X)
    
        # Downsample sum logs of spectra instead of multiplying
        hps = copy(X)
        for h in range(2, 9):  # TODO: choose a smarter upper limit
            dec = decimate(X, h, zero_phase=True)
            hps[:len(dec)] += dec
    
        # Find the peak and interpolate to get a more accurate peak
        i_peak = argmax(hps[:len(dec)])
        i_interp = parabolic(hps, i_peak)[0]
    
        # Convert to equivalent frequency
        return fs * i_interp / N  # Hz

### calcuation functions for sound properties

In [3]:
def find(condition):
    "Return the indices where ravel(condition) is true"
    res, = np.nonzero(np.ravel(condition))
    return res
    
def rms_flat(a):
    """
    Return the root mean square of all the elements of *a*, flattened out.
    """
    return np.sqrt(np.mean(np.absolute(a)**2))

def dB(q):
    """
    Return the level of a field quantity in decibels.
    """
    return 20 * np.log10(q)

def parabolic(f, x):
    """
    Quadratic interpolation for estimating the true position of an
    inter-sample maximum when nearby samples are known.

    f is a vector and x is an index for that vector.

    Returns (vx, vy), the coordinates of the vertex of a parabola that goes
    through point x and its two neighbors.

    Example:
    Defining a vector f with a local maximum at index 3 (= 6), find local
    maximum if points 2, 3, and 4 actually defined a parabola.

    In [3]: f = [2, 3, 1, 6, 4, 2, 3, 1]

    In [4]: parabolic(f, argmax(f))
    Out[4]: (3.2142857142857144, 6.1607142857142856)
    """
    if int(x) != x:
        raise ValueError('x must be an integer sample index')
    else:
        x = int(x)
    xv = 1/2. * (f[x-1] - f[x+1]) / (f[x-1] - 2 * f[x] + f[x+1]) + x
    yv = f[x] - 1/4. * (f[x-1] - f[x+1]) * (xv - x)
    return (xv, yv)

def parabolic_polyfit(f, x, n):
    """
    Use the built-in polyfit() function to find the peak of a parabola

    f is a vector and x is an index for that vector.

    n is the number of samples of the curve used to fit the parabola.
    """
    a, b, c = np.polyfit(np.arange(x-n//2, x+n//2+1), f[x-n//2:x+n//2+1], 2)
    xv = -0.5 * b/a
    yv = a * xv**2 + b * xv + c
    return (xv, yv)

### endpoint detection algorithms

In [4]:
def endpoint_detection1(src_signal=None, test_seg_len=2000, threshod=500):
    src_signal_len = len(src_signal)
    current_volume_sum = float(0);
    loopnum = (src_signal_len // test_seg_len)-1; #consider drop out the remaining part
    detect_value_list = [];
    detected_pos = 0;
    for i in range(loopnum):
        start_pos = i*test_seg_len;
        end_pos = (i+1)*test_seg_len;
        test_seg = src_signal[start_pos:end_pos];
        # energy = int(-(np.sqrt(np.mean(test_seg^2))));##-(np.sqrt(np.mean(test_seg**2)));
        # print(type(energy))
        energy = -audioop.rms(test_seg, 2)
        # print(type(energy))
        energy_bytes = bytes([energy & 0xFF, (energy >> 8) & 0xFF]);# shift energy right 8 bits and put it in high positions
        print(f"len of src_sig is {len(test_seg)}, len of energy_bytes:{len(energy_bytes)}")
        debiased_energy = audioop.rms(audioop.add(test_seg, energy_bytes * len(test_seg), 2), 2)
        if debiased_energy > threshod:
            detected_pos = start_pos;
        detect_value_list.append(debiased_energy);
    return detect_value_list, detected_pos

### detection algorithms

In [5]:
def detectAudio(signal=None, sr=None, threshold=None, freq_func=None):
    wanted_freq = 1699;#calculated using autocorr #3357#calculate using fft;
    targetFreq = freq_func(signal, sr);#freq_estimate.freq_from_fft(signal, sr);
    print(f"targetFreq calculated:{targetFreq}")
    diff_freq = np.abs(int(wanted_freq-targetFreq));
    print(f"diff_freq calculated:{diff_freq}")
    if diff_freq < 50:
        sig_len = len(signal);
        window = [0] * sig_len;
        # signal = np.float64(signal);
        i = 0;
        for x in signal:
            window[i] = np.float64(x) * (0.54 - 0.46*math.cos(2*math.pi*np.float64(i)/np.float64(sig_len-1)));
            i += 1;
        window = np.asarray(window, dtype=np.float64);
        fftData = np.fft.fft(window);
        fftData_re = np.real(fftData);
        fftData_im = np.imag(fftData);
        targetIndex =  int(np.float64(len(fftData_re)) * np.float64(targetFreq) / np.float64(sr));
        # magnitude = np.sqrt(fftData_re^2 + fftData_im^2);
        magnitude = np.absolute(np.complex128(fftData_re[targetIndex]));
        
        if magnitude > threshold:
            print("Significant data detected {}Hz, magnitude: {}\n".format(targetFreq, magnitude));
        else:
            print("\rNo significant magnitude({}) at {}Hz\n".format(magnitude, targetFreq));
    else:
        print(f"wanted frequency:{wanted_freq} is not found, found frequency:{targetFreq}");

In [6]:
_threshold = 100;
_sr = 22050;
test_beep1 = "./dataset/raw/beep_clear_clip_01_22K.wav"
_beep_obj1 = wavio.read(test_beep1);
# print(f"rate of w:{w.rate}");
# print(f"rate of w:{w.sampwidth}");
# print(f"rate of w:{w.data.dtype}");
_beep_sig1 = _beep_obj1.data.T[0];
print(f"{test_beep1} loaded.");
################################################################
test_beep2 = "./dataset/raw/beep_with_whitenoise_clip_01_22K.wav";
_beep_obj2 = wavio.read(test_beep2);
_beep_sig2 = _beep_obj2.data.T[0];
print(f"{test_beep2} loaded.");
################################################################
n_wav = "./dataset/raw/neg_beep_clear_clip_01_22K.wav";
nw = wavio.read(n_wav);
n_sig = nw.data.T[0];
print(f"{n_wav} loaded.");
################################################################
n_wav2 = "./dataset/raw/neg_beep_clear_clip_02_22K.wav"
nw2 = wavio.read(n_wav2);
n_sig2 = nw2.data.T[0];
print(f"{n_wav2} loaded.");

./dataset/raw/beep_clear_clip_01_22K.wav loaded.
./dataset/raw/beep_with_whitenoise_clip_01_22K.wav loaded.
./dataset/raw/neg_beep_clear_clip_01_22K.wav loaded.
./dataset/raw/neg_beep_clear_clip_02_22K.wav loaded.


### detect alarm of specific frequency

In [7]:
_freq_func = freq_estimate.freq_from_autocorr;
print(f"testing {test_beep1}")
detectAudio(signal=_beep_sig1, sr=_sr, threshold=_threshold, freq_func=_freq_func);
IPython.display.display(Audio(_beep_sig1, rate=_sr))
print("\n\n");
print(f"testing {test_beep2}")
detectAudio(signal=_beep_sig2, sr=_sr, threshold=_threshold, freq_func=_freq_func);
IPython.display.display(Audio(_beep_sig2, rate=_sr))
print("\n\n");
print(f"testing {n_wav}");
detectAudio(signal=n_sig, sr=_sr, threshold=_threshold, freq_func=_freq_func);
IPython.display.display(Audio(n_sig, rate=_sr))
print("\n\n");
print(f"testing {n_wav2}");
detectAudio(signal=n_sig2, sr=_sr, threshold=_threshold, freq_func=_freq_func);
IPython.display.display(Audio(n_sig2, rate=_sr))

testing ./dataset/raw/beep_clear_clip_01_22K.wav
targetFreq calculated:1699.5103653506894
diff_freq calculated:0
Significant data detected 1699.5103653506894Hz, magnitude: 1105.5695630487983






testing ./dataset/raw/beep_with_whitenoise_clip_01_22K.wav
targetFreq calculated:1699.5749062729496
diff_freq calculated:0
Significant data detected 1699.5749062729496Hz, magnitude: 2495.7454200941165






testing ./dataset/raw/neg_beep_clear_clip_01_22K.wav
targetFreq calculated:2034.9128564811258
diff_freq calculated:335
wanted frequency:1699 is not found, found frequency:2034.9128564811258





testing ./dataset/raw/neg_beep_clear_clip_02_22K.wav
targetFreq calculated:1152.652873795395
diff_freq calculated:546
wanted frequency:1699 is not found, found frequency:1152.652873795395


In [None]:
"""
func processAudio(in []int32) {
	// Windowing function to the audio samples before performing the FFT.
	window := make([]float64, len(in))
	for i, x := range in {
		window[i] = float64(x) * (0.54 - 0.46*math.Cos(2*math.Pi*float64(i)/float64(len(in)-1)))
	}

	// Direct FFT
	// window := make([]float64, len(in))
	// for i, x := range in {
	// 	window[i] = float64(x)
	// }

	fftData := fft.FFTReal(window)

	// Find the magnitude of the target frequency bin
	targetIndex := int(float64(len(fftData)) * float64(*targetFreq) / float64(sampleRate))
	magnitude := cmplx.Abs(fftData[targetIndex])

	// Check if the magnitude is above the threshold
	if magnitude > threshold {
		go checkBeeps(true)
		// fmt.Printf("Significant data detected %dHz, magnitude: %.0f\n", targetFreq, magnitude)
	} else {
		go checkBeeps(false)
		// fmt.Printf("\rNo significant magnitude at %d Hz\n", targetFreq)
	}
}
"""