In [None]:
import tensorflow_datasets as tfds
import tensorflow as tf

from tensorflow.keras.layers import (
    Input,
    Dense,
    Flatten,
    Reshape,
    Conv2D,
    MaxPooling2D,
    UpSampling2D,
)

from tensorflow.keras import Sequential
from tensorflow.keras.models import Model
from tensorflow.keras.losses import MeanSquaredError

import numpy as np
import pandas as pd
import matplotlib
import matplotlib.pyplot as plt

import warnings
warnings.filterwarnings('ignore')

In [None]:
print(f"Tensorflow datasets: {tfds.__version__}")
print(f"Tensorflow: {tf.__version__}")
print(f"Numpy: {np.__version__}")
print(f"Pandas: {pd.__version__}")
print(f"Matplotlib: {matplotlib.__version__}")

In [None]:
import pump
data_dir = "../dataset"

(train, test), info = tfds.load('pump', split=["train", "test"], data_dir=data_dir, with_info=True)

In [None]:
tfds.as_dataframe(train.take(1), info)

In [None]:
df = tfds.as_dataframe(train.take(5), info)
df.head()

In [None]:
A = tf.signal.linear_to_mel_weight_matrix(
    num_mel_bins=128, num_spectrogram_bins=256+1, sample_rate=16000, dtype=tf.float32
)

In [None]:
def mel(item):
    audio = tf.cast(item["audio"], tf.float32)
    audio = audio / 2**15

    stfts = tf.signal.stft(
            audio,
            frame_length=512,
            frame_step=256,
            pad_end=False,  # librosa test compatibility
        )
    mag_stfts = tf.abs(stfts)

    melgrams = tf.tensordot(
            tf.square(mag_stfts), A, axes=1
    )

    def _tf_log10(x):
        numerator = tf.math.log(x)
        denominator = tf.math.log(tf.constant(10, dtype=numerator.dtype))
        return numerator / denominator
        
    log_melgrams = _tf_log10(melgrams + 10e-6)
    return log_melgrams, log_melgrams

In [None]:
def mel_test(item):
    audio = tf.cast(item["audio"], tf.float32)
    audio = audio / 2**15

    stfts = tf.signal.stft(
            audio,
            frame_length=512,
            frame_step=256,
            pad_end=False,  # librosa test compatibility
        )
    mag_stfts = tf.abs(stfts)

    melgrams = tf.tensordot(
            tf.square(mag_stfts), A, axes=1
    )

    def _tf_log10(x):
        numerator = tf.math.log(x)
        denominator = tf.math.log(tf.constant(10, dtype=numerator.dtype))
        return numerator / denominator
        
    item["audio"] = _tf_log10(melgrams + 10e-6)
    return item

In [None]:
BATCH_SIZE = 128
SHUFFLE_BUFFER_SIZE = 32

audio_train = train.map(mel).batch(BATCH_SIZE)
label_train = train.map(lambda item: item["label"])

normal_test = test.filter(lambda item: item["label"]==0).map(mel_test).batch(BATCH_SIZE)
anomaly_test = test.filter(lambda item: item["label"]==1).map(mel_test).batch(BATCH_SIZE)

In [None]:
autoencoder = tf.keras.Sequential(
    [
        Reshape((624, 128, 1), input_shape=(624, 128)),
        Conv2D(filters=64, kernel_size=(2,2), activation='relu', padding='same'),
        MaxPooling2D(pool_size=(4,4)),
        Conv2D(filters=128, kernel_size=(3,3), activation='relu', padding='same'),
        MaxPooling2D(pool_size=(2,2)),
        Conv2D(filters=256, kernel_size=(3,3), activation='relu', padding='same'),
        Conv2D(filters=1, kernel_size=(3,3), activation='sigmoid', padding='same'),
        Conv2D(filters=128, kernel_size=(3,3), activation='relu', padding='same'),
        UpSampling2D(size=(2,2)),
        Conv2D(filters=4, kernel_size=(3,3), activation='relu', padding='same'),
        UpSampling2D(size=(4,4)),
        Conv2D(filters=1, kernel_size=(3,3), padding='same')
    ]
)

autoencoder.summary()

In [None]:
autoencoder.compile(optimizer='adam', loss=MeanSquaredError())

In [None]:
history = autoencoder.fit(train,
                epochs=10,
                shuffle=True)

In [None]:
reconstruction = autoencoder.predict(audio_train)
train2 = np.stack(audio_train.unbatch().map(lambda x, _: x), axis=1).T

train_loss = tf.keras.losses.mae(train2, reconstruction)

plt.hist(train_loss, bins=20)
plt.xlabel("Train loss")
plt.ylabel("No of examples")
plt.show()

In [None]:
df = pd.DataFrame({"error": train_loss.numpy()})
df.describe()

In [None]:
reconstruction = autoencoder.predict(normal_test.map(lambda item: item["audio"]))
test2 = np.stack(normal_test.unbatch().map(lambda item: item["audio"]), axis=1).T

test_loss = tf.keras.losses.mae(test2, reconstruction)

plt.hist(test_loss, bins=10)
plt.xlabel("Test loss")
plt.ylabel("No of examples")
plt.show()

In [None]:
df = pd.DataFrame({"error": test_loss.numpy()})
df.describe()

In [None]:
reconstruction = autoencoder.predict(anomaly_test.map(lambda item: item["audio"]))
test2 = np.stack(anomaly_test.unbatch().map(lambda item: item["audio"]), axis=1).T

test_loss = tf.keras.losses.mae(test2, reconstruction)

plt.hist(test_loss, bins=10)
plt.xlabel("Test loss")
plt.ylabel("No of examples")
plt.show()

In [None]:
df = pd.DataFrame({"error": test_loss.numpy()})
df.describe()