In [3]:
import numpy as np
import matplotlib.pyplot as plt
from scipy import signal
from scipy.io import wavfile
import sunau
from scipy import signal
from scipy.io import wavfile
import pandas as pd
import pywt
import librosa
import python_speech_features
from sklearn.utils.validation import check_X_y, check_array, check_is_fitted
import pywt # Python wavelet transform implementation
from sklearn.base import BaseEstimator, TransformerMixin # Interfaces and base classes for pipeline components

In [4]:
class Normalizer(TransformerMixin, BaseEstimator):
    """
    Normalizes sample data to the interval (-1, 1)
    
    Parameters
    ----------
    
    """
    def __init__(self, nbits: int, signed: bool):
        self.signed = signed
        self.nbits = nbits
        if self.signed:
            self.nbits = self.nbits-1
    
    def transform(self, samples: np.array) -> np.array:
        normalized_samples = samples/(2**self.nbits) 
        normalized_samples = np.clip(normalized_samples, -1, 1)
        return normalized_samples
    
    def fit(self, X):
        return self

In [5]:
class NormFFT(TransformerMixin, BaseEstimator):
    
    def __init__(self, n: int = 5000):
        self.n=n
        
    def transform(self, normalized_samples: np.array) -> (np.array, np.array, np.array):
        transformed_samples = fft(normalized_samples, n=self.n) # calculate fourier transform (complex numbers list)
        length = len(transformed_samples)/2  # you only need half of the fft list (real signal symmetry)
        halved_transform = transformed_samples[0:int(length-1)]
        return np.abs(halved_transform), np.real(halved_transform), np.imag(halved_transform) 
    
    def fit(self, X):
        return self

In [6]:
from sklearn.utils.validation import check_X_y, check_array, check_is_fitted
import pywt # Python wavelet transform implementation
from sklearn.base import BaseEstimator, TransformerMixin # Interfaces and base classes for pipeline components
class WaveletTransformer(TransformerMixin, BaseEstimator):
    """Compute approximation coefficients of a selected wavelet.
    
    Parameters
    ----------
    wavelet_name : str, default='db1'
        Wavelet to use in transformation.
        Must be a wavelet name defiend in PyWavelets library
        See http://wavelets.pybytes.com/
    mode : str, default='symmetric'
        Extrapolation mode for transform.
        See https://pywavelets.readthedocs.io/en/latest/ref/signal-extension-modes.html#ref-modes
    Attributes
    ----------
    n_features_ : int
        The number of features of the data passed to :meth:`fit`.
    wavelet_name : str, default='db1'
        Wavelet to use in transformation.
        See http://wavelets.pybytes.com/
    mode : str, default='symmetric'
        Extrapolation mode for transform.
        See https://pywavelets.readthedocs.io/en/latest/ref/signal-extension-modes.html#ref-modes
    
    """
    def __init__(self,
                 wavelet_name: str = 'db1',
                 mode: str = 'symmetric'):
        self.wavelet_name = wavelet_name
        self.mode = mode

    def fit(self, X, y=None):
        """A reference implementation of a fitting function for a transformer.
        Parameters
        ----------
        X : {array-like, sparse matrix}, shape (n_samples, n_features)
            The training input samples.
        y : None
            There is no need of a target in a transformer, yet the pipeline API
            requires this parameter.
        Returns
        -------
        self : object
            Returns self.
        """
        X = check_array(X, accept_sparse=True)

        # Each row of X must have the same length
        # In other words, signals need to be truncated or padded to a fixed length
        # prior to passing to this transformer.
        self.n_features_ = X.shape[1]

        # Other checks go here
        
        # Return the transformer
        return self

    def transform(self, X):
        """ Compute wavelet transform on input data X
        
        Parameters
        ----------
        X : {array-like, sparse-matrix}, shape (n_samples, n_features)
            The input samples.
        Returns
        -------
        X_transformed : array, shape (n_samples, n_features)
            The array containing the wavelet transform approximation coefficients from each row of X
            in ``X``.
        """
        # Check is fit had been called
        check_is_fitted(self, 'n_features_')

        # Input validation
        X = check_array(X, accept_sparse=True)

        # Check that the input is of the same shape as the one passed
        # during fit.
        if X.shape[1] != self.n_features_:
            raise ValueError('Shape of input is different from what was seen'
                             'in `fit`')
            
        (cA, cD) = pywt.dwt(X, self.wavelet_name, self.mode)
        return cA

In [7]:
def loadAudio(audioPath):
    sample_rate, samples = wavfile.read(audioPath)
    frequencies, times, spectrogram = signal.spectrogram(samples, sample_rate)
    spectrogram = np.log(spectrogram)
    transposed_spec = spectrogram.transpose()
    freq_list = list(frequencies)
    #freq_list = [str(f) for f in freq_list]
    #freq_list = [f + " Hz" for f in freq_list]
    audio_df = pd.DataFrame(transposed_spec, index = times, columns = freq_list )
    audio_df.index = times
    audio_df
    return audio_df, samples, sample_rate

In [None]:
def constructSpectrogramFigure(audio_df):
    fig, ax = plt.subplots(figsize=(16, 4))
    im = plt.pcolormesh(audio_df.index, audio_df.columns, audio_df.transpose(), shading='auto')

In [110]:
"""
- A function to normalize a wave
- A function that computes an FFT given a normalized wave, then takes the first half, then returns the magnitude or real or complex part
    - Either accept a normalized wave or call the normalize function on raw wave
- A function to plot the FFT
    - Either pass the FFT as a argument or normalized wave, or call FFT on raw wave, etc
"""
def normalize(samples: np.array) -> np.array:
    normalized_samples = samples/(2**15) #b is now normalized on [-1,1) [we were going to normalize between -1,1 like they had, so we divide by the maximum value permitted by this number of bits (8, 16, 32, ..) minus 1 (for the signed bit)]
    normalized_samples = np.clip(normalized_samples, -1, 1)
    return normalized_samples

def normFFT(normalized_samples: np.array, n: int =5000) -> (np.array, np.array, np.array):
    """This function takes a normalized numpy array of amplitude measurements, applies a FFT, and returns the magnitudes,
        the real parts, and the imaginary parts of the FFT
    
    Inputs:
        normalized_samples (np.array): an array of floats bounded between -1 and 1
        n: (int): output length for FFT; the length of the resulting arrays will be n/2
    Returns:
        tuple: numpy arrays of the magnitudes of the FFT, the real parts of the FFT, the imaginary parts of the FFT
    """
    transformed_samples = fft(normalized_samples, n=n) # calculate fourier transform (complex numbers list)
    length = len(transformed_samples)/2  # you only need half of the fft list (real signal symmetry)
    #halved_transform = transformed_samples[0:int(length-1)]
    #return np.abs(halved_transform), np.real(halved_transform), np.imag(halved_transform)
    return np.abs(transformed_samples), np.real(transformed_samples), np.imag(transformed_samples)

In [111]:
from scipy.fftpack import fft
def constructFFTFigure(abs_fft):
    plt.plot(abs_fft) 
    plt.xlim((0,1000))
    plt.show()

In [112]:
from scipy.signal import butter,filtfilt
def butter_highpass_filter(data, cutoff, sample_rate, order):
    normal_cutoff = cutoff / (0.5 * sample_rate)
    # Get the filter coefficients 
    b, a = butter(order, normal_cutoff, btype='high', analog=False)
    y = filtfilt(b, a, data)
    return y

In [113]:
def butter_lowpass_filter(data, cutoff, sample_rate, order):
    normal_cutoff = cutoff / (0.5 * sample_rate)
    # Get the filter coefficients 
    b, a = butter(order, normal_cutoff, btype='low', analog=False)
    y = filtfilt(b, a, data)
    return y

In [114]:
#delet above
import glob

In [115]:
glob.glob("C:\\Users\\kbk17\\EspionageDomainExploration\\*.wav")

['C:\\Users\\kbk17\\EspionageDomainExploration\\backgroundnoisewithcar.wav',
 'C:\\Users\\kbk17\\EspionageDomainExploration\\desk_tap1.wav',
 'C:\\Users\\kbk17\\EspionageDomainExploration\\desk_tap2.wav',
 'C:\\Users\\kbk17\\EspionageDomainExploration\\desk_tap3.wav',
 'C:\\Users\\kbk17\\EspionageDomainExploration\\desk_tap4.wav',
 'C:\\Users\\kbk17\\EspionageDomainExploration\\desk_tap5.wav',
 'C:\\Users\\kbk17\\EspionageDomainExploration\\desk_tap6.wav',
 'C:\\Users\\kbk17\\EspionageDomainExploration\\desk_tap7.wav',
 'C:\\Users\\kbk17\\EspionageDomainExploration\\desk_tap8.wav',
 'C:\\Users\\kbk17\\EspionageDomainExploration\\desk_taps.wav',
 'C:\\Users\\kbk17\\EspionageDomainExploration\\key_press1.wav',
 'C:\\Users\\kbk17\\EspionageDomainExploration\\key_press2.wav',
 'C:\\Users\\kbk17\\EspionageDomainExploration\\key_press3.wav',
 'C:\\Users\\kbk17\\EspionageDomainExploration\\key_press4.wav',
 'C:\\Users\\kbk17\\EspionageDomainExploration\\key_press5.wav',
 'C:\\Users\\kbk17\\Es

In [116]:
desk_path_list = ['C:\\Users\\kbk17\\EspionageDomainExploration\\desk_tap1.wav',
                     'C:\\Users\\kbk17\\EspionageDomainExploration\\desk_tap2.wav',
                     'C:\\Users\\kbk17\\EspionageDomainExploration\\desk_tap3.wav',
                     'C:\\Users\\kbk17\\EspionageDomainExploration\\desk_tap4.wav',
                     'C:\\Users\\kbk17\\EspionageDomainExploration\\desk_tap5.wav',
                     'C:\\Users\\kbk17\\EspionageDomainExploration\\desk_tap6.wav',
                     'C:\\Users\\kbk17\\EspionageDomainExploration\\desk_tap7.wav',
                     'C:\\Users\\kbk17\\EspionageDomainExploration\\desk_tap8.wav']
key_path_list = ['C:\\Users\\kbk17\\EspionageDomainExploration\\key_press1.wav',
                 'C:\\Users\\kbk17\\EspionageDomainExploration\\key_press2.wav',
                 'C:\\Users\\kbk17\\EspionageDomainExploration\\key_press3.wav',
                 'C:\\Users\\kbk17\\EspionageDomainExploration\\key_press4.wav',
                 'C:\\Users\\kbk17\\EspionageDomainExploration\\key_press5.wav']

In [117]:
desk_read_output = [loadAudio(x) for x in desk_path_list] #a list of tuples of spectrogram (a dataframe whose columns are frequencies, rows are points in time, values are magnitudes of freqs), numpy array (raw audio data), and an int
key_read_output = [loadAudio(x) for x in key_path_list]

In [118]:
"""
Housing Price Estimator

Y ~ X1 + X2 + X3

Y  | X1 | X2 | X3
----------------
600k| 3  | 40 | 53
300k| 2  | 20 | 70
   
Sound Classifier

Y | Hz_1_1 | Hz_2_1 | ... | Hz_1_2 | Hz_2_2 | .... 
-----------------
D |
K |
"""

'\nHousing Price Estimator\n\nY ~ X1 + X2 + X3\n\nY  | X1 | X2 | X3\n----------------\n600k| 3  | 40 | 53\n300k| 2  | 20 | 70\n   \nSound Classifier\n\nY | Hz_1_1 | Hz_2_1 | ... | Hz_1_2 | Hz_2_2 | .... \n-----------------\nD |\nK |\n'

# This is for the real part of the FFT only

In [119]:
labels = []
waveforms = []
for df, samples, sample_rate in key_read_output:
    waveforms.append(samples)
    labels.append(0)
    
for df, samples, sample_rate in desk_read_output:
    waveforms.append(samples)
    labels.append(1)

In [120]:
longest = 0
for w in waveforms:
    if len(w) > longest:
        longest=len(w)
longest

14262

In [121]:
normalized_samples = [normalize(sample) for sample in waveforms]

In [122]:
#each component in this list is a tuple composed of magnitudes, real parts, and imaginary parts
ffts = [normFFT(w, longest) for w in normalized_samples]

NameError: name 'halved_transform' is not defined

In [123]:
[len(x[0]) for x in ffts]

[7130, 7130, 7130, 7130, 7130, 7130, 7130, 7130, 7130, 7130, 7130, 7130, 7130]

In [124]:
# Tuple components
magnitudes = 0
real_part = 1
imag_part = 2

X_train = np.vstack([t[real_part] for t in ffts])
# 0 corresponds to keypress, 1 to desktap
y_train = labels
X_train.shape

(13, 7130)

In [125]:
X_train

array([[ 5.38963135e+02, -8.99767698e+02,  1.73396474e+03, ...,
         4.21634589e-02,  1.07224755e-01, -1.90448170e-01],
       [-5.23357330e+02, -6.63200249e+02,  7.19952731e+02, ...,
        -6.67591015e-02,  2.30031218e-02,  1.62695051e-01],
       [-1.41549744e+02, -3.16605433e+02,  3.24055513e+02, ...,
         9.41471700e-02, -1.75952324e-01, -2.09322989e-01],
       ...,
       [-2.39297485e+00,  1.20429282e+00, -2.47457103e+01, ...,
         3.82669127e-02, -5.99555191e-03,  2.12888799e-02],
       [-3.56314087e+00,  3.67538406e+01,  3.99749760e+01, ...,
         6.17863378e-03, -2.58381612e-02, -1.89590677e-02],
       [-2.75354004e+00,  2.72658809e+01, -1.82402099e+01, ...,
         1.36899091e-02, -3.65458315e-02, -7.84401260e-04]])

In [126]:
import numpy as np
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import MinMaxScaler
from sklearn.svm import SVC
clf = make_pipeline(MinMaxScaler(), SVC(gamma='auto', kernel='linear'))
clf.fit(X_train, y_train)

Pipeline(steps=[('minmaxscaler', MinMaxScaler()),
                ('svc', SVC(gamma='auto', kernel='linear'))])

In [127]:
clf.predict(X_train)

array([0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1])

In [128]:
from sklearn.linear_model import LogisticRegression
lr_clf = make_pipeline(MinMaxScaler(), LogisticRegression())
lr_clf.fit(X_train, y_train)

Pipeline(steps=[('minmaxscaler', MinMaxScaler()),
                ('logisticregression', LogisticRegression())])

In [129]:
lr_clf.predict(X_train)

array([0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1])

In [130]:
test_paths = glob.glob("C:\\Users\\kbk17\\EspionageDomainExploration\\Test_Cases\\*.wav")
test_paths

['C:\\Users\\kbk17\\EspionageDomainExploration\\Test_Cases\\Audio Track-10.wav',
 'C:\\Users\\kbk17\\EspionageDomainExploration\\Test_Cases\\Audio Track-11.wav',
 'C:\\Users\\kbk17\\EspionageDomainExploration\\Test_Cases\\Audio Track-12.wav',
 'C:\\Users\\kbk17\\EspionageDomainExploration\\Test_Cases\\Audio Track-13.wav',
 'C:\\Users\\kbk17\\EspionageDomainExploration\\Test_Cases\\Audio Track-14.wav',
 'C:\\Users\\kbk17\\EspionageDomainExploration\\Test_Cases\\Audio Track-15.wav',
 'C:\\Users\\kbk17\\EspionageDomainExploration\\Test_Cases\\Audio Track-16.wav',
 'C:\\Users\\kbk17\\EspionageDomainExploration\\Test_Cases\\Audio Track-17.wav',
 'C:\\Users\\kbk17\\EspionageDomainExploration\\Test_Cases\\Audio Track-18.wav',
 'C:\\Users\\kbk17\\EspionageDomainExploration\\Test_Cases\\Audio Track-19.wav',
 'C:\\Users\\kbk17\\EspionageDomainExploration\\Test_Cases\\Audio Track-2.wav',
 'C:\\Users\\kbk17\\EspionageDomainExploration\\Test_Cases\\Audio Track-20.wav',
 'C:\\Users\\kbk17\\Espionage

In [131]:
test_paths =[
    'C:\\Users\\kbk17\\EspionageDomainExploration\\Test_Cases\\Audio Track.wav',
    'C:\\Users\\kbk17\\EspionageDomainExploration\\Test_Cases\\Audio Track-2.wav',
    'C:\\Users\\kbk17\\EspionageDomainExploration\\Test_Cases\\Audio Track-3.wav',
    'C:\\Users\\kbk17\\EspionageDomainExploration\\Test_Cases\\Audio Track-4.wav',
    'C:\\Users\\kbk17\\EspionageDomainExploration\\Test_Cases\\Audio Track-5.wav',
    'C:\\Users\\kbk17\\EspionageDomainExploration\\Test_Cases\\Audio Track-6.wav',
    'C:\\Users\\kbk17\\EspionageDomainExploration\\Test_Cases\\Audio Track-7.wav',
    'C:\\Users\\kbk17\\EspionageDomainExploration\\Test_Cases\\Audio Track-8.wav',
    'C:\\Users\\kbk17\\EspionageDomainExploration\\Test_Cases\\Audio Track-9.wav',
    'C:\\Users\\kbk17\\EspionageDomainExploration\\Test_Cases\\Audio Track-10.wav',
    'C:\\Users\\kbk17\\EspionageDomainExploration\\Test_Cases\\Audio Track-11.wav',
    'C:\\Users\\kbk17\\EspionageDomainExploration\\Test_Cases\\Audio Track-12.wav',
    'C:\\Users\\kbk17\\EspionageDomainExploration\\Test_Cases\\Audio Track-13.wav',
    'C:\\Users\\kbk17\\EspionageDomainExploration\\Test_Cases\\Audio Track-14.wav',
    'C:\\Users\\kbk17\\EspionageDomainExploration\\Test_Cases\\Audio Track-15.wav',
    'C:\\Users\\kbk17\\EspionageDomainExploration\\Test_Cases\\Audio Track-16.wav',
    'C:\\Users\\kbk17\\EspionageDomainExploration\\Test_Cases\\Audio Track-17.wav',
    'C:\\Users\\kbk17\\EspionageDomainExploration\\Test_Cases\\Audio Track-18.wav',
    'C:\\Users\\kbk17\\EspionageDomainExploration\\Test_Cases\\Audio Track-19.wav',
    'C:\\Users\\kbk17\\EspionageDomainExploration\\Test_Cases\\Audio Track-20.wav',
    'C:\\Users\\kbk17\\EspionageDomainExploration\\Test_Cases\\Audio Track-21.wav']

In [132]:
test_audio = [loadAudio(path) for path in test_paths]

  sample_rate, samples = wavfile.read(audioPath)


In [133]:
test_labels = []
test_waveforms = []
    
for _, samples, _ in test_audio[0:6]:
    test_waveforms.append(samples)
    test_labels.append(0)

for _, samples, _ in test_audio[6:12]:
    test_waveforms.append(samples)
    test_labels.append(1)

len(test_waveforms)

12

In [134]:
normalized_test_waveforms = [normalize(tw) for tw in test_waveforms]
normFFT_output = [normFFT(ntw, longest) for ntw in normalized_test_waveforms]

NameError: name 'halved_transform' is not defined

In [135]:
X_test = np.vstack([ t[real_part] for t in normFFT_output])

In [136]:
lr_clf.predict(X_test)

array([1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1])

In [137]:
clf.predict(X_test)

array([1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1])

In [138]:
lr_clf.predict_proba(X_test)

array([[0.02006929, 0.97993071],
       [0.02088374, 0.97911626],
       [0.02092747, 0.97907253],
       [0.02093996, 0.97906004],
       [0.02048695, 0.97951305],
       [0.02084149, 0.97915851],
       [0.02178763, 0.97821237],
       [0.0222506 , 0.9777494 ],
       [0.0198462 , 0.9801538 ],
       [0.01699877, 0.98300123],
       [0.00825823, 0.99174177],
       [0.06059414, 0.93940586]])

# Experimenting with Real and Imaginary Parts

In [139]:
X_train = np.vstack([np.hstack([t[real_part], t[imag_part]]) for t in ffts])
clf.fit(X_train, y_train)

Pipeline(steps=[('minmaxscaler', MinMaxScaler()),
                ('svc', SVC(gamma='auto', kernel='linear'))])

In [140]:
clf.predict(X_train)

array([0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1])

In [141]:
X_test = np.vstack([np.hstack([t[real_part], t[imag_part]]) for t in normFFT_output])

In [142]:
clf.predict(X_test)

array([1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1])

# PCA of Fourier Domain Frequencies

In [143]:
from sklearn.decomposition import PCA

In [144]:
pca = PCA(n_components = 5)
pca.fit(X_train)

PCA(n_components=5)

In [145]:
print(pca.explained_variance_ratio_)

[0.35345174 0.19366198 0.16024072 0.0706165  0.05051442]


In [146]:
#this adds the steps we did above to the pipeline
lr_clf = make_pipeline(MinMaxScaler(), PCA(n_components=8), LogisticRegression())
lr_clf.fit(X_train, y_train)

Pipeline(steps=[('minmaxscaler', MinMaxScaler()), ('pca', PCA(n_components=8)),
                ('logisticregression', LogisticRegression())])

In [147]:
lr_clf.named_steps['pca'].explained_variance_ratio_.sum()

0.9384313530472355

In [148]:
lr_clf.predict(X_test)

array([1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1])

# Quick sanity check using Decision Tree

In [149]:
from sklearn import tree

In [150]:
tree_clf = make_pipeline(MinMaxScaler(), PCA(n_components=8), tree.DecisionTreeClassifier())
tree_clf.fit(X_train, y_train)

Pipeline(steps=[('minmaxscaler', MinMaxScaler()), ('pca', PCA(n_components=8)),
                ('decisiontreeclassifier', DecisionTreeClassifier())])

In [151]:
tree_clf.predict(X_train)

array([0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1])

In [152]:
tree_clf.predict(X_test)

array([1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 0])

# Retraining using all but two test data above

In [155]:
tree_clf.fit(X_train[1:9], y_train[1:9])

Pipeline(steps=[('minmaxscaler', MinMaxScaler()), ('pca', PCA(n_components=8)),
                ('decisiontreeclassifier', DecisionTreeClassifier())])

In [156]:
tree_clf.predict(X_train)

array([1, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1])