In [None]:
%pip install numpy
%pip install pandas
%pip install matplotlib
%pip install itertools
%pip install sklearn

In [None]:
import pandas as pd
import numpy as np
from itertools import chain
from sklearn.neural_network import MLPRegressor
import matplotlib.pyplot as plt

In [None]:
# import filterwarnings
from warnings import filterwarnings

filterwarnings('ignore')

# Train Test Split Implementation

In [None]:
# take an np array x and return values at indices
def get_val_at_indices(x, indices):
    return x[indices]

def train_test_split(*arrays, test_size=0.25, shufffle=True, random_state=1):
    # get length of first array
    length = len(arrays[0])

    # split lengths
    len_test = int(np.ceil(length*test_size))
    len_train = length - len_test

    if shufffle:
        perm = np.random.RandomState(random_state).permutation(length)
        test_indices = perm[:len_test]
        train_indices = perm[len_test:]
    else:
        train_indices = np.arange(len_train)
        test_indices = np.arange(len_train, length)

    return list(chain.from_iterable((get_val_at_indices(x, train_indices), get_val_at_indices(x, test_indices)) for x in arrays))

# Custom Class Implementation

In [None]:
class CNN:
    def __init__(self, hidden_layer_sizes=(100,), learning_rate=0.1, epochs=200, batch_size=32, random_state=1, tol=1e-4, hidden_layer_activation='logistic', output_layer_activation='linear', optim_algo='SGD', loss_function='MSE', early_stopping=False):
        # learning rate should be casted to np.float64
        self.learning_rate = np.float64(learning_rate)
        self.epochs = epochs
        self.batch_size = batch_size
        self.random_state = random_state
        self.tol = tol
        self.hidden_layer_sizes = hidden_layer_sizes
        self.is_first_pass = True
        if hidden_layer_activation == 'logistic':
            self.hidden_layer_activation = self.sigmoid
            self.hidden_layer_activation_derivative = self.sigmoid_derivative
        elif hidden_layer_activation == 'ReLU':
            self.hidden_layer_activation = self.ReLU
            self.hidden_layer_activation_derivative = self.ReLU_derivative
        elif hidden_layer_activation == 'linear':
            self.hidden_layer_activation = self.linear
            self.hidden_layer_activation_derivative = self.linear_derivative
        else:
            # default to sigmoid
            self.hidden_layer_activation = self.sigmoid
            self.hidden_layer_activation_derivative = self.sigmoid_derivative

        if output_layer_activation == 'logistic':
            self.output_layer_activation = self.sigmoid
            self.output_layer_activation_derivative = self.sigmoid_derivative
        elif output_layer_activation == 'ReLU':
            self.output_layer_activation = self.ReLU
            self.output_layer_activation_derivative = self.ReLU_derivative
        elif output_layer_activation == 'linear':
            self.output_layer_activation = self.linear
            self.output_layer_activation_derivative = self.linear_derivative
        else:
            # default to sigmoid
            self.output_layer_activation = self.sigmoid
            self.output_layer_activation_derivative = self.sigmoid_derivative

        if optim_algo == 'SGD':
            self.optim_algo = self.msgd
    
        if loss_function == 'MSE':
            self.loss_function = self.MSE

        self.early_stopping = early_stopping

    # train test split to store the train and test data
    def train_test_split(self, *arrays, test_size=0.25, shufffle=True, random_state=1):
        # get length of first array
        length = len(arrays[0])

        # split lengths
        len_test = int(np.ceil(length*test_size))
        len_train = length - len_test

        if shufffle:
            perm = np.random.RandomState(random_state).permutation(length)
            test_indices = perm[:len_test]
            train_indices = perm[len_test:]
        else:
            train_indices = np.arange(len_train)
            test_indices = np.arange(len_train, length)

        self.X_train, self.X_test, self.y_train, self.y_test = list(chain.from_iterable((get_val_at_indices(x, train_indices), get_val_at_indices(x, test_indices)) for x in arrays))

    def sigmoid(self, X):
        return 1 / (1 + np.exp(-X))
    
    def sigmoid_derivative(self, X):
        return X * (1-X)
    
    def ReLU(self, X):
        return np.maximum(0, X)
    
    def ReLU_derivative(self, X):
        return 1 * (X > 0)
    
    def linear(self, X):
        return X
    
    def linear_derivative(self, X):
        return 1

    def load_data(self, X, y):
        mini_batches = []

        num_batches = int(np.ceil(X.shape[0] / self.batch_size))

        for i in range(num_batches):
            start = i * self.batch_size
            end = start + self.batch_size

            if end > X.shape[0]:
                end = X.shape[0]

            mini_batches.append((X[start:end], y[start:end]))

        return mini_batches

    # MSE loss
    def MSE(self, y, y_pred):
        return np.mean((y - y_pred)**2)

    def msgd(self, X, y):
        activations = [X] + [None]*(self.num_layers-1)
        deltas = [None] * (self.num_layers-1)

        mini_batches = self.load_data(X, y)

        train_losses = []
        test_losses = []

        for i in range(self.epochs):
            epoch_loss_train = 0
            for mini_batch in mini_batches:
                X_mini, y_mini = mini_batch

                activations[0] = X_mini

                self.forward(activations)

                epoch_loss += self.backward(activations, deltas, y_mini, len(X_mini)) * len(X_mini)

            if i%10 == 9:
                train_losses.append(epoch_loss / X.shape[0])
                y_pred = self.predict(self.X_test)
                test_losses.append(self.loss_function(self.y_test, y_pred) / self.X_test.shape[0])

            # If early stopping is enabled, stop if loss is less than tolerance
            if self.early_stopping:
                if epoch_loss < self.tol:
                    break

        self.train_losses = train_losses
        self.test_losses = test_losses

    def weight_initialization(self, layer_sizes):
        np.random.seed(self.random_state)

        self.weights = []
        self.biases = []

        for i in range(self.num_layers - 1):
            weight, bias = self.add_layer(layer_sizes[i], layer_sizes[i+1])

        self.weights.append(weight)
        self.biases.append(bias)
    
    def add_layer(self, input_size, output_size):
        weight = np.random.RandomState.uniform(-1, 1, (input_size, output_size))
        bias = np.random.RandomState.uniform(-1, 1, (1, output_size))

        weight = np.array(weight, dtype=np.float64)
        bias = np.array(bias, dtype=np.float64)

        return weight, bias

    def first_pass(self, y, layer_sizes):
        self.num_layers = len(layer_sizes)

        self.weight_initialization(layer_sizes)

    def forward(self, activations):
        activations[0] = self.hidden_layer_activation(activations[0])

        for i in range(self.num_layers-1):
            activations[i+1] = np.dot(activations[i], self.weights[i]) + self.biases[i]

            # if not output layer, apply activation function
            if i != self.num_layers-2:
                activations[i+1] = self.hidden_layer_activation(activations[i+1])
            else:
                activations[i+1] = self.output_layer_activation(activations[i+1])

        return activations

    def backward(self, activations, deltas, y, current_batch_size):
        loss = self.loss_function(y, activations[-1])

        deltas[self.num_layers-2] = (activations[-1] - y) * self.output_layer_activation_derivative(self.activations[-1])

        weights_gradient = np.dot(activations[self.num_layers-2].T, deltas[self.num_layers-2]) / current_batch_size
        biases_gradient = np.sum(deltas[self.num_layers-2], axis=0) / current_batch_size

        self.weights[self.num_layers-2] += self.learning_rate * weights_gradient
        self.biases[self.num_layers-2] += self.learning_rate * biases_gradient

        for i in range(self.num_layers-2, 0, -1):
            deltas[i-1] = np.dot(deltas[i], self.weights[i].T) * self.hidden_layer_activation_derivative(activations[i])

            weights_gradient = np.dot(activations[i-1].T, deltas[i-1]) / current_batch_size
            biases_gradient = np.sum(deltas[i-1], axis=0) / current_batch_size

            self.weights[i-1] += self.learning_rate * weights_gradient
            self.biases[i-1] += self.learning_rate * biases_gradient

        return loss

    def train(self, plot_test_losses=False):
        self.plot_test_losses = plot_test_losses

        X = self.X_train
        y = self.y_train
        # if y is 1D, convert to 2D
        if len(y.shape) == 1:
            y = y.reshape(-1, 1)

        if len(y_test.shape) == 1:
            y_test = y_test.reshape(-1, 1)

        # Initialize weights and biases if it's the first pass
        if self.is_first_pass:
            self.first_pass(y, [X.shape[1]] + list(self.hidden_layer_sizes) + [y.shape[1]])
            self.is_first_pass = False

        # Train using the optimization algorithm set while initializing the model
        self.optim_algo(X, y)

        if plot_test_losses:
            plt.plot(self.train_losses, label='Train Loss')
            plt.plot(self.test_losses, label='Test Loss')
            plt.xlabel('Epochs')
            plt.ylabel('Loss')
            plt.legend()
            plt.show()

    def predict(self, X):
        activation = X

        activation = self.hidden_layer_activation(activation) 
        
        for i in range(self.num_layers-1):
            activation = np.dot(activation, self.weights[i]) + self.biases[i]

            if i != self.num_layers-2:
                activation = self.hidden_layer_activation(activation)
            else:
                activation = self.output_layer_activation(activation)

        return activation
    
    def score(self, X, y):
        # Get the prediction of the model on the test data
        y_pred = self.predict(X)

        # The score is the loss function value between the true and predicted values
        return self.loss_function(y, y_pred) / X.shape[0]
    

# Main

## Reading data

In [None]:
data = pd.read_csv('./taxi.csv')

# print column names
print(data.columns)

## Pre Processing

In [None]:
# from the df remove first 2 columns
data = data.iloc[:, 2:]

# convert to numpy array
data = data.values

print(data.shape)

print(data[0])

# for each data point, the second column is a date time object of the format 'YYYY-MM-DD HH:MM:SS UTC', convert this to a unix timestamp 
data[:, 1] = pd.to_datetime(data[:, 1]).astype('int64') / 10**9

print(data[0])

In [None]:
# fares is the first column values
y = data[:, 0]

# features are the rest of the columns
X = data[:, 1:]

print(f"X: {X.shape}, y: {y.shape}")

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Reshape y_train and y_test to be 2D arrays so that np computations can be done without issues
y_train = y_train.reshape(-1, 1)
y_test = y_test.reshape(-1, 1)

print(f"X_train: {X_train.shape}, y_train: {y_train.shape}")

In [73]:
# smaller example of size 64
inputs = [[x for x in range(64)]]
outputs = [[x for x in range(64)]]

input_train, input_test, output_train, output_test = train_test_split(inputs, outputs, test_size=0.25, random_state=42)

print(f"input_train: {input_train.shape}, output_train: {output_train.shape}")

# Create a CNN model with one hidden layer of 3 neurons
model = CNN(hidden_layer_sizes=(3,), learning_rate=0.1, epochs=200, batch_size=32, random_state=1, tol=1e-4, hidden_layer_activation='logistic', output_layer_activation='linear', optim_algo='SGD', loss_function='MSE', early_stopping=False)

# Train the model on the smaller example
model.train(input_train, output_train)

# Get the score of the model on the test data
score = model.score(input_test, output_test)

print(f"Score: {score}")

input_train: (3, 2), output_train: (3, 1)


## Case 1

* No of hidden layers: 1
* No. of neurons in hidden layer: 32
* Activation function in the hidden layer: Sigmoid
* 1 neuron in the output layer.
* Activation function in the output layer: Linear
* Optimisation algorithm: Mini Batch Stochastic Gradient Descent (SGD)
* Loss function: Mean Squared Error (MSE)
* Learning rate: 0.01
* No. of epochs = 200

### Custom Class Implementation

In [None]:
CustomNN = CNN(
        hidden_layer_sizes=(32,), 
        hidden_layer_activation='logistic', 
        output_layer_activation='linear',
        optim_algo='SGD',
        loss_function='MSE',
        learning_rate=0.01,
        epochs=200,
        random_state=42,
    )

Training the model using X_train and y_train

In [None]:
CustomNN.train(X_train, y_train)

print(f"train score for the custom model: {CustomNN.score(X_train, y_train)}")

Testing the model using X_test

In [None]:
test_score = CustomNN.score(X_test, y_test)

print(f"test score for the custom model: {test_score}")

### Sklearn Implementation

In [None]:
MLP = MLPRegressor(
    hidden_layer_sizes=(32,),
    activation='logistic',
    solver='sgd',
    learning_rate_init=0.01,
    max_iter=200,
    random_state=42
)

Training the model using X_train and y_train

In [None]:
MLP.fit(X_train, y_train)

print(f"train score for the MLP model: {MLP.score(X_train, y_train)}")

Testing the model using X_test

In [None]:
MLP_test_score = MLP.score(X_test, y_test)

print(f"test score for the MLP model: {MLP_test_score}")

## Case 2
* No of hidden layers: 2
* No. of neurons in the 1st hidden layer: 64
* No. of neurons in the 2nd hidden layer: 32
* Activation function in both the hidden layers: ReLU
* 1 neuron in the output layer.
* Activation function in the output layer: Linear
* Optimisation algorithm: Mini Batch Stochastic Gradient Descent (SGD)
* Loss function: Mean Squared Error (MSE)
* Learning rate: 0.01
* No. of epochs = 200

### Custom Class Implementation

In [None]:
CustomNN = CNN(
                hidden_layer_sizes=(64, 32,),
                hidden_layer_activation='ReLU',
                output_layer_activation='linear',
                optim_algo='SGD',
                loss_function='MSE',
                learning_rate=0.01,
                epochs=200,
                random_state=42,
        )

Training the model using X_train and y_train

In [None]:
CustomNN.train(X_train, y_train)

print(f"train score for the custom model: {CustomNN.score(X_train, y_train)}")

Testing the model using X_test

In [None]:
CustomNN_test_score = CustomNN.score(X_test, y_test)

print(f"test score for the custom model: {CustomNN_test_score}")