In [None]:
import numpy as np
import math
import os
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib import cm
from matplotlib import ticker as ticker
import time
from typing import List, Tuple, Callable

In [None]:
# Import dependencies
from sklearn.linear_model import LinearRegression, RidgeCV, LassoCV
from sklearn.pipeline import make_pipeline, Pipeline
from sklearn.preprocessing import PolynomialFeatures, StandardScaler
from sklearn.model_selection import train_test_split, KFold, cross_val_score, cross_validate
from sklearn.metrics import mean_squared_error, r2_score, accuracy_score
from sklearn.utils import shuffle
import joblib
from sklearn.neural_network import MLPRegressor

In [None]:
def print_scores(scores):
        # Calculate MSE, RMSE, R2 values
    Best_CV_MSE = min(abs(scores['test_neg_mean_squared_error']))
    Best_CV_RMSE = np.sqrt(Best_CV_MSE)
    Best_CV_R2 = max(scores['test_r2'])
    Worst_CV_MSE = max(abs(scores['test_neg_mean_squared_error']))
    Worst_CV_RMSE = np.sqrt(Worst_CV_MSE)
    Worst_CV_R2 = min(scores['test_r2'])
    Ave_CV_MSE = np.average(abs(scores['test_neg_mean_squared_error']))
    Ave_CV_RMSE = np.sqrt(Ave_CV_MSE)
    Ave_CV_R2 = np.average(scores['test_r2'])

    # Print out cross validation history data
    print("~~~~ Cross Validation Results ~~~~")
    print("Best MSE CV: ", Best_CV_MSE)
    print("Best RMSE CV: ", Best_CV_RMSE)
    print("Best R2 CV: ", Best_CV_R2)
    print("Worst MSE CV: ", Worst_CV_MSE)
    print("Worst RMSE CV: ", Worst_CV_RMSE)
    print("Worst R2 CV: ", Worst_CV_R2)
    print('Average MSE CV: ', Ave_CV_MSE)
    print('Average RMSE CV: ', Ave_CV_RMSE)
    print("Average R2 CV: ", Ave_CV_R2)
    print("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")


def ANN_Train(X, y, test_size, train_size, k, epochs, hidden_nodes, hidden_layers, batch_size, learning_rate):
    hidden_layer_sizes = tuple(np.full(hidden_layers, hidden_nodes))
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size, train_size=train_size, random_state=42)
    model = Pipeline([('scaler', StandardScaler(with_mean=True)), ('ANN', MLPRegressor(hidden_layer_sizes=hidden_layer_sizes, activation='relu', solver='adam', 
                                                                                       batch_size=batch_size, max_iter=epochs, learning_rate_init=learning_rate))])

    print("Cross validating...")
    scores = cross_validate(model, X_train, y_train, cv=k, scoring=('r2', 'neg_mean_squared_error'), return_train_score=True)
    
    print_scores(scores)

    print("Fitting model...")

    # Fit model
    model.fit(X_train, y_train)

    # Output cross validation data
    CrossValData = [Best_CV_MSE, Best_CV_RMSE, Best_CV_R2, Worst_CV_MSE, Worst_CV_RMSE, Worst_CV_R2, Ave_CV_MSE, Ave_CV_RMSE, Ave_CV_R2]

    return model, X_test, y_test, CrossValData

def ANN_Test(model, X_test, y_test, DataEfficiencyToggle):

    print("Testing model...")

    # Make predictions
    y_pred = model.predict(X_test)

    # Calculate R2, RMSE, and MSE metrics for test data
    SparseDataTest_R2 = r2_score(y_test, y_pred)
    SparseDataTest_MSE = mean_squared_error(y_test, y_pred, squared=True)
    SparseDataTest_RMSE = np.sqrt(SparseDataTest_MSE)

    # Print model info after CV
    print("~~~~~~~~~ Final Model Structure Info ~~~~~~~~~")
    print("Number of Layers: ", model[1].n_layers_)
    print("Number of input features: ", model[1].n_features_in_)
    print("Number of outputs: ", model[1].n_outputs_)
    print("Number of iterations ran: ", model[1].n_iter_)
    print("~~~~~~~~~ Final Model Error Info ~~~~~~~~~")
    print("Test MSE: ", Test_MSE)
    print("Test RMSE: ", Test_RMSE)
    print("Test R2: ", Test_R2)
    print("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")

    # Plot ANN results.
    fig, ax = plt.subplots(1, 2)
    fig.suptitle("ANN Results")

    ax[0].plot(np.linspace(1, len(model[1].loss_curve_), len(model[1].loss_curve_)), model[1].loss_curve_)
    ax[0].set_xlabel("Epoch")
    ax[0].set_ylabel("Loss (Mean Squared Error)")
    ax[0].set_title('Epoch History, MSE: {:.2f}, RMSE: {:.2f}, R2: {:.2f}'.format(Test_MSE, Test_RMSE, Test_R2), fontsize=7)

    # Parity Plot
    ax[1].plot(y_test, y_pred, 'r*')
    ax[1].set_xlabel("y_test")
    ax[1].set_ylabel("y_pred")
    ax[1].set_title('Parity Plot, MSE: {:.2f}, RMSE: {:.2f}, R2: {:.2f}'.format(Test_MSE, Test_RMSE, Test_R2), fontsize=7)
    plt.tight_layout()

    return SparseDataTest_R2, SparseDataTest_RMSE, SparseDataTest_MSE,


In [None]:



# Import dataset


# Define ANN model hidden layer structure and training parameters
hidden_layers = 3
hidden_nodes = 8
test_size = 0.2
train_size = 0.8
k = 5
batch_size = 100
epochs = 500
learning_rate = 0.01

# Preprocess and parse data.
# For non-data efficiency modeling, set desired statistical features and set DataEfficiencyToggle = 0
# For data efficiency modeling, set number of packets, layer numbers, and set DataEfficiencyToggle = 1

DataEfficiencyToggle = 1

if DataEfficiencyToggle == 0:
    StatisticalFeatures = ['StDev', 'Skew', 'Kurtosis']

    # Parse dataset

if DataEfficiencyToggle == 1:
    PacketNumber = # any integer value between 1 and 10
    LayerNumbers = # any integer value between 1 and 20

    # Packetize data

    # Iterate through data stream types (ie current, CTWD, etc.)

    # Define input and output variables (X and y)

    # Train ANN
    model, X_test, y_test, CrossValData = ANN_Train(X, y, test_size, train_size, k, epochs, hidden_nodes, hidden_layers, batch_size, learning_rate)

    # Test ANN
    SparseDataTest_R2, SparseDataTest_RMSE, SparseDataTest_MSE = ANN_Test(model, X_test, y_test, DataEfficiencyToggle)

# Show plots
plt.show()
