In [36]:
import time
import argparse

import numpy as np
import matplotlib.pyplot as plt
from matplotlib.gridspec import GridSpec
import seaborn as sns

import librosa
import librosa.display
import IPython.display as ipd
from scipy.signal import wiener

import tensorflow as tf

import warnings
warnings.filterwarnings("ignore")

filename_1 = 'audio_files/videoplayback.wav'
filename_2 = 'audio_files/videoplayback2.m4a'

In [18]:
def ArgParser():
    parser = argparse.ArgumentParser()
    
    parser.add_argument("--sample_rate", dest="sample_rate", type=int, default=16000)
    parser.add_argument("--n_fft", dest="n_fft", type=int, default=2048)
    parser.add_argument("--window_size", dest="window_size", type=int, default=400) # 25ms
    parser.add_argument("--hop_length", dest="hop_length", type=int, default=160) # 10ms
    parser.add_argument("--n_mels", dest="n_mels", type=int, default=64)
    parser.add_argument("--n_mfcc", dest="n_mfcc", type=int, default=13)
    parser.add_argument("--max_samples", dest="max_samples", type=int, default=64000)
    parser.add_argument("--delta_width", dest="delta_width", type=int, default=3)
    parser.add_argument("--learning_rate", dest="learning_rate", type=float, default=1e-3)
    parser.add_argument("--epochs", dest="epochs", type=int, default=50)
    parser.add_argument("--dropout", dest="dropout", type=float, default=0.5)
    parser.add_argument("--type", dest="type", type=str, default="mfcc")
    parser.add_argument("--main_dir", dest='main_dir', type=str, default="Datasets/TIMIT-dataset/tfrec_data")
    
    args = parser.parse_known_args()[0]
    seq_len = int(np.ceil(args.max_samples / args.hop_length))
    if args.type == "mel":
        input_shape = (seq_len, args.n_mels, 1)
    else:
        input_shape = (seq_len, (args.n_mfcc * 3) + 2, 1)
    parser.add_argument("--input_shape", type=tuple, default=input_shape)
    parser.add_argument("--seq_len", type=int, default=seq_len)
    return parser.parse_known_args()[0]

args = ArgParser()
args

Namespace(delta_width=3, dropout=0.5, epochs=50, hop_length=160, input_shape=(400, 41, 1), learning_rate=0.001, main_dir='Datasets/TIMIT-dataset/tfrec_data', max_samples=64000, n_fft=2048, n_mels=64, n_mfcc=13, sample_rate=16000, seq_len=400, type='mfcc', window_size=400)

# Helper functions

In [None]:
class Algorithm:
    def __init__(self):
        self.args = args
        self.data = 

    def AudioProcessing(self, wav_path):


        

In [45]:
def stream(block_size, data, sr, duration):
    block_start = np.arange(0, int(duration), step=block_size) * sr
    block_end = block_start[1::]
    for i, j in zip(block_start, block_end):
        y_block = data[i:j]
        yield y_block

def extract_phonemes(voiced_flag):
    word, words = [], []
    for i, j in enumerate(voiced_flag):
        if j == True:
            word.append(i)
        else:
            if word != []:
                words.append(word)
            word = []
    return words

def extract_stats(phonemes, times, f0):
    stats = {}
    idx, mean, std = [], [], []
    for i, phoneme in enumerate(phonemes):
        mag = [f0[idx] for idx in phoneme]
        mean.append(np.mean(mag))
        std.append(np.std(mag))
        idx.append(i)
    stats['index'] = idx
    stats['mean'] = mean
    stats['std'] = std
    return stats

def frames_to_samples(phoneme, hop_length):
    start_idx = librosa.frames_to_samples(
        phoneme[0], hop_length=hop_length, n_fft=2048)
    end_idx = librosa.frames_to_samples(
        phoneme[-1], hop_length=hop_length, n_fft=2048)
    return start_idx, end_idx

def Segmentor(args):
    spectrogram = Input(shape=args.input_shape, dtype=tf.float32, name='audio')
    mask = Input(shape=args.input_shape[0], dtype=tf.bool, name='mask')

    x = TimeDistributed(Conv1D(128, 3))(spectrogram)
    x = TimeDistributed(ReLU())(x)
    x = TimeDistributed(BatchNormalization())(x)
    x = TimeDistributed(MaxPool1D(2))(x)

    x = TimeDistributed(Conv1D(64, 3))(x)
    x = TimeDistributed(ReLU())(x)
    x = TimeDistributed(BatchNormalization())(x)
    x = TimeDistributed(MaxPool1D(2))(x)

    x = TimeDistributed(Conv1D(32, 3))(x)
    x = TimeDistributed(ReLU())(x)
    x = TimeDistributed(BatchNormalization())(x)
    x = TimeDistributed(MaxPool1D(2))(x)
    
    x = TimeDistributed(Flatten())(x)
    x = Bidirectional(LSTM(100, dropout=args.dropout, return_sequences=True))(x, mask=mask)
    x = Bidirectional(LSTM(25, dropout=args.dropout, return_sequences=True))(x, mask=mask)
    x = TimeDistributed(Dense(2, activation='softmax'))(x, mask=mask)
    model = Model(inputs=[spectrogram, mask], outputs=x, name='Segmentor')
    model.load_weights("model_weights\segmentor.h5")
    return model

def Inference(y, sr):
    # Get attention mask
    n_frames = len(y) // args.hop_length
    pad_length = args.seq_len - n_frames 
    mask = tf.concat([tf.ones([n_frames]), tf.zeros([pad_length])], axis=-1)
    mask = tf.cast(mask, dtype=tf.bool)
    mask = tf.expand_dims(mask, axis=0)

    # Get mfcc
    y = librosa.util.fix_length(y, args.max_samples)
    mfcc = librosa.feature.mfcc(
        y=y, sr=sr, n_mfcc=args.n_mfcc, hop_length=args.hop_length,
        win_length=args.window_size, n_mels=args.n_mels,
        n_fft=args.n_fft, fmin=0, fmax=8000)[:, :args.seq_len]
    mfcc = np.transpose(mfcc)
    delta = librosa.feature.delta(
        mfcc, width=args.delta_width, order=1, axis=0)
    delta2 = librosa.feature.delta(
        mfcc, width=args.delta_width, order=2, axis=0)
    zcr = librosa.feature.zero_crossing_rate(
        y=y, frame_length=args.window_size, hop_length=args.hop_length)
    zcr =  np.transpose(zcr)[:-1, :]
    f0 = librosa.yin(y=y, sr=sr, fmin=50, fmax=2000, win_length=args.window_size,
        hop_length=args.hop_length, frame_length=2048)
    f0 = np.expand_dims(np.diff(f0, axis=-1), axis=-1)
    mfcc = np.concatenate((mfcc, delta, delta2, zcr, f0), axis=-1)
    mfcc = tf.convert_to_tensor(mfcc, dtype=tf.float32)
    mfcc = tf.expand_dims(mfcc, axis=0)

    # Infer
    inputs = {"audio": mfcc, "mask": mask}
    model = Segmentor(args)
    y_pred = model.predict(inputs)[0]
    y_pred = tf.argmax(y_pred, axis=-1)
    y_pred = tf.squeeze(np.where(y_pred==1))
    y_pred = [(y_pred[i].numpy(), y_pred[i+1].numpy()) for i, b in enumerate(y_pred) if i < len(y_pred) - 1]

    # Post-processing
    y_pred = [frames for frames in y_pred if (frames[1] - frames[0]) >= 2]   
    return y_pred

def Plotter(filename, block_size, block):
    
    
    

class Algorithm():
    def __init__(self, filename, block_size):
        self.data, self.sr = librosa.load(
            filename, sr=librosa.get_samplerate(filename))
        self.y_pred
        self.duration = librosa.get_duration(self.data, sr=self.sr)
        self.fmin = librosa.note_to_hz('G2')
        self.fmax = librosa.note_to_hz('A4')
        self.frame_length = 2048
        self.hop_length = self.frame_length // 4
        self.block_size = block_size
        self.n_blocks = int(self.duration // self.block_size)
        self.residual = 0
        self.benchmark = 0
        self.forward = 0
        self.mag_rate = 0.1
        self.grad_limit = 1.0
        self.mono_block = False
        self.model = Segmentor(args)
    
    def train(self):
        false_counter, true_counter = 0, 0
        blocks = []
        for i, y_block in enumerate(
                stream(block_size=self.block_size, data=self.data,
                       sr=self.sr, duration=self.duration)):          

            total = 0       
            start_time = time.time()
            f0, voiced_flag, voiced_prob = librosa.pyin(
                    y_block, sr=self.sr, fmin=self.fmin, fmax=self.fmax,
                    frame_length=self.frame_length, switch_prob=1e-10,
                    no_trough_prob=1e-2, resolution=1e-1)
                
            zcr = librosa.feature.zero_crossing_rate(y_block, frame_length=self.frame_length*2)[0] 
            zcr = np.abs(np.gradient(zcr))
            
            rms = librosa.feature.rms(y_block)[0] 

            f0 = np.where(
                (voiced_flag==True) & (rms<rms.mean()) & (zcr<0.2), 
                np.nan, f0)

            voiced_flag = np.where(
                (voiced_flag==True) & (rms<rms.mean()) & (zcr<0.2), 
                False, voiced_flag)

            voiced_sum = voiced_flag.sum()

            if voiced_sum == 0:
                print("{}/{} - No speech detected.".format(i+1, self.n_blocks))
                false_counter += 1
                pass
            else:
                phonemes = extract_phonemes(voiced_flag)
                for j, phoneme in enumerate(phonemes):
                    start_idx, end_idx = phoneme[0], phoneme[-1]
                    length = end_idx - start_idx
                    mean = np.mean(f0[start_idx:end_idx])
                    delta = np.abs(mean - self.residual)
                    if length > 1:
                        grad = np.abs(np.mean(np.gradient(f0[start_idx:end_idx], edge_order=1)))
                    else:
                        grad = 1
                    if self.forward == 0:
                        self.buffer = len(phonemes) // 2
                    if ((self.forward == 0) & (j < self.buffer)) | (delta > self.benchmark * 0.3):
                        self.benchmark += (mean / self.buffer)
                    else:
                        if (self.benchmark * self.mag_rate > delta) & (grad < self.grad_limit):
                            total += length
                    self.residual = mean

                self.forward = mean
                voiced_ratio = voiced_sum / len(voiced_flag)
                elapsed_time = time.time() - start_time
                monotonic_ratio = total / voiced_sum
                
                if monotonic_ratio >= 0.4:
                    if true_counter == 0:
                        block_start = i * self.block_size
                    else:
                        false_counter = 0
                    mono_block = True
                    true_counter += 1
                else:
                    mono_block = False
                    false_counter += 1
                    if (false_counter >= 3) & (true_counter > 0): # Reset counters
                        true_counter = 0 
                        false_counter = 0
                        blocks.append([block_start, i * self.block_size])

                print("{}/{} - voiced_ratio: {:.2f}% - phonemes: {} - monotonic_frames: {} - monotonic_ratio: {:.2f}% - elapsed_time: {:.2f}s".format(
                    i+1, self.n_blocks, voiced_ratio*100, len(phonemes), total, monotonic_ratio*100, elapsed_time)) 
        
        print("Mononotic blocks:\n", blocks)          

In [48]:
y, sr = librosa.load(filename_1, sr=None)
if sr != args.sample_rate:
    y = librosa.resample(y, orig_sr=sr, target_sr=args.sample_rate)
    sr = args.sample_rate

print(len(y))

40724457


In [None]:
def Stream(block_size, data, sr, duration):
    block_start = np.arange(0, int(duration), step=block_size) * sr
    block_end = block_start[1::]
    for i, j in zip(block_start, block_end):
        y_block = data[i:j]
        yield y_block

In [None]:
def Stream(frame_size, y, sr):

# Data Analysis
## Monotone 1

In [46]:
outputs = Plotter(filename=filename_1, block_size=6, block=0)

40724457


NameError: name 'data' is not defined

In [None]:
y, sr, phonemes, times, hop_length, f0 = plotter(filename=filename_1, block_size=6, block=10)

### Block sample

In [None]:
ipd.Audio(data=y, rate=sr)

### Phoneme sample

In [None]:
if phonemes != []:
    start_idx, end_idx = frames_to_samples(phonemes[0], hop_length)
    ipd.display(ipd.Audio(y[start_idx:end_idx]*2, rate=sr, autoplay=True))
else:
    print("No phonemes detected.")

In [None]:
stats = extract_stats(phonemes, times, f0)
sns.lineplot(x=stats['index'], y=stats['mean'], label='mean')
sns.lineplot(x=stats['index'], y=stats['std'], label='std')
plt.show()

## Normal

In [None]:
y, sr, phonemes, times, hop_length, f0 = plotter(filename=filename_2, block_size=6, block=2)

### Block sample

In [None]:
ipd.Audio(data=y, rate=sr)

### Phoneme sample

In [None]:
if phonemes != []:
    start_idx, end_idx = frames_to_samples(phonemes[2], hop_length)
    ipd.display(ipd.Audio(y[start_idx:end_idx]*2, rate=sr, autoplay=True))
else:
    print("No phonemes detected.")

In [None]:
stats = extract_stats(phonemes, times, f0)
sns.lineplot(x=stats['index'], y=stats['mean'], label='mean')
sns.lineplot(x=stats['index'], y=stats['std'], label='std')
plt.show()

# Training

$\frac{voiced frames}{duration}$ = voiced ratio

$\frac{monotonic frames}{voiced frames}$ = monotonic ratio


## Monotone (Monologue)

In [None]:
Algorithm(filename=filename_1, block_size=6).train()

## Normal (Clip)

In [None]:
Algorithm(filename=filename_2, block_size=6).train()