In [1]:
import numpy as np
import torch
from scipy.integrate import odeint
import torch.nn as nn
import matplotlib.pyplot as plt
from PIL import Image
import torch
import numpy as np
import matplotlib.pyplot as plt
from scipy.integrate import odeint
from torch import nn, autograd
import random
import torch.nn.functional as F
from torch.nn import TransformerEncoder, TransformerEncoderLayer

import math

In [2]:
# Define the device globally
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('Using device:', device)
device = torch.device('cpu')

Using device: cuda


In [3]:
 def duffing_generator_batch( num_batches, x):
        params_list = []  # To store parameters tensors for each batch
        y_physics_list = []  # To store the y_physics tensors for each batch

        for _ in range(num_batches):
            # Randomly generate parameters
            a = random.uniform(-2, 2)
            b = random.uniform(0, 3)
            d = random.uniform(0, 0.5)
            gamma = random.uniform(0, 1.5)
            w = random.uniform(0, 2.5)
            y0 = [0,random.uniform(0,1)]
            #print('this is y0=',y0)
            # Duffing differential equation solver setup
            def duffing(y, t):
                y0, y1 = y
                dydt = [y1, -d * y1 - a * y0 - b * y0**3 + gamma * np.cos(w * t)]
                return dydt

            # Initial conditions and solving the ODE
           # y0 = [0, 0]
            sol = odeint(duffing, y0, x.cpu().squeeze().numpy())  # Ensure x is compatible with odeint
            y = torch.tensor(sol[:, 0], dtype=torch.float32).view(-1, 1)  # y_physics for one batch

            y_physics_list.append(y)

            # Handling parameters similarly if needed
            params = torch.tensor([d, a, b, gamma, w], dtype=torch.float32).view(1, -1).repeat(x.size(0), 1)
            params_list.append(params)
    
           # params_tensor = torch.tensor(params_list)
            #y_tensor = torch.tensor(y_physics_list)
        # Option 1: Return lists directly
        #return params_tensor, y_tensor

            # Option 2: Stack tensors to create a batch dimension explicitly
            params_tensor = torch.stack(params_list, dim=0)  # Shape: [num_batches, x.size(0), 5]
            y_physics_tensor = torch.stack(y_physics_list, dim=0)  # Shape: [num_batches, x.size(0), 1]
    
            return params_tensor, y_physics_tensor

In [4]:
class AdaptedRNN(nn.Module):
    def __init__(self, n_time_features, n_param_features, n_hidden, n_layers):
        super(AdaptedRNN, self).__init__()
        
        # Number of input features for the RNN is the sum of time features and parameter features
        self.rnn_input_features = n_time_features + n_param_features
        
        # RNN Layer
        self.rnn = nn.RNN(input_size=self.rnn_input_features, hidden_size=n_hidden, num_layers=n_layers, batch_first=True)
        
        # Output layer
        self.output_layer = nn.Linear(n_hidden, 1)
        
        # Activation function
        self.activation = nn.Tanh()

    def forward(self, x, params):
        # Reshape x to add a sequence length dimension of 1
        # If x is [500, 1], we reshape it to [500, 1, 1] to match [batch_size, seq_len, n_time_features]
        x = x.unsqueeze(1)  # Adds seq_len dimension
        
        # Since params is [500, 5], we also need to adjust it for concatenation
        # We want params to be [500, 1, 5] to match x's new shape for concatenation
        params = params.unsqueeze(1)  # Adds seq_len dimension
        
        # Now x_combined shape will be [500, 1, 6] after concatenation
        x_combined = torch.cat((x, params), dim=2)
        
        # Process with RNN as before
        rnn_out, _ = self.rnn(x_combined)
        final_output = rnn_out[:, -1, :]  # Taking the last timestep output
        final_output = self.activation(final_output)
        final_output = self.output_layer(final_output)
        return final_output

In [5]:
class AdaptedFCN(nn.Module):
    def __init__(self, n_time_features, n_param_features, n_hidden, n_layers):
        super(AdaptedFCN, self).__init__()
        
        # Number of input features is the sum of time features and parameter features
        self.input_features = n_time_features + n_param_features
        
        # Input layer takes combined time and parameters
        self.input_layer = nn.Linear(self.input_features, n_hidden)
        
        # Hidden layers
        self.hidden_layers = nn.ModuleList()
        for _ in range(n_layers - 1):
            self.hidden_layers.append(nn.Linear(n_hidden, n_hidden))
        
        # Output layer
        self.output_layer = nn.Linear(n_hidden, 1)
        
        # Activation function
        self.activation = nn.Tanh()  # Using Tanh as an example; adjust as needed

    def forward(self, x, params):
        # Concatenate time and parameter features
        x_combined = torch.cat((x, params), dim=1)
        
        # Input layer with activation
        x = self.activation(self.input_layer(x_combined))
        
        # Hidden layers with activation
        for layer in self.hidden_layers:
            x = self.activation(layer(x))
        
        # Output layer without activation (assuming a regression task)
        x = self.output_layer(x)
        return x

In [6]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe = torch.zeros(max_len, 1, d_model)
        pe[:, 0, 0::2] = torch.sin(position * div_term)
        pe[:, 0, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0)]
        return self.dropout(x)

class AdaptedTransformer(nn.Module):
    def __init__(self, n_time_features, n_param_features, n_hidden, n_layers, nhead, dropout=0.5):
        super(AdaptedTransformer, self).__init__()
        
        self.n_time_features = n_time_features
        self.n_param_features = n_param_features
        self.input_features = n_time_features + n_param_features
        
        self.embedding = nn.Linear(self.input_features, n_hidden)
        self.pos_encoder = PositionalEncoding(n_hidden, dropout)
        
        encoder_layers = TransformerEncoderLayer(n_hidden, nhead, n_hidden * 2, dropout, batch_first=True)
        self.transformer_encoder = TransformerEncoder(encoder_layers, n_layers)
        
        self.output_layer = nn.Linear(n_hidden, 1)
        
    def forward(self, x, params):
        # Example adjustment: Ensure `params` is repeated or expanded to match `x`'s first dimension
        if x.size(0) != params.size(0):
            # Assuming `x` has more entries than `params` and you want to repeat `params` to match
            repeat_factor = x.size(0) // params.size(0)
            params = params.repeat(repeat_factor, 1)
    
        # Now concatenate
        x_combined = torch.cat((x, params), dim=-1)  # Ensure correct shape
        
        x_embedded = self.embedding(x_combined)
        x_pos_encoded = self.pos_encoder(x_embedded)
        
        x_transformed = self.transformer_encoder(x_pos_encoded)
        
        output = self.output_layer(x_transformed.squeeze(1))
        return output

In [7]:
x = torch.linspace(0, 10, 5000).view(-1,1)  
# Set up the physics loss training locations
x_physics = torch.linspace(0, 10, 300).view(-1, 1).requires_grad_(True)
num_batches =1
# Set up the random seed and initialize the model and optimizer
torch.manual_seed(123)
model = AdaptedRNN(1,5,64,12)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
n_time_features = 1
n_param_features = 5
n_hidden =  4 # Size of the hidden layer
n_layers = 2  # Number of transformer layers
nhead = 2  # Number of heads in the multi-head attention mechanism
dropout = 0.1  # Dropout rate

model = AdaptedTransformer(n_time_features, n_param_features, n_hidden, n_layers, nhead, dropout)
model = model.to(device)
# Train the model for 60000 steps
for i in range(6000):
    for batch in range(num_batches):
        #optimizer.zero_grad()
        params_tensor,y_tensor= duffing_generator_batch( num_batches, x)
        selected_params = params_tensor[0, :1, :].repeat(x.size(0), 1)  # Adjusted to have shape [500, 5]
        y_data = y_tensor[:, 0:2000:20] 
        y_data = y_data.to(device)
        x_data = x[0:2000:20] 
        d, a, b, gamma, w = params_tensor[:, 0], params_tensor[:, 1], params_tensor[:, 2], params_tensor[:, 3], params_tensor[:, 4]
        selected_params = selected_params[0:2000:20]
        selected_params = selected_params[0:1].repeat(x_data.size(0), 1)
        x_data = x_data.to(device)
        selected_params = selected_params.to(device)
        yh = model(x_data, selected_params)
        
        # Compute the data loss by comparing the model output with the training data
        # Assuming your linear layer is named `layer`
        loss1 = torch.mean((yh - y_data)**2)
        physics_params = params_tensor[0, :1, :].repeat(x_physics.size(0), 1) 
        x_physics = x_physics.to(device)
        physics_params = physics_params.to(device)
        # Compute the physics loss by enforcing the differential equation
        yhp = model(x_physics,physics_params)
        dy_pred = autograd.grad(yhp, x_physics, torch.ones_like(yhp), create_graph=True)[0]
        d2y_pred = autograd.grad(dy_pred, x_physics, torch.ones_like(dy_pred), create_graph=True)[0]
            
        physics = d2y_pred + d * dy_pred + a * yhp + b * torch.pow(yhp, 3) - gamma * torch.cos(w * x_physics)
        loss_physics = (1e-4) * torch.mean(physics**2)
            
        # Compute the total loss as the sum of the data loss, the physics loss, and the boundary loss
        total_loss = loss1 + loss_physics 
            
        # Update the model parameters using backpropagation and gradient descent
        total_loss.backward()
        optimizer.step()
        y_tensor_squeezed = y_tensor.squeeze()

    # Plot the results every 150 steps
    if (i+1) % 10 == 0:   
        # Print the loss value after each step
        print("Loss at Step", i+1, ":", total_loss.item())
        yh = model(x,selected_params).detach().numpy()
        plt.figure(figsize=(10, 5))
        plt.plot(x.numpy(), y_tensor_squeezed.numpy(), label='Ground Truth')
        plt.plot(x.numpy(), yh, label='Neural Network Output')
        plt.scatter(x_data.numpy(), y_data.numpy(), color='red', label='Training points')
        plt.legend()
        plt.show()

Loss at Step 10 : 0.7708511352539062


RuntimeError: [enforce fail at alloc_cpu.cpp:117] err == 0. DefaultCPUAllocator: can't allocate memory: you tried to allocate 1000000000000 bytes. Error code 12 (Cannot allocate memory)