# Sound classification with YAMNet

YAMNet is a deep net that predicts 521 audio event [classes](https://github.com/tensorflow/models/blob/master/research/audioset/yamnet/yamnet_class_map.csv) from the [AudioSet-YouTube corpus](http://g.co/audioset) it was trained on. It employs the
[Mobilenet_v1](https://arxiv.org/pdf/1704.04861.pdf) depthwise-separable
convolution architecture.

#### Install the dependencies

In [None]:
!python3 -m pip install tensorflow
!pip install tensorflow-hub

In [25]:
import tensorflow as tf
import tensorflow_hub as hub
import numpy as np
import csv

import matplotlib.pyplot as plt
from IPython.display import Audio
from scipy.io import wavfile

Load the Model from TensorFlow Hub.

Note: to read the documentation just follow the model's [url](https://tfhub.dev/google/yamnet/1)

In [26]:
# Load the model.
model = hub.load('https://tfhub.dev/google/yamnet/1')

In [27]:
## Add a method to verify and convert a loaded audio is on the proper sample_rate (16K), otherwise it would affect the model's results.
def ensure_sample_rate(original_sample_rate, waveform,
                       desired_sample_rate=16000):
  """Resample waveform if required."""
  if original_sample_rate != desired_sample_rate:
    desired_length = int(round(float(len(waveform)) /
                               original_sample_rate * desired_sample_rate))
    waveform = scipy.signal.resample(waveform, desired_length)
  return desired_sample_rate, waveform

In [28]:
from scipy.io import wavfile

import wave

def convert_pcm_to_wave(pcm_path="Fall1.pcm"):
  # Define parameters
  wav_path = pcm_path.replace(".pcm", ".wav")
  channels = 1
  sample_width = 2  # 2 bytes = 16-bit audio
  frame_rate = 16000  # Hz

  # Read raw PCM data
  with open(pcm_path, 'rb') as pcmfile:
      pcm_data = pcmfile.read()

  # Write WAV file
  with wave.open(wav_path, 'wb') as wavfile:
      wavfile.setnchannels(channels)
      wavfile.setsampwidth(sample_width)
      wavfile.setframerate(frame_rate)
      wavfile.writeframes(pcm_data)

def get_embedding(file_path):
  if file_path.endswith(".pcm"):
    convert_pcm_to_wave(file_path)
    file_path = file_path.replace(".pcm", ".wav")
  # Load the audio.
  sample_rate, wav_data = wavfile.read(file_path, 'rb')
  sample_rate, wav_data = ensure_sample_rate(sample_rate, wav_data)

  # Show some basic information about the audio.
  duration = len(wav_data)/sample_rate
  # print(f'Sample rate: {sample_rate} Hz')
  # print(f'Total duration: {duration:.2f}s')
  # print(f'Size of the input: {len(wav_data)}')

  # # Listening to the wav file.
  # Audio(wav_data, rate=sample_rate)
  # The `wav_data` needs to be normalized to values in `[-1.0, 1.0]` (as stated in the model's [documentation](https://tfhub.dev/google/yamnet/1)).
  waveform = wav_data / tf.int16.max

  # Run the model, check the output.
  scores, embeddings, spectrogram = model(waveform)
  embedding = np.array(tf.reduce_mean(embeddings, axis=0))
  return embedding

### Use Fall1,2,3,4,5.pcm and NoFall1,2,3,4,5.pcm as calibration data

Ensure the folder Sounds exists within the same directory.

In [29]:
import glob
import re
import os

fall_pattern = re.compile(r"Fall\d+\.pcm")
embeddings_fall = []
AUDIO_DIR = "Sounds"
for file in glob.glob(f"{AUDIO_DIR}/Fall*.pcm"):
    filename = os.path.basename(file)
    if fall_pattern.fullmatch(filename):
        embedding = get_embedding(f"{AUDIO_DIR}/{filename}")
        embeddings_fall.append(embedding)
embeddings_fall = np.array(embeddings_fall)

nofall_pattern = re.compile(r"NoFall\d+\.pcm")
embeddings_nofall = []
for file in glob.glob(f"{AUDIO_DIR}/NoFall*.pcm"):
    filename = os.path.basename(file)
    if nofall_pattern.fullmatch(filename):
        embedding = get_embedding(f"{AUDIO_DIR}/{filename}")
        embeddings_nofall.append(embedding)
embeddings_nofall = np.array(embeddings_nofall)

In [30]:
from scipy.spatial import distance
def detect_fall(file_path):
  query = get_embedding(file_path)

  accum_fall_distance = 0
  for i in range(4):
    accum_fall_distance += distance.cosine(query, embeddings_fall[i]) * .25

  accum_nofall_distance = 0
  for i in range(4):
    accum_nofall_distance += distance.cosine(query, embeddings_nofall[i]) * .25

  if accum_fall_distance < accum_nofall_distance:
    return "fall"
  else:
    return "no fall"

In [43]:
detect_fall("output.wav")

'fall'