<a href="https://colab.research.google.com/github/uzl/DySys-ML/blob/main/KoopmanNetwork.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim

In [2]:
class KoopmanNetwork(nn.Module):
    """
    A simple feedforward neural network to estimate Koopman operator embeddings.
    """
    def __init__(self, input_dim, embedding_dim):
        super(KoopmanNetwork, self).__init__()
        # Define the network layers
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, 128),
            nn.ReLU(),
            nn.Linear(128, embedding_dim)
        )
        self.decoder = nn.Sequential(
            nn.Linear(embedding_dim, 128),
            nn.ReLU(),
            nn.Linear(128, input_dim)
        )
        self.koopman_layer = nn.Linear(embedding_dim, embedding_dim, bias=False)

    def forward(self, x):
        # Encode the input to an embedding
        embedding = self.encoder(x)
        # Apply the Koopman layer
        koopman_embedding = self.koopman_layer(embedding)
        # Decode the embedding back to the original space
        reconstruction = self.decoder(koopman_embedding)
        return embedding, koopman_embedding, reconstruction


In [3]:
def train_koopman_network(network, data, lag_time, epochs=100, learning_rate=1e-3):
    optimizer = optim.Adam(network.parameters(), lr=learning_rate)
    criterion = nn.MSELoss()

    for epoch in range(epochs):
        total_loss = 0
        for i in range(data.size(0) - lag_time):
            # Prepare the inputs and targets
            x = data[i]
            y = data[i + lag_time]

            # Zero the parameter gradients
            optimizer.zero_grad()

            # Forward pass
            _, koopman_embedding, _ = network(x)

            # Compute loss using the Koopman embedding and the target
            loss = criterion(koopman_embedding, network.encoder(y))
            total_loss += loss.item()

            # Backward pass and optimize
            loss.backward()
            optimizer.step()

        # Print the average loss every 10 epochs
        if (epoch + 1) % 10 == 0:
            print(f'Epoch {epoch+1}/{epochs}, Loss: {total_loss / (data.size(0) - lag_time)}')

    print('Training complete')


In [4]:
# Example usage:

# Synthetic example data: 100 time steps, 3 features (like the Lorenz system)
trajectory_data = torch.randn(100, 3)

# Define the input dimension (number of features) and the desired embedding dimension
input_dim = trajectory_data.size(1)
embedding_dim = 64  # This can be chosen based on the problem

# Create the Koopman network instance
koopman_net = KoopmanNetwork(input_dim=input_dim, embedding_dim=embedding_dim)

# Train the network on the trajectory data
lag_time = 1  # Set the lag time for the Koopman operator estimation
train_koopman_network(koopman_net, trajectory_data, lag_time, epochs=100, learning_rate=1e-3)


Epoch 10/100, Loss: 0.0005426348098541958
Epoch 20/100, Loss: 0.0008521540117781666
Epoch 30/100, Loss: 0.0003028472215513407
Epoch 40/100, Loss: 0.0003063631932426954
Epoch 50/100, Loss: 0.00016120568761834875
Epoch 60/100, Loss: 0.00015346130675599777
Epoch 70/100, Loss: 0.00010848695729701013
Epoch 80/100, Loss: 7.259551481790065e-05
Epoch 90/100, Loss: 5.8873127631104026e-05
Epoch 100/100, Loss: 3.089531320816751e-05
Training complete
