In [None]:
!pip install gekko
!pip install hyperopt

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt
#from ramp_input import make_ramp_adv
from gekko import GEKKO
# import optuna
# from optuna.trial import TrialState
from hyperopt import tpe, hp, fmin, STATUS_OK,Trials, space_eval
from hyperopt.pyll.base import scope


# to account for fixed parameters in objective function:
from functools import partial

# ---------------------------------------------------------------------------
        This is the making of a ramp function, only for the kaggle notebook

In [None]:
import numpy as np
import matplotlib.pyplot as plt

def make_ramp_adv(time_points, T, t_rise, t_fall, delay, amplitude, last, repetitions):
    '''
    time_points = number of points per time period
    T = time period
    t_rise & t_fall = RATE of rise per unit time and rate for fall respectively
    delay = input delay
    repetitions = how many times you will repeat the input

    Returns: output ramp function (y) and timespace(n) to plot
    '''
    y = []
    n = np.linspace(0, repetitions * T, repetitions * time_points + 1)
    resolution = int(time_points / T)
    rise = t_rise / resolution
    fall = t_fall / resolution
    rise_steps = 0
    
    # delay (zero values)
    for i in range(resolution * delay):
        y.append(0)
    
    # up-ramp values
    for i in range((T - delay) * resolution):
        if rise * i > amplitude:
            break
        y.append(rise * i)
        rise_steps += 1
    
    # amplitude-hold values
    for i in range(last * resolution - rise_steps - int(amplitude // fall) - delay * resolution ):
        y.append(amplitude)
    
    # down-ramp values
    for i in range(int(amplitude // fall)+1):
        y.append(amplitude - fall * i)
    
    # Append zeros to y to match the length of n
    y += [0] * (time_points - len(y))

    y_repeated = y * repetitions
    y_repeated.append(0) #adds one term at the end that makes sure the sizes are matched between n and y_repeated

    return y_repeated, n

# ----------------------------------------------------------------------------------------

# Make a complete pipeline to show the model's flexibility:
### Objectives: 
1. Incorporate the initial conditions into the model.
2. Include the flexibility of your ramp input function, add the params (make a dictionary)
3. Display the ground truth values from GEKKO as a part of training process

Path: 
Ramp input -> GEKKO -> Voltage curve -> Model_train -> Output

In [None]:
# Making dictionaries for passing into the final function

# For hyperparameter otimisation we need to define a range of hyper parameters (among which Hyperopt will choose)

# Dict 1: Value of parameters to BUILD the ramp input function: CONSTANT
ramp_dict = { 
    "T": 40, # Time period in seconds
    "time_points": 400, # The resolution of each time period: for example, 400 values of Voltage in 40 seconds. Useful in making the linear timespace
    "delay": 0, # The input delay of ramp input in seconds
    "t_rise": 0.5, # the rate of rise of ramp input
    "t_fall": 0.2, # the rate of fall of ramp input
    "amplitude": 2, # Maximum amplitude of ramp input
    "last": 35, # the point of termination of ramp input in seconds, after which Vin = 0
    "repetitions": 2 # the repetitions for periodic input function
}


# Dict 2: Value of Resistor and capacitor in series R-C circuit
lumped_elements = {
    "R" : 5,
    "C" : 1
}


# Dict 3: Initial conditions for ODE solver. INPUT TO ODE SOLVER (GEKKO)
initial_conditions = {
    "Vc": 5.0,
    "dvdt": 0.0
}


# Dict 4: List of the hyperparameters for training process: TO BE OPTIMISED USING Bayesian Optimisation

# PARAMETER SPACE:
train_dict = {
    
    # Weights to the loss function
    "n_hidden": hp.quniform("n_hidden", 20, 40, 1),
    "n_layers": hp.quniform("n_layers", 4, 10, 1),
    # typecast the above 2 parameters into INT before using
    
    "lambda_boundary": hp.uniform("lambda_boundary", 0.0, 1.0), # Contribution of the boundaries Vc[0] and Vc[-1] to the loss function
    
    "lambda_physics": hp.uniform("lambda_physics", 0.0, 1.0), # Contribution of the physics loss to the loss function
    
    "lambda_deriv": 0, # Contribution of the boundary dvdt[0] to the loss function

    "lr": hp.uniform("lr", 0.0001, 0.01),
    
    "epochs": hp.choice("epochs", [20001, 25001, 30001, 35001, 40001]), # Number of epochs
    
    
    "physics_points": hp.quniform("physics_points", 100, 200, 1), # Number of points where the physics loss is evaluated
    # remember: quniform gives float output by default, so we cast it into integer before using it!
    
}

In [None]:
class FCN(nn.Module):
    """Defines a standard fully-connected network in PyTorch.
    Number of inputs, Number of outputs, Number of hidden inputs, Total layers"""
    
    def __init__(self, N_INPUT, N_OUTPUT, N_HIDDEN, N_LAYERS):
        super().__init__()
        activation = nn.Tanh
        self.fcs = nn.Sequential(nn.Linear(N_INPUT, N_HIDDEN), activation(),)
        self.fch = nn.Sequential(*[
                        nn.Sequential(*[
                            nn.Linear(N_HIDDEN, N_HIDDEN),
                            activation()]) for _ in range(N_LAYERS-1)])
        self.fce = nn.Linear(N_HIDDEN, N_OUTPUT)

    def forward(self, x):
        x = self.fcs(x)
        x = self.fch(x)
        x = self.fce(x)
        return x


In [None]:
def solve_ode(ramp_dict, initial_conditions, lumped_elements, plot = False):
    # lets find a differential equation for SERIES R C circuit to determine the voltage

    ''' Pre-defined differential equation solved: 
        Input: ramp function parameters, initial conditions, Value of resistor and capacitor
        Returns: Voltage across capacitor, derivative of the same, ramp input, linear time space for plotting'''

    
    m = GEKKO()

    # Getting the ramp function
    Vin_ramp, m.time = make_ramp_adv(ramp_dict["time_points"], ramp_dict["T"], ramp_dict["t_rise"], ramp_dict["t_fall"], 
                                     ramp_dict["delay"], ramp_dict["amplitude"], ramp_dict["last"], ramp_dict["repetitions"])
    
    # make it as parameters to gekko
    Vin = m.Param(value = Vin_ramp)
    
    # make variables here, and put their initial values
    Vc = m.Var(initial_conditions["Vc"]) 
    dvdt = m.Var(initial_conditions["dvdt"])

    R = lumped_elements["R"]
    C = lumped_elements["C"]
    
    # make equation
    m.Equation(dvdt + Vc/(R*C) == Vin/R*C)
    m.Equation(Vc.dt()==dvdt)
    #solve the equation
    m.options.IMODE = 4
    m.solve(disp = False) # if true, then a lot of things will be displayed
    
    DVDT = dvdt.value
    time_space = m.time
    
    # plot the results
    if plot:
        plt.plot(m.time,Vin_ramp,'g:',label='Vin(t)')
        plt.plot(m.time,Vc,'b-',label='Vc(t)')
        plt.plot(m.time, DVDT, 'r--', label='d(Vc(t))/dt')
        plt.ylabel('Vc(t)')
        plt.xlabel('time')
        plt.legend(loc='best')
        plt.show()

    return Vc, DVDT, Vin_ramp, time_space

In [None]:
def extract_boundaries(Vc, DVDT):

    '''This function is responsible for extracting the boundaries of the solved ODE equation
        Returned datatype: Dictionary
        These boundary values will be used in the Loss function of the PINN'''
    
    boundaries = {
        "u_last" : Vc[-1],
        "u_first" : Vc[0],
        "du_first" : DVDT[0],
        "du_last" : DVDT[-1]
    }
    return boundaries

In [None]:
def train_model(lr, lumped_elements, boundaries, v_input, Vc, lambda_boundary, lambda_deriv, lambda_physics, epochs, physics_points, 
                n_hidden, n_layers, n_repetitions, time_period, time_points):

    '''
    Inputs: lr --> learning rate
            Boundaries --> Final and initial values of capacitor voltage
            v_input --> Input ramp function
            Vc --> ground truth from GEKKO
            lambda_boundary/deriv/physics --> contributory weights of the boundary loss, derivative boundary loss, and physics loss
            epochs --> number of epochs
            physics_points --> number of points for which physics loss is evaluated
            n_hidden + n_layers --> definition of number of nodes per layer and number of layers in PINN model
            n_repetitions --> number of repetitions of ramp input
            time_period + time_points --> time period of ramp input in seconds and number of time_points per period
            
    Objective of function: Instantiating the PINN model; building the loss function with the mentioned weights; '''
    
    #torch.manual_seed(123)
    # Extracting standard values from dictionaries
    
    R = lumped_elements["R"]
    C = lumped_elements["C"]
    u_first = boundaries["u_first"]
    u_last = boundaries["u_last"]
    du_first = boundaries["du_first"]
    du_last = boundaries["du_last"]

    # Define a fully connected network (FCN) for the PINN
    pinn = FCN(1, 1, n_hidden, n_layers)
    
    # Define boundary points for the boundary loss (time points, not voltage values)
    time_point_left = torch.tensor([0.0], dtype=torch.float32).view(-1, 1).requires_grad_(True)  
    time_point_right = torch.tensor([time_points*n_repetitions], dtype=torch.float32).view(-1, 1).requires_grad_(True)  # the last value in the time series

    
    # Defining linear timespace with number of points = 'physics_points'; TO CALCULATE PHYSICS LOSS
    t_physics = torch.linspace(0, time_period*n_repetitions, physics_points, dtype=torch.float32).view(-1, 1).requires_grad_(True)
    
    # Making V_input smaller in size to match number of physics points; and enabling grad
    v_input_interp = F.interpolate(torch.tensor(v_input, dtype=torch.float32).view(1, 1, -1), size=physics_points).view(-1, 1).requires_grad_(True)
    
    # Initialize the optimizer
    optimiser = torch.optim.Adam(pinn.parameters(), lr= lr)

    
    t_test = torch.linspace(0, time_period*n_repetitions, time_points*n_repetitions+1).view(-1,1)

    # Converting the ground truth from the ODE solver into a tensor
    u_exact = torch.tensor(Vc).view(-1,1)  # exact solution from GEKKO
    
    # Training loop
    for i in range(epochs):
        optimiser.zero_grad()
        
        # Compute boundary loss: (predicted values from PINN - ground truth)^2
        u_left = pinn(time_point_left)  # at t = 0
        loss1_initial = (torch.squeeze(u_left) - u_first)**2  
        
        u_right = pinn(time_point_right)  # at t = end
        loss1_final = (torch.squeeze(u_right) - u_last)**2  
        
        # loss for time derivative at the initial condition
        dudt_left = torch.autograd.grad(u_left, time_point_left, torch.ones_like(u_left), create_graph=True)[0]
        loss_deriv = (torch.squeeze(dudt_left) - du_first)**2
        
        # Physics loss: differential equation with input incorporated
        u_physics = pinn(t_physics)  # Evaluate PINN at physics points
        dudt_physics = torch.autograd.grad(u_physics, t_physics, torch.ones_like(u_physics), create_graph=True)[0]
        loss_physics = torch.mean((dudt_physics + u_physics / (R * C) - v_input_interp / (R * C))**2)
        
        # Total loss
        loss = lambda_boundary * (loss1_initial + loss1_final) + lambda_deriv * loss_deriv + lambda_physics * loss_physics
        loss.backward()  
        optimiser.step()  
        
        # Plot the result during training
        if i % 20000 == 0:
            u_test = pinn(t_test).detach()
            plt.figure(figsize=(6, 2.5))
            plt.scatter(t_physics.detach()[:, 0], torch.zeros_like(t_physics)[:, 0], s=20, lw=0, color="tab:green", alpha=0.6)
            plt.scatter(time_point_left.detach()[:, 0], torch.zeros_like(time_point_left)[:, 0], s=20, lw=0, color="tab:red", alpha=0.6)
            plt.plot(t_test[:, 0], u_exact[:, 0], label="Exact solution", color="tab:grey", alpha=0.6)
            plt.plot(t_test[:, 0], u_test[:, 0], label="PINN solution", color="tab:green")
            plt.title(f"Training step {i}")
            plt.legend()
            plt.show()

    return u_test, loss, pinn


In [None]:
def save_model(str):

    '''Save the model with annotation of your choice, to your default folder'''
    output_model_file = '/kaggle/working/RC_PINN_'+str+'.pt'
    
    model_to_save = FCN
    torch.save(model_to_save, output_model_file)

In [None]:
def RAW_solve_train_objective(train_dict, ramp_dict, initial_conditions, lumped_elements):

    '''An end-to-end function for deployment:
    Inputs: ramp_dict --> parameters to define the input ramp function
            initial_conditions --> initial conditions for solving the ODE equation
            train_dict --> hyperparameters for the training process
            
    returns: predicted voltage across capacitor'''

    # solve_ode: retrieve Ground truth for voltage across capacitor, dvdt, and input ramp function (used later to interploate the function in 'train_model')
    Vc, DVDT, Vin_ramp, _ = solve_ode(ramp_dict, initial_conditions, lumped_elements, plot = True)

    # extract boundaries to compute the boundary loss in 'train_model'
    boundaries = extract_boundaries(Vc, DVDT)

    # compute and retrieve the predicted values from the training process
    u_pred, loss, pinn = train_model(train_dict["lr"], lumped_elements, boundaries, Vin_ramp, Vc, train_dict["lambda_boundary"], train_dict["lambda_deriv"], 
                                     train_dict["lambda_physics"],train_dict["epochs"], int(train_dict["physics_points"]), int(train_dict["n_hidden"]), int(train_dict["n_layers"]), 
                                     ramp_dict["repetitions"], ramp_dict["T"], ramp_dict["time_points"])

    # save the model
    #save_model(str)
    return {'loss': loss,
            'status': STATUS_OK,
            'model': pinn,
            'params': train_dict}

In [None]:
save_train_objective = partial(RAW_solve_train_objective, 
                              ramp_dict = ramp_dict,
                              initial_conditions = initial_conditions,
                              lumped_elements = lumped_elements)

In [None]:
trials = Trials()


best_params = fmin(
    fn=save_train_objective,
    space=train_dict,
    algo=tpe.suggest,
    max_evals=50,
    trials=trials)

In [None]:
save_model('hyperOpt_attempt1')

In [None]:
strin = 'attempt1'
try: 
	geeky_file = open('best_parameters'+strin+'.txt', 'a') 
	geeky_file.write(str(best_params)) 
	geeky_file.close() 

except: 
	print("Unable to append to file")


In [None]:
try: print(best_params)

except: print('best_params not found')