In [None]:
from scipy.integrate import odeint 
import time
import math
import numpy as np
import pylab as py
from math import *
import numpy as np
from scipy.optimize import newton
from matplotlib import animation, rc
from matplotlib import pyplot as plt
import pandas as pd
import torch
import torch.nn as nn
from torch.autograd import Variable
from sklearn.preprocessing import MinMaxScaler
from IPython.display import HTML

import warnings
import os

Solving the movement of the double pendulum

In [None]:
# Differential equations describing the system
def double_pendulum(u,t,m1,m2,L1,L2,g):
    # du = derivatives
    # u = variables
    # p = parameters
    # t = time variable

    du = np.zeros(4)
    c = np.cos(u[0]-u[2])  # intermediate variables
    s = np.sin(u[0]-u[2])  # intermediate variables

    du[0] = u[1]   # d(theta 1)
    du[1] = ( m2*g*np.sin(u[2])*c - m2*s*(L1*c*u[1]**2 + L2*u[3]**2) - (m1+m2)*g*np.sin(u[0]) ) /( L1 *(m1+m2*s**2) )
    du[2] = u[3]   # d(theta 2)
    du[3] = ((m1+m2)*(L1*u[1]**2*s - g*np.sin(u[2]) + g*np.sin(u[0])*c) + m2*L2*u[3]**2*s*c) / (L2 * (m1 + m2*s**2))

    return du

Splitting the Training Data into training batches and scaling the data

In [None]:
def create_inout_sequences(input_data, tw):
    inout_seq = []
    L = len(input_data)
    for i in range(L-tw):
        train_seq = input_data[i:i+tw]
        train_label = input_data[i+tw:i+tw+1]
        inout_seq.append((train_seq ,train_label))
    return inout_seq

def preprocess_sol(train_data_normalized, train_window):
    train_inout_seq= create_inout_sequences(train_data_normalized.reshape(len(train_data_normalized), 9), train_window)
    return train_inout_seq

Calculating the path of the double pendulum and saving the model parameters

In [None]:
def makepath_param(theta1, theta2, Nt, tfinal, m1,m2,l1,l2, g = 9.81):
    u0 = [np.pi*theta1/180, 0, np.pi*theta2/180, 0]

    #timesteps
    dt = tfinal / Nt
    
    t = np.linspace(0, tfinal, Nt)
    sol = odeint(double_pendulum, u0, t, args=(m1,m2,l1,l2,g))
    
    # create a parameter list
    params = np.array([m1, m2, l1, l2, dt])
    
    # giving the correct shape
    params_repeated = np.tile(params, (sol.shape[0], 1))
    
    # stack them on the simulation data
    sol_with_params = np.hstack((sol, params_repeated))
    
    return sol_with_params

Slop for multiple simulations

In [None]:
def multi_simulations(theta1, theta2, change_of_angle, amount_of_different_inits, Nt, tfinal, m1_params, m2_params, l1_params, l2_params, g):
    simulation_sets = []
    print(f'Amount of chosen timesteps is {Nt}')
    print(f'Total simulation time is {tfinal} s')
    #This slope is defined for different initial condition
    for i in range(0, amount_of_different_inits):
        theta1 += change_of_angle
        theta2 -= change_of_angle
        print(f"theta 1 ist {theta1}")
        print(f"theta 2 ist {theta2}")
        for m1 in m1_params:
            for m2 in m2_params:
                for l1 in l1_params:
                    for l2 in l2_params:
                        sol = makepath_param(theta1, theta2, Nt, tfinal, m1,m2,l1,l2,g)
                        print(f"m1 = {m1}; l1 = {l1}; m2 = {m2}; l2 = {l2}\n")
                        simulation_sets.append(sol)
    return simulation_sets

Splitting the Simulation Data into a Training set and Test set

In [None]:
def data_splitter_and_scaler(scaler, Simulation_datasets, test_size = 0.2):
    train_splits = []
    test_splits = []

    for dataset in Simulation_datasets:
        split_index = int(len(dataset) * (1 - test_size))
        train_split = dataset[:split_index]
        test_split = dataset[split_index:]
        train_splits.append(train_split)
        test_splits.append(test_split)
    
    for i in range(len(train_splits)):
        dataset = train_splits[i]
        train_window = 10
        
        #separating the params since those valuus shouldnt be scale
        params_and_dt = dataset[:, 4:]
        m1, m2, l1, l2, dt = params_and_dt[0]
        print(f"m1 = {m1}; l1 = {l1}; m2 = {m2}; l2 = {l2}; dt = {dt}\n")
        
        train_data_normalized = scaler.fit_transform(dataset[:, :4].reshape(-1, 4))
        train_data_normalized_with_params = np.concatenate((train_data_normalized, params_and_dt), axis=1)    
        train_data_normalized_with_params = torch.FloatTensor(train_data_normalized_with_params).reshape(len(dataset), 9)

        #A change of the reshape function inside the preprocess_sol might be required in case you change the amount of timesteps
        train_inout_seq = preprocess_sol(train_data_normalized_with_params, train_window)
        
        train_splits[i] = train_inout_seq
        
    return train_splits, test_splits

Function to test the model during the Trainingsequence

In [None]:
def training_tester(model, data, num_layers, device, hidden_layer_size = 100):
    
    train_window = 10
    highest = len(data)- train_window
    
    call_begin_position = np.random.randint(len(data) - train_window - 1)
    call_end_position = call_begin_position + train_window
    
    scaler = MinMaxScaler(feature_range=(-1, 1))
    
    data_normalized = scaler.fit_transform(data.reshape(-1, 4))
    
    test_batch = data_normalized[call_begin_position:call_end_position].reshape(train_window,4).tolist()
    test_batch = torch.tensor(test_batch).float()
    
    model.eval().cpu()
    
    model.c_h = (torch.zeros(num_layers, 1, hidden_layer_size).to(device),
                        torch.zeros(num_layers, 1, hidden_layer_size).to(device))
    prediction = model(test_batch).to(device)
    prediction = prediction.cpu().detach().numpy()
    
    test_comp_pos = call_end_position + 1
    test_data = data_normalized[test_comp_pos].reshape(1,4).tolist()
    
    test_error = np.abs(test_data - prediction)
    test_error = np.mean(test_error)
    
    return prediction, test_data, test_error

In [None]:
def physics_loss_1d(predicted_T, current_T, dx, dt, C):

    # Compute the 1D Laplacian (finite difference approximation)
    # laplacian = (current_T[2:] - 2*current_T[1:-1] + current_T[:-2]) / dx**2
    laplace = []
    xx_T = current_T*0
    for i in range(1, len(current_T)-1):
        xx_T[i] = (current_T[i+1] - 2*current_T[i] + current_T[i-1]) / dx**2
        laplace.append(xx_T)

    laplace_arr = np.array(laplace)

    # Compute the time derivative
    # time_derivative = (predicted_T[1:-1] - current_T[1:-1]) / dt
    time_derivative = (predicted_T - current_T) / dt
    # print(f"This is the ffirst nod {time_derivative[0]} ")
    # print(f"This is the derivative of the last nod {time_derivative[-1]}")
    loss_list = []

    for i in range(0, len(laplace[0])-2):
        physics_loss = time_derivative[i+1] - C * laplace[i]
        loss_list.append(physics_loss)

    return physics_loss

Conservation of Energy equation, as physical information

In [None]:
def energy_calculation_np(theta1, theta1_dot, theta2, theta2_dot, mass1, mass2, length1, length2, g=9.81):

    kinetic_energy = 0.5 * mass1 * length1**2 * theta1_dot**2 + 0.5 * mass2 * ((length1 * theta1_dot)**2 + (length2 * theta2_dot)**2 + 2 * length1 * length2 * theta1_dot * theta2_dot * np.cos(theta1 - theta2))

    potential_energy = -mass1 * g * length1 * np.cos(theta1) - mass2 * g * (length1 * np.cos(theta1) + length2 * np.cos(theta2))

    total_energy = abs(kinetic_energy + potential_energy)

    return total_energy

def conservation_of_energy(predicted_state, previous_state, target_state, mass1, mass2, length1, length2, g=9.81):

    #extracting the data from the arrays
    theta1, theta1_dot, theta2, theta2_dot = predicted_state[0, :]
    prev_theta1, prev_theta1_dot, prev_theta2, prev_theta2_dot = previous_state[0, :]
    target_theta1, target_theta1_dot, target_theta2, target_theta2_dot = target_state[0, :]

    #Calculating all the type specific energies
    predicted_energy = energy_calculation_np(theta1, theta1_dot, theta2, theta2_dot, mass1, mass2, length1, length2, g=9.81)
    previous_energy = energy_calculation_np(prev_theta1, prev_theta1_dot, prev_theta2, prev_theta2_dot, mass1, mass2, length1, length2, g=9.81)
    target_energy = energy_calculation_np(target_theta1, target_theta1_dot, target_theta2, target_theta2_dot, mass1, mass2, length1, length2, g=9.81)

    #calculating the difference = loss of the energies
    energy_loss = predicted_energy - previous_energy

    return energy_loss

Lagrange Equation as physical information

In [None]:
def differential_operator(current_pos, previous_pos, timestep):
    slope = (previous_pos - current_pos) / timestep
    return slope

def lagrange_loss_single_value(predicted_state, previous_state, target_state, timestep, m1, m2, l1, l2, g=9.81):
    #extracting the data from the arrays
    theta1, theta1_dot, theta2, theta2_dot = predicted_state[0, :]
    prev_theta1, prev_theta1_dot, prev_theta2, prev_theta2_dot = previous_state[0, :]
    target_theta1, target_theta1_dot, target_theta2, target_theta2_dot = target_state[0, :]
        
    theta1_ddot = differential_operator(theta1_dot, prev_theta1_dot, timestep)
    theta2_ddot = differential_operator(theta2_dot, prev_theta2_dot, timestep)
        
    lagrange_eq_theta1 = (
                            (m1 + m2) * theta1_ddot * l1**2
                            + m2 * l1**2 * theta2_ddot * l2**2 * np.cos(theta1 - theta2)
                            + m2 * l1**2 * theta2_dot**2 * l2**2 * np.sin(theta1 - theta2)
                            + (m1 + m2) * g * l1 * np.sin(theta1)
                        )
    lagrange_eq_theta2 = (
                            theta2_ddot * l2
                            + theta1_ddot * l1**2 * l2 * np.cos(theta1 - theta2)
                            - theta1_ddot * l1**2 * l2 * np.sin(theta1 - theta2)
                            + g * np.sin(theta2)
                        )
        
    lagrange_loss = np.abs(lagrange_eq_theta1) + np.abs(lagrange_eq_theta2)
    return lagrange_loss

Testing the trained model

In [None]:
#this is the full OneStep function.   it
def OneStep(model, model_layers, data, filename, filepath_excel, test_nr, steps = 100, g = 9.81):
    print('data set length =', len(data))
    train_window = 10
    initial_predict_data = data[:, :4]
    scaler = MinMaxScaler(feature_range=(-1, 1))
    train_data_normalized = scaler.fit_transform(initial_predict_data.reshape(-1, 4))
    fut_pred = len(data) -train_window
    test_inputs = train_data_normalized[0:train_window].reshape(train_window,4).tolist()
    model.eval().cpu()
    preds = test_inputs.copy()
    t2 = test_inputs
    hidden_layer_size = 100
    x = 0
    for i in range(fut_pred):
        seq = torch.FloatTensor(t2[i:]).cpu()
        model.c_h = (torch.zeros(model_layers, 1, hidden_layer_size),
                        torch.zeros(model_layers, 1, hidden_layer_size))
        x = model(seq).cpu()
        x = x.cpu()
        preds.append(x.detach().numpy())
        t2.append(x.detach().numpy())
    actual_predictions = scaler.inverse_transform(np.array(preds ).reshape(-1,4))
    print(len(actual_predictions))

    # the following will plot the lower mass path for steps using the actual ODE sover
    # and the predicitons
    plt.figure( figsize=(10,5))
    u0 = data[:,0]     # theta_1
    u1 = data[:,1]     # omega 1
    u2 = data[:,2]     # theta_2
    u3 = data[:,3]     # omega_2
    up0 = actual_predictions[:,0]     # theta_1
    up1 = actual_predictions[:,1]     # omega 1
    up2 = actual_predictions[:,2]     # theta_2
    up3 = actual_predictions[:,3]     # omega_2
    m1 = data[:, 4][0]
    m2 = data[:, 5][0]
    l1 = data[:, 6][0]
    l2 = data[:, 7][0]
    x1 = l1*np.sin(u0);          # First Pendulum
    y1 = -l1*np.cos(u0);
    x2 = x1 + l2*np.sin(u2);     # Second Pendulum
    y2 = y1 - l2*np.cos(u2);
    xp1 = l1*np.sin(up0);          # First Pendulum
    yp1 = -l1*np.cos(up0);
    xp2 = xp1 + l2*np.sin(up2);     # Second Pendulum
    yp2 = yp1 - l2*np.cos(up2); 
    print(x2[0], y2[0])
    plt.plot(x2[0:steps], y2[0:steps], color='r')
    plt.plot(xp2[0:steps],yp2[0:steps] , color='g')

    ###############################################################################
    dt = data[:, 8][0]
    Nt = len(data[:, 8])
    tfinal = dt * Nt
    t = np.linspace(0, tfinal, Nt)

    energy_data = energy_calculation_np(u0, u1, u2, u3, m1, m2, l1, l2, g)
    energy_predicted = energy_calculation_np(up0, up1, up2, up3, m1, m2, l1, l2, g)

    ###############################################################################
    plt.figure(figsize=(10, 6))
    ## Only necessary when addapting the lookback
    # lookback += 200
    plt.suptitle('Simulation vs Prediction - ' + filename, fontsize=16)

    lookback = 0
    plt.subplot(3, 2, 1)
    plt.plot(t[lookback:], data[lookback:, 0], label='Simulation')
    plt.plot(t[lookback:], actual_predictions[:, 0], label='Prediction')
    plt.ylabel('Theta 1')
    plt.legend()

    plt.subplot(3, 2, 2)
    plt.plot(t[lookback:], data[lookback:, 1], label='Simulation')
    plt.plot(t[lookback:], actual_predictions[:, 1], label='Prediction')
    plt.ylabel('Omega 1')
    plt.legend()

    plt.subplot(3, 2, 3)
    plt.plot(t[lookback:], data[lookback:, 2], label='Simulation')
    plt.plot(t[lookback:], actual_predictions[:, 2], label='Prediction')
    plt.ylabel('Theta 2')
    plt.xlabel('Time in s')
    plt.legend()

    plt.subplot(3, 2, 4)
    plt.plot(t[lookback:], data[lookback:, 3], label='Simulation')
    plt.plot(t[lookback:], actual_predictions[:, 3], label='Prediction')
    plt.ylabel('Omega 2')
    plt.xlabel('Time in s')
    plt.legend()

    plt.subplot(3, 2, 5)
    plt.plot(t[lookback:], energy_data, label='Simulation')
    plt.plot(t[lookback:], energy_predicted, label='Prediction')
    plt.ylabel('Omega 2')
    plt.xlabel('Time in s')
    plt.legend()

    ##############################################################################
    
    # Create dataframe
    df = pd.DataFrame({ 'time': t,
        f'theta1_actual': data[:, 0], 'omega1_actual': data[:, 1],
        'theta2_actual': data[:, 2], 'omega2_actual': data[:, 3],
        'energy_actual': energy_data,
        'm1': data[:, 4], 'm2': data[:, 5], 'l1': data[:, 6], 'l2': data[:, 7], 'dt': data[:, 8],
        'theta1_predicted': actual_predictions[:, 0], 'omega1_predicted': actual_predictions[:, 1],
        'theta2_predicted': actual_predictions[:, 2], 'omega2_predicted': actual_predictions[:, 3],
        'energy_predicted': energy_predicted,
    })

    filename = f'Test_nr_{test_nr}_{filename}'
    #Combination filepath and filename
    full_path = os.path.join(filepath_excel, filename)

    # safe csv
    df.to_csv(f'{full_path}.csv', index=False)
    
    return actual_predictions

In [None]:
def model_trainer(model_freeze, scaler, train_datasets, test_datasets, epochs, subcycle, training_type, data_handling, freeze, filename, filepath_excel, device, num_layers, hidden_layer_size):    
    loss_function = nn.MSELoss()
    loss_function2 = nn.L1Loss()
    
    #Choosing the Optimizer
    optimizer_freeze = torch.optim.Adam(model_freeze.parameters(), lr=0.0005)
    
    # necessary for the documentation of the Trainingevolution
    total_loss_list = []
    physical_loss_list = []
    red_physical_loss_list = []
    data_loss_list = []
    test_error_mean_list = []
    test_error_MAE_list = []
    test_error_MSE_list = []
    trainingsequences = []
    counter = 0

    for j in range(0, len(train_datasets)):      
        if freeze == True:
            num_layers_to_freeze = j
            model_freeze.freeze_layers(model_freeze, j)
            if num_layers_to_freeze == 0:
                print(f'First training cycle is conducted without freezed layers: {num_layers_to_freeze}')
            else:
                print(f'Amount of freezed layers: {num_layers_to_freeze}')

        print(f"Cycle {j+1} of {len(train_datasets)} cycles has started")
        
        if data_handling == "structured":
            train_inout_seq = train_datasets[j]
        
        for i in range(epochs):
            counter += 1
            for _ in range(subcycle):
                
                if data_handling == "random":
                    #random call off the trainingdataset
                    random_call = np.random.randint(len(train_datasets))
                    train_inout_seq = train_datasets[random_call]
                
                #random call of the training batch
                num = np.random.randint(len(train_inout_seq))
                X_train = train_inout_seq[num][0][:,:4].to(device)
                y_target = train_inout_seq[num][1][:,:4].to(device)
                
                #get the last 4 pendulum-parameter
                params = train_inout_seq[num][0][:,4:].to(device)
                m1, m2, L1, L2, timestep = params[0]
                
                optimizer_freeze.zero_grad()

                model_freeze.c_h = (torch.zeros(len(train_datasets), 1, hidden_layer_size).to(device),
                                torch.zeros(len(train_datasets), 1, hidden_layer_size).to(device))

                y_pred_freeze = model_freeze(X_train)
                
                single_loss = loss_function(y_pred_freeze.view(1,4), y_target)
                
                #preparing the data for the physical loss
                #getting the previous timestep
                X_train_3D = torch.tensor(X_train, dtype=torch.float32).unsqueeze(0)

                prev_val = X_train_3D[0, -1, :]
                prev_val = prev_val.detach().cpu().numpy()
                prev_val_real = scaler.inverse_transform(prev_val.reshape(1, -1))

                #rescaling the actual prediction
                prediction_real = y_pred_freeze.detach().cpu().numpy()
                prediction_real = scaler.inverse_transform(prediction_real.reshape(1, -1))

                #for comparison the data target
                y_target_real = y_target.detach().cpu().numpy()
                y_target_real = scaler.inverse_transform(y_target_real.reshape(1, -1))

                if training_type == "Data_driven":
                    total_loss = single_loss
                elif training_type == "Energy":
                    energy_loss = conservation_of_energy(prediction_real, prev_val_real, y_target_real, m1, m2, L1, L2, g=9.81)
                    energy_loss = abs(energy_loss)
                    alpha = 0.1 + 0.9 * (i / epochs)
                    total_loss = single_loss + alpha * energy_loss
                elif training_type == "Lagrange":
                    lagrange_loss = lagrange_loss_single_value(prediction_real, prev_val_real, y_target_real, timestep, m1, m2, L1, L2)
                    alpha = 0.1 + 0.9 * (i / epochs)
                    total_loss = single_loss + alpha * lagrange_loss
                else:
                    raise ValueError("Ungültiger training_type. Er muss 'Lagrange', 'Energy' oder 'Datadriven' sein.")
                    
                total_loss.backward()
                optimizer_freeze.step()

                test_dataset_tensor = torch.tensor(test_datasets[j][:,:4])
                prediction, test_data, test_error_mean = training_tester(model_freeze, test_dataset_tensor, num_layers, device, hidden_layer_size)

                test_loss_MSE = loss_function(torch.tensor(test_data), torch.tensor(prediction))
                test_loss_MAE = loss_function2(torch.tensor(test_data), torch.tensor(prediction))

            if training_type == "Data_driven":
                not_considered = 0
                physical_loss_list.append(not_considered)
            elif training_type == "Energy":
                physical_loss_list.append(energy_loss.item())
            elif training_type == "Lagrange":
                physical_loss_list.append(lagrange_loss.item())    
            else:
                raise ValueError("Ungültiger training_type. Er muss 'Lagrange', 'Energy' oder 'Datadriven' sein.")      
                       
            total_loss_list.append(total_loss.item())
            data_loss_list.append(single_loss.item())
            test_error_mean_list.append(test_error_mean.item())
            test_error_MAE_list.append(test_loss_MAE.item())
            test_error_MSE_list.append(test_loss_MSE.item())

            trainingsequences.append(counter)

            if i%25 == 1:

                print(f'epoch: {i:3} data loss of freezed model: {single_loss.item():10.8f}')
                print(f'epoch: {i:3} total loss of freezed model: {total_loss.item():10.8f}')
                if training_type == "Energy":
                    print(f'epoch: {i:3} Energy loss of freezed model: {energy_loss.item():10.8f}')
                if training_type == "Lagrange":
                    print(f'epoch: {i:3} Lagrange loss of freezed model: {lagrange_loss.item():10.8f}')
                print(f'the test error ist {test_error_mean.item()}')
                # print(f'the test error ist {test_loss.item()}')

        df_losses = pd.DataFrame({
            "Epoch": trainingsequences,
            "Total Loss": total_loss_list,
            "Physical Loss": physical_loss_list,
            "Data Loss": data_loss_list,
            "Test Error Mean": test_error_mean_list,
            "Test Error Loss Function": test_error_MAE_list,
            "Test Error Loss Function MSE": test_error_MSE_list
        })

        # Save csv
        df_losses.to_csv(os.path.join(filepath_excel, f"trainingloss_cycle{j+1}_{filename}.csv"), index=False)
        
        print(f"CSV-Datei gespeichert als 'trainingloss_cycle{j+1}_{filename}.csv'")

        # Create plot
        plt.figure(figsize=(10, 6))

        plt.plot(df_losses["Epoch"], df_losses["Total Loss"], label="Total Loss", color="blue")      
        plt.plot(df_losses["Epoch"], df_losses["Physical Loss"], label="Physical Loss", color="red")
        plt.plot(df_losses["Epoch"], df_losses["Data Loss"], label="Data Loss", color="orange")
        plt.plot(df_losses["Epoch"], df_losses["Test Error Mean"], label="Test Error Mean", color="green")  
        plt.plot(df_losses["Epoch"], df_losses["Test Error Loss Function"], label="Test Error Loss Function", color="purple") 
        plt.plot(df_losses["Epoch"], df_losses["Test Error Loss Function MSE"], label="Test Error Loss Function MSE", color="black")  

        #titles
        plt.xlabel("Epoch")
        plt.ylabel("Loss")
        plt.title("Training Loss über die Epochen")
        plt.legend()
        plt.grid(True)

        plt.show()

        print(f"Period {j+1} was completed and the model states have been saved at this moment")
        torch.save(model_freeze.state_dict(), filename)
    print("training finished")
    
    torch.save(model_freeze.state_dict(), filename)
    
    return model_freeze