# **Credit Card Fraud**

## **Loading Libraries**

In [None]:
import tensorflow.compat.v2 as tf
import tensorflow_probability as tfp

tf.enable_v2_behavior()

print(tf.__version__)
print(tfp.__version__)

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, accuracy_score, roc_auc_score, roc_curve
from sklearn.preprocessing import MinMaxScaler
from sklearn.manifold import TSNE
import os
import numpy as np

tfk = tf.keras
tfkl = tf.keras.layers
tfpl = tfp.layers
tfd = tfp.distributions

import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, accuracy_score, roc_auc_score, roc_curve
from sklearn.preprocessing import MinMaxScaler
from sklearn.manifold import TSNE
import os

%matplotlib inline
np.random.seed(0)
tf.random.set_seed(0)

## **Loading Data**

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
file_path= '/content/drive/MyDrive/Data/creditcard.csv'

raw_data = pd.read_csv(file_path)
data, data_test = train_test_split(raw_data, test_size=0.25)

## **Exploratory Analysis**

In [None]:
data.loc[:,"Time"] = data["Time"].apply(lambda x : x / 3600 % 24)
data.loc[:,'Amount'] = np.log(data['Amount']+1)

data_test.loc[:,"Time"] = data_test["Time"].apply(lambda x : x / 3600 % 24)
data_test.loc[:,'Amount'] = np.log(data_test['Amount']+1)
# data = data.drop(['Amount'], axis = 1)
print(data.shape)
data.head()

In [None]:
import matplotlib.pyplot as plt
from sklearn.manifold import TSNE

def tsne_plot(X, Y, filename="tsne.png"):
    tsne = TSNE(n_components=2, random_state=0)
    X_2d = tsne.fit_transform(X)

    plt.figure(figsize=(6, 5))
    target_ids = range(len(set(Y)))
    colors = 'r', 'g'  # Adjust colors as needed
    for i, c, label in zip(target_ids, colors, set(Y)):
        plt.scatter(X_2d[Y == i, 0], X_2d[Y == i, 1], c=c, label=label)
    plt.legend()
    plt.savefig(filename)
    plt.show()



non_fraud = data[data['Class'] == 0].sample(1000)
fraud = data[data['Class'] == 1]

df = pd.concat([non_fraud, fraud]).sample(frac=1).reset_index(drop=True)
X = df.drop(['Class'], axis = 1).values
Y = df["Class"].values

tsne_plot(X, Y, "original.png")

## **Train a VAE**

We train a VAE with 100k in-sample non-fraud transactions. As this is for exploratory and illustration purpose, the hidden layer design was by trial and error.

The prior for the latent variables was set to be a random unit multivariate normal vector of the latent dimention. The latent dimension is set to 2 so that it give some intuitive illustrations as you will see soon.

The output of the encoder, the latent distribution parameters, was deliberated chosen to be multiviarate normal with non-zero covariance because I noticed it had subsequent impact on the separation of normal transactions from fraud transactions, suggesting that the covariance of fraud transactions may have patterns. As a result, there are 5 distribution parameters to be learnt (2 mean values + 3 covariance values from the lower triangle of the 2-buy-2 covariance matrix)

The output of the decoder, the data distribution parameters, follow feature-independent normal distributions. This choice is important. Most of the examples I could find online were applied to binary images such as the MNIST dataset where the output would follow independent bernoulli distributions. Here the data are real-valued and generally follow normal distributions, hence it only makes sense to model the output with normal distributions or alike. Another important implication of having the right distribution is that it will give the corresponding log probability loss during training. It would not make sense to train a real-valued normal distribution using binary cross entropy, for example.

In [None]:
from tensorflow.keras.utils import plot_model
tfd = tfp.distributions


def dense_layers(sizes):
    return tfk.Sequential([tfkl.Dense(size, activation=tf.nn.leaky_relu) for size in sizes])

original_dim = X.shape[1]
input_shape = X[0].shape
intermediary_dims = [20, 10, 8]
latent_dim = 2
batch_size = 128
max_epochs = 1000

# prior = tfd.Independent(tfd.Normal(loc=tf.zeros(latent_dim), scale=1),
#                         reinterpreted_batch_ndims=1)

prior = tfd.MultivariateNormalDiag(
        loc=tf.zeros([latent_dim]))
        #scale_identity_multiplier=1.0)

encoder = tfk.Sequential([
    tfkl.InputLayer(input_shape=input_shape, name='encoder_input'),
    dense_layers(intermediary_dims),
    tfkl.Dense(tfpl.MultivariateNormalTriL.params_size(latent_dim), activation=None),
    tfpl.MultivariateNormalTriL(latent_dim,
                           activity_regularizer=tfpl.KLDivergenceRegularizer(prior)),
], name='encoder')

encoder.summary()
plot_model(encoder, to_file='vae_mlp_encoder.png', show_shapes=True)

decoder = tfk.Sequential([
    tfkl.InputLayer(input_shape=[latent_dim]),
    dense_layers(reversed(intermediary_dims)),
    tfkl.Dense(tfpl.IndependentNormal.params_size(original_dim), activation=None),
    tfpl.IndependentNormal(original_dim),
], name='decoder')

decoder.summary()
plot_model(decoder, to_file='vae_mlp_decoder.png', show_shapes=True)

vae = tfk.Model(inputs=encoder.inputs,
                outputs=decoder(encoder.outputs[0]),
                name='vae_mlp')

negloglik = lambda x, rv_x: -rv_x.log_prob(x)

vae.compile(optimizer=tf.keras.optimizers.Nadam(),
            loss=negloglik)

vae.summary()
plot_model(vae,
           to_file='vae_mlp.png',
           show_shapes=True)

In [None]:
x = data.drop(["Class"], axis=1)
y = data["Class"].values

x_norm, x_fraud = x.values[y == 0], x.values[y == 1]

x_norm_sample = x_norm[np.random.randint(x_norm.shape[0], size=100000), :]
x_norm_train_sample, x_norm_val_sample = train_test_split(x_norm_sample, test_size=0.2)

In [None]:
tf_train = tf.data.Dataset.from_tensor_slices((x_norm_train_sample, x_norm_train_sample)).batch(batch_size).prefetch(tf.data.experimental.AUTOTUNE).shuffle(int(10e4))
tf_val = tf.data.Dataset.from_tensor_slices((x_norm_val_sample, x_norm_val_sample)).batch(batch_size).prefetch(tf.data.experimental.AUTOTUNE).shuffle(int(10e4))

In [None]:
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint

checkpointer = ModelCheckpoint(filepath='bestmodel.h5', verbose=0, save_best_only=True)
earlystopper = EarlyStopping(monitor='val_loss', mode='min', min_delta=0.005, patience=20, verbose=0, restore_best_weights=True)

hist = vae.fit(tf_train,
               epochs=max_epochs,
               shuffle=True,
               verbose=0,
               validation_data=tf_val,
               callbacks=[checkpointer, earlystopper])



**The training stops when the validation losses fail to decrease for 20 consecutive epochs.**

In [None]:

# Plot Keras training history
def plot_loss(hist):
    plt.plot(hist.history['loss'])
    plt.plot(hist.history['val_loss'])
    plt.title('model loss')
    plt.ylabel('loss')
    plt.xlabel('epoch')
    plt.legend(['train', 'test'], loc='upper left')
    plt.yscale('log',base=10)
    plt.show()

plot_loss(hist)


In [None]:
reconstruct_samples_n = 100

def reconstruction_log_prob(eval_samples, reconstruct_samples_n):
    encoder_out = encoder(eval_samples)
    encoder_samples = encoder_out.sample(reconstruct_samples_n)
    return np.mean(decoder(encoder_samples).log_prob(eval_samples), axis=0)

In [None]:
latent_x_mean = encoder(X).mean()
plt.scatter(latent_x_mean[:, 0], latent_x_mean[:, 1], c=Y, cmap='RdYlGn_r', s=2)
plt.title('latent means')
plt.ylabel('mean[1]')
plt.xlabel('mean[0]')
plt.show()

In [None]:
latent_x_std = encoder(X).stddev()
plt.scatter(latent_x_std[:, 0], latent_x_std[:, 1], c=Y, cmap='RdYlGn_r', s=2)
plt.title('latent standard deviations')
plt.ylabel('stddev[1]')
plt.xlabel('stddev[0]')
plt.show()

In [None]:
latent_x = encoder(X).sample()
plt.scatter(latent_x[:, 0], latent_x[:, 1], c=Y, cmap='RdYlGn_r', s=2)
plt.title('latent vector samples')
plt.ylabel('z[1]')
plt.xlabel('z[0]')
plt.show()

In [None]:
def dense_layers(sizes):
    return tfk.Sequential([tfkl.Dense(size, activation=tf.nn.leaky_relu) for size in sizes])

# Model parameters
original_dim = X.shape[1]
input_shape = X[0].shape
intermediary_dims = [20, 10, 8]
latent_dim = 2
batch_size = 128
max_epochs = 1000

# Prior distribution
prior = tfd.MultivariateNormalDiag(
    loc=tf.zeros([latent_dim]),
    scale_diag=tf.ones([latent_dim])
)

# Encoder model
encoder = tfk.Sequential([
    tfkl.InputLayer(input_shape=input_shape, name='encoder_input'),
    dense_layers(intermediary_dims),
    tfkl.Dense(tfpl.MultivariateNormalTriL.params_size(latent_dim), activation=None),
    tfpl.MultivariateNormalTriL(latent_dim, activity_regularizer=tfpl.KLDivergenceRegularizer(prior)),
], name='encoder')

encoder.summary()
plot_model(encoder, to_file='vae_mlp_encoder.png', show_shapes=True)

# Decoder model
decoder = tfk.Sequential([
    tfkl.InputLayer(input_shape=[latent_dim], name='decoder_input'),
    dense_layers(reversed(intermediary_dims)),
    tfkl.Dense(tfpl.IndependentNormal.params_size(original_dim), activation=None),
    tfpl.IndependentNormal(original_dim),
], name='decoder')

decoder.summary()
plot_model(decoder, to_file='vae_mlp_decoder.png', show_shapes=True)

# VAE model
vae = tfk.Model(inputs=encoder.inputs, outputs=decoder(encoder.outputs[0]), name='vae_mlp')

negloglik = lambda x, rv_x: -rv_x.log_prob(x)

vae.compile(optimizer=tf.keras.optimizers.Nadam(), loss=negloglik)

vae.summary()
plot_model(vae, to_file='vae_mlp.png', show_shapes=True)



In [None]:
def reconstruction_log_prob(eval_samples, reconstruct_samples_n):
    # Convert eval_samples to tf.float32 to match model dtype
    eval_samples = tf.convert_to_tensor(eval_samples, dtype=tf.float32)

    # Get encoder output
    encoder_out = encoder(eval_samples)

    # Sample from the encoder's distribution
    encoder_samples = encoder_out.sample(reconstruct_samples_n)

    # Reshape encoder_samples to match the decoder's expected input shape
    encoder_samples_reshaped = tf.reshape(encoder_samples, [-1, latent_dim])  # Flatten along batch and samples

    # Decode the reshaped samples
    decoder_out = decoder(encoder_samples_reshaped)

    # Reshape eval_samples to match decoder_out for log_prob calculation
    eval_samples_reshaped = tf.repeat(eval_samples, repeats=reconstruct_samples_n, axis=0) # Repeat each sample to match the number of reconstructed samples

    # Compute log probability for each sample individually, keeping the sample dimension
    log_prob = -decoder_out.log_prob(eval_samples_reshaped)

    # Calculate the mean log probability for each original sample across the reconstructed samples
    log_prob = tf.reduce_mean(tf.reshape(log_prob, [tf.shape(eval_samples)[0], reconstruct_samples_n]), axis=1)

    return log_prob.numpy()  # Convert TensorFlow tensor to NumPy array

# Assuming `X` and `reconstruct_samples_n` are defined
reconstruct_samples_n = 100  # Example value, adjust as needed
x_log_prob = reconstruction_log_prob(X, reconstruct_samples_n)


In [None]:
[x_log_prob[Y == 0][0:10], x_log_prob[Y == 1][0:10]]

In [None]:

# Plotting the histogram
plt.hist([x_log_prob[Y == 0][0:10], x_log_prob[Y == 1][0:10]], bins=60, label=['Class 0', 'Class 1'])
plt.title('Reconstruction Log Probability')
plt.ylabel('Frequency')
plt.xlabel("log p(x|x')")
plt.legend()
plt.show()

Now, let's take the negative reconstruction log probability, and draw a ROC curve across the range to see how it would perform if we were to build a threshold-based fraud detector on it.

In [None]:
fpr, tpr, thresh = roc_curve(Y, x_log_prob)
auc = roc_auc_score(Y, x_log_prob)

plt.plot(fpr,tpr,label="linear in-sample, auc="+str(auc))
plt.title('VAE roc curve - training')
plt.ylabel('True Positive Rate')
plt.xlabel("False Positive Rate")
plt.legend(loc='best')
plt.show()

## **VAE vs SVM**

In [None]:
from sklearn import svm
clf = svm.SVC(gamma='scale')
clf.fit(X, Y)
auc = roc_auc_score(Y, clf.predict(X))

plt.plot(fpr,tpr,label="linear in-sample, auc="+str(auc))
plt.title('SVM roc curve - training')
plt.ylabel('True Positive Rate')
plt.xlabel("False Positive Rate")
plt.legend(loc='best')
plt.show()

## **Evaluation on Test**

In [None]:
x_test_log_prob = reconstruction_log_prob(data_test.drop(['Class'], axis = 1).values, reconstruct_samples_n)
test_y = data_test["Class"].values

fpr, tpr, thresh = roc_curve(test_y, x_test_log_prob)
auc = roc_auc_score(test_y, x_test_log_prob)

plt.plot(fpr,tpr,label="linear in-sample, auc="+str(auc))
plt.title('VAE roc curve - test')
plt.ylabel('True Positive Rate')
plt.xlabel("False Positive Rate")
plt.legend(loc='best')
plt.show()

In [None]:
auc = roc_auc_score(test_y, clf.predict(data_test.drop(['Class'], axis = 1).values))

plt.plot(fpr,tpr,label="linear in-sample, auc="+str(auc))
plt.title('SVM roc curve - test')
plt.ylabel('True Positive Rate')
plt.xlabel("False Positive Rate")
plt.legend(loc='best')
plt.show()

- The VAE hidden layer design has room for improvement.
- Hyperparameter tuning can be added.