In [None]:
import json
import math
import os
import random
from typing import (Any, Dict, List, Tuple)

import IPython
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
import scipy
import scipy.io.wavfile
import scipy.signal
import sklearn
import sklearn.tree


print('IPython.__version__ = %s' % IPython.__version__)
print('numpy.__version__ = %s' % np.__version__)
print('matplotlib.__version__ = %s' % matplotlib.__version__)
print('scipy.__version__ = %s' % scipy.__version__)
print('sklearn.__version__ = %s' % sklearn.__version__)

print('\nseeding random with %d\n' % int("4ec4215f92cf3e", 16))
random.seed(int("4ec4215f92cf3e", 16))

with open('/proc/%d/status' % os.getpid(), 'rb') as f:
    print(f.read().decode('utf-8'))

In [None]:
!ls ../audios

In [None]:
kelly_wife_or_dog = 'wr2sVPTacTE'
dhoom_taana = 'TjUXr560Gu0'
video_name = kelly_wife_or_dog
# video_name = dhoom_taana
rate, data = scipy.io.wavfile.read('../audios/%s.wav' % video_name)
print('data.shape = %s' % str(data.shape))
print('data.dtype = %s' % data.dtype)

In [None]:
length = data.shape[0] / rate
print('number of channels = %d' % data.shape[1])
print('length = %f seconds' % length)

if data.shape[1] > 1:
    print('selecting channel 0')
    data = data[:, 0]
    
limit = 120
if length > limit:
    print('shortening to %d seconds' % limit)
    data = data[:limit * rate].copy()
    length = float(limit)
    
print('final shape = %s' % str(data.shape))

In [None]:
# I decided to use `stft` instead of `spectrogram` because it seemed simpler to understand it's
# "window" and "overlap" arguments.
# https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.stft.html vs
# https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.spectrogram.html . 
# The `spectrogram` documentation has a nice overview, but does a poor job at explaining
# windows with respect to the `window` argument. I tried also looking at
# https://homes.cs.washington.edu/~thickstn/spectrograms.html but ultimately settled on
# `stft`. Note that the result of `stft` is in the complex domain, so we have to use the
# magnitude to get a real value.
window_size = int(rate) // 100
step_size = window_size // 2
windows_per_second = int(rate) // step_size
freqs, times, spectro = scipy.signal.stft(
    data,
    rate,
    window='hann', # default, as specified by the documentation (listed above)
    nperseg=window_size,
    noverlap=window_size // 2
)
# Note that my interpretation of spectro is more like spectro.T. In that, each row in
# spectro.T represents the frequency strengths at a particular time. Specifically,
# spectro.T[0] represents the first 10 ms of `data`
print('data.shape = %s' % str(data.shape))
print('spectro.shape = %s' % str(spectro.shape))
print('(spectro.shape[0] - 1) * (spectro.shape[1] - 1) = %d' % ((spectro.shape[0] - 1) * (spectro.shape[1] - 1)))
print('spectro.dtype = %s' % spectro.dtype)

In [None]:
time_range = np.linspace(0, length, data.shape[0])
plt.plot(time_range, data, label='channel 0')

# Some interesting markers
if video_name == dhoom_taana:
    # The start of the chorus
    plt.axvline(x=34, color='#d62728')
    # The start of the solo singer, "Kaise, ..."
    plt.axvline(x=43, color='#d62728')
elif video_name == kelly_wife_or_dog:
    # The long, slow "Sooooo, Kelly..."
    plt.axvline(x=11.8, color='#d62728')

plt.legend()
plt.xlabel('Time (seconds)')
plt.ylabel('Amplitude')
plt.gcf().set_size_inches([15, 4]) # default is 6 x 4
plt.show()

In [None]:
time_range = np.linspace(0, limit, spectro.shape[1])
signal1 = np.abs(spectro).T[:, :5].sum(axis=1)
signal2 = np.abs(spectro).T[:, 5:10].sum(axis=1)
plt.plot(time_range, signal1, label='sum(freqs[:5])')
plt.plot(time_range, signal2, label='sum(freqs[5:10])')

# Some interesting markers
if video_name == dhoom_taana:
    # The start of the chorus
    plt.axvline(x=34, color='#d62728')
    # The start of the solo singer, "Kaise, ..."
    plt.axvline(x=43, color='#d62728')
elif video_name == kelly_wife_or_dog:
    # The long, slow "Sooooo, Kelly..."
    plt.axvline(x=11.8, color='#d62728')

plt.legend()
plt.xlabel('Time (seconds)')
plt.ylabel('Amplitude')
plt.gcf().set_size_inches([15, 4]) # default is 6 x 4
plt.show()

In [None]:
# I'm using `np.arange(freqs.shape[0])` instead of `freqs` directly
# because that's just how I think about frequencies...
plt.pcolormesh(times, np.arange(freqs.shape[0])[:60], np.abs(spectro)[:60, :])

# Some interesting markers
if video_name == dhoom_taana:
    # The start of the chorus
    plt.axvline(x=34, color='#d62728')
    # The start of the solo singer, "Kaise, ..."
    plt.axvline(x=43, color='#d62728')
elif video_name == kelly_wife_or_dog:
    # The long, slow "Sooooo, Kelly..."
    plt.axvline(x=11.8, color='#d62728')
    
plt.ylabel('Frequency')
plt.xlabel('Time (seconds)')
plt.gcf().set_size_inches([15, 4]) # default is 6 x 4
plt.show()

In [None]:
# Pass in the transpose of `data` (`data.T`) because IPython expects
# a different shape than what scipy returns.
# https://stackoverflow.com/questions/57137050/i-am-facing-problems-displaying-audio-file-using-python
IPython.display.Audio(data=data.T, rate=rate)

In [None]:
# Starts at about ~34 seconds
dhoom_taana_lyrics = """
Dhoom Taana Ta Dum Ta Na Na Na
Dhoom Taana Ta Dum Ta Na Na Na
Dhoom Taana Dhir Na Dhir Na...

Kaise, Naino Se Nain Milao Sajna
Kaise, Mein Aise Na Ghabrao Sajna
Kaise, Aaye Na Aise Mohe Laaj Sajna
Choona Na Dehko Mohe Aaj Sajna
"""
# Ends half way through "mohe" (doesn't get to finish "aaj sajna").
# Per line, there are 8, 8, and 6 sounds and then 6, 6, 7, and finally
# 3.5 words that are uttered in the first 60 seconds of the song.

dhoom_taana_utterances = [
    {'start': 34, 'end': 34.2, 'duration': 0.2, 'content': 'dhoom'},
    {'start': 43, 'end': 43.9, 'duration': 0.9, 'content': 'kaise'},
    {'start': 47.5, 'end': 48.1, 'duration': 0.6, 'content': 'kaise'},
    {'start': 51.5, 'end': 52.1, 'duration': 0.6, 'content': 'kaise'},
]

kelly_wife_or_dog_utterances = []
with open('../outputs/%s.json' % kelly_wife_or_dog, 'rb') as f:
    raw_output = json.loads(f.read().decode('utf-8'))
    
result_items = raw_output['results']['items']
for item in result_items:
    if 'start_time' not in item:
        continue
    start_time = float(item['start_time'])
    if start_time > limit:
        continue
    end_time = float(item['end_time'])
    kelly_wife_or_dog_utterances.append({
        'start': start_time,
        'end': end_time,
        'duration': end_time - start_time,
        'content': item['alternatives'][0]['content'],
    })
    print(kelly_wife_or_dog_utterances[-1])

utterances = []
if video_name == dhoom_taana:
    utterances = dhoom_taana_utterances
elif video_name == kelly_wife_or_dog:
    utterances = kelly_wife_or_dog_utterances
    
print('num utterances = %d' % len(utterances))

In [None]:
def labelsFromUtterances(utterances: List[Dict[str, Any]], n_rows: int) -> Tuple[int, np.ndarray]:
    """return: np.ndarray[ndtype=intish, shape=[n_rows]]"""
    positive_examples = 0
    labels = np.zeros(n_rows)
    for item in utterances:
        # math.ceil rounds up to the latest millisecond for labeling
        start_i = int(math.ceil(item['start'] * windows_per_second))
        end_i = int(math.ceil(item['end'] * windows_per_second))
        for i in range(start_i, min(end_i, n_rows)):
            labels[i] = 1
            positive_examples += 1
    return positive_examples, labels


def trainFromSpectro(spectro: np.ndarray, utterances: List[Dict[str, Any]]) -> sklearn.tree.DecisionTreeClassifier:
    """spectro: np.ndarray[ndtype=float64, shape=[Nwindows, Nfreqs]]"""
    # TODO replace Dict with a dataclass
    print('spectro.shape = %s' % str(spectro.shape))
    
    positive_examples, labels = labelsFromUtterances(utterances, spectro.shape[0])
    print('got %d positive examples for training' % positive_examples)
    print('out of %d leaves %d negative examples (i.e. not talking)' % (spectro.shape[0], spectro.shape[0] - positive_examples))
    
    classifier = sklearn.tree.DecisionTreeClassifier(
        random_state=random.randint(0, 2 ** 32 - 1),
        max_depth=5
    )
    model = classifier.fit(spectro, labels)
    plt.gcf().set_size_inches([24, 8]) # default is 6 x 4
    sklearn.tree.plot_tree(model, max_depth=2, node_ids=True)
    return model


model = trainFromSpectro(np.abs(spectro).T[:, :60], utterances)

In [None]:
def utterancesFromPredictions(min_word_width, predictions):
    ones = np.ones(min_word_width)
    i = 0
    predicted_utterances = []
    while i < predictions.shape[0] - min_word_width + 1:
        # 0 means ~"no words are spoken"
        if predictions[i] == 0:
            i += 1
            continue
        if (predictions[i : i + min_word_width] != ones).all():
            predictions[i] = 0
            i += 1
            continue
        for j in range(i + min_word_width, predictions.shape[0]):
            if predictions[j] == 0:
                j -= 1
                break
        predicted_utterances.append({
            'start': i / windows_per_second,
            'end': j / windows_per_second,
            'duration': (j - i) / windows_per_second,
            # content is TBD!
        })
        i = j + 1
    return predicted_utterances

predictions = model.predict(np.abs(spectro).T[:, :60])
# Any prediction shorter than this will be considered noise
min_word_width = 8
#predictions[:min_word_width] = np.zeros(min_word_width)
predicted_utterances = utterancesFromPredictions(min_word_width, predictions)
    
print('model found %d possible utterances' % len(predicted_utterances))

# I'm using `np.arange(freqs.shape[0])` instead of `freqs` directly
# because that's just how I think about frequencies...
plt.pcolormesh(times, np.arange(freqs.shape[0])[:60], np.abs(spectro)[:60, :])

for item in predicted_utterances:
    plt.axvline(x=item['start'], color='#d62728')
    
plt.ylabel('Frequency')
plt.xlabel('Time (seconds)')
plt.gcf().set_size_inches([15, 4]) # default is 6 x 4
plt.show()

In [None]:
# I'm using `np.arange(freqs.shape[0])` instead of `freqs` directly
# because that's just how I think about frequencies...
plt.pcolormesh(times, np.arange(freqs.shape[0])[:60], np.abs(spectro)[:60, :])

for item in utterances:
    plt.axvline(x=item['start'], color='#d62728')
    
plt.ylabel('Frequency')
plt.xlabel('Time (seconds)')
plt.gcf().set_size_inches([15, 4]) # default is 6 x 4
plt.show()

In [None]:
def readAndSpectro(video) -> Tuple[int, np.ndarray]:
    """return: np.ndarray[dtype=float64, shape=[Nrows, Nfreqs]]"""
    rate, data = scipy.io.wavfile.read('../audios/%s.wav' % video)
    
    length = data.shape[0] / rate
    if data.shape[1] > 1:
        data = data[:, 0]
    limit = 10
    if length > limit:
        data = data[:limit * rate].copy()
        length = float(limit)
    
    window_size = int(rate) // 100
    step_size = window_size // 2
    windows_per_second = int(rate) // step_size
    freqs, times, spectro = scipy.signal.stft(
        data,
        rate,
        window='hann', # default, as specified by the documentation (listed above)
        nperseg=window_size,
        noverlap=window_size // 2
    )
    return windows_per_second, freqs, times, np.abs(spectro).T

_, freqs, times, dhoom_taana_spectro = readAndSpectro(dhoom_taana)

new_predictions = model.predict(dhoom_taana_spectro[:, :60])
new_predicted_utterances = utterancesFromPredictions(min_word_width, new_predictions)

print('model found %d possible utterances' % len(new_predicted_utterances))

# I'm using `np.arange(freqs.shape[0])` instead of `freqs` directly
# because that's just how I think about frequencies...
plt.pcolormesh(times, np.arange(freqs.shape[0])[:60], dhoom_taana_spectro.T[:60, :])

for item in new_predicted_utterances:
    plt.axvline(x=item['start'], color='#d62728')
    
plt.ylabel('Frequency')
plt.xlabel('Time (seconds)')
plt.gcf().set_size_inches([15, 4]) # default is 6 x 4
plt.show()

In [None]:
with open('/proc/%d/status' % os.getpid(), 'rb') as f:
    print(f.read().decode('utf-8'))