In [1]:
# Based on https://viso.ai/deep-learning/what-are-liquid-neural-networks/

import numpy as np
import tensorflow as tf
from tensorflow import keras

In [2]:
def initialize_weights(input_dim, reservoir_dim, output_dim, spectral_radius):
    # initialize input weights randomly
    reservoir_weights = np.random.randn(reservoir_dim, reservoir_dim)
    # scale reservoir weights to achieve desired spectral radius
    reservoir_weights *= spectral_radius / np.max(np.abs(np.linalg.eigvals(reservoir_weights)))
    # initialize input-to-reservoir weights randomly
    input_weights = np.random.randn(reservoir_dim, input_dim)
    # initialize output weights to zero
    output_weights = np.zeros((reservoir_dim, output_dim))
    
    return reservoir_weights, input_weights, output_weights

In [3]:
def train_lnn(input_data, labels, reservoir_weights, input_weights, output_weights, leak_rate, num_epochs):
    num_samples = input_data.shape[0]
    reservoir_dim = reservoir_weights.shape[0]
    reservoir_states = np.zeros((num_samples, reservoir_dim))
    
    for epoch in range(num_epochs):
        for i in range(num_samples):
            # update reservoir state
            if i > 0:
                reservoir_states[i, :] = (1 - leak_rate) * reservoir_states[i - 1, :]
            reservoir_states[i, :] += leak_rate * np.tanh(
                np.dot(reservoir_weights, reservoir_states[i, :]) + 
                np.dot(input_weights, input_data[i, :]))
            
        # train output weights
        output_weights = np.dot(np.linalg.pinv(reservoir_states), labels)
        # compute training accuracy
        train_prediction = np.dot(reservoir_states, output_weights)
        train_accuracy = np.mean(np.argmax(train_prediction, axis=1) == np.argmax(labels, axis=1))
        print(f'Epoch {epoch + 1}/{num_epochs}, training accuracy: {train_accuracy:.4f}')
        
    return output_weights

In [4]:
def predict_lnn(input_data, reservoir_weights, input_weights, output_weights, leak_rate):
    num_samples = input_data.shape[0]
    reservoir_dim = reservoir_weights.shape[0]
    reservoir_states = np.zeros((num_samples, reservoir_dim))
    
    for i in range(num_samples):
        # update reservoir state
        if i > 0:
            reservoir_states[i, :] = (1 - leak_rate) * reservoir_states[i - 1, :]
        reservoir_states[i, :] += leak_rate * np.tanh(
            np.dot(reservoir_weights, reservoir_states[i, :]) + 
            np.dot(input_weights, input_data[i, :]))
        
    # compute predictions using output weights
    predictions = np.dot(reservoir_states, output_weights)
    return predictions

In [5]:
# set LNN hyperparameters
input_dim = 784
reservoir_dim = 1_000
output_dim = 10
leak_rate = 0.1
spectral_radius = 0.9
num_epochs = 10
input_scale = 1.0 / 255.0

In [6]:
# load and preprocess MNIST dataset
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()
y_train = keras.utils.to_categorical(y_train)
y_test = keras.utils.to_categorical(y_test)
x_train = x_train.reshape(x_train.shape[0], input_dim) * input_scale
x_test = x_test.reshape(x_test.shape[0], input_dim) * input_scale

In [None]:
# initialize LNN weights
reservoir_weights, input_weights, output_weights = initialize_weights(
    input_dim, reservoir_dim, output_dim, spectral_radius)

In [None]:
# train LNN
output_weights = train_lnn(
    x_train, y_train, reservoir_weights, input_weights, output_weights, leak_rate, num_epochs)

In [None]:
# evaluate the LNN on test data
predictions = predict_lnn(
    x_test, reservoir_weights, input_weights, output_weights, leak_rate)
test_accuracy = np.mean(np.argmax(predictions, axis=1) == np.argmax(y_test, axis=1))
print(f'Test accuracy: {test_accuracy:.4f}')