<a href="https://colab.research.google.com/github/wolfisberg/zhaw-ba-online/blob/main/my_crepe.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Imports

In [None]:
import datetime
import os
import numpy as np
import tensorflow as tf
import scipy.interpolate
import matplotlib.pyplot as plt
import shutil


%load_ext tensorboard


from google.colab import drive
drive.mount('/content/drive')

# Config

In [None]:
# Audio
SNR_RANGE = (-5.0,20.0) #dB
FRAME_LENGTH = 1024
FRAME_STEP = 256
MIN_RAND_GAIN = 0.05
MAX_RAND_GAIN = 1.1
SAMPLE_LENGTH = 3 #shorter than shortest noise/speech sample
FS = 16000
PITCH_SAMPLING_TIME = 0.01 # s
PITCH_FRAME_LENGTH = 0.032 # s


# Data
BATCH_SIZE = 32
# NUM_FRAMES = 1 + (FS * SAMPLE_LENGTH - FRAME_LENGTH) // FRAME_STEP
NUM_FRAMES = 1

# Training
STEPS_PER_EPOCH = 500
EPOCHS = 2
VALIDATION_STEPS = 5


# Directories
_DATA_DIR = os.path.join('/content/drive/MyDrive/BA_2021/')
_TFRECORDS_DIR = os.path.join(_DATA_DIR, 'tfrecords')

SPEECH_DATA_TR_DIR = os.path.join(_TFRECORDS_DIR, 'speech', 'tr')
NOISE_DATA_TR_DIR = os.path.join(_TFRECORDS_DIR, 'noise', 'tr')
SPEECH_DATA_CV_DIR = os.path.join(_TFRECORDS_DIR, 'speech', 'cv')
NOISE_DATA_CV_DIR = os.path.join(_TFRECORDS_DIR, 'noise', 'cv')
SPEECH_DATA_TT_DIR = os.path.join(_TFRECORDS_DIR, 'speech', 'tt')
NOISE_DATA_TT_DIR = os.path.join(_TFRECORDS_DIR, 'noise', 'tt')

TIMESTAMP = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")


# Misc
SEED = 2


# Parsing
PARSING_CONFIG_NOISE = {
    'data': tf.io.VarLenFeature(tf.string),
    'data_sampling_rate': tf.io.VarLenFeature(tf.int64),
    'data_num_channels': tf.io.VarLenFeature(tf.int64),
    'data_width': tf.io.VarLenFeature(tf.int64),
}

PARSING_CONFIG_SPEECH = {
    'data': tf.io.VarLenFeature(tf.string),
    'data_sampling_rate': tf.io.VarLenFeature(tf.int64),
    'data_num_channels': tf.io.VarLenFeature(tf.int64),
    'data_width': tf.io.VarLenFeature(tf.int64),
    'pitch': tf.io.VarLenFeature(tf.float32),
    'pitch_confidence': tf.io.VarLenFeature(tf.float32),
}



In [None]:
print(NOISE_DATA_TR_DIR)

# Data

## Copy Data to Runtime

In [None]:
DATA_DIR_LOCAL = '/content/data'

if not os.path.exists(DATA_DIR_LOCAL):
    os.mkdir(DATA_DIR_LOCAL)
    
    RECORD_DIR_LOCAL = os.path.join(DATA_DIR_LOCAL, 'tfrecords')
    shutil.copytree(_TFRECORDS_DIR, RECORD_DIR_LOCAL)


_TFRECORDS_DIR = os.path.join(DATA_DIR_LOCAL, 'tfrecords')

In [None]:
print(_TFRECORDS_DIR)

## Process Data

In [None]:
def _parse_noise_record(serialized_example):
    parsed_features = tf.io.parse_single_example(serialized_example, features=PARSING_CONFIG_NOISE)
    decoded_features = {
        "data_num_channels": tf.cast(parsed_features["data_num_channels"].values[0], tf.int32),
        "data_sampling_rate": tf.cast(parsed_features["data_sampling_rate"].values[0], tf.int32),
        "data_width": tf.cast(parsed_features["data_width"].values[0], tf.int32),
    }
    data = tf.io.decode_raw(parsed_features['data'].values[0], tf.int16)
    decoded_features.update({"data": data})
    return decoded_features


def _parse_speech_record(serialized_example):
    parsed_features = tf.io.parse_single_example(serialized_example, features=PARSING_CONFIG_SPEECH)
    decoded_features = {
        "data_num_channels": tf.cast(parsed_features["data_num_channels"].values[0], tf.int32),
        "data_sampling_rate": tf.cast(parsed_features["data_sampling_rate"].values[0], tf.int32),
        "data_width": tf.cast(parsed_features["data_width"].values[0], tf.int32),
        "pitch": tf.cast(parsed_features['pitch'].values, tf.float32),
        "pitch_confidence": tf.cast(parsed_features['pitch_confidence'].values, tf.float32),
    }
    data = tf.io.decode_raw(parsed_features['data'].values[0], tf.int16)
    decoded_features.update({"data": data})
    return decoded_features


def _mix_noisy_speech(speech, noise):
    speech_pow = tf.math.reduce_euclidean_norm(speech)
    noise_pow = tf.math.reduce_euclidean_norm(noise)

    min_SNR = SNR_RANGE[0]
    max_SNR = SNR_RANGE[1]
    snr_current = 20.0*tf.math.log(speech_pow/noise_pow)/tf.math.log(10.0)
    snr_target = tf.random.uniform((),minval=min_SNR,maxval=max_SNR)

    noise = noise * tf.math.pow(10.0,(snr_current-snr_target)/20.0)
    noisy_speech = speech+noise

    return speech, noise, noisy_speech


def _interpolate_pitch(pitch,t):
    pitch = pitch.numpy()
    t = t.numpy()
    t_pitch = np.arange(0, len(pitch)) * PITCH_SAMPLING_TIME + PITCH_FRAME_LENGTH / 2
    f = scipy.interpolate.interp1d(t_pitch, pitch, 'nearest')
    return f(t).astype(np.float32)


@tf.function
def _interpolate_pitch_tf(pitch,t):
    y = tf.py_function(_interpolate_pitch,[pitch,t], Tout=tf.float32)
    return tf.squeeze(y)


def _calc_features(speech_data, noise_data):
    speech = tf.squeeze(tf.cast(speech_data["data"], tf.float32))
    noise = tf.squeeze(tf.cast(noise_data["data"], tf.float32))
    speech = speech / tf.int16.max
    noise = noise / tf.int16.max

    # random_start_idx = int(tf.round(tf.random.uniform([], maxval=(
    #         tf.cast(len(noise), tf.float32) - SAMPLE_LENGTH * FS - PITCH_SAMPLING_TIME))))
    # noise = noise[random_start_idx:random_start_idx + SAMPLE_LENGTH * FS]

    # random_start_idx = int(tf.round(tf.random.uniform([], minval=161, maxval=(
    #         tf.cast(len(speech), tf.float32) - SAMPLE_LENGTH * FS - 161))))
    # speech = speech[random_start_idx:random_start_idx + SAMPLE_LENGTH * FS]
    
    random_start_idx = int(tf.round(tf.random.uniform([], maxval=(
            tf.cast(len(noise), tf.float32) - SAMPLE_LENGTH * FS - PITCH_SAMPLING_TIME))))
    noise = noise[random_start_idx:random_start_idx + FRAME_LENGTH]

    random_start_idx = int(tf.round(tf.random.uniform([], minval=161, maxval=(
            tf.cast(len(speech), tf.float32) - SAMPLE_LENGTH * FS - 161))))
    speech = speech[random_start_idx: random_start_idx + FRAME_LENGTH]

    #SNR_range = SNR_RANGE
    frame_length = FRAME_LENGTH
    frame_step = FRAME_STEP
    speech, noise, noisy = _mix_noisy_speech(speech, noise)

    random_gain = tf.math.exp(
        tf.random.uniform([], minval=tf.math.log(MIN_RAND_GAIN), maxval=tf.math.log(MAX_RAND_GAIN)))
    noisy = random_gain * noisy

    noisy_frames = tf.signal.frame(noisy, frame_length, frame_step)
    #noisy_stft = tf.signal.stft(noisy,frame_length,frame_step)
    frame_times = random_start_idx / FS + tf.range(0, NUM_FRAMES) * frame_step / FS + frame_length / FS

    print(frame_times)

    pitch = tf.squeeze(speech_data["pitch"])
    pitch_confidence = tf.squeeze(speech_data["pitch_confidence"])
    #pitch = tf.where(pitch_confidence>config['pitch_confidence_threshold'],pitch,0)
    pitch_interpolated = _interpolate_pitch_tf(pitch, frame_times)
    return noisy_frames, pitch_interpolated

## Provide Data

In [None]:
def get_training_data():
    speech_ds = tf.data.TFRecordDataset([os.path.join(SPEECH_DATA_TR_DIR, file) for file in os.listdir(SPEECH_DATA_TR_DIR)])
    speech_ds = speech_ds.map(_parse_speech_record).repeat(None).shuffle(buffer_size=1000, seed=SEED)

    noise_ds = tf.data.TFRecordDataset([os.path.join(NOISE_DATA_TR_DIR, file) for file in os.listdir(NOISE_DATA_TR_DIR)])
    noise_ds = noise_ds.map(_parse_noise_record).repeat(None).shuffle(buffer_size=1000, seed=SEED)

    dataset_combined = tf.data.Dataset.zip((speech_ds, noise_ds))
    dataset_features = dataset_combined.map(_calc_features, num_parallel_calls=tf.data.experimental.AUTOTUNE)
    dataset_features = dataset_features.batch(BATCH_SIZE, drop_remainder=True).prefetch(tf.data.experimental.AUTOTUNE)

    return dataset_features


def get_validation_data():
    speech_ds = tf.data.TFRecordDataset([os.path.join(SPEECH_DATA_CV_DIR, file) for file in os.listdir(SPEECH_DATA_CV_DIR)])
    speech_ds = speech_ds.map(_parse_speech_record).repeat(None).shuffle(buffer_size=1000, seed=SEED)

    noise_ds = tf.data.TFRecordDataset([os.path.join(NOISE_DATA_CV_DIR, file) for file in os.listdir(NOISE_DATA_CV_DIR)])
    noise_ds = noise_ds.map(_parse_noise_record).repeat(None).shuffle(buffer_size=1000, seed=SEED)

    dataset_combined = tf.data.Dataset.zip((speech_ds, noise_ds))
    dataset_features = dataset_combined.map(_calc_features, num_parallel_calls=tf.data.experimental.AUTOTUNE)
    dataset_features = dataset_features.batch(BATCH_SIZE, drop_remainder=True).prefetch(tf.data.experimental.AUTOTUNE)

    return dataset_features


def get_test_data():
    speech_ds = tf.data.TFRecordDataset([os.path.join(SPEECH_DATA_TT_DIR, file) for file in os.listdir(SPEECH_DATA_TT_DIR)])
    speech_ds = speech_ds.map(_parse_speech_record).repeat(None).shuffle(buffer_size=1000, seed=SEED)

    noise_ds = tf.data.TFRecordDataset([os.path.join(NOISE_DATA_TT_DIR, file) for file in os.listdir(NOISE_DATA_TT_DIR)])
    noise_ds = noise_ds.map(_parse_noise_record).repeat(None).shuffle(buffer_size=1000, seed=SEED)

    dataset_combined = tf.data.Dataset.zip((speech_ds, noise_ds))
    dataset_features = dataset_combined.map(_calc_features, num_parallel_calls=tf.data.experimental.AUTOTUNE)
    dataset_features = dataset_features.batch(BATCH_SIZE, drop_remainder=True).prefetch(tf.data.experimental.AUTOTUNE)

    return dataset_features

# Models

## CREPE

In [None]:
from tensorflow.keras.layers import Input, Reshape, Conv2D, BatchNormalization
from tensorflow.keras.layers import MaxPool2D, Dropout, Permute, Flatten, Dense
from tensorflow.keras.models import Model


MODEL_USED = 'crepe'
LOG_DIR = os.path.join(_DATA_DIR, MODEL_USED, 'logs', TIMESTAMP)
if not os.path.exists(LOG_DIR):
    os.makedirs(LOG_DIR)
CHECKPOINT_DIR = os.path.join(_DATA_DIR, MODEL_USED, 'checkpoints', TIMESTAMP)
if not os.path.exists(CHECKPOINT_DIR):
    os.makedirs(CHECKPOINT_DIR)


def get_model_crepe():
    layers = [1, 2, 3, 4, 5, 6]
    filters = [n * 32 for n in [32, 4, 4, 4, 8, 16]]
    widths = [512, 64, 64, 64, 64, 64]
    strides = [(4, 1), (1, 1), (1, 1), (1, 1), (1, 1), (1, 1)]

    x = Input(shape=(1024,), name='input', dtype='float32')
    y = Reshape(target_shape=(1024, 1, 1), name='input-reshape')(x)

    for l, f, w, s in zip(layers, filters, widths, strides):
        y = Conv2D(f, (w, 1), strides=s, padding='same',
                   activation='relu', name="conv%d" % l)(y)
        y = BatchNormalization(name="conv%d-BN" % l)(y)
        y = MaxPool2D(pool_size=(2, 1), strides=None, padding='valid',
                      name="conv%d-maxpool" % l)(y)
        y = Dropout(0.25, name="conv%d-dropout" % l)(y)

    y = Permute((2, 1, 3), name="transpose")(y)
    y = Flatten(name="flatten")(y)
    y = Dense(1, activation='sigmoid', name="classifier")(y)

    model = Model(inputs=x, outputs=y)
    model.compile('adam', 'binary_crossentropy')

    return model


In [None]:
print(LOG_DIR)

# Training

## Load Data

In [None]:
dataset_training = get_training_data()
dataset_validation = get_validation_data()
dataset_test = get_test_data()

## Debug

In [None]:
print(next(iter(dataset_training)))

## Load Model

In [None]:
model = get_model_crepe()

In [None]:
model.summary()

## Fit Model

In [None]:
tensorboard_callback = tf.keras.callbacks.TensorBoard(LOG_DIR, histogram_freq=1)
checkpoint = tf.keras.callbacks.ModelCheckpoint(filepath=os.path.join(CHECKPOINT_DIR,'{epoch:02d}-{val_loss:.2f}.hdf5'))

callbacks = [checkpoint, tensorboard_callback]

history = model.fit(
    dataset_training,
    steps_per_epoch=STEPS_PER_EPOCH,
    epochs=EPOCHS,
    verbose = 1,
    validation_data = dataset_validation,
    validation_steps=VALIDATION_STEPS,
    callbacks = callbacks)
    
loss = model.evaluate(dataset_test, steps=1)

In [None]:
%tensorboard --logdir /content/drive/MyDrive/BA_2021/crepe/logs

In [None]:
prediction_sample = next(iter(dataset_test))[0][0]
print(prediction_sample)
prediction = model.predict(prediction_sample)
print(prediction)

# Testing

In [None]:
plt.plot(test_prediciton[0], 'g')
plt.plot(test_prediciton[1], 'r')
plt.show()