In [1]:
import numpy as np
import pandas as pd
import jax.numpy as jnp
import jax
import matplotlib.pyplot as plt

from utils.oversampling import fit_resample
from utils.data_processer import *

In [2]:
data = pd.read_csv("../creditcard_2021.csv")
print(f"Number of samples: {len(data)}")
print(f"Number of fraudolent transaction: {(data['Class'] == 1).sum()}")
print(f"Ratio of fraudolent transaction: {data['Class'].mean()}")

Number of samples: 284807
Number of fraudolent transaction: 492
Ratio of fraudolent transaction: 0.001727485630620034


# Artificial Neural Network
## Definition

In [3]:
class ANN:
    def __init__(self, layers_size=None, act_func=jnp.tanh, out_act_fun = jax.nn.sigmoid):
        self.layers_size = layers_size
        self.act_func = act_func 
        self.out_act_fun = out_act_fun

    def initialize_parameters(self, layers_size=None):
        """
            Returns the parameters of the artificial neural network given the number of
            neurons of its layers, namely it sets the matrix of weights and the bias
            vector for each layer, initialized randomly
          
            Parameters:
            layers_size: list - ordered sizes of the layers of the artificial neural network

            Returns:
            params: list - parameters of the artificial neural network, namely weights and biases

            Raises:
            exception: if layers_size is not provided
        """

        if layers_size is None:
            raise Exception("Size of layers not provided")
        
        layers_size = jnp.array(layers_size)
        
        np.random.seed(0) # For reproducibility
        self.layers_size = layers_size
        params = list()
        
        for i in range(len(self.layers_size) - 1):
            W = np.random.randn(self.layers_size[i+1], self.layers_size[i])
            b = np.zeros((self.layers_size[i+1], 1))
            params.append(W)
            params.append(b)
        
        return params
    
    def MSW(self, params):
        """
        Computes the sum of the squared values of the weights of the artificial neural
        network
    
        Parameters:
        params: list - parameters of the artificial neural network, namely weights and biases
        
        Returns:
        float - sum of the squared values of the weights of the artificial neural network
        """
        
        # Extract weights
        weights = params[::2]
        
        # Calculate MSW
        partial_sum = 0.0
        n_weights = 0
        for W in weights:
            partial_sum = partial_sum + jnp.sum(W * W)
            n_weights = n_weights + W.size
            
        return partial_sum / n_weights
    
    # Metrics
    def confusion_matrix(self, true_labels, pred_labels):
        """
            Computes the confusion matrix
        
            Parameters:
            true_labels: ndarray - correct values of the samples' class'
            pred_labels: ndarray - predicted values of the samples' class'
    
            Returns:
            TP: float - true positives - attacks classified accurately as attacks
            TN: float - true negatives - normal transactions accurately classified as normal
            FP: float - false positives - normal traffic incorrectly classified as attacks
            FN: float - false negatives - attacks incorrectly classified as normal
        """
    
        TP = np.sum(np.logical_and(pred_labels == 1., true_labels == 1.))
        TN = np.sum(np.logical_and(pred_labels == 0., true_labels == 0.)) 
        FP = np.sum(np.logical_and(pred_labels == 1., true_labels == 0.))
        FN = np.sum(np.logical_and(pred_labels == 0., true_labels == 1.))
        
        return TP, TN, FP, FN
    
    def accuracy(self, true_labels, pred_labels):
        """
            Computes the accuracy of the predictions
          
            Parameters:
            true_labels: ndarray - correct values of the samples' class'
            pred_labels: ndarray - predicted values of the samples' class'
          
            Returns:
            float - accuracy of the artificial neural network, namely the number of samples
                    correctly classified divided by the total number of samples
        """
        TP, TN, _, _ = self.confusion_matrix(true_labels, pred_labels)
        AC = ((TN + TP) / len(pred_labels)) * 100 # accuracy
        return round(float(AC), 2)
    
    def recall(self, true_labels, pred_labels):
        """
            Computes the recall (or sensitivity) of the predictions
          
            Parameters:
            true_labels: ndarray - correct values of the samples' class'
            pred_labels: ndarray - predicted values of the samples' class'
          
            Returns:
            float - recall of the artificial neural network, 
                    namely the percentage of positive predictions (true positive rate),
                    out of the total positive
        """
        TP, _, _, FN = self.confusion_matrix(true_labels, pred_labels)
        RC = (TP / (TP + FN)) * 100 # recall
        return round(float(RC), 2)
    
    def precision(self, true_labels, pred_labels):
        """
            Computes the precision of the predictions
          
            Parameters:
            true_labels: ndarray - correct values of the samples' class'
            pred_labels: ndarray - predicted values of the samples' class'
          
            Returns:
            float - precision of the artificial neural network, namely the percentage of truly positive,
                    out of all positive predicted    
        """
        TP, _, FP, _ = self.confusion_matrix(true_labels, pred_labels)
        PR = (TP / (TP + FP)) * 100 # precision
        return round(float(PR), 2)
    
    def f1_score(self, true_labels, pred_labels):
        """
            Computes the F1 Score of the predictions
          
            Parameters:
            true_labels: ndarray - correct values of the samples' class'
            pred_labels: ndarray - predicted values of the samples' class'
          
            Returns:
            float - f1 score of the artificial neural network, namely the harmonic mean of precision and recall. 
                    It takes both false positive and false negatives into account
        """
        RC = self.recall(true_labels, pred_labels)
        PR = self.precision(true_labels, pred_labels)
        F1 = 2 * PR * RC / (PR + RC) # f1 score
        return round(float(F1), 2)
    
    def metrics(self, true_labels, pred_labels, metrics_df=None, dataset_label=''):
        """
            Computes and print metrics TP, TN, FP, FN, AC, RC, PC, F1
        
            Parameters:
            predictions: ndarray - predictions of samples obtained with a model
            true_labels: ndarray - true labels of the samples
            metrics_df: DataFrame - DataFrame to which the computed statistics have to be put
            dataset_label: str - label identifying the belonging of the statistics to its dataset
        
            Returns:
            DataFrame - DataFrame containing the statistics contained in the parameter metrics_df
                        plus the statistics computed on the new predictions
        """
        
        TP, TN, FP, FN = self.confusion_matrix(true_labels, pred_labels) 
        
        AC = self.accuracy(true_labels, pred_labels)
        RC = self.recall(true_labels, pred_labels)
        PR = self.precision(true_labels, pred_labels)
        F1 = self.f1_score(true_labels, pred_labels)
        
        if metrics_df is None:
            columns = ['Set of features', 'TP', 'TN', 'FP', 'FN', 'accuracy', 'recall', 'precision', 'F1-score']
            metrics_df = pd.DataFrame([[dataset_label, TP, TN, FP, FN, AC, RC, PR, F1]], columns=columns)
        else:
            columns = ['Set of features', 'TP', 'TN', 'FP', 'FN', 'accuracy', 'recall', 'precision', 'F1-score']
            metrics_df = pd.concat([metrics_df, pd.DataFrame([[dataset_label, TP, TN, FP, FN, AC, RC, PR, F1]], columns=columns)], ignore_index=True)
          
        return metrics_df
    
    # Loss functions
    def cross_entropy(self):
        @jax.jit
        def callable(x=None, y=None, params=None):
            """
                Computes the Cross Entropy Cost function
                
                Parameters:
                x: ndarray - input of the artificial neural network
                y: ndarray - correct value of the output, one-hot representation
                
                Returns:
                float - Cross Entropy Cost between the predictions of the artificial neural network and the correct values
                
                Raises:
                exception: if x is not provided
                exception: if y is not provided
                exception: if params are not provided
            """
            
            if x is None:
                raise Exception("x is not provided")
            if y is None:
                raise Exception("y is not provided")
            if params is None:
                raise Exception("params are not provided")
        
            y_pred = self.predict(x, params)
            return -jnp.mean(y * jnp.log(y_pred) + (1 - y) * jnp.log(1 - y_pred))
        return callable
    
    def mean_squared_error(self):
        @jax.jit
        def callable(x, y, params):
            """
                Computes the Mean Squared Error
                
                Parameters:
                x: ndarray - input of the artificial neural network
                y: ndarray - correct value of the output, one-hot representation
                
                Returns:
                float - Mean Squared Error between the predictions of the artificial neural network and the correct values
                
                Raises:
                exception: if x is not provided
                exception: if y is not provided
                exception: if params are not provided
            """
            
            if x is None:
                raise Exception("x is not provided")
            if y is None:
                raise Exception("y is not provided")
            if params is None:
                raise Exception("params are not provided")
            
            y_pred = self.predict(x, params)
            return jnp.mean((y_pred - y) ** 2)
        return callable
    
    def regularized_loss(self, loss_function, penalization):
        @jax.jit
        def callable(x=None, y=None, params=None):
            """
                Computes the loss function applying regularization to the given loss function with penalization 
            
                Parameters:
                x: ndarray - input of the artificial neural network
                y: ndarray - correct value of the output
                params: list - parameters of the artificial neural network, namely weights and biases
                penalization: float - weight to which the MSW is multiplied and 
                                    that makes possible to modify the impact of the regularization term
                Returns:
                float - loss function value between the predictions of the artificial neural network
                        and the correct values with regularization term
              
                Raises:
                exception: if x is not provided
                exception: if y is not provided
                exception: if params are not provided
            """
                
            if x is None:
                raise Exception("x is not provided")
            if y is None:
                raise Exception("y is not provided")
            if params is None:
                raise Exception("params are not provided")
            
            return loss_function(x, y, params) + penalization/(2 * x.shape[0]) * self.MSW(params)
        return callable
    
    # Optimisation algorithms
    def SGD(
            self, 
            loss_function, 
            epochs=1000, 
            batch_size=128, 
            learning_rate_min=1e-3, 
            learning_rate_max=1e-1, 
            learning_rate_decay=1000,
    ):
        """
           Trains the artificial neural network with Stochastic Gradient Descent method using mini-batches and
           learning rate decay
        
            Parameters:
            loss_function: callable - loss function that it used in order to evaluate the cost 
                                        between the predictions and the correct values
            epochs: int - number of epochs to perform
            batch_size: int, optional - size of the batches to be used for computing the gradient
            learning_rate_min: float - minimum learning rate used in the training phase
            learning_rate_max: float - maximum learning rate used in the training phase
            learning_rate_decay: float - learning rate decay used in the training phase
        
            Returns:
            params: list - trained parameters of the artificial neural network, 
                            namely weights and biases optimized for fitting the training set
            history: list - history of the loss function optimisation
        """
        def callable(x_train, y_train, params):
            # Number of samples
            num_samples = x_train.shape[0]
        
            # Loss and it's gradient functions
            loss = jax.jit(loss_function)
            grad_loss = jax.jit(jax.grad(loss_function, argnums=2))
        
            # History
            history = list()
            history.append(loss(x_train, y_train, params))
            
            for epoch in range(epochs):
                # Get learning rate
                learning_rate = max(learning_rate_min, learning_rate_max * (1 - epoch/learning_rate_decay))
        
                # Select batch_size indices randomly
                idxs = np.random.choice(num_samples, batch_size)
                
                # Calculate gradient
                grad_val = grad_loss(x_train[idxs,:], y_train[idxs,:], params)
        
                # Update params
                for i in range(len(params)):
                    params[i] = params[i] - learning_rate * grad_val[i]
                
                # Update history
                history.append(loss(x_train, y_train, params))
            return params, history
        return callable
    
    def SGD_momentum(
            self, 
            loss_function, 
            epochs=1000, 
            batch_size=128, 
            learning_rate_min=1e-3, 
            learning_rate_max=1e-1, 
            learning_rate_decay=1000,
            momentum=0.9,
    ):
        """
            Trains the artificial neural network with Stochastic Gradient Descent method with Momentum 
            using mini-batches and learning rate decay
        
            Parameters:
            loss_function: callable - loss function that it used in order to evaluate the cost 
                                    between the predictions and the correct values
            epochs: int - number of epochs to perform
            batch_size: int, optional - size of the batches to be used for computing the gradient
            learning_rate_min: float - minimum learning rate used in the training phase
            learning_rate_max: float - maximum learning rate used in the training phase
            learning_rate_decay: float - learning rate decay used in the training phase
            momentum: float - momentum used in the training phase
        
            Returns:
            params: list - trained parameters of the artificial neural network, 
                            namely weights and biases optimized for fitting the training set
            history: list - history of the loss function optimisation
        """
        def callable(x_train, y_train, params):
            # Number of samples
            num_samples = x_train.shape[0]
        
            # Loss and it's gradient functions
            loss = jax.jit(loss_function)
            grad_loss = jax.jit(jax.grad(loss_function, argnums=2))
            
            # History
            history = list()
            history.append(loss(x_train, y_train, params))
            
            # Initialize velocity
            velocity = list()
            for i in range(len(params)):
                velocity.append(np.zeros_like(params[i]))
                
            for epoch in range(epochs):
                # Get learning rate
                learning_rate = max(learning_rate_min, learning_rate_max * (1 - epoch/learning_rate_decay))
        
                # Select batch_size indices randomly
                idxs = np.random.choice(num_samples, batch_size)
                
                # Calculate gradient
                grad_val = grad_loss(x_train[idxs,:], y_train[idxs,:], params)
                    
                for i in range(len(params)):
                    # Compute velocity[i]
                    velocity[i] = momentum * velocity[i] - learning_rate * grad_val[i]

                    # Update params[i]
                    params[i] = params[i] + velocity[i]
                
                # Update history
                history.append(loss(x_train, y_train, params))
            return params, history
        return callable
    
    def NAG(
            self, 
            loss_function, 
            epochs=1000, 
            batch_size=128, 
            learning_rate_min=1e-3, 
            learning_rate_max=1e-1, 
            learning_rate_decay=1000,
            momentum=0.9,
    ):
        def callable(x_train, y_train, params):
            """
                Trains the artificial neural network with Nesterov Accelerated Gradient method
            
                Parameters:
                loss_function: callable - loss function that it used in order to evaluate the cost 
                                        between the predictions and the correct values
                epochs: int - number of epochs to perform
                batch_size: int, optional - size of the batches to be used for computing the gradient
                learning_rate_min: float - minimum learning rate used in the training phase
                learning_rate_max: float - maximum learning rate used in the training phase
                learning_rate_decay: float - learning rate decay used in the training phase
                momentum: float - momentum used in the training phase
            
                Returns:
                params: list - trained parameters of the artificial neural network, 
                            namely weights and biases optimized for fitting the training set
                history: list - history of the loss function optimisation
            """
    
            # Number of samples
            num_samples = x_train.shape[0]
        
            # Loss and it's gradient functions
            loss = jax.jit(loss_function)
            grad_loss = jax.jit(jax.grad(loss_function, argnums=2))
            
            # History
            history = list()
            history.append(loss(x_train, y_train, params))
            
            # Initialize velocity
            velocity = list()
            for i in range(len(params)):
                velocity.append(np.zeros_like(params[i]))
                
            for epoch in range(epochs):
                # Get learning rate
                learning_rate = max(learning_rate_min, learning_rate_max * (1 - epoch/learning_rate_decay))
        
                # Select batch_size indices randomly
                idxs = np.random.choice(num_samples, batch_size)
                
                
                # Calculate gradient: 
                # here it's necessary to calculate the arguments that will substitute 'params' on gradient evaluation
                grad_args = list()
                for i in range(len(params)):
                    grad_args.append(np.zeros_like(params[i]))
                for i in range(len(params)):
                    grad_args[i] = params[i] - momentum * velocity[i]
                
                grad_val = grad_loss(x_train[idxs,:], y_train[idxs,:], grad_args)
                    
                for i in range(len(params)):
                    # Compute velocity[i]
                    velocity[i] = momentum * velocity[i] + learning_rate * grad_val[i]

                    # Update params[i]
                    params[i] = params[i] - velocity[i]
                
                # Update history
                history.append(loss(x_train, y_train, params))
            return params, history
        return callable
    
    def RMSprop(
            self, 
            loss_function, 
            epochs=1000, 
            batch_size=128, 
            learning_rate=0.1,
            decay_rate = 0.9,
            epsilon = 1e-8
    ):
        """
            Trains the artificial neural network with Root Mean Square Propagation method
        
            Parameters:
            loss_function: callable - loss function that it used in order to evaluate the cost 
                                    between the predictions and the correct values
            epochs: int - number of epochs to perform
            batch_size: int, optional - size of the batches to be used for computing the gradient
            learning_rate: float - learning rate used in the training phase
            decay_rate: float - learning rate decay
            epsilon: float - small constant to prevent division by zero (~1e-8)
        
            Returns:
            params: list - trained parameters of the artificial neural network, 
                            namely weights and biases optimized for fitting the training set
            history: list - history of the loss function optimisation
        """
        def callable(x_train, y_train, params):
            # Number of samples
            num_samples = x_train.shape[0]
        
            # Loss and it's gradient functions
            loss = jax.jit(loss_function)
            grad_loss = jax.jit(jax.grad(loss_function, argnums=2))
            
            # History
            history = list()
            history.append(loss(x_train, y_train, params))
            
            # Initialize cumulated square gradient
            cumulated_square_grad = list()
            for i in range(len(params)):
                cumulated_square_grad.append(np.zeros_like(params[i]))
                
            for epoch in range(epochs):        
                # Select batch_size indices randomly
                idxs = np.random.choice(num_samples, batch_size)
                
                # Calculate gradient
                grad_val = grad_loss(x_train[idxs,:], y_train[idxs,:], params)
                    
                for i in range(len(params)):
                    # Update cumulated square gradient
                    cumulated_square_grad[i] = decay_rate * cumulated_square_grad[i] + (1 - decay_rate) * grad_val[i] * grad_val[i]
            
                    # Update params[i]
                    params[i] = params[i] - learning_rate * grad_val[i] / (epsilon + np.sqrt(cumulated_square_grad[i]))
                
                # Update history
                history.append(loss(x_train, y_train, params))
            return params, history
        return callable
    
    def train(self, x_train, y_train, params, optimizer):
        """
            Trains the artificial neural network using one the optimization algorithms
        
            Parameters:
            x_train: ndarray - training set of the dataset to fit
            y_train: ndarray - training set's sample's labels
            params: ndarray - parameters of the artificial neural network, namely weights and biases
            optimizer: callable - optimization algorithm to be used in the training phase

            Returns:
            ndarray - updated weights and bias
            ndarray - history of the loss function
        """
        
        return optimizer(x_train, y_train, params)
    
    def predict(self, x=None, params=None):
        """
            Computes the value of the output of the artificial neural network given an input
        
            Parameters:
            x: ndarray - input of the artificial neural network
            params: ndarray - parameters of the artificial neural network, namely weights and biases
        
            Returns: 
            ndarray - output value of the artificial neural network
                
            Raises:
            Exception - if x is not provided
            Exception - if params were not initialized
        """
        
        if x is None:
            raise Exception("x is not provided")
        if params is None:
            raise Exception("Parameters were not initialized")
        
        # Number of ANN layers
        num_layers = int(len(self.layers_size)) + 1
        
        # Algorithm
        layer = x.T
        weights = params[0::2]
        biases = params[1::2]
        for i in range(num_layers - 2):
            # Update layer values
            layer = weights[i] @ layer + biases[i]
            
            # Apply activation function
            layer = self.act_func(layer)
                  
        # On the output layer it is applied the sigmoid function 
        # since the output is needed to be between 0 and 1
        layer = self.out_act_fun(layer)
        layer = layer.T
        
        return layer

# Datasets

In [4]:
datasets = list()

v1 = ['V1', 'V5', 'V7', 'V8', 'V11', 'V13', 'V14', 'V15', 'V16', 'V17', 'V18', 'V19', 'V20', 'V21', 'V22', 'V23', 'V24', 'Amount', 'Class']
datasets.append(data[v1])

v2 = ['V1', 'V6', 'V13', 'V16', 'V17', 'V22', 'V23', 'V28', 'Amount', 'Class']
datasets.append(data[v2])

v3 = ['V2', 'V11', 'V12', 'V13', 'V15', 'V16', 'V17', 'V18', 'V20', 'V21', 'V24', 'V26', 'Amount', 'Class']
datasets.append(data[v3])

v4 = ['V2', 'V7', 'V10', 'V13', 'V15', 'V17', 'V19', 'V28', 'Amount', 'Class']
datasets.append(data[v4])

v5 = ['Time', 'V1', 'V7', 'V8', 'V9', 'V11', 'V12', 'V14', 'V15', 'V22', 'V27', 'V28', 'Amount', 'Class']
datasets.append(data[v5])

v6 = data.columns
datasets.append(data[v6])

v7 = ['V2', 'V4', 'V5', 'V6', 'V11', 'V12', 'V13', 'V16', 'V17', 'V18', 'V20', 'V21', 'V22', 'V23', 'V25', 'V26', 'V28', 'Amount', 'Class']
datasets.append(data[v7])

In [5]:
# ANN with:
network = ANN(act_func=jnp.tanh, out_act_fun=jax.nn.sigmoid)

# Training and evaluation

In [6]:
metrics_train_df = None
metrics_test_df = None
for i, dataset in enumerate(datasets):
    # Get dataset (based on the feature vectors defined before)
    input = dataset.to_numpy()

    # Data splitting
    x_train, y_train, _, _, x_test, y_test = data_split(data_input=input, train_size=0.8)
    
    # SMOTE: oversampling
    n_samples = 6000
    x_minority = x_train[y_train[:, 0] == 1] # minority class samples (attacks)
    x_train_synthetic = fit_resample(x_minority, n_samples=n_samples) # generate synthetic data for training
    y_train_synthetic = np.ones((n_samples,1)) # generate other attack labels as well
    
    # Add synthetic data to the original one
    x_train_normalized = np.concatenate((x_train, x_train_synthetic), axis=0)
    y_train = np.concatenate((y_train, y_train_synthetic), axis=0)
    
    # Training set normalisation
    x_train_normalized, data_train_min, data_train_max = min_max(data=x_train_normalized)
    
    # Validation set normalisation
    # ...
    
    # Testing set normalisation
    x_test_normalized, _, _ = min_max(x_test, data_train_min, data_train_max)
    
    # Initialize weights and biases
    params = network.initialize_parameters([x_train_normalized.shape[1], 30, 20, 1])
    
    # Train ann
    updated_params, history = network.train(
        x_train = x_train_normalized, 
        y_train = y_train, 
        params = params,    
        optimizer = network.RMSprop(
            loss_function=network.regularized_loss(network.cross_entropy(), penalization=0.5),
            epochs=2000,
            batch_size=256,
            learning_rate=0.001,
            decay_rate=0.9,
            epsilon=1e-8,
        )
    )
    
    # Get training predicted labels
    train_pred_labels = network.predict(x_train_normalized, updated_params)
    train_pred_labels = train_pred_labels >= 0.5
    
    # Get validation predicted labels
    # ...
    
    # Get testing predicted labels
    test_pred_labels = network.predict(x_test_normalized, updated_params)
    test_pred_labels = test_pred_labels >= 0.5
    
    # Print metrics
    metrics_train_df = network.metrics(
        true_labels=y_train, 
        pred_labels=train_pred_labels, 
        metrics_df=metrics_train_df,
        dataset_label='v' + str(i+1) + ' training'
    )
    metrics_test_df = network.metrics(
        true_labels=y_test, 
        pred_labels=test_pred_labels, 
        metrics_df=metrics_test_df,
        dataset_label='v' + str(i+1) + ' testing'
    )

In [7]:
metrics_train_df

Unnamed: 0,Set of features,TP,TN,FP,FN,accuracy,recall,precision,F1-score
0,v1 training,4996,227317,137,1395,99.34,78.17,97.33,86.7
1,v2 training,4465,227382,72,1926,99.15,69.86,98.41,81.71
2,v3 training,4935,227348,106,1456,99.33,77.22,97.9,86.34
3,v4 training,4770,227384,70,1621,99.28,74.64,98.55,84.94
4,v5 training,5369,227285,169,1022,99.49,84.01,96.95,90.02
5,v6 training,5790,227211,243,601,99.64,90.6,95.97,93.21
6,v7 training,4928,227389,65,1463,99.35,77.11,98.7,86.58


In [8]:
metrics_test_df

Unnamed: 0,Set of features,TP,TN,FP,FN,accuracy,recall,precision,F1-score
0,v1 testing,79,56824,37,22,99.9,78.22,68.1,72.81
1,v2 testing,68,56843,18,33,99.91,67.33,79.07,72.73
2,v3 testing,78,56838,23,23,99.92,77.23,77.23,77.23
3,v4 testing,76,56844,17,25,99.93,75.25,81.72,78.35
4,v5 testing,86,56818,43,15,99.9,85.15,66.67,74.79
5,v6 testing,84,56795,66,17,99.85,83.17,56.0,66.93
6,v7 testing,75,56848,13,26,99.93,74.26,85.23,79.37
