In [10]:
import torch
import torch.nn as nn
import numpy as np
from typing import Tuple, List
from dataclasses import dataclass


In [11]:
@dataclass
class ReservoirConfig:
    """Configuration for the pendulum reservoir"""
    sampling_rate: int = 10  # κ in the paper
    cycles_per_sample: int = 20  # N in the paper
    memory_length: int = 100  # m in the paper, for temporal tasks
    damping: float = 0.05  # k in the paper
    length: float = 1.0  # l in the paper
    dt: float = 0.01  # integration time step
    force_min: float = 1.0  # minimum force amplitude
    force_max: float = 2.0  # maximum force amplitude
    base_frequency: float = 1.0  # ω in the paper

In [12]:

class PendulumReservoir:
    def __init__(self, config: ReservoirConfig):
        self.config = config
        self.g = 9.81  # gravitational acceleration
        
    def _integrate_pendulum(self, 
                           x0: torch.Tensor, 
                           v0: torch.Tensor, 
                           force_amp: torch.Tensor, 
                           force_freq: torch.Tensor,
                           n_steps: int) -> Tuple[torch.Tensor, torch.Tensor]:
        """Integrate pendulum equations using RK4 method"""
        dt = self.config.dt
        x, v = x0, v0
        
        for _ in range(n_steps):
            # RK4 integration
            k1x = v
            k1v = (-self.g/self.config.length * torch.sin(x) 
                   - self.config.damping * v 
                   + force_amp * torch.sign(torch.sin(force_freq * _ * dt)))
            
            k2x = v + 0.5 * dt * k1v
            k2v = (-self.g/self.config.length * torch.sin(x + 0.5 * dt * k1x)
                   - self.config.damping * (v + 0.5 * dt * k1v)
                   + force_amp * torch.sign(torch.sin(force_freq * (_ + 0.5) * dt)))
            
            k3x = v + 0.5 * dt * k2v
            k3v = (-self.g/self.config.length * torch.sin(x + 0.5 * dt * k2x)
                   - self.config.damping * (v + 0.5 * dt * k2v)
                   + force_amp * torch.sign(torch.sin(force_freq * (_ + 0.5) * dt)))
            
            k4x = v + dt * k3v
            k4v = (-self.g/self.config.length * torch.sin(x + dt * k3x)
                   - self.config.damping * (v + dt * k3v)
                   + force_amp * torch.sign(torch.sin(force_freq * (_ + 1) * dt)))
            
            x = x + (dt/6) * (k1x + 2*k2x + 2*k3x + k4x)
            v = v + (dt/6) * (k1v + 2*k2v + 2*k3v + k4v)
            
        return x, v

    def get_reservoir_state(self, 
                           input_value: torch.Tensor, 
                           encoding: str = 'amplitude') -> torch.Tensor:
        """Get reservoir state for a single input value"""
        # Initialize pendulum state
        x0 = torch.zeros_like(input_value)
        v0 = torch.zeros_like(input_value)
        
        # Set force parameters based on encoding
        if encoding == 'amplitude':
            force_amp = self.config.force_min + (self.config.force_max - self.config.force_min) * input_value
            force_freq = torch.full_like(input_value, self.config.base_frequency)
        else:  # frequency encoding
            force_amp = torch.full_like(input_value, self.config.force_max)
            force_freq = self.config.force_min + (self.config.force_max - self.config.force_min) * input_value
            
        # Calculate number of steps needed
        period = 2 * np.pi / self.config.base_frequency
        steps_per_cycle = int(period / self.config.dt)
        total_steps = steps_per_cycle * self.config.cycles_per_sample
        
        # Get sampled states
        states = []
        for i in range(self.config.cycles_per_sample):
            x, v = self._integrate_pendulum(x0, v0, force_amp, force_freq, steps_per_cycle)
            for _ in range(self.config.sampling_rate):
                states.append(x)
            x0, v0 = x, v
            
        return torch.stack(states)


In [13]:
class ReservoirComputer:
    def __init__(self, config: ReservoirConfig, temporal: bool = False):
        self.config = config
        self.reservoir = PendulumReservoir(config)
        self.temporal = temporal
        self.W = None  # Output weights
        
    def _get_state_matrix(self, 
                         inputs: torch.Tensor, 
                         encoding: str = 'amplitude') -> torch.Tensor:
        """Convert input sequence to state matrix"""
        states = []
        for input_value in inputs:
            state = self.reservoir.get_reservoir_state(input_value, encoding)
            if self.temporal:
                states.append(state)
                if len(states) > self.config.memory_length:
                    states.pop(0)
            else:
                states = [state]
                
        if self.temporal:
            # Weight states by recency
            weights = torch.linspace(0, 1, len(states))
            state_matrix = torch.stack([w * s for w, s in zip(weights, states)])
        else:
            state_matrix = states[0]
            
        return state_matrix.flatten(1)
    
    def fit(self, 
            inputs: torch.Tensor, 
            targets: torch.Tensor, 
            encoding: str = 'amplitude') -> None:
        """Train the reservoir computer"""
        state_matrix = self._get_state_matrix(inputs, encoding)
        # Use pseudo-inverse for regression
        self.W = torch.linalg.pinv(state_matrix) @ targets
        
    def predict(self, 
                inputs: torch.Tensor, 
                encoding: str = 'amplitude') -> torch.Tensor:
        """Make predictions using trained weights"""
        if self.W is None:
            raise ValueError("Model must be trained first")
        state_matrix = self._get_state_matrix(inputs, encoding)
        return state_matrix @ self.W


In [14]:

# Example usage for Task I: Learning a polynomial
def task_1():
    # Configure reservoir
    config = ReservoirConfig()
    rc = ReservoirComputer(config, temporal=False)
    
    # Generate training data
    x = torch.linspace(-3, 3, 50)
    y = (x - 3) * (x - 2) * (x - 1) * x * (x + 1) * (x + 2) * (x + 3)
    
    # Normalize inputs to [0, 1]
    x_norm = (x - x.min()) / (x.max() - x.min())
    
    # Train reservoir
    rc.fit(x_norm, y)
    
    # Generate test data
    x_test = torch.linspace(-3, 3, 10)
    x_test_norm = (x_test - x.min()) / (x.max() - x.min())
    y_pred = rc.predict(x_test_norm)
    
    return x_test, y_pred


In [15]:

# Example usage for Task II: Lorenz attractor reconstruction
def task_2():
    def lorenz(state, dt, sigma=10, rho=28, beta=8/3):
        x, y, z = state
        dx = sigma * (y - x)
        dy = x * (rho - z) - y
        dz = x * y - beta * z
        return torch.tensor([dx, dy, dz]) * dt
    
    # Generate Lorenz data
    dt = 0.01
    n_steps = 5000
    state = torch.tensor([1.0, 1.0, 1.0])
    x_series = []
    z_series = []
    
    for _ in range(n_steps):
        state = state + lorenz(state, dt)
        x_series.append(state[0].item())
        z_series.append(state[2].item())
    
    x_series = torch.tensor(x_series)
    z_series = torch.tensor(z_series)
    
    # Normalize data
    x_norm = (x_series - x_series.min()) / (x_series.max() - x_series.min())
    z_norm = (z_series - z_series.min()) / (z_series.max() - z_series.min())
    
    # Configure and train reservoir
    config = ReservoirConfig()
    rc = ReservoirComputer(config, temporal=True)
    
    # Use first 4000 points for training
    train_len = 4000
    rc.fit(x_norm[:train_len], z_norm[:train_len])
    
    # Predict remaining points
    z_pred = rc.predict(x_norm[train_len:])
    
    return z_series[train_len:], z_pred


In [16]:
if __name__ == "__main__":
    # Run Task I
    x_test, y_pred = task_1()
    print("Task I RMSE:", torch.sqrt(torch.mean((y_pred - (x_test - 3) * (x_test - 2) * (x_test - 1) * 
                                                x_test * (x_test + 1) * (x_test + 2) * (x_test + 3))**2)))
    
    # # Run Task II  
    # z_true, z_pred = task_2()
    # print("Task II RMSE:", torch.sqrt(torch.mean((z_pred - z_true)**2)))

IndexError: Dimension out of range (expected to be in range of [-1, 0], but got 1)

In [9]:
import matplotlib.pyplot as plt

# Visualize results from Task I
plt.figure(figsize=(6,4))
plt.plot(x_test, (x_test - 3)*(x_test - 2)*(x_test - 1)*x_test*(x_test + 1)*(x_test + 2)*(x_test + 3), label='True')
plt.plot(x_test, y_pred, label='Predicted')
plt.title("Task I Results")
plt.legend()
plt.show()

# Visualize results from Task II
plt.figure(figsize=(6,4))
plt.plot(z_true, label='True')
plt.plot(z_pred, label='Predicted')
plt.title("Task II Results")
plt.legend()
plt.show()

NameError: name 'x_test' is not defined

<Figure size 600x400 with 0 Axes>