In [None]:
import matplotlib.pyplot as plt
import datetime
import numpy as np
import pandas as pd
import tensorflow as tf
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, precision_score, recall_score
from sklearn.model_selection import train_test_split
from tensorflow.keras import layers, losses
from tensorflow.keras.models import Model

In [None]:
# load file
filepath = "../Data/creditcard.csv"
df = pd.read_csv(filepath)

In [None]:
# identify X features, and y class
# Those data have been used froom creditCard notbook, EDA
X = df.drop(['Class'], axis=1)
y = df['Class']
X_train, X_test, y_train, y_test = train_test_split(X,y, test_size=0.2, random_state = 43)

In [None]:
# Normalize data, scale to [0,1]
train_data = StandardScaler().fit_transform(X_train)
test_data = StandardScaler().fit_transform(X_test)
train_labels = y_train
test_labels = y_test

In [None]:
train_labels = train_labels.astype(bool)
test_labels = test_labels.astype(bool)

normal_train_data = train_data[train_labels]
normal_test_data = test_data[test_labels]

anomalous_train_data = train_data[~train_labels]
anomalous_test_data = test_data[~test_labels]

In [None]:
# Auto encoder with keras dense layers
class AnomalyDetector(Model):
  def __init__(self):
    super(AnomalyDetector, self).__init__()
    self.encoder = tf.keras.Sequential([
      layers.Dense(32, activation="relu"),
      layers.Dense(16, activation="relu"),
      layers.Dense(8, activation="relu")])
    
    self.decoder = tf.keras.Sequential([
      layers.Dense(16, activation="relu"),
      layers.Dense(32, activation="relu"),
      layers.Dense(30, activation="sigmoid")])
    
  def call(self, x):
    encoded = self.encoder(x)
    decoded = self.decoder(encoded)
    return decoded

autoencoder = AnomalyDetector()

In [None]:
# second method to keras sequential model build
encoder = tf.keras.models.Sequential([layers.Conv2D(keras.layers.Reshape([30,1,1]) ,input_shape=[1,30]),
    layers.Conv2D(32,kernel_size=(3,3), padding='same', activation='relu'),
      layers.Dense(32, activation="relu"),
      layers.Dense(16, activation="relu"),
      layers.Dense(8, activation="relu")])

decoder = tf.keras.models.Sequential([
      layers.Dense(16, activation="relu"),
      layers.Dense(32, activation="relu"),
      layers.Conv2DTanspose(32,kernel_size=(3,3), padding='same', activation='relu'),
      layers.Dense(30, activation="sigmoid")])

autoencoder2 = tf.keras.models.Sequential([encoder,decoder])

In [None]:
autoencoder.compile(loss='mae', optimizer='adam')
autoencoder2.compile(loss='mae', optimizer='adam')

In [None]:
# train the data
from gc import callbacks


log_dir = "logs/fit1/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1)
history = autoencoder.fit(normal_train_data, normal_train_data, 
          epochs=20, 
          batch_size=512,
          validation_data=(test_data, test_data),
          shuffle=True, callbacks=[tensorboard_callback])

In [None]:
log_dir = "logs/fit2/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
tensorboard_callback2 = tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1)
history2 = autoencoder2.fit(normal_train_data, normal_train_data, 
          epochs=20, 
          batch_size=512,
          validation_data=(test_data, test_data),
          shuffle=True, callbacks=[tensorboard_callback2])

In [None]:
print('Dense layer training result')
plt.plot(history.history["loss"], label="Training Loss")
plt.plot(history.history["val_loss"], label="Validation Loss")
plt.legend()
plt.show()

print('Conv2D layer training result')
plt.plot(history.history["loss"], label="Training Loss")
plt.plot(history.history["val_loss"], label="Validation Loss")
plt.legend()
plt.show()

In [None]:
# plot the reconstuct error vs normal 
reconstructions = autoencoder.predict(normal_train_data)
train_loss = tf.keras.losses.mae(reconstructions, normal_train_data)

reconstructions2 = autoencoder2.predict(normal_train_data)
train_loss2 = tf.keras.losses.mae(reconstructions2, normal_train_data)
plt.hist(train_loss[None,:], bins=50)
plt.xlabel("Train loss")
plt.ylabel("No of examples")
plt.show()

In [None]:
threshold = np.mean(train_loss) + np.std(train_loss)
print("Threshold: ", threshold)

threshold2 = np.mean(train_loss2) + np.std(train_loss2)
print("Threshold: ", threshold2)

In [None]:
def predict(model, data, threshold):
  reconstructions = model(data)
  loss = tf.keras.losses.mae(reconstructions, data)
  return tf.math.less(loss, threshold)

def print_stats(predictions, labels):
  print("Accuracy = {}".format(accuracy_score(labels, predictions)))
  print("Precision = {}".format(precision_score(labels, predictions)))
  print("Recall = {}".format(recall_score(labels, predictions)))

In [None]:
preds = predict(autoencoder, test_data, threshold)
print_stats(preds, test_labels)

preds2 = predict(autoencoder2, test_data, threshold)
print_stats(preds2, test_labels)

In [None]:
print("Dense layer only confusion matrix")
tf.math.confusion_matrix(
    test_labels,
    preds,
    num_classes=None,
    weights=None,
    dtype=tf.dtypes.int32,
    name=None
)

In [None]:
print("Conv2D layer only confusion matrix")
tf.math.confusion_matrix(
    test_labels,
    preds,
    num_classes=None,
    weights=None,
    dtype=tf.dtypes.int32,
    name=None
)

In [None]:
%load_ext tensorboard