In [None]:
from matplotlib import pyplot as plt
import numpy as np
import os
from scipy.io import wavfile
from scipy.fftpack import fft, ifft, dct, idct
from scipy.signal import fftconvolve
import IPython

In [None]:
plt.rcParams["figure.dpi"] = 400             # Fix plot quality.
plt.rcParams["figure.figsize"] = (12,3)      # Change plot size / aspect (you may adjust this).

In [None]:
def load_data(directory='raw_data'):
    """
    Function to load sound data from the donateacry dataset.
    Also normalizes each to have the same length.
    
    Parameters
    ----------
    directory : str
                directory to search for data
    scale     : bool
                indicates whether to scale data or not
    
    Returns
    -------
    wavforms : ndarray
               array of data points (each waveform)
    labels : ndarray
             array of labels corresponding to each waveform
    """
    subdirs = os.listdir(directory)
    wavforms = []
    labels = []
    for dirc in subdirs:
        filenames = os.listdir(directory+'/'+dirc)
        for filename in filenames:
            # Get wavform from each file 
            wav = wavfile.read(directory+'/'+dirc+'/'+filename)[1]
                
            # Store wavform and appropriate label
            wavforms.append(wav)
            labels.append(dirc)
    
    # Find the minimum wavform length.
    lens = set()
    for d in wavforms:
        lens.add(len(d))
    min_len = min(lens)
    
    # Cut the end off each wavform so they're the same lengths.
    for i,wav in enumerate(wavforms):
        wavforms[i] = wav[:min_len]

    return np.array(wavforms), np.array(labels)

def duplicate_data(data, labels, alpha=0.3333):
    """
    Duplicates data and adds random noise to the duplicates.
    
    Parameters
    ----------
    data (ndarray): array of waveform values.
    labels (ndarray): array of labels for each row of data.
    alpha (float): a parameter that changes how large the noise may be for 
            all waveforms relative to each of their sample values.
        
    Returns
    -------
    new_data (ndarray): all the same rows as data with noisy rows as well
    new_labels (ndarray): the labels corresponding to the rows of new_data
    """
    new_data = []
    new_labels = []
    for wav, label in zip(data, labels):
        # Sample noise from the normal distribution.
        noise = np.array([np.random.normal(scale=max(abs(x)*alpha, 0)) for x in wav])
        
        new_wav = wav.astype(np.int64) + noise.astype(np.int64)
        
        # Rescale the new_wav to make sure no parts go above 32767.
        new_wav = np.array(32767*new_wav/np.max(np.abs(new_wav))).astype(np.int16)
        
        new_data.append(new_wav)
        new_labels.append(label)
    
    # Concatenate the old and new data/labels.
    new_data = np.vstack([data, np.array(new_data)])
    new_labels = np.hstack([labels, np.array(new_labels)])
    
    return new_data, new_labels


def cut_data(data, labels, n):
    """
    Takes data and cuts each sample into parts of specified length
    
    Parameters
    ----------
    data   : ndarray
             array of waveform values of constant length
    labels : ndarray
             array of labels for each row of data
    n.     : int
             desired length of new samples
             
    Returns
    -------
    new_data   : ndarray
                 array of new cut samples
    new_labels : ndarray
                 array of corresponding labels
    """
    new_data = []
    new_labels = []
    # Get number of possible subsamples from current samples
    subs = length//n

    # Cut each sample
    for i, wav in enumerate(data):
        # Get as many new samples from old sample as possible
        for x in range(subs):
            # Extract subsamples of length n
            new_data.append(wav[x*n:(x+1)*n])
            # Store corresponding label
            new_labels.append(labels[i])
            
    return new_data, new_labels


def get_fft(data, rate=8000):
    """
    Function to transform wavelength data into frequency data using the FFT
    
    Parameters
    ----------
    data : ndarray
           Array where each row is the wavelength samples of a single cry instance
           
    Returns
    -------
    freqs       : ndarray
                  Array whose rows are the sampled frequencies
    freq_domain : domain of frequency values
    """
    freqs = []
    freq_domain = np.arange(0, len(data[0])) * rate / len(data[0])
    freq_domain = freq_domain[:len(freq_domain)//2]
    
    for i, wav in enumerate(data):
        # Get frequencies using Fourier Transform
        f = np.abs(fft(wav))[:len(freq_domain)]
        # Get real part of frequencies, normalize, and sample every 100
        tmp = (np.real(f) / np.real(max(f)))[::100]
        freqs.append(list(tmp))
    
    return np.array(freqs), freq_domain[::100]

def labels_to_integers(labels):
    """
    Convert the array of all labels (strings) to integer values.
    
    Parameters
    ----------
    labels (ndarray): array of strings containing the label name for
        each data point
    
    Returns
    -------
    new_labels (ndarray): array where labels are replaced with ints
    mapping (dict): a mapping from int -> string showing which integer
        corresponds to which label
    """
    new_labels = np.zeros(labels.shape, dtype=int)
    mapping = {}
    
    for idx, lab in enumerate(np.unique(labels)):
        new_labels[labels==lab] = idx
        mapping[idx] = lab
    
    return new_labels, mapping
    
def cut_data(data, labels, n):
    """
    Takes data and cuts each sample into parts of specified length
    
    Parameters
    ----------
    data   : ndarray
             array of waveform values of constant length
    labels : ndarray
             array of labels for each row of data
    n.     : int
             desired length of new samples
             
    Returns
    -------
    new_data   : ndarray
                 array of new cut samples
    new_labels : ndarray
                 array of corresponding labels
    """
    new_data = []
    new_labels = []
    # Get number of possible subsamples from current samples
    subs = data.shape[1]//n

    # Cut each sample
    for i, wav in enumerate(data):
        # Get as many new samples from old sample as possible
        for x in range(subs):
            # Extract subsamples of length n
            new_data.append(wav[x*n:(x+1)*n])
            # Store corresponding label
            new_labels.append(labels[i])
            
    return np.array(new_data), np.array(new_labels)

def data_to_df(data, labels, freqs):
    """
    Function to convert numpy array of data to a pandas DataFrame
    
    Parameters
    ----------
    data  : ndarray
            Array whose rows are of the form [[frequencies], 'label']
    label : list or ndarray of shape (n,)
            list of labels corresponding to each row of data
    freqs : list or ndarray of shape (n,)
            Frequency domain of the passed data (used to make column names)
    
    Returns
    -------
    df : pd.DataFrame
         DataFrame where each row is a cry instance and each column is a frequency
         with a final 'label' column
    """
    # Data to df
    df = pd.DataFrame(data,columns=freqs)
    # Labels column
    df['labels'] = labels
    return df

In [None]:
# Get the data.
X, y = load_data()

# Duplicate everything except the hungry data (of which there is already a lot).
X_dup, y_dup = duplicate_data(X[y != 'hungry'], y[y != 'hungry'])

In [None]:
# Put the data together.
X = np.vstack([X_dup, X[y == 'hungry'][::3]])
y = np.hstack([y_dup, y[y == 'hungry'][::3]])

In [None]:
# Make y be integer-valued.
y, mapping = labels_to_integers(y)

In [None]:
# Get the data in frequency space
Xf, freqs = get_fft(X)

In [None]:
print(X.shape)
print(y.shape)
print(Xf.shape)

# Try some ML methods

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.discriminant_analysis import QuadraticDiscriminantAnalysis, LinearDiscriminantAnalysis
from sklearn.linear_model import LinearRegression
from sklearn.mixture import GaussianMixture
from sklearn.cluster import KMeans

In [None]:
Xf_train, Xf_test, X_train, X_test, y_train, y_test = train_test_split(
    Xf, X, y, test_size=0.33, random_state=42)

## Regression

### OLS

In [247]:
OLSf = LinearRegression().fit(Xf_train,y_train)

In [248]:
pct_correct = np.mean(np.round(OLSf.predict(Xf_test)) != y_test)
print('Percent correct (rounded to nearest class):', round(1 - pct_fail, 4))

Percent correct (rounded to nearest class): 0.2391


In [249]:
np.mean(np.round(OLSf.predict(Xf_test)) == y_test)

0.2391304347826087

In [250]:
print('R^2 value:', OLSf.score(Xf_test, y_test))

R^2 value: -6.185488616398643


In [251]:
OLS = LinearRegression().fit(X_train,y_train)

In [252]:
pct_correct = np.mean(np.round(OLS.predict(X_test)) != y_test)
print('Percent correct (rounded to nearest class):', round(1 - pct_fail, 4))

Percent correct (rounded to nearest class): 0.2391


In [253]:
np.mean(np.round(OLS.predict(X_test)) == y_test)

0.32608695652173914

In [254]:
print('R^2 value:', OLS.score(X_test, y_test))

R^2 value: -1.4382597217286501


## Gaussian Discriminant Analysis

### QDA

In [255]:
qdaf = QuadraticDiscriminantAnalysis().fit(Xf_train, y_train)



In [256]:
np.mean(qdaf.predict(Xf_test) == y_test)

0.34782608695652173

In [257]:
qda = QuadraticDiscriminantAnalysis().fit(X_train, y_train)



In [258]:
np.mean(qda.predict(X_test) == y_test)

0.2717391304347826

### LDA

In [259]:
ldaf = LinearDiscriminantAnalysis().fit(Xf_train, y_train)



In [260]:
np.mean(ldaf.predict(Xf_test) == y_test)

0.44565217391304346

In [261]:
lda = LinearDiscriminantAnalysis().fit(X_train, y_train)



In [262]:
np.mean(lda.predict(X_test) == y_test)

0.7065217391304348

## Clustering

### K-Means

In [192]:
km = KMeans(n_clusters=5).fit(Xf)
km.p

### GMM

In [264]:
gmm = GaussianMixture(n_components=5).fit(Xf_train)

In [266]:
gmm.predict(Xf_test)

array([4, 1, 1, 4, 1, 1, 1, 0, 4, 3, 1, 2, 4, 2, 0, 1, 0, 0, 4, 4, 2, 2,
       1, 3, 4, 4, 3, 1, 3, 0, 1, 1, 4, 1, 0, 3, 4, 1, 1, 4, 3, 4, 4, 1,
       3, 1, 1, 0, 4, 0, 1, 1, 2, 1, 4, 4, 1, 3, 4, 1, 4, 4, 2, 1, 3, 2,
       1, 4, 3, 4, 1, 1, 1, 2, 2, 0, 2, 2, 3, 3, 3, 3, 4, 1, 1, 1, 4, 3,
       4, 4, 2, 1])

In [267]:
y_test

array([2., 4., 3., 4., 3., 3., 2., 3., 3., 4., 0., 3., 3., 2., 3., 2., 3.,
       4., 3., 3., 1., 4., 3., 3., 3., 3., 0., 2., 0., 1., 2., 3., 2., 3.,
       3., 0., 3., 3., 0., 4., 1., 3., 4., 2., 1., 3., 2., 0., 3., 3., 0.,
       3., 3., 0., 3., 0., 3., 1., 2., 0., 3., 3., 2., 4., 2., 3., 3., 4.,
       4., 3., 2., 4., 3., 0., 2., 3., 3., 2., 1., 2., 3., 4., 3., 3., 2.,
       3., 4., 4., 3., 3., 3., 2.])

# EXTRA STUFF -- PLAYING AROUND

In [None]:
class SoundWave(object):
    """A class for working with digital audio signals."""


    def __init__(self, rate, samples):
        """Set the SoundWave class attributes.

        Parameters:
            rate (int): The sample rate of the sound.
            samples ((n,) ndarray): NumPy array of samples.
        """
        # Set the attributes
        self.rate = rate
        self.samples = samples
        self.num_samples = samples.size


    def plot(self, useDFT=False):
        """Plot the graph of the sound wave (time versus amplitude)."""
        plt.figure()
        if useDFT:
            plt.subplot(122)
            plt.title("Fourier Transform")
            plt.xlabel("Frequency (Hz)")
            plt.ylabel("Magnitude")
            # Correctly set the x axis to be in terms of frequency
            x = np.arange(self.num_samples)*self.rate/self.num_samples
            y = np.abs(fft(self.samples))
            x_end = self.num_samples*self.rate/self.num_samples/2
            #plt.xlim(-.02*x_end,x_end)
            plt.ylim(0,1.075*np.max(y))
            plt.plot(x,y)
            plt.subplot(121)
        
        # Set the y axis limits
        plt.ylim(-32768, 32767)
        plt.title("Sound Wave")
        # Correctly label the x axis in terms of seconds
        x = np.linspace(0,self.num_samples/self.rate,self.num_samples)
        plt.xlabel("Time (s)")
        plt.ylabel("Samples")
        plt.plot(x,self.samples)


    def export(self, filename, force=False):
        """Generate a wav file from the sample rate and samples. 
        If the array of samples is not of type np.int16, scale it before exporting.

        Parameters:
            filename (str): The name of the wav file to export the sound to.
        """
        # Note: we convert to int64 so that we can multiply by 
        #  32767, then we convert the result back to int16
        if force:
            samples = np.array(self.samples,dtype=np.int64)
            samples = np.array(32767*samples/np.max(np.abs(samples)),
                               dtype=np.int16)
        elif type(self.samples) is not np.int16:
            samples = np.array(32767*samples/np.max(np.abs(samples)),
                               dtype=np.int16)
        else:
            samples = self.samples

        # Write the samples to a file
        wavfile.write(filename, self.rate, samples)
        

    def __len__(self):
        """Returns the length of the sound file in seconds."""
        return int(round(self.num_samples/self.rate))

    
    def __add__(self, other):
        """Combine the samples from two SoundWave objects.

        Parameters:
            other (SoundWave): An object containing the samples to add
                to the samples contained in this object.
        
        Returns:
            (SoundWave): A new SoundWave instance with the combined samples.

        Raises:
            ValueError: if the two sample arrays are not the same length.
        """
        # Make sure they are the same length
        if other.num_samples != self.num_samples:
            raise ValueError("The two sample sounds are not the same length.")
        
        # Get the element-wise sum of the two samples.
        sample1 = np.array(other.samples,dtype=np.int32)
        sample2 = np.array(self.samples,dtype=np.int32)
        summed_wave = sample1 + sample2
        # Scale the sample if necessary
        if np.max(np.abs(summed_wave)) > 32767:
            summed_wave = np.array(summed_wave*32767
                    /np.max(np.abs(summed_wave)),dtype=np.int16)
            
        return SoundWave(self.rate, summed_wave)
        

    # Problem 1.4
    def __rshift__(self, other):
        """Concatentate the samples from two SoundWave objects.

        Parameters:
            other (SoundWave): An object containing the samples to concatenate
                to the samples contained in this object.

        Raises:
            ValueError: if the two sample rates are not equal.
        """
        # Error checking
        if other.rate != self.rate:
            raise ValueError("The two samples' rates are not equal.")
        
        return SoundWave(self.rate, np.concatenate((self.samples,other.samples)))
                                   
    
    def __mul__(self, other):
        """Convolve the samples from two SoundWave objects using circular convolution.
        
        Parameters:
            other (SoundWave): An object containing the samples to convolve
                with the samples contained in this object.
        
        Returns:
            (SoundWave): A new SoundWave instance with the convolved samples.

        Raises:
            ValueError: if the two sample rates are not equal.
        """
        if self.rate != other.rate:
            raise ValueError("The sample rates are not equal.")
        
        f = self.samples
        g = other.samples
        if len(f) < len(g):
            f = np.concatenate([f,np.zeros(len(g)-len(f))])
        if len(g) < len(f):
            g = np.concatenate([g,np.zeros(len(f)-len(g))])
            
        f_conv_g = ifft(fft(f)*fft(g)).real
        return SoundWave(self.rate, f_conv_g)

    
    def __pow__(self, other):
        """Convolve the samples from two SoundWave objects using linear convolution.
        
        Parameters:
            other (SoundWave): An object containing the samples to convolve
                with the samples contained in this object.
        
        Returns:
            (SoundWave): A new SoundWave instance with the convolved samples.

        Raises:
            ValueError: if the two sample rates are not equal.
        """
        if self.rate != other.rate:
            raise ValueError("The sample rates are not equal.")
            
        n,m = self.num_samples, other.num_samples
        
        # Find the smallest 2**a such that 2**a >= n + m - 1
        to_compare = n + m - 1
        a = 0
        while 2**a < to_compare:
            a += 1
        
        # Append zeros to f,g so that both are length 2**a
        f, g = self.samples, other.samples
        f = np.concatenate([f,np.zeros(2**a - n)])
        g = np.concatenate([g,np.zeros(2**a - m)])
        
        out = ifft(fft(f)*fft(g))[:(n+m-1)].real
        return SoundWave(self.rate, out)

    
    def clean(self, low_freq, high_freq):
        """Remove a range of frequencies from the samples using the DFT. 

        Parameters:
            low_freq (float): Lower bound of the frequency range to zero out.
            high_freq (float): Higher boound of the frequency range to zero out.
        """
        
        a, b = ( int(low_freq/self.rate*self.num_samples), 
                 int(high_freq/self.rate*self.num_samples) )
        
        # Get the frequencies
        freq = fft(self.samples)
        
        # zero them out between the low and high frequency
        freq[a:b] = 0 
        freq[(freq.size-b):(freq.size-a)] = 0
        
        # Convert the frequencies back to the sound wave
        self.samples = ifft(freq).real

In [None]:
# Some of the test files I can mess around with.
file_ti1 = 'raw_data/tired/03ADDCFB-354E-416D-BF32-260CF47F7060-1433658024-1.1-f-04-ti.wav'
file_ti2 = 'raw_data/tired/06c4cfa2-7fa6-4fda-91a1-ea186a4acc64-1430029221058-1.7-f-26-ti.wav'
file_hu1 = 'raw_data/hungry/02c3b725-26e4-4a2c-9336-04ddc58836d9-1430726196216-1.7-m-04-hu.wav'
file_hu2 = 'raw_data/hungry/02ead89b-aa02-453e-8b83-6ebde9fe7551-1430233132879-1.7-m-26-hu.wav'

In [None]:
SW = SoundWave(*wavfile.read(file_ti2))
SW.plot(useDFT=True)
IPython.display.Audio(filename=file_ti2)

In [None]:
SW = SoundWave(*wavfile.read(file_hu1))
SW.plot(useDFT=True)
IPython.display.Audio(filename=file_hu1)

In [None]:
SW2 = SoundWave(*wavfile.read(file_hu2))
SW2.plot(useDFT=True)
IPython.display.Audio(filename=file_hu2)

In [None]:
SW = SoundWave(*wavfile.read(file_ti1))
SW.plot(useDFT=True)
IPython.display.Audio(filename=file_ti1)

In [None]:
bare_y = y[::10]
ifftbare_y = ifft(bare_y).real
new_samples = 32767*ifftbare_y/np.max(np.abs(ifftbare_y))
bare_SW = SoundWave(SW.rate//10, new_samples)
bare_SW.plot(useDFT=True)
bare_SW.export('reduced_sound.wav', force=True)

In [None]:
IPython.display.Audio(filename='reduced_sound.wav')

In [None]:
x = np.arange(SW.num_samples)*SW.rate/SW.num_samples
y = np.abs(fft(SW.samples))
plt.figure()
plt.plot(x[:len(x)//2], y[:len(x)//2])
plt.title(f'Using {len(x)//2} frequency measurements')
plt.figure()
plt.plot(x[:len(x)//2:10], y[:len(x)//2:10])
plt.title(f'Using {len(x[:len(x)//2:10])} frequency measurements')
plt.figure()
plt.plot(x[:len(x)//2:100], y[:len(x)//2:100])
plt.title(f'Using {len(x[:len(x)//2:100])} frequency measurements')

In [None]:
# Section to get all the data files.
all_data = []
for folder in ['belly_pain', 'burping', 'discomfort', 'hungry', 'tired']:
    all_data.append('raw_data/'+folder)

In [None]:
data, labels = load_data()
data

In [None]:
labels

In [None]:
dup, lab = duplicate_data(data[:2], labels[:2])
dup

In [None]:
plt.figure()
plt.plot(dup[0][0])

plt.figure()
plt.plot(dup[2][0])

In [None]:
X = []
lens = set()
for d, _ in data:
    X.append(d)
X = np.array(X)
X

In [None]:
np.max(X, axis=1)