In [None]:
!pip install tensorflow pandas numpy sklearn matplotlib

In [None]:
import tensorflow as tf
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score

In [None]:
ecg_data = pd.read_csv('http://storage.googleapis.com/download.tensorflow.org/data/ecg.csv', header = None)
ecg_data.head()

In [None]:
data = ecg_data.values

features = data[:, 0:-1]
labels = data[:, -1]

features_train, features_test, labels_train, labels_test = train_test_split(features, labels, test_size = 0.2)

In [None]:
value_min = tf.reduce_min(features_train)
value_max = tf.reduce_max(features_train)
value_dif = value_max - value_min

features_train = (features_train - value_min) / value_dif
features_test = (features_test - value_min) / value_dif

features_train = tf.cast(features_train, tf.float32)
features_test = tf.cast(features_test, tf.float32)

In [None]:
labels_train = labels_train.astype(bool)
labels_test = labels_test.astype(bool)

features_train_normal = features_train[labels_train]
features_test_normal = features_test[labels_test]

features_train_anomalous = features_train[~labels_train]
features_test_anomalous = features_test[~labels_test]

In [None]:
plt.grid()
plt.plot(np.arange(140), features_train_normal[0])
plt.title('A Normal ECG')
plt.show()

In [None]:
plt.grid()
plt.plot(np.arange(140), features_train_anomalous[0])
plt.title('An Anomalous ECG')
plt.show()

In [None]:
class AnomalyDetector(tf.keras.models.Model):

    def __init__(self):

        super(AnomalyDetector, self).__init__()

        self.encoder = tf.keras.Sequential([
            tf.keras.layers.Dense(32, activation = 'relu'),
            tf.keras.layers.Dense(16, activation = 'relu'),
            tf.keras.layers.Dense(8, activation = 'relu')
        ])

        self.decoder = tf.keras.Sequential([
            tf.keras.layers.Dense(16, activation = 'relu'),
            tf.keras.layers.Dense(32, activation = 'relu'),
            tf.keras.layers.Dense(140, activation = 'sigmoid'),
        ])

    def call(self, data):

        encoded = self.encoder(data)
        decoded = self.decoder(encoded)

        return decoded

In [None]:
anomalyDetector = AnomalyDetector()
anomalyDetector.compile(loss = 'mae', optimizer = 'adam')

In [None]:
history = anomalyDetector.fit(
    features_train_normal,
    features_train_normal,
    epochs = 20,
    batch_size = 512,
    validation_data = (features_test, features_test),
    shuffle = True
)

In [None]:
plt.plot(history.history['loss'], label = 'Training Loss')
plt.plot(history.history['val_loss'], label = 'Validation Loss')
plt.legend()

In [None]:
encoded_data = anomalyDetector.encoder(features_test_normal).numpy()
decoded_data = anomalyDetector.decoder(encoded_data).numpy()

plt.plot(features_test_normal[0], 'b')
plt.plot(decoded_data[0], 'r')
plt.fill_between(np.arange(140), decoded_data[0], features_test_normal[0], color = 'lightcoral')
plt.legend(labels = ['Input', 'Reconstruction', 'Error'])
plt.show()

In [None]:
encoded_data = anomalyDetector.encoder(features_test_anomalous).numpy()
decoded_data = anomalyDetector.decoder(encoded_data).numpy()

plt.plot(features_test_anomalous[0], 'b')
plt.plot(decoded_data[0], 'r')
plt.fill_between(np.arange(140), decoded_data[0], features_test_anomalous[0], color = 'lightcoral')
plt.legend(labels = ['Input', 'Reconstruction', 'Error'])
plt.show()

In [None]:
reconstructions = anomalyDetector.predict(features_train_normal)
train_loss = tf.keras.losses.mae(reconstructions, features_train_normal)

plt.hist(train_loss[None, :], bins = 50)
plt.xlabel('Train loss')
plt.ylabel('No of examples')
plt.show()

In [None]:
threshold = np.mean(train_loss) + np.std(train_loss)
print('Threshold: ', threshold)

In [None]:
reconstructions = anomalyDetector.predict(features_test_anomalous)
test_loss = tf.keras.losses.mae(reconstructions, features_test_anomalous)

plt.hist(test_loss[None, :], bins = 50)
plt.xlabel('Test loss')
plt.ylabel('No of examples')
plt.show()

In [None]:
def predict(model, data, threshold):

    reconstructions = model(data)
    loss = tf.keras.losses.mae(reconstructions, data)

    return tf.math.less(loss, threshold)

def print_stats(predictions, labels):

    print('Accuracy: {}'.format(accuracy_score(labels, predictions)))
    print('Precision: {}'.format(precision_score(labels, predictions)))
    print('Recall: {}'.format(recall_score(labels, predictions)))

In [None]:
predictions = predict(anomalyDetector, features_test, threshold)
print_stats(predictions, labels_test)