# Libraries and Data Information

## Installations

### Mount Google Drive

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


### Create Virtual Environment.

In [2]:
# You might be required to create a virtual environment to install the packages
# depending on your Colab.
#!python -m venv /content/drive/My Drive/my_env  # Store in your Drive

### Activate Environment

In [3]:
# Make sure to activate that environment
!source /content/drive/My Drive/my_env/bin/activate

/bin/bash: line 1: /content/drive/My: No such file or directory


### Install Packages.

In [4]:
# The following packages shown below will be reinstalled for compatibility
!pip install numpy
#!pip install numpy==1.24.3
!pip install scipy==1.10.0
!pip install scikit-learn
!pip install tensorflow
!pip install torch



In [5]:
#!pip install keras-spiking==0.1.1

In [6]:
# Other Installations. After this please Restart and Run All cells
# (Runtime -> Restart session and run all)
#!pip install numpy
!pip install numpy.rec # Install numpy.rec for compatibility.
#!pip install scipy==1.10.0
#!pip install keras_spiking --upgrade
!pip install keras_spiking
!pip install nengo==3.2.0

[31mERROR: Could not find a version that satisfies the requirement numpy.rec (from versions: none)[0m[31m
[0m[31mERROR: No matching distribution found for numpy.rec[0m[31m


Testing if they work.

In [7]:
import numpy as np
import scipy as sp
import sklearn
import tensorflow as tf
import nengo
import nengo.spa as spa
import torch
import keras_spiking

print("NumPy version:", np.__version__)
print("SciPy version:", sp.__version__)
#print("scikit-learn version:", sklearn.__version__)
print("TensorFlow version:", tf.__version__)
print("PyTorch version:", torch.__version__)

NumPy version: 1.26.4
SciPy version: 1.10.0
TensorFlow version: 2.18.0
PyTorch version: 2.5.1+cu124


## Libraries

In [8]:
#matplotlib imports
import matplotlib.pyplot as plt
import matplotlib.pyplot as scatter
import matplotlib.cm as cm


# sklearn imports
import sklearn
print(f"Scikit-learn version: {sklearn.__version__}")
from sklearn.preprocessing import MinMaxScaler, StandardScaler, MaxAbsScaler
from sklearn.preprocessing import RobustScaler
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.metrics import mean_absolute_percentage_error
print(f"Scikit-learn version: {sklearn.__version__}")

# keras/tensorflow imports
import tensorflow as tf
from keras.layers import Dense, Dropout, SimpleRNN, LSTM, GRU, Flatten, Conv1D
from keras.layers import MaxPooling1D
from keras.models import Sequential
import keras
from sklearn.model_selection import GridSearchCV


# warnings imports
import warnings
warnings.filterwarnings("ignore")

# torch imports
import torch
import torch.nn as nn
from torchvision import transforms
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"{device}" " is available.")

# spikingjelly imports
#from spikingjelly.clock_driven import neuron, surrogate
#from spikingjelly.clock_driven import accelerating
#import spikingjelly.clock_driven.neuron as sj


# Other imports
from urllib.request import urlretrieve
import collections
import pandas as pd
from math import sqrt
import os
import pickle
from datetime import datetime, timedelta
from google.colab import drive

Scikit-learn version: 1.6.1
Scikit-learn version: 1.6.1
cpu is available.


# Load Data

In [9]:
# Please obtain pems_time_series_traffic_Q.csv
from google.colab import files, runtime
data_to_load = files.upload()

Saving Sunspots.csv to Sunspots.csv


# Read and Check Data

## Preprocessing stage.

In [10]:
# List all keys.
filename_list = list(data_to_load.keys())
filename = filename_list[0]

In [11]:
# Turn csv to dataframe (df).
df = pd.read_csv(filename)
df.head()

Unnamed: 0.1,Unnamed: 0,Date,Monthly Mean Total Sunspot Number
0,0,1749-01-31,96.7
1,1,1749-02-28,104.3
2,2,1749-03-31,116.7
3,3,1749-04-30,92.8
4,4,1749-05-31,141.7


In [12]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3265 entries, 0 to 3264
Data columns (total 3 columns):
 #   Column                             Non-Null Count  Dtype  
---  ------                             --------------  -----  
 0   Unnamed: 0                         3265 non-null   int64  
 1   Date                               3265 non-null   object 
 2   Monthly Mean Total Sunspot Number  3265 non-null   float64
dtypes: float64(1), int64(1), object(1)
memory usage: 76.7+ KB


In [13]:
title_name = 'CalTrans PeMS Data from 03-06-2023 to 03-06-2024'

In [14]:
df['VMT (Veh-Miles)'] = [float(str(i).replace(",", "")) for i in df['VMT (Veh-Miles)']]
df['VHT (Veh-Hours)'] = [float(str(i).replace(",", "")) for i in df['VHT (Veh-Hours)']]

KeyError: 'VMT (Veh-Miles)'

## Other Pre-Processing Information

In [None]:
# Convert Datetime column to Datetime format.
df['Hour'] = pd.to_datetime(df['Hour'])

# Index the Datetime column after transformation.
df.set_index('Hour', inplace=True)
df.head()

In [None]:
df.isna().sum()   # Check missing data.

* As can be seen, there are no missing values in our data
* Now let's observe our data set on the graph

In [None]:
# Let's look at the years in the data set.
years = df.index.year.unique()
years

In [None]:
# Print % Observed per year.
df_yr_obs = df['% Observed'].resample('Y').mean()
df_yr_obs.to_frame()

In [None]:
# Print # Lane Points.
df_yr_lane_pts = df['# Lane Points'].resample('Y').mean()
df_yr_lane_pts.to_frame()

In [None]:
# Obtain Vehicle Miles Traveled (VMT) for each year.
df_yr_avg_vmt = df['VMT (Veh-Miles)'].resample('Y').mean()
df_yr_avg_vmt.to_frame()

In [None]:
# Obtain Vehicle Hours Traveled (VMT) for each year.
df_yr_avg_vht = df['VHT (Veh-Hours)'].resample('Y').mean()
df_yr_avg_vht.to_frame()

In [None]:
# Obtain Q or efficiency Measurement (VMT/VHT) for each year.
df_yr_avg_vmt = df['Q (VMT/VHT)'].resample('Y').mean()
df_yr_avg_vmt.to_frame()

## Plot Vehicle-Miles Traveled, Vehicle-Hours Traveled, Q based on Trends and Seasonality.

Note: Seasonality Plots will be daily, weekly, and monthly.

In [None]:
# Plotting Vehicle Miles and Vehicle Hours Travelled in One Plot.
def plot_veh_mi_hrs(select_df, start, end, seasonality, title_name):
  fig1, ax11 = plt.subplots(figsize=(16, 16))
  ax12 = ax11.twinx()
  ax11.plot(select_df['VMT (Veh-Miles)'][start:end], color='blue',
            label='Vehicle Miles')
  ax11.set_ylabel("Vehicle Miles", fontsize = 20)
  ax11.set_xlabel("Hour", fontsize = 20)
  ax12.plot(select_df['VHT (Veh-Hours)'][start:end], alpha=0.7, color='orange',
            label='Vehicle Hours')
  ax12.set_ylabel("Vehicle Hours", fontsize = 20)
  plt.axhspan(0, 1, facecolor='gray', alpha=0.3)
  plt.title("Vehicle Miles and Hours for {0} ({1})".
            format(title_name, seasonality), fontsize = 20)
  plt.show()

# Plot VMT, VHT, or Q in seperate plots.
def plot_veh_mi_hrs_Q(select_df, start, end, seasonality, title_name, feature):
    plt.figure(figsize=(16, 16))
    plt.plot(select_df[start:end], label=feature)
    plt.locator_params(axis='x', nbins=10)
    plt.locator_params(axis='y', nbins=10)
    plt.rc('xtick', labelsize=20)
    plt.rc('ytick', labelsize=20)
    plt.ylabel(feature, fontsize = 20)
    plt.xlabel("Hour", fontsize = 20)
    plt.title("{0} for {1} ({2})".format(feature, title_name, seasonality),
              fontsize = 20)
    plt.show()

### Yearly Plots (Trend)

In [None]:
plot_veh_mi_hrs(df, 0, df.shape[0], "Yearly", title_name)
plot_veh_mi_hrs_Q(df['VMT (Veh-Miles)'], 0, df.shape[0], "Yearly", title_name,
                  'Vehicle Miles')
plot_veh_mi_hrs_Q(df['VHT (Veh-Hours)'], 0, df.shape[0], "Yearly", title_name,
                  'Vehicle Hours')
plot_veh_mi_hrs_Q(df['Q (VMT/VHT)'], 0, df.shape[0], "Yearly", title_name,
                  'Q (VMT/VHT)')

### Daily Plots (Seasonality)

In [None]:
plot_veh_mi_hrs(df, 0, 24, "Daily", title_name)
plot_veh_mi_hrs_Q(df['VMT (Veh-Miles)'], 0, 24, "Daily", title_name,
                  'Vehicle Miles')
plot_veh_mi_hrs_Q(df['VHT (Veh-Hours)'], 0, 24, "Daily", title_name,
                  'Vehicle Hours')
plot_veh_mi_hrs_Q(df['Q (VMT/VHT)'], 0, 24, "Daily", title_name, 'Q (VMT/VHT)')

### Weekly Plots (Seasonality)

In [None]:
weekly_df = df.query("Hour >= '2023-04-09' \
                       and Hour <= '2023-04-16'")
weekly_df

In [None]:
plot_veh_mi_hrs(weekly_df, 0, weekly_df.shape[0], "Weekly", title_name)
plot_veh_mi_hrs_Q(weekly_df['VMT (Veh-Miles)'], 0, weekly_df.shape[0], "Weekly",
                  title_name, 'Vehicle Miles')
plot_veh_mi_hrs_Q(weekly_df['VHT (Veh-Hours)'], 0, weekly_df.shape[0], "Weekly",
                  title_name, 'Vehicle Hours')
plot_veh_mi_hrs_Q(weekly_df['Q (VMT/VHT)'], 0, weekly_df.shape[0], "Weekly",
                  title_name, 'Q (VMT/VHT)')

### Monthly Plots (Seasonality)

In [None]:
monthly_df = df.query("Hour >= '2023-05-01' \
                       and Hour <= '2023-06-01'")

monthly_df

In [None]:
plot_veh_mi_hrs(monthly_df, 0, monthly_df.shape[0], "Monthly", title_name)
plot_veh_mi_hrs_Q(monthly_df['VMT (Veh-Miles)'], 0, monthly_df.shape[0],
                  "Monthly", title_name, 'Vehicle Miles')
plot_veh_mi_hrs_Q(monthly_df['VHT (Veh-Hours)'], 0, monthly_df.shape[0],
                  "Monthly", title_name, 'Vehicle Hours')
plot_veh_mi_hrs_Q(monthly_df['Q (VMT/VHT)'], 0, monthly_df.shape[0], "Monthly",
                  title_name, 'Q (VMT/VHT)')

### Combined Plots
Trend and Seaonality Plots are also plotted in combination.

In [None]:
fig, axes = plt.subplots(2, 2, figsize=(12, 12)) # 5 rows, 1 column
axes[0, 0].plot(df.index, df['Q (VMT/VHT)'])
axes[0, 0].set_title('Q(VMT/VHT) Trend', fontsize = 20)
axes[0, 0].set_ylabel('Q', fontsize = 15)
axes[0, 0].set_xlabel('Date', fontsize = 15)
axes[0, 0].tick_params(axis='y', labelsize=10)   # Adjust y-axis tick label size
axes[0, 0].tick_params(axis='x', labelsize=10)  # Adjust x-axis tick label s
axes[0, 1].plot(df.index[:24], df['Q (VMT/VHT)'][:24])
axes[0, 1].set_title('Q(VMT/VHT) Seasonality (Daily)', fontsize = 20)
axes[0, 1].set_ylabel('Q', fontsize = 15)
axes[0, 1].set_xlabel('Date', fontsize = 15)
axes[0, 1].tick_params(axis='x', labelsize=10)  # Adjust x-axis tick label size
axes[0, 1].tick_params(axis='y', labelsize=10)   # Adjust y-axis tick label size
axes[0, 1].set_xlabel('Date', fontsize = 15)
#axes[0, 1].locator_params(axis='x', nbins=2)  # Reduce x-ticks for this subplot
axes[1, 0].plot(weekly_df.index, weekly_df['Q (VMT/VHT)'])
axes[1, 0].set_title('Q(VMT/VHT) Seasonality (Weekly)', fontsize = 20)
axes[1, 0].set_ylabel('Q', fontsize = 15)
axes[1, 0].tick_params(axis='y', labelsize=10)   # Adjust y-axis tick label size
axes[1, 0].tick_params(axis='x', labelsize=10)  # Adjust x-axis tick label size
axes[1, 0].set_xlabel('Date', fontsize = 15)
axes[1, 0].locator_params(axis='x', nbins=1)  # Reduce x-ticks for this subplot
axes[1, 1].plot(monthly_df.index, monthly_df['Q (VMT/VHT)'])
axes[1, 1].set_title('Q(VMT/VHT) Seasonality (Monthly)', fontsize = 20)
axes[1, 1].set_ylabel('Q', fontsize = 15)
axes[1, 1].set_xlabel('Date', fontsize = 15)
axes[1, 1].tick_params(axis='x', labelsize=10)  # Adjust x-axis tick label size
axes[1, 1].tick_params(axis='y', labelsize=10)   # Adjust y-axis tick label size
axes[1, 1].set_xlabel('Date', fontsize = 15)
axes[1, 1].locator_params(axis='x', nbins=1)  # Reduce x-ticks for this subplot
fig.autofmt_xdate()
fig.tight_layout() # Adjusts subplot parameters for a tight layout

# Normalization Process

Normalize data based on MinMaxScaler, StandardScaler, MaxAbsScaler or RobustScaler.

In [None]:
def get_scaler(scaler):
    scalers = {
        "minmax": MinMaxScaler,
        "standard": StandardScaler,
        "maxabs": MaxAbsScaler,
        "robust": RobustScaler,
    }
    return scalers.get(scaler.lower())()

def normalize_data(df):
    scaler_type = 'minmax'
    #scaler_type = input("Enter a scaler type (minmax,standard, maxabs, robust): ")
    if (scaler_type != 'minmax') and (scaler_type != 'standard') and (scaler_type != 'maxabs') and (scaler_type != 'robust'):
      runtime.unassign()

    scaler = get_scaler(scaler_type)
    normalized_data_vmt = scaler.fit_transform(df['VMT (Veh-Miles)'].
                                               values.reshape(-1,1))
    df['vmt_norm'] = normalized_data_vmt
    normalized_data_vht = scaler.fit_transform(df['VHT (Veh-Hours)'].
                                               values.reshape(-1,1))
    df['vht_norm'] = normalized_data_vht
    normalized_data_Q = scaler.fit_transform(df['Q (VMT/VHT)'].
                                               values.reshape(-1,1))
    df['Q_norm'] = normalized_data_Q
    return df, scaler

df_norm, scaler = normalize_data(df)
df_norm_mon, scaler = normalize_data(monthly_df)
df_norm_wk, scaler = normalize_data(weekly_df)
df_norm_day, scaler = normalize_data(monthly_df)


In [None]:
df_norm # Display normalized data.

## Plot VMT, VHT, Q Normalized based on Trend and Seasonality.

Now after normalization we can observe that the data range on y-axis is 0.0 - 1.0. Same Trend and Seasonality Plots are shown for VMT, VHT, and Q.

### Yearly Plots (Trend)

In [None]:
plot_veh_mi_hrs(df_norm, 0, df_norm.shape[0], 'Yearly Normalized', title_name)
plot_veh_mi_hrs_Q(df_norm['vmt_norm'], 0, df_norm.shape[0], 'Trend Normalized',
                  title_name, 'Vehicle Miles Normalized')
plot_veh_mi_hrs_Q(df_norm['vht_norm'], 0, df_norm.shape[0], 'Trend Normalized',
                  title_name, 'Vehicle Hours Normalized')
plot_veh_mi_hrs_Q(df_norm['Q_norm'], 0, df_norm.shape[0], 'Trend Normalized',
                  title_name, 'Q Normalized')

### Daily Plots (Seasonality)

In [None]:
plot_veh_mi_hrs(df_norm, 0, 25, 'Daily Normalized', title_name)
plot_veh_mi_hrs_Q(df_norm['vmt_norm'], 0, 25, 'Seasonality Daily Normalized',
                  title_name, 'Vehicle Miles Normalized')
plot_veh_mi_hrs_Q(df_norm['vht_norm'], 0, 25, 'Seasonality Daily Normalized',
                  title_name, 'Vehicle Hours Normalized')
plot_veh_mi_hrs_Q(df_norm['Q_norm'], 0, 25, 'Seasonality Daily Normalized',
                  title_name, 'Q Normalized')

### Weekly Plots (Seasonality)

In [None]:
plot_veh_mi_hrs(df_norm_wk, 0, df_norm_wk.shape[0], 'Weekly Normalized',
                title_name)
plot_veh_mi_hrs_Q(df_norm_wk['vmt_norm'], 0, df_norm_wk.shape[0],
                  'Seasonality Weekly Normalized', title_name,
                  'Vehicle Miles Normalized')
plot_veh_mi_hrs_Q(df_norm_wk['vht_norm'], 0, df_norm_wk.shape[0],
                  'Seasonality Weekly Normalized', title_name,
                  'Vehicle Hours Normalized')
plot_veh_mi_hrs_Q(df_norm_wk['Q_norm'], 0, df_norm_wk.shape[0],
                  'Seasonality Weekly Normalized', title_name, 'Q Normalized')

### Monthly Plots (Seasonality)

In [None]:
plot_veh_mi_hrs(df_norm_mon, 0, df_norm_mon.shape[0], 'Monthly Normalized',
                title_name)
plot_veh_mi_hrs_Q(df_norm_mon['vmt_norm'], 0, df_norm_mon.shape[0],
                  'Seasonality Nonthly Normalized', title_name,
                  'Vehicle Miles Normalized')
plot_veh_mi_hrs_Q(df_norm_mon['vht_norm'], 0, df_norm_mon.shape[0],
                  'Seasonality Monthly Normalized', title_name,
                  'Vehicle Hours Normalized')
plot_veh_mi_hrs_Q(df_norm_mon['Q_norm'], 0, df_norm_mon.shape[0],
                  'Seasonality Monthly Normalized', title_name,
                  'Q Normalized')

# Let's observe train and test data on the graph

## Split into Train/Test Data.

In [None]:
test_size = 0.2 # Set Split rate to 4:1 (80 % Training 20 % Test).
split_pos = int(df.shape[0]*(1 - test_size))
df = df.reset_index()
df

In [None]:
# 2019-02-06 will be the date seperation. Dates past this will be the test set.
split_date = str(df.Hour[split_pos])
#split_date = split_date[:10]
#DOM_train = df_norm.loc[df_norm.index <= split_date].copy()
DOM_train = df.loc[df_norm.index <= split_date].copy()
#DOM_test = df_norm.loc[df_norm.index > split_date].copy()
DOM_test = df.loc[df_norm.index > split_date].copy()
split_date

## Plot Train/Test Split Graphs.

For VMT, VHT, and Q.

In [None]:
# Vehicle Miles Travelled (VMT)
plt.figure(figsize=(16,6))
plt.plot(DOM_train['Hour'], DOM_train['VMT (Veh-Miles)'],
         label='Training Set (Vehicle Miles)')
plt.plot(DOM_test['Hour'], DOM_test['VMT (Veh-Miles)'],
         label='Testing Set (Vehicle Miles)')
plt.axvline(x = df.Hour[split_pos], color='black', ls='--')
plt.legend(['Training Set (Vehicle Miles)', 'Test Set (Vehicle Miles)'])
plt.axhspan(0, 1, facecolor='gray', alpha=0.3)
plt.xlabel("Hour", fontsize = 20)
plt.ylabel("Vehicle Miles (VMT)", fontsize = 20)
plt.title("Train Test Split for Vehicle Miles", fontsize = 20)
plt.show()

In [None]:
# Vehicle Hours Travelled (VHT)
plt.figure(figsize=(16,6))
plt.plot(DOM_train['Hour'], DOM_train['VHT (Veh-Hours)'],
         label='Training Set (Vehicle Hours)')
plt.plot(DOM_test['Hour'], DOM_test['VHT (Veh-Hours)'],
         label='Testing Set (Vehicle Hours)')
plt.axvline(x = df.Hour[split_pos], color='black', ls='--')
plt.legend(['Training Set (Vehicle Hours)', 'Test Set (Vehicle Hours)'])
plt.axhspan(0, 1, facecolor='gray', alpha=0.3)
plt.xlabel("Hour")
plt.ylabel("Vehicle Hours (VHT)")
plt.title("Train Test Split for Vehicle Hours", fontsize = 20)
plt.show()

In [None]:
# Q is the sum of VMT divided by the sum of VHT or space mean
# speeds/harmonic mean of speeds on the freeway. It's also the efficiency
# measurement.
plt.figure(figsize=(16,6))
plt.plot(DOM_train['Hour'], DOM_train['Q (VMT/VHT)'],
         label='Training Set (Vehicle Hours)')
plt.plot(DOM_test['Hour'], DOM_test['Q (VMT/VHT)'],
         label='Testing Set (Vehicle Hours)')
plt.axvline(x = df.Hour[split_pos], color='black', ls='--')
plt.legend(['Training Set (Q)', 'Test Set (Q)'])
plt.axhspan(0, 1, facecolor='gray', alpha=0.3)
plt.xlabel("Hour", fontsize = 20)
plt.ylabel("Q (VMT/VHT)", fontsize = 20)
plt.ylim([40, 70])
plt.title("Train Test Split for Q", fontsize = 20)
plt.show()

In [None]:
# Datetime column is indexed after transformation
df.set_index('Hour', inplace=True)
df.head()

# Prepare Data for Training the Models.

* Data is set as training and test data set in a model appropriate way.

In [None]:
def load_data(data, seq_len):
    X_train = []
    y_train = []

    for i in range(seq_len, len(data)):
        X_train.append(data.iloc[i-seq_len : i, 0])
        y_train.append(data.iloc[i, 0])

    # last days are going to be used in test
    X_test = X_train[split_pos:]
    y_test = y_train[split_pos:]

    # first  days are going to be used in training
    X_train = X_train[:split_pos]
    y_train = y_train[:split_pos]

    # convert to numpy array
    X_train = np.array(X_train)
    y_train = np.array(y_train)
    X_test = np.array(X_test)
    y_test = np.array(y_test)

    # reshape data to input into RNN, LSTM, and GRU models
    X_train = np.reshape(X_train, (split_pos, seq_len, 1))
    X_test = np.reshape(X_test, (X_test.shape[0], seq_len, 1))

    return [X_train, y_train, X_test, y_test]

* **The seq_len** parameter determines how far back the model will look at historical data, helping the model to capture time dependencies in a memory-aware way.
* We should note that if "seq_len" is too large, the model can become complex and prone to overlearning.

In [None]:
# Delete columns to create individual VHT, VMT, and Q dataframes.
df_vht = df_norm.copy()
df_vht = df_vht.drop(columns = ['VMT (Veh-Miles)', 'VHT (Veh-Hours)',
                                'Q (VMT/VHT)', '# Lane Points', '% Observed',
                                'vmt_norm', 'Q_norm'])
#df_vht = df.copy()
#df_vht = df_vht.drop(columns = ['VMT (Veh-Miles)', '# Lane Points',
#                                '% Observed', 'vmt_norm', 'vht_norm'])

df_vmt = df_norm.copy()
df_vmt = df_vmt.drop(columns = ['VHT (Veh-Hours)', 'VMT (Veh-Miles)',
                                'Q (VMT/VHT)', '# Lane Points', '% Observed',
                                'vht_norm', 'Q_norm'])
#df_vmt = df.copy()
#df_vmt = df_vmt.drop(columns = ['VHT (Veh-Hours)','# Lane Points',
#                                '% Observed', 'vmt_norm', 'vht_norm'])

df_Q = df_norm.copy()
df_Q = df_Q.drop(columns = ['VHT (Veh-Hours)', 'VMT (Veh-Miles)',
                            'Q (VMT/VHT)', '# Lane Points', '% Observed',
                            'vmt_norm', 'vht_norm'])

#df_Q = df_norm.copy()
#df_Q = df_Q.drop(columns = ['VHT (Veh-Hours)', 'VMT (Veh-Miles)',
#                            '# Lane Points', '% Observed', 'vmt_norm',
#                            'vht_norm', 'Q_norm'])

In [None]:
# Also, normalized individual dataframes for VHT, VMT, and Q can be created.
df_norm2 = df_norm.drop(columns = ['VHT (Veh-Hours)', 'VMT (Veh-Miles)',
                                   '# Lane Points', '% Observed',])
df_norm2

In [None]:
df_vht

In [None]:
df_vmt

In [None]:
df_Q

In [None]:
# Enter a number based on the instructions provided below whether to use the
# df_vht, df_vmt, or df_Q dataframes. The number selected will utilize that
# particular dataframe.
a = int(input("Enter a number (Odd Number for Vehicle Hours,\
Even Number for Vehicle Miles\Odd Number divisible by 3 for Q (Space Mean/\
Harmonic Speed)): "))

feature = ""
selected_df = ""
feature_norm = ""
feature_field = ""

if a % 2 == 0:
    feature = 'Vehicle Miles'
    selected_df = df_vmt
    feature_norm = 'vmt_norm'
    feature_field = 'VMT (Veh-Miles)'

elif a % 3 == 0:
    feature = 'Space Mean/Harmonic Speed'
    selected_df = df_Q
    feature_norm = 'Q_norm'
    feature_field = 'Q (VMT/VHT)'

else:
    feature = 'Vehicle Hours'
    selected_df = df_vht
    feature_norm = 'vht_norm'
    feature_field = 'VHT (Veh-Hours)'

print("You've chosen: {0}/{1}/{2}".format(feature, feature_norm, feature_field))

In [None]:
# Setting up seq_len. Default to 20 for ease of use.
seq_len = 20
#seq_len = 6*24*30

# Let's create train, test data
#X_train, y_train, X_test, y_test = load_data(df_vht, seq_len)
#X_train, y_train, X_test, y_test = load_data(df_vmt, seq_len)
X_train, y_train, X_test, y_test = load_data(selected_df, seq_len)
#X_train, y_train, X_test, y_test = load_data(df_norm2, seq_len)

# Print shapes of training and testing data.
print('X_train.shape = ',X_train.shape)
print('y_train.shape = ', y_train.shape)
print('X_test.shape = ', X_test.shape)
print('y_test.shape = ',y_test.shape)

In [None]:
X_train

In [None]:
X_test

In [None]:
y_train

In [None]:
y_test

## Essential Variables.
Will be used when training all of the models.

In [None]:
epochs = 50
batch_size = 1000
#loss = 'MAE'
loss = 'MSE'

## Training Loss vs. Epochs Plot

Plotting Accuracy and Validation Accuracy vs Epochs. Code taken from https://machinelearningmastery.com/display-deep-learning-model-training-history-in-keras/.

In [None]:
def plot_training_loss(model, model_name):
    model.history.keys()
    plt.plot(model.history['loss'])
    plt.title('Training Loss vs. Epochs for {0}'.format(model_name))
    plt.ylabel('loss')
    plt.xlabel('epoch')
    plt.show()

# Non-Spiking Model

## Create Non-Spiking Model

In [None]:
# This involves reshaping and using a keras spiking dense layer.
non_spiking_model = tf.keras.Sequential(
    [
        tf.keras.layers.Reshape((-1, X_train.shape[1]),
                                input_shape=(None, X_train.shape[1])),
        tf.keras.layers.Dense(1, activation="relu"),
    ]
)


## Train Non-Spiking Network Model.

In [None]:
non_spiking_model.compile(optimizer="adam",loss=loss)
non_spiking_model_epochs = non_spiking_model.fit(X_train,y_train,epochs=epochs,
                                                 batch_size=batch_size)
plot_training_loss(non_spiking_model_epochs, "SNN")

## Evaluate Non-Spiking Network Model using MAE, RMSE, and MAPE.

In [None]:
print("Pure Dense Layer Predictions for Training Set")
non_snn_pred_train = non_spiking_model.predict(X_train)
non_snn_pred_train = np.ndarray.flatten(non_snn_pred_train)
print("Pure Dense Layer Predictions for Testing Set")
non_snn_pred_test = non_spiking_model.predict(X_test)
non_snn_pred_test = np.ndarray.flatten(non_snn_pred_test)
non_snn_mae = mean_absolute_error(y_test, non_snn_pred_test)
print("MAE of Pure Dense Layer model = ",non_snn_mae)
non_snn_rmse =  mean_squared_error(y_test, non_snn_pred_test) ** 0.5
print("RMSE of Pure Dense Layer model = ", non_snn_rmse)
#rnn_r2_score = r2_score(y_test,rnn_predictions_test)
#print("R2 Score of RNN model = ",rnn_r2_score)
non_snn_mape_score = 100*mean_absolute_percentage_error(y_test,
                                                        non_snn_pred_test)
print("MAPE Score of Pure Dense Layer model = {0} %".format(non_snn_mape_score))

## Reverse Transform Scaler to Convert to Real Values.

In [None]:
# First perform inverse transofrm and reshape.
y_train_inverse = scaler.inverse_transform(y_train.reshape(-1, 1))
non_snn_pred_train = non_snn_pred_train.reshape(-1, 1)
nsnn_pred_inv_train = scaler.inverse_transform(non_snn_pred_train)

y_test_inverse = scaler.inverse_transform(y_test.reshape(-1, 1))
non_snn_pred_test = non_snn_pred_test.reshape(-1, 1)
nsnn_pred_inv_test = scaler.inverse_transform(non_snn_pred_test)

# Get values after inverse transformation
y_train_inverse = y_train_inverse.flatten()
nsnn_pred_inv_train = nsnn_pred_inv_train.flatten()

y_test_inverse = y_test_inverse.flatten()
nsnn_pred_inv_test = nsnn_pred_inv_test.flatten()

## Show Actual and Predicted y values as Dataframes.

### With Training Dates.

In [None]:
first_6169_index_dates = df.index[:X_train.shape[0]]
results_nSNN_train = pd.DataFrame({"Date": first_6169_index_dates,
                                  'Actual': y_train_inverse,
                                  'Predicted': nsnn_pred_inv_train})
results_nSNN_train

### With Testing Dates.

In [None]:
last_6169_index_dates = df.index[-X_test.shape[0]:]
results_nSNN_test = pd.DataFrame({"Date":last_6169_index_dates,
                                 'Actual': y_test_inverse,
                                 'Predicted': nsnn_pred_inv_test})
results_nSNN_test

## Plotting Predictions for Transmitted/Received Bytes made by Non-Spiking Model.
Used for Comparing Training and Testing Data Visually.

### Only Testing Data.

In [None]:
plt.figure(figsize=(16,6))
plt.plot(y_test, color='blue',label='Actual {0}'.format(feature))
plt.plot(non_snn_pred_test, alpha=0.7, color='yellow',
         label='Predicted {0}'.format(feature))
plt.axhspan(0, 1, facecolor='gray', alpha=0.3)
plt.title("{0} Predictions made by SNN model for {1}".
          format(feature, title_name), fontsize = 20)
plt.rc('xtick', labelsize=20)
plt.rc('ytick', labelsize=20)
plt.locator_params(axis='x', nbins=10)
plt.locator_params(axis='y', nbins=10)
plt.xlabel('Time', fontsize = 20)
plt.ylabel(feature_norm, fontsize = 20)
plt.legend()
plt.show()

### Training and Testing Data combined.

In [None]:
plt.figure(figsize=(16,6))
plt.plot(results_nSNN_train['Date'], results_nSNN_train['Actual'], color='red',
         label='Actual {0} (Training Case)'.format(feature))
plt.plot(results_nSNN_train['Date'],  results_nSNN_train['Predicted'], alpha=0.7,
         color='green', label='Predicted {0} (Training Case)'.format(feature))
plt.plot(results_nSNN_test['Date'], results_nSNN_test['Actual'], color='blue',
         label='Actual {0} (Test Case)'.format(feature))
plt.plot(results_nSNN_test['Date'], results_nSNN_test['Predicted'], alpha=0.7,
         color='yellow', label='Predicted {0} (Test Case)'.format(feature))
plt.axhspan(0, 1, facecolor='gray', alpha=0.3)
plt.title("{0} Predictions made by RNN model for {1}".
          format(feature, title_name), fontsize = 20)
plt.xlabel('Time', fontsize = 10)
plt.rc('xtick', labelsize=10)
plt.rc('ytick', labelsize=10)
plt.locator_params(axis='x', nbins=10)
plt.locator_params(axis='y', nbins=10)
#plt.ylim([40, 70])
plt.ylabel(feature_norm, fontsize = 20)
plt.legend()
plt.show()

## Combine Actual and Predicted Results for Train and Test Cases for Non Spiking Neural Network.

In [None]:
results_nSNN = [results_nSNN_train, results_nSNN_test]
results_nSNN = pd.concat(results_nSNN)
display(results_nSNN)

## Plotting Residuals for Actual and Predicted Results for Non-Spiking Model.

This is to check for whether the model has adequately captured the information of the data.

In [None]:
actual_arr_nSNN = np.array(results_nSNN['Actual'])
predicted_arr_nSNN = np.array(results_nSNN['Predicted'])
residuals_nSNN = predicted_arr_nSNN - actual_arr_nSNN
plt.scatter(results_nSNN['Date'], residuals_nSNN)
plt.axhline(y=0, color = 'black')
plt.title("Differences between Acutal and Predicted Values for Non-Spiking Model.",
          fontsize = 15)
#plt.locator_params(axis='x', nbins=10)
#plt.locator_params(axis='y', nbins=10)
#plt.ylim([40, 70])
plt.xlabel("Time", fontsize = 20)
plt.ylabel("Residual", fontsize = 20)
plt.show()

# Spiking Network model

## Create SNN Model.

### Spiking Activation Layer

In [None]:
class SpikingActivationCell(tf.keras.layers.Layer):
    def __init__(self, activation='tanh', threshold=1.0, dt=1.0, **kwargs):
        super(SpikingActivationCell, self).__init__(**kwargs)
        self.activation = activation
        self.threshold = threshold
        self.dt = dt  # Add dt as an attribute
        self.state_size = 1  # State to store membrane potential

    def build(self, input_shape):
        self.membrane_potential = self.add_weight(
            name='membrane_potential',
            shape=(input_shape[-1],),  # Shape is now based on input_shape
            initializer='zeros',
            trainable=False
        )
        super(SpikingActivationCell, self).build(input_shape)

    def call(self, inputs, states):
        prev_membrane_potential = states[0]

        # Apply activation function
        activated_input = tf.keras.activations.get(self.activation)(inputs)

        # Update membrane potential
        membrane_potential = prev_membrane_potential + activated_input*self.dt

        # Generate spikes based on threshold
        spikes = tf.cast(membrane_potential >= self.threshold, dtype=tf.float32)

        # Reset membrane potential after spike
        membrane_potential = membrane_potential - spikes * self.threshold

        # Return the spikes and the updated membrane potential as the new state
        return spikes, [membrane_potential]

class SpikingActivationLayer(tf.keras.layers.RNN):
    def __init__(self, activation='tanh', threshold=1.0, dt=1.0, **kwargs):
        cell = SpikingActivationCell(activation=activation, threshold=threshold,
                                     dt=dt)
        super(SpikingActivationLayer, self).__init__(cell, **kwargs)


    def call(self, inputs, initial_state=None, **kwargs):
        # Get the input shape
        input_shape = tf.shape(inputs)
        batch_size = input_shape[0]
        num_features = input_shape[-1]

        # Initialize the state if it's not provided
        if initial_state is None:
            initial_state = tf.zeros((batch_size, num_features), dtype=tf.float32)

        # Call the RNN with the correct initial state
        return super(SpikingActivationLayer, self).call(inputs,
                                                        initial_state=initial_state,
                                                        **kwargs)

    # This method is usually called internally by Keras to determine the shape \
    # of the output tensor. It should align with the arguments received in the
    # `call()` method. Based on the error, the argument `sequences_shape` should
    # be aligned with the `inputs` argument of `call()`.
    def compute_output_shape(self, input_shape):
        return input_shape

    # Override the build method to accept and ignore the sequences_shape argument
    def build(self, input_shape):
        # Call the parent class's build method, passing only the input_shape
        super(SpikingActivationLayer, self).build(input_shape)

### Spiking Model

In [None]:
spiking_model = tf.keras.Sequential(
    [
        # Add temporal dimension to model. For this dataset, the input_shape is
        # passed into the model.
        tf.keras.layers.Reshape((-1, X_train.shape[1]),
                                input_shape=(None, X_train.shape[1])),
        # Keras' TimeDistributed wrapper allows Dense layer to operate on
        # temporal data.
        tf.keras.layers.TimeDistributed(tf.keras.layers.Dense(128)),
        # SpikingActivation: Function replaces any activation function with it's
        # Spiking equivalent. In addition to activation functions, the timestep
        # (dt) can be changed to run the spiking neurons for longer periods of
        # time. spiking_aware_training is set to True as it helps in the
        # regularization.
        #keras_spiking.SpikingActivation("tanh", dt = 100,
        #                                spiking_aware_training=True),
        # SpikingActivation: Function replaces any activation function with it's
        # Spiking equivalent. In addition to activation functions, the timestep
        # (dt) can be changed to run the spiking neurons for longer periods of
        # time. spiking_aware_training is set to True as it helps in the
        # regularization.
        # Instead of using the keras_spiking.SpikingActivation, we directly
        # apply the activation function using tf.keras.activations.tanh.
        # This change avoids the internal use of tf.keras.layers.RNN
        # within the SpikingActivation layer, which was causing the issue.
        #tf.keras.layers.Activation(tf.keras.activations.tanh),
        SpikingActivationLayer(activation='tanh', threshold = 0.7, dt=100,
                               return_sequences=True),
        # GlobalAveragePooling1D: Averages spiking output over time. Used for
        # determining the spiking rate of neuron.
        #tf.keras.layers.Lambda(lambda x: x[0]),
        tf.keras.layers.GlobalAveragePooling1D(),
        tf.keras.layers.Dense(1),
    ]
)

spiking_model.summary()

## Train Spiking Network Model.

In [None]:
spiking_model.compile(optimizer="adam",loss=loss)
spiking_model_epochs = spiking_model.fit(X_train, y_train, epochs=epochs,
                           batch_size=batch_size)
plot_training_loss(spiking_model_epochs, "SNN")

## Evaluate Rate-Based SNN Model using MAE, RMSE, and R2.

In [None]:
print("Rate-Based SNN Predictions for Training Set")
snn_pred_train = spiking_model.predict(X_train)
print("Dense SNN Predictions for Testing Set")
snn_pred_test = spiking_model.predict(X_test)
snn_mae = mean_absolute_error(y_test, snn_pred_test)
print("MAE of Dense model = ",snn_mae)
snn_rmse =  mean_squared_error(y_test, snn_pred_test) ** 0.5
print("RMSE of Dense model = ",snn_rmse)
#rnn_r2_score = r2_score(y_test,rnn_predictions_test)
#print("R2 Score of RNN model = ",rnn_r2_score)
snn_mape_score = 100*mean_absolute_percentage_error(y_test, snn_pred_test)
print("MAPE Score of Dense model = {0} %".format(snn_mape_score))

## Reverse Transform Scaler to Convert to Real Values.

In [None]:
y_train_inverse = scaler.inverse_transform(y_train.reshape(-1, 1))
snn_pred_inv_train = scaler.inverse_transform(snn_pred_train)

y_test_inverse = scaler.inverse_transform(y_test.reshape(-1, 1))
snn_pred_inv_test = scaler.inverse_transform(snn_pred_test)

# Get values after inverse transformation
y_train_inverse = y_train_inverse.flatten()
snn_pred_inv_train = snn_pred_inv_train.flatten()

y_test_inverse = y_test_inverse.flatten()
snn_pred_inv_test = snn_pred_inv_test.flatten()

## Show Actual and Predicted y values as Dataframes.

### With Training Dates.

In [None]:
first_6169_index_dates = df.index[:X_train.shape[0]]
results_SNN_train = pd.DataFrame({"Date": first_6169_index_dates,
                                  'Actual': y_train_inverse,
                                  'Predicted': snn_pred_inv_train})
results_SNN_train

### With Testing Dates.

In [None]:
last_6169_index_dates = df.index[-X_test.shape[0]:]
results_SNN_test = pd.DataFrame({"Date":last_6169_index_dates,
                                 'Actual': y_test_inverse,
                                 'Predicted': snn_pred_inv_test})
results_SNN_test

## Plotting Predictions for PeMS Dataset Output made by Rate-Based SNN Model.

### Only Testing Data.

In [None]:
plt.figure(figsize=(16,6))
plt.plot(y_test, color='blue',label='Actual {0}'.format(feature))
plt.plot(snn_pred_test, alpha=0.7, color='yellow',
         label='Predicted {0}'.format(feature))
plt.axhspan(0, 1, facecolor='gray', alpha=0.3)
plt.title("{0} Predictions made by Rate-Based SNN model for {1}".
          format(feature, title_name), fontsize = 20)
plt.rc('xtick', labelsize=20)
plt.rc('ytick', labelsize=20)
plt.locator_params(axis='x', nbins=10)
plt.locator_params(axis='y', nbins=10)
plt.xlabel('Time', fontsize = 20)
plt.ylabel(feature_norm, fontsize = 20)
plt.legend()
plt.show()

### Training and Testing Data combined.

In [None]:
plt.plot(results_SNN_train['Date'], results_SNN_train['Actual'], color='red',
         label='Actual {0} (Training Case)'.format(feature))
plt.plot(results_SNN_train['Date'],  results_SNN_train['Predicted'], alpha=0.7,
         color='green', label='Predicted {0} (Training Case)'.format(feature))
plt.plot(results_SNN_test['Date'], results_SNN_test['Actual'], color='blue',
         label='Actual {0} (Test Case)'.format(feature))
plt.plot(results_SNN_test['Date'], results_SNN_test['Predicted'], alpha=0.7,
         color='yellow', label='Predicted {0} (Test Case)'.format(feature))
plt.axhspan(0, 1, facecolor='gray', alpha=0.3)
plt.title("{0} Predictions made by Rate-Based SNN model for {1}".
          format(feature, title_name), fontsize = 20)
plt.xlabel('Time', fontsize = 10)
plt.rc('xtick', labelsize=10)
plt.rc('ytick', labelsize=10)
plt.locator_params(axis='x', nbins=10)
plt.locator_params(axis='y', nbins=10)
plt.ylim([40, 70])
plt.ylabel(feature_norm, fontsize = 20)
plt.legend()
plt.show()

## Combine Actual and Predicted Results for Train and Test Cases for Rate-Based SNN.

In [None]:
results_SNN = [results_SNN_train, results_SNN_test]
results_SNN = pd.concat(results_SNN)
display(results_SNN)

## Plot Output Spikes
Code taken from https://pythontic.com/visualization/charts/spikerasterplot#:~:text=A%20Spike%20raster%20plot%20is%20a%20plot%20used,potential%20from%20a%20neuron%20at%20a%20specified%20position.

In [None]:
def check_output(seq_model, X_train, X_test, y_train, y_test, modify_dt=0.1):
    """
    This code is used to show the plot of the output spikes for the
    SpikingActivation Layer.
    """

    # Repeat the outputs for n_steps.
    n_steps = 10
    train_sequences = np.tile(X_train[:, None], (1, n_steps, 1, 1))
    test_sequences = np.tile(X_test[:, None], (1, n_steps, 1, 1))

    # Rebuild the model with the functional API to access the output of
    # intermediate layers.
    inp = x = tf.keras.Input(batch_shape=seq_model.layers[0].input_shape)
    has_global_average_pooling = False
    # Obtain list of layers to create a spike matrix.
    spikes_dict = {}
    layer_names = []
    print("length of seq_model.layers: ", len(seq_model.layers))
    for layer in seq_model.layers:
        print("layer type: ", type(layer))
        print("layer name: ", layer.name)
        if "_1" in layer.name:
          layer_new_name = str(layer.name).replace("_1", "")
          print("layer new name: ", layer_new_name)
          layer_names.append(layer_new_name)
        else:
          layer_names.append(str(layer.name))
        if isinstance(layer, tf.keras.layers.GlobalAveragePooling1D):
            # Remove the pooling so that we can see the model's output over
            # time.
            has_global_average_pooling = True
            continue

        if isinstance(layer, (keras_spiking.SpikingActivation,
                              keras_spiking.Lowpass)):

            cfg = layer.get_config()
            # Update dt, if specified
            if modify_dt is not None:
                cfg["dt"] = modify_dt
            # Return the full time series for visualization
            cfg["return_sequences"] = True

            layer = type(layer)(**cfg)

        if isinstance(layer, keras_spiking.SpikingActivation):
            # save this layer so we can access it later
            spike_layer = layer

        x = layer(x)

    # Initialize spikes_dict with layer names as keys
    for layer_name in layer_names:
        spikes_dict[layer_name] = np.array([]) # Initialize as empty array

    layer_names.append("Output")
    spikes_dict["Output"] = np.array([])

    # Take a look at Spiking Activation Layer.
    func_model = tf.keras.Model(inp, [x, spike_layer.output])

    # Copy weights to new model
    func_model.set_weights(seq_model.get_weights())

    # Run model
    output, spikes = func_model.predict(test_sequences)
    spikes = np.transpose(spikes, (0, 2, 1))
    print("spikes.shape: ", spikes.shape)

    # Iterate through layer list
    for i in range(spikes.shape[0]):
      for j in range(len(layer_names)):
        # Append spike data for each layer, handling potential empty arrays
        if spikes_dict[layer_names[j]].size == 0:
            spikes_dict[layer_names[j]] = spikes[i][j]
        else:
            spikes_dict[layer_names[j]] = np.concatenate((spikes_dict[layer_names[j]],
                                                    spikes[i][j]), axis = 0)

    print("spikes_dict: ", spikes_dict)

    if has_global_average_pooling:
        # Check test accuracy using average output over all timesteps
        predictions = np.argmax(output.mean(axis=1), axis=-1)
    else:
        # Check test accuracy using output from only the last timestep
        predictions = np.argmax(output[:, -1], axis=-1)

    print("spike_layer.dt: ", spike_layer.dt)
    # Evaluate Test Data
    seq_model_mae = mean_absolute_error(y_test, snn_pred_test)
    print("MAE of seq_model = ", seq_model_mae)
    seq_model_rmse = mean_absolute_error(y_test, snn_pred_test)**0.5
    print("RMSE of seq_model = ", seq_model_rmse)
    seq_model_mape = 100*mean_absolute_percentage_error(y_test, snn_pred_test)
    print("MAPE of seq_model = ", seq_model_mape)

    # Start Output Spike Plot.
    num_neurons = 10
    print("spikes for spiking_activation: ", spikes_dict['spiking_activation'])
    spikes_matrix = spikes_dict['spiking_activation']*20*modify_dt
    print("spikes_matrix.shape: ", spikes_matrix.shape)
    spikes_matrix = spikes_matrix.reshape(spikes.shape[0], n_steps)
    print("spikes_matrix.shape: ", spikes_matrix.shape)
    neurons = np.random.choice(spikes_matrix.shape[1], num_neurons, replace=True)
    print("neurons: ", neurons)
    print("neurons.shape: ", neurons.shape)

    fig, ax = plt.subplots(figsize=(14, 12), facecolor="#00FFFF")
    color = cm.get_cmap('tab10')(0)
    timesteps = np.arange(n_steps)
    print("timesteps shape: ", timesteps.shape)
    print("timesteps: {0}".format(timesteps))
    for i in range(num_neurons):
      print("i: ", i)
      indices = np.where(spikes_matrix[:, i] != 0)
      list_of_indices = [list(row) for row in zip(*indices)]
      print("list of indices: ", list_of_indices)
      flat_list = []
      for row in list_of_indices:
          flat_list.extend(row)
      print("flat_list: ", flat_list)
      #print("spike matrix where shape: ", len(np.where(spikes_matrix[:, i] != 0)))
      #output_spikes_list = list((np.where(spikes_matrix[:, i] != 0)))
      #print("output_spikes_list: ", output_spikes_list)
      #print(timesteps[np.where(spikes_matrix[:, i] != 0)])
      for t in flat_list:
        #if (t < n_steps):
        ax.plot([t, t], [i+0.5, i+1.5], color=color)
          #ax.eventplot([t,], [i + 0.5, i + 0.5])
        #ax.eventplot([t, t], lineoffsets=[i+0.5, i+1.5], linelengths=1, colors=color)

    ax.set_ylim(0.5, num_neurons+0.5)
    #ax.set_yticks(list(range(1, num_neurons+1, int(np.ceil(num_neurons/50)))))
    ax.set_yticks(list(range(1, num_neurons + 1)))  # Adjust y-axis ticks
    #ax.set_xticks(list(range(1, n_steps+1, 10)))
    ax.set_ylabel("Neuron Index", fontsize = 15)
    ax.set_xlabel("Time in $ms$", fontsize = 15)
    ax.set_title("Dense Spiking Activation Layer Spikes Plot", fontsize = 15)

In [None]:
#check_output(spiking_model, X_train, X_test, y_train, y_test)

In [None]:
#check_output(spiking_model, X_train, X_test, y_train, y_test, modify_dt = 100)

## Plotting Residuals for Actual and Predicted Results for Rate-Based SNN.

In [None]:
actual_arr_SNN = np.array(results_SNN['Actual'])
predicted_arr_SNN = np.array(results_SNN['Predicted'])
residuals_SNN = predicted_arr_SNN - actual_arr_SNN
plt.scatter(results_SNN['Date'], residuals_SNN)
plt.axhline(y=0, color = 'black')
plt.title("Residual Plots for SNN in PeMS {} Dataset.".format(feature_field))
plt.xlabel("Time")
plt.ylabel("Residual")
plt.show()

From the residual plots for the RNN model, it can be concluded that the RNN model is a good fit.

# Spiking Network Model with LIF.

## Create SNN with LIF Class.

### Creating LIF Neuron Class.

In [None]:
class LIFNeuron(tf.keras.layers.Layer):
    def __init__(self, threshold=0.5, leak_rate=0.001, **kwargs):
        super(LIFNeuron, self).__init__(**kwargs)
        self.threshold = threshold
        self.leak_rate = leak_rate

    def build(self, input_shape):
        self.membrane_potential = self.add_weight(
            name='membrane_potential',
            shape=(input_shape[-1],),  # Shape based on input features
            initializer='zeros',
            trainable=False
        )

    def call(self, inputs):
        input_shape = tf.shape(inputs)
        batch_size = input_shape[0]
        seq_len = input_shape[1]
        num_features = input_shape[2]

        # Initialize batch of membrane potentials
        batch_membrane_potential = tf.tile(
            tf.expand_dims(self.membrane_potential, axis=0),
            [batch_size, 1]
        )

        # Initialize output tensor
        outputs = tf.TensorArray(dtype=tf.float32, size=seq_len)

        # Iterate over time steps using tf.while_loop
        t = tf.constant(0)
        while_condition = lambda t, outputs, batch_membrane_potential: tf.less(t, seq_len)

        def while_body(t, outputs, batch_membrane_potential):
            current_input = inputs[:, t, :]
            batch_membrane_potential = batch_membrane_potential * (1 - self.leak_rate) + current_input
            spikes = tf.cast(batch_membrane_potential >= self.threshold, dtype=tf.float32)
            batch_membrane_potential = batch_membrane_potential * (1 - spikes)
            outputs = outputs.write(t, spikes)
            t = tf.add(t, 1)
            return t, outputs, batch_membrane_potential

        _, outputs, _ = tf.while_loop(while_condition, while_body, [t, outputs, batch_membrane_potential])

        outputs = outputs.stack()  # Stack the outputs from the TensorArray
        outputs = tf.transpose(outputs, [1, 0, 2])  # Reshape to (batch_size, seq_len, num_features)

        return outputs

### Create SpikingActivation Class.

Due to the depreciation of keras_spiking.SpikingActivation, a custom SpikingActivation class is provided below prior to creating the model.

In [None]:
class SpikingActivationCell(tf.keras.layers.Layer):
    def __init__(self, activation='tanh', threshold=1.0, **kwargs):
        super(SpikingActivationCell, self).__init__(**kwargs)
        self.activation = activation
        self.threshold = threshold
        self.state_size = 1  # State to store membrane potential

    def build(self, input_shape):
        # The membrane_potential should have the shape of the input features.
        self.membrane_potential = self.add_weight(
            name='membrane_potential',
            shape=(input_shape[-1],),  # Match the input feature shape
            initializer='zeros',
            trainable=False
        )
        super(SpikingActivationCell, self).build(input_shape)

    def call(self, inputs, states):
        prev_membrane_potential = states[0]

        # Apply activation function
        activated_input = tf.keras.activations.get(self.activation)(inputs)

        # Update membrane potential
        membrane_potential = prev_membrane_potential + activated_input

        # Generate spikes based on threshold
        spikes = tf.cast(membrane_potential >= self.threshold, dtype=tf.float32)

        # Reset membrane potential after spike
        membrane_potential = membrane_potential - spikes * self.threshold

        # Return the spikes and the updated membrane potential as the new state
        return spikes, [membrane_potential]

class SpikingActivationLayer(tf.keras.layers.RNN):
    def __init__(self, activation='tanh', threshold=1.0, **kwargs):
        cell = SpikingActivationCell(activation=activation, threshold=threshold)
        super(SpikingActivationLayer, self).__init__(cell, **kwargs)

### Create Model

In [None]:
spiking_LIF_model = tf.keras.Sequential(
    [
        # Add temporal dimension to model. For this dataset, the input_shape is
        # passed into the model.
        tf.keras.layers.Reshape((-1, X_train.shape[1]),
                                input_shape=(None, X_train.shape[1])),
        # Keras' TimeDistributed wrapper allows Dense layer to operate on
        # temporal data.
        tf.keras.layers.TimeDistributed(tf.keras.layers.Dense(128)),
        # SpikingActivation: Function replaces any activation function with it's
        # Spiking equivalent. In addition to activation functions, the timestep
        # (dt) can be changed to run the spiking neurons for longer periods of
        # time. spiking_aware_training is set to True as it helps in the
        # regularization.
        #keras_spiking.SpikingActivation("tanh", dt = 100,
        #                                spiking_aware_training=True),
        #tf.keras.layers.Activation('tanh'),  # Non-linearity before LIF
        #LIFNeuron(threshold=0.5, leak_rate=0.0001),
        #LIFNeuron(threshold=0.5, leak_rate=0.001),
        #tf.keras.layers.Dropout(0.1),  # Added dropout
        #LIFNeuron(threshold=0.7, leak_rate=0.02),
        LIFNeuron(threshold=0.5, leak_rate=0.01),
        #tf.keras.layers.TimeDistributed(tf.keras.layers.Dense(64)),
        #tf.keras.layers.Activation('tanh'),  # Non-linearity before LIF
        #LIFNeuron(threshold=0.5, leak_rate=0.01),
        #tf.keras.layers.Dropout(0.009),  # Added dropout
        #LIFNeuron(threshold=0.5, leak_rate=0.01),
        #tf.keras.layers.Dropout(0.1),  # Added dropout
        #LIFNeuron(threshold=1.0, leak_rate=0.01),
        #tf.keras.layers.Dropout(0.1),  # Added dropout
        #RateBasedSoftmax(time_window=10),
        # GlobalAveragePooling1D: Averages spiking output over time. Used for
        # determining the spiking rate of neuron.
        #keras_spiking.SpikingActivation("tanh", dt = 100,
        #                                spiking_aware_training=True),
        #tf.keras.layers.Activation(tf.keras.activations.tanh),
        #SpikingActivation(activation=tf.keras.activations.tanh, dt = 100,
        #                  spiking_aware_training=True),
        #spiking_LIFv3_model.add(SpikingActivation(activation=tf.keras.activations.tanh, dt=1.0, spiking_aware_training=True)),
        tf.keras.layers.GlobalAveragePooling1D(),
        #tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(1),
    ]
)

spiking_LIF_model.summary()

Number of Neurons for Dense Layer

In [None]:
dense_layer = spiking_LIF_model.layers[1].layer  # Access the Dense layer within TimeDistributed
num_dense_neurons = dense_layer.units
print(f"Number of neurons in Dense layer: {num_dense_neurons}")

Number of Neurons for LIF Layer

In [None]:
#lif_layer = spiking_LIF_model.layers[2].layer  # Access the LIFNeuron layer
#num_lif_neurons = lif_layer.units
#print(f"Number of neurons in LIFNeuron layer: {num_lif_neurons}")

## Training SNN LIF Model.

In [None]:
# Compile the model
optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
spiking_LIF_model.compile(optimizer='adam', loss=loss)
#spiking_LIF_model.compile(optimizer=optimizer, loss=loss)
spiking_model_LIF_epochs = spiking_LIF_model.fit(X_train,y_train,epochs=epochs,
                                                 batch_size=batch_size)
plot_training_loss(spiking_model_LIF_epochs, "SNN")

## Evaluate SNN LIF Model using MAE, RMSE, and R2.

In [None]:
print("Dense LIF SNN Predictions for Training Set")
snn_LIF_pred_train = spiking_LIF_model.predict(X_train)
print("Dense LIF SNN Predictions for Testing Set")
snn_LIF_pred_test = spiking_LIF_model.predict(X_test)
print(snn_LIF_pred_test.shape)
snn_LIF_mae = mean_absolute_error(y_test, snn_LIF_pred_test)
print("MAE of LIF SNN  model = ",snn_LIF_mae)
snn_LIF_rmse =  mean_squared_error(y_test, snn_LIF_pred_test) ** 0.5
print("RMSE of LIF SNN model = ",snn_LIF_rmse)
#rnn_r2_score = r2_score(y_test,rnn_predictions_test)
#print("R2 Score of RNN model = ",rnn_r2_score)
snn_LIF_mape_score = 100*mean_absolute_percentage_error(y_test, snn_LIF_pred_test)
print("MAPE Score of LIF SNN model = {0} %".format(snn_LIF_mape_score))

## Reverse Transform Scaler to Convert to Real Values.

In [None]:
y_train_inverse = scaler.inverse_transform(y_train.reshape(-1, 1))
snn_LIF_pred_inv_train = scaler.inverse_transform(snn_LIF_pred_train)

y_test_inverse = scaler.inverse_transform(y_test.reshape(-1, 1))
snn_LIF_pred_inv_test = scaler.inverse_transform(snn_LIF_pred_test)

# Get values after inverse transformation
y_train_inverse = y_train_inverse.flatten()
snn_LIF_pred_inv_train = snn_LIF_pred_inv_train.flatten()

y_test_inverse = y_test_inverse.flatten()
snn_LIF_pred_inv_test = snn_LIF_pred_inv_test.flatten()

## Show Actual and Predicted y values as Dataframes.

### With Training Dates.

In [None]:
first_6169_index_dates = df.index[:X_train.shape[0]]
results_SNN_LIF_train = pd.DataFrame({"Date": first_6169_index_dates,
                                  'Actual': y_train_inverse,
                                  'Predicted': snn_LIF_pred_inv_train})
results_SNN_LIF_train

### With Testing Dates.

In [None]:
last_6169_index_dates = df.index[-X_test.shape[0]:]
results_SNN_LIF_test = pd.DataFrame({"Date":last_6169_index_dates,
                                 'Actual': y_test_inverse,
                                 'Predicted': snn_LIF_pred_inv_test})
results_SNN_LIF_test

## Plotting Predictions for PeMS Dataset Output made by SNN LIF Model.

### Only Testing Data.

In [None]:
plt.figure(figsize=(16,6))
plt.plot(y_test, color='blue',label='Actual {0}'.format(feature))
plt.plot(snn_LIF_pred_test, alpha=0.7, color='yellow',
         label='Predicted {0}'.format(feature))
plt.axhspan(0, 1, facecolor='gray', alpha=0.3)
plt.title("{0} Predictions made by SNN LIF model for {1}".
          format(feature, title_name), fontsize = 20)
plt.rc('xtick', labelsize=20)
plt.rc('ytick', labelsize=20)
plt.locator_params(axis='x', nbins=10)
plt.locator_params(axis='y', nbins=10)
plt.xlabel('Time', fontsize = 20)
plt.ylabel(feature_norm, fontsize = 20)
plt.legend()
plt.show()

### Training and Testing Data combined.

In [None]:
plt.plot(results_SNN_LIF_train['Date'], results_SNN_LIF_train['Actual'], color='red',
         label='Actual {0} (Training Case)'.format(feature))
plt.plot(results_SNN_LIF_train['Date'],  results_SNN_LIF_train['Predicted'], alpha=0.7,
         color='green', label='Predicted {0} (Training Case)'.format(feature))
plt.plot(results_SNN_LIF_test['Date'], results_SNN_LIF_test['Actual'], color='blue',
         label='Actual {0} (Test Case)'.format(feature))
plt.plot(results_SNN_LIF_test['Date'], results_SNN_LIF_test['Predicted'], alpha=0.7,
         color='yellow', label='Predicted {0} (Test Case)'.format(feature))
plt.axhspan(0, 1, facecolor='gray', alpha=0.3)
plt.title("{0} Predictions made by SNN LIF model for {1}".
          format(feature, title_name), fontsize = 20)
plt.xlabel('Time', fontsize = 10)
plt.rc('xtick', labelsize=10)
plt.rc('ytick', labelsize=10)
plt.locator_params(axis='x', nbins=10)
plt.locator_params(axis='y', nbins=10)
plt.ylim([40, 70])
plt.ylabel(feature_norm, fontsize = 20)
plt.legend()
plt.show()

## Combine Actual and Predicted Results for Train and Test Cases for SNN LIF Model.

In [None]:
results_SNN_LIF = [results_SNN_LIF_train, results_SNN_LIF_test]
results_SNN_LIF = pd.concat(results_SNN_LIF)
display(results_SNN_LIF)

## Plot Output Spikes
Code taken from https://pythontic.com/visualization/charts/spikerasterplot#:~:text=A%20Spike%20raster%20plot%20is%20a%20plot%20used,potential%20from%20a%20neuron%20at%20a%20specified%20position.

In [None]:
#check_output(spiking_LIF_model, X_train, X_test, y_train, y_test)

## Plotting Residuals for Actual and Predicted Results for LIF SNN.

In [None]:
actual_arr_SNN_LIF = np.array(results_SNN_LIF['Actual'])
pred_arr_SNN_LIF = np.array(results_SNN_LIF['Predicted'])
residuals_SNN_LIF = pred_arr_SNN_LIF - actual_arr_SNN_LIF
plt.scatter(results_SNN_LIF['Date'], residuals_SNN_LIF)
plt.axhline(y=0, color = 'black')
plt.title("Residual Plots for LIF SNN in PeMS {} Dataset.".format(feature_field))
plt.xlabel("Time")
plt.ylabel("Residual")
plt.show()

# Spiking Network Model with LIF (Version 4)

## Create New SNN and LIF Class

### New LIF Class using Refractory Period

In [None]:
class LIFNeuronV3(tf.keras.layers.Layer):
  def __init__(self, threshold=1.0, leak_rate=0.05, refractory_period=5, **kwargs):
      super(LIFNeuronV3 , self).__init__(**kwargs)
      self.threshold = threshold
      self.leak_rate = leak_rate
      self.refractory_period = refractory_period
      self.membrane_potential = None
      self.return_sequences = True # Flag to return output sequence

  def build(self, input_shape):
    self.num_neurons = input_shape[-1]  # Store the number of neurons
    self.membrane_potential = self.add_weight(
          name='membrane_potential',
          #shape=(input_shape[0], input_shape[-1]),
          #shape=(None, self.num_neurons),
          #shape=(input_shape[1], input_shape[-1]),
          #shape=(seq_len, num_features),  # Use Python integers
          shape=(1, self.num_neurons),
          initializer='zeros',
          trainable=False

      )

    self.refractory_counter = self.add_weight(
            name='refractory_counter',
            shape = (),
            #shape=(self.num_neurons,),
            #shape=(1, self.num_neurons),
            initializer='zeros',
            trainable=False,
            dtype=tf.int32  # Ensure refractory_counter is an integer type
        )


  def call(self, inputs):
      input_shape = tf.shape(inputs)
      inputs = tf.reshape(inputs, (input_shape[0], input_shape[1], input_shape[2]))

      # Replace for loop with tf.while_loop
      def while_body(t, sample_outputs_ta, membrane_potential, refractory_counter, sample):
          current_input = tf.gather(sample, t, axis=0)  # Get input at time step t
          membrane_potential = membrane_potential * (1 - self.leak_rate) + current_input

          # Check refractory period
          spikes = tf.cond(
              refractory_counter > 0,
              lambda: tf.zeros_like(membrane_potential),
              lambda: tf.cast(membrane_potential >= self.threshold, dtype=tf.float32)
          )

          # Reset potential and update refractory counter
          reset_condition = tf.reduce_any(spikes > 0)
          membrane_potential = tf.cond(reset_condition,
                                      lambda: membrane_potential * (1 - spikes),
                                      lambda: membrane_potential)

          refractory_counter = tf.cond(reset_condition,
                                      #lambda: self.refractory_period,
                                      lambda: tf.constant(self.refractory_period, dtype=tf.int32),
                                      lambda: tf.maximum(0, refractory_counter - 1))

          # Reshape spikes before writing to TensorArray
          spikes = tf.reshape(spikes, (1, self.num_neurons))  # Shape as (1, num_neurons)
          sample_outputs_ta = sample_outputs_ta.write(t, spikes)

          return tf.add(t, 1), sample_outputs_ta, membrane_potential, refractory_counter, sample

      # Initialize loop variables
      t = tf.constant(0)
      max_time_steps = input_shape[1]

      # Execute the loop for each sample in the batch using tf.map_fn
      def process_sample(sample):
          # Initialize TensorArray inside process_sample
          sample_outputs_ta = tf.TensorArray(dtype=tf.float32, size=0, dynamic_size=True)

          _, sample_outputs_ta, _, _, _ = tf.while_loop(
              lambda t, *_: tf.less(t, max_time_steps),
              while_body,
              [t, sample_outputs_ta, self.membrane_potential, self.refractory_counter, sample]
          )
          sample_outputs = sample_outputs_ta.stack()
          return sample_outputs

      # Use tf.map_fn to process each sample in the batch
      outputs = tf.map_fn(process_sample, inputs, dtype=tf.float32)

      return outputs

### Create Model

In [None]:
spiking_LIFv3_model = tf.keras.Sequential(
    [
        # Add temporal dimension to model. For this dataset, the input_shape is
        # passed into the model.
        tf.keras.layers.Reshape((-1, X_train.shape[1]),
                                input_shape=(None, X_train.shape[1])),
        # Keras' TimeDistributed wrapper allows Dense layer to operate on
        # temporal data.
        tf.keras.layers.TimeDistributed(tf.keras.layers.Dense(128)),
        #tf.keras.layers.Dense(64, activation='tanh'),
        #LIFNeuronV3(threshold=1.25, leak_rate=0.06, refractory_period=5),
        #tf.keras.layers.Activation('tanh'),  # Non-linearity before LIF
        LIFNeuronV3(threshold=0.5, leak_rate=0.01, refractory_period=275),
        tf.keras.layers.Dropout(0.2),
        tf.keras.layers.Reshape((-1, 128)),  # Reshape to (batch_size, time_steps, features)

        #tf.keras.layers.TimeDistributed(tf.keras.layers.Dense(64)),
        #tf.keras.layers.Activation('tanh'),  # Non-linearity before LIF
        #LIFNeuronV3(threshold=0.5, leak_rate=0.01, refractory_period=5),


        #tf.keras.layers.Dropout(0.01),  # Added dropout
        # Use tf.keras.layers.Permute to swap the time_steps and num_neurons dimensions
        #tf.keras.layers.Permute((2, 1)),  # Now shape is (batch_size, num_neurons, time_steps)
        #tf.keras.layers.Reshape((-1, 64)),  # Reshape to (batch_size, time_steps, num_neurons)
        # GlobalAveragePooling1D: Averages spiking output over time. Used for
        # determining the spiking rate of neuron.
        #keras_spiking.SpikingActivation("tanh", dt = 100,
        #                                spiking_aware_training=True),
        #SpikingActivation(activation=tf.keras.activations.tanh, dt = 100,
        #                  spiking_aware_training=True),
        tf.keras.layers.GlobalAveragePooling1D(),
        #tf.keras.layers.GlobalAveragePooling1D(data_format='channels_last'),
        #tf.keras.layers.Dense(future),
        #tf.keras.layers.Flatten(),  # Add Flatten layer
        #tf.keras.layers.Reshape((batch_size, 1)),  # Reshape to (batch_size, 1)
        #tf.keras.layers.Lambda(lambda x: x[:, -1]),
        tf.keras.layers.Dense(1),
        #tf.keras.layers.Dense(1, input_shape=(X_train.shape[1] * 64, )),

    ]

)

spiking_LIFv3_model.summary()

## Training SNN V4 Model.

In [None]:
# Compile the model
optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
spiking_LIFv3_model.compile(optimizer='adam', loss=loss)
spiking_LIFv3_model.compile(optimizer=optimizer, loss=loss)
spiking_LIFv3_model_epochs = spiking_LIFv3_model.fit(X_train,y_train,epochs=epochs,
                                                 batch_size=batch_size)
plot_training_loss(spiking_LIFv3_model_epochs, "SNN LIF with Refractory Period and Output Time Steps")

## Evaluating SNN V4 Model using MAE, RMSE

In [None]:
print("SNN v4 Predictions for Training Set")
spiking_LIFv3_model_pred_train = spiking_LIFv3_model.predict(X_train)
print("SNN v4 Predictions for Training Set")
spiking_LIFv3_model_pred_test = spiking_LIFv3_model.predict(X_test)
print(spiking_LIFv3_model_pred_test.shape)

# Reshape the model predictions if necessary
#spiking_LIFv3_model_pred_test = spiking_LIFv3_model_pred_test.reshape(y_test.shape)

spiking_LIFv3_model_mae = mean_absolute_error(y_test, spiking_LIFv3_model_pred_test)
print("MAE of SNN v4 model = ",spiking_LIFv3_model_mae)
spiking_LIFv3_model_rmse =  mean_squared_error(y_test, spiking_LIFv3_model_pred_test) ** 0.5
print("RMSE of SNN v4  model = ", spiking_LIFv3_model_rmse)
spiking_LIFv3_model_r2_score = r2_score(y_test, spiking_LIFv3_model_pred_test)
print("R2 Score of SNN v4 model = ", spiking_LIFv3_model_r2_score)
spiking_LIFv3_model_mape_score = 100*mean_absolute_percentage_error(y_test,
                                                                    spiking_LIFv3_model_pred_test)
print("MAPE Score of SNN v4 model = {0} %".format(spiking_LIFv3_model_mape_score))

# Compare Predictions

* For both models, we see that the predicted values are close to the actual values, which means that the models perform well in predicting the sequence.

## Training

In [None]:
#plt.figure(figsize=(15,8))

#plt.plot(y_train, c="black", linewidth=3, label="Original values")
#plt.plot(rnn_predictions_train, c="red", linewidth=3, label="LSTM predictions")
#plt.axhspan(0, 1, facecolor='gray', alpha=0.3)
#plt.legend()
#plt.xlabel('Hour')
#plt.ylabel('Q')
#plt.title("Predictions(SNN) vs Actual Data (Training)",
#          fontsize=20)
#plt.show()

## Testing

In [None]:
#plt.figure(figsize=(15,8))

#plt.plot(y_test, c="black", linewidth=3, label="Original values")
#plt.plot(rnn_predictions_test, c="red", linewidth=3, label="LSTM predictions")
#plt.axhspan(0, 1, facecolor='gray', alpha=0.3)
#plt.legend()
#plt.xlabel('Hour', fontsize = 20)
#plt.ylabel('Q', fontsize = 20)
#plt.title("Predictions(SNN) vs Actual Data (Test)",
#          fontsize=20)
#plt.show()

# Forecasting Future Data for all models

## Essential Functions.

In [None]:
# this section for unknown future.
future=seq_len
forecast_SNN = []
Xin = X_test[:,:,:]
Xin

In [None]:
def insert_end(Xin, new_input, timestep):
    for i in range(timestep-1):
        Xin[:,i,:] = Xin[:,i+1,:]
    Xin[:,timestep-1,:] = new_input
    return Xin

def forecast_data(Xin, model_type, forecast, future_num):
    time=[]
    data_pts = 24*future_num
    for j in range(data_pts):
      out = model_type.predict(Xin, batch_size=1000)
      forecast.append(out[j, 0])
      Xin = insert_end(Xin, out[j, 0], future_num)
      time_current = pd.to_datetime(df_norm.index[-1])+timedelta(hours=j)
      if (j % 100 == 0):
        print("j = {0}".format(j))
        print("forecast: {0}".format(forecast))
        print("time_current: ",  time_current)
      time.append(time_current)


    forcasted_output=np.asanyarray(forecast)
    forcasted_output=forcasted_output.reshape(-1,1)
    forcasted_output = scaler.inverse_transform(forcasted_output)
    forcasted_output = pd.DataFrame(forcasted_output)
    date = pd.DataFrame(time)
    forecast_result = pd.concat([date, forcasted_output],axis=1)
    forecast_result.columns = "Date","Forecasted"

    return forecast_result

In [None]:
# Plotting Predictions for Transmitted/Received Bytes made by Vanilla RNN Model.
def plot_forecast(forecast_set, model_name, mode, min=30, max=70):
    plt.figure(figsize=(16,6))
    plt.plot(forecast_set['Date'], forecast_set['Forecasted'], alpha=0.7,
             color='teal', label='Forecasted {0}'.format(feature))
    plt.axhspan(0, 1, facecolor='gray', alpha=0.3)
    plt.title('{0} Forecast for {1} using {2}'.
              format(feature_field, mode, model_name), fontsize = 15)
    plt.locator_params(axis='x', nbins=10)
    plt.locator_params(axis='y', nbins=10)
    plt.ylim([min, max])
    plt.xlabel('Time', fontsize = 20)
    plt.ylabel(feature_field, fontsize = 20)
    plt.rc('xtick', labelsize=10)
    plt.rc('ytick', labelsize=10)
    plt.legend()
    plt.show()

In [None]:
# Plotting Predictions for VHT/VMT/Q.
def plot_with_forecast(train_set, test_set, forecast_set, model_name, mode,
                       min=40, max=70):
    plt.figure(figsize=(16,6))
    plt.plot(train_set['Date'], train_set['Actual'], color='red',
             label='Actual {0} (Training Case)'.format(feature))
    plt.plot(train_set['Date'], train_set['Predicted'], alpha=0.7,
             color='green',
             label='Predicted {0} (Training Case)'.format(feature))
    plt.plot(test_set['Date'], test_set['Actual'], color='blue',
            label='Actual {0} (Test Case)'.format(feature))
    plt.plot(test_set['Date'], test_set['Predicted'], alpha=0.7, color='yellow',
             label='Predicted {0} (Test Case)'.format(feature))
    plt.plot(forecast_set['Date'], forecast_set['Forecasted'], alpha=0.7,
             color='teal', label='Forecasted {0}'.format(feature))
    plt.axhspan(0, 1, facecolor='gray', alpha=0.3)
    plt.title("{0} Predictions and Forecasts made by {1} model for {2}".
              format(feature_field, model_name, title_name), fontsize = 15)
    plt.locator_params(axis='x', nbins=10)
    plt.locator_params(axis='y', nbins=10)
    plt.rc('xtick', labelsize=10)
    plt.rc('ytick', labelsize=10)
    plt.ylim([min, max])
    plt.xlabel('Time', fontsize = 20)
    plt.ylabel('Normalized {0} Scale'.format(feature_field), fontsize = 20)
    plt.legend()
    plt.show()

## Forecasting Future Data for SNN.

In [None]:
forecast_result_snn = forecast_data(Xin, spiking_model, forecast_SNN, future)

In [None]:
forecast_result_snn

In [None]:
# Plot Forecast combined with Training and Testing Data.
plot_with_forecast(results_SNN_train, results_SNN_test, forecast_result_snn,
                   'SNN', title_name, min=20, max=80)

In [None]:
# Only Forecast Data for closer look.
plot_forecast( forecast_result_snn,
              'SNN', title_name, min=20, max=80)

# Creating a pickle file for all classifier models.

## Mount model_pkl folder.

In [None]:
#os.chdir('/content/drive/My Drive/VT/Research/TSC_datasets/model_pkl')

## Send model to pkl file.

In [None]:
#filename_rnn = 'TSC_apdatasets-prediction-spiking-model_vehicle_hours.pkl'
#pickle.dump(spiking_model, open(filename_rnn, 'wb'))

# Convert to PDF.

In [None]:
#!pip install nbconvert
#!apt-get-qq install texlive texlive-xetex texlive-latex-extra pandoc
#!pip install--quiet pypandoc

In [None]:
#!jupyter nbconvert --to latex TSC_pems_models_Spiking_v2.ipynb
#!jupyter nbconvert--to PDF "/content/drive/My Drive/VT/Research/TSC_datasets/TSC_Program_Implementation/PeMS Dataset/TSC_pems_models_Spiking_v2.ipynb"

In [None]:
#files.download('your_notebook_name.tex')