In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
from torch.autograd import grad
import torch.nn.functional as F
import pandas as pd
import torch.nn.utils.parametrize as parametrize
from torch.nn.functional import normalize
from torch.optim.lr_scheduler import StepLR
from sklearn.metrics import r2_score, mean_squared_error
from scipy.signal import medfilt

In [55]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
device 

device(type='cuda')

In [None]:
start = 400
data = pd.read_csv("Two_Train_15s_25_50.csv")
temp_1_tr = np.array(data["Temp_1_Train"])/100
heat_1_tr = np.array(data["Heat_1_Train"])/100


temp_2_tr = np.array(data["Temp_2_Train"])/100
heat_2_tr = np.array(data["Heat_2_Train"])/100

out_1_tr = temp_1_tr[start+2:-1].reshape(-1,1)
temp_1_tr_pr = temp_1_tr[start:-3].reshape(-1,1)
heat_1_tr_pr = heat_1_tr[start+1:-2].reshape(-1,1)
heat_1_tr = heat_1_tr[start+2:-1].reshape(-1,1)
temp_1_tr = temp_1_tr[start+1:-2].reshape(-1,1)

out_2_tr = temp_2_tr[start+2:-1].reshape(-1,1)
temp_2_tr_pr = temp_2_tr[start:-3].reshape(-1,1)
heat_2_tr_pr = heat_2_tr[start+1:-2].reshape(-1,1)
heat_2_tr = heat_2_tr[start+2:-1].reshape(-1,1)
temp_2_tr = temp_2_tr[start+1:-2].reshape(-1,1)

delta_1_tr = np.concatenate(((temp_1_tr - temp_1_tr_pr).reshape(-1,1), (heat_1_tr - heat_1_tr_pr).reshape(-1,1)), axis=1)
delta_2_tr = np.concatenate(((temp_2_tr - temp_2_tr_pr).reshape(-1,1), (heat_2_tr - heat_2_tr_pr).reshape(-1,1)), axis=1)

delta_tr = np.concatenate((delta_1_tr, delta_2_tr), axis=1)
output_tr = np.concatenate((out_1_tr, out_2_tr), axis=1)
input_tr = np.concatenate((temp_1_tr_pr, heat_1_tr_pr, temp_2_tr_pr, heat_2_tr_pr), axis=1)
curr_tr = np.concatenate((temp_1_tr, temp_2_tr), axis=1)

In [59]:
def convert_Tensor(curr_tr, input_tr, delta_tr, output_tr):
    curr_tr = torch.tensor(curr_tr).float().to(device).reshape(-1,2).requires_grad_(True)
    input_tr = torch.tensor(input_tr).float().to(device).reshape(-1,4).requires_grad_(True)
    delta_tr = torch.tensor(delta_tr).float().to(device).reshape(-1,4).requires_grad_(True)
    output_tr = torch.tensor(output_tr).float().to(device).reshape(-1,2)
    return curr_tr, input_tr, delta_tr, output_tr
curr_tr, input_tr, delta_tr, output_tr = convert_Tensor(curr_tr, input_tr, delta_tr, output_tr)

In [None]:
class Taylor_Net(nn.Module):
    def __init__(self):
        super(Taylor_Net, self).__init__()
        self.fc1 = nn.Linear(4, 4)
        self.fc2_1 = nn.Linear(1, 1)
        self.fc2_2 = nn.Linear(1, 1)
        self.fc2_3 = nn.Linear(1, 1)
        self.fc2_4 = nn.Linear(1, 1)

        self.fc3 = nn.Linear(4, 4)
        self.fc3_1 = nn.Linear(1, 1)
        self.fc3_2 = nn.Linear(1, 1)
        self.fc3_3 = nn.Linear(1, 1)
        self.fc3_4 = nn.Linear(1, 1)
        
    def forward_1(self, x):
        x = F.relu(self.fc1(x))
        x_1 = self.fc2_1(x[:, 0].reshape(-1, 1))
        y_1 = self.fc2_2(x[:, 1].reshape(-1, 1))
        z_1 = self.fc2_3(x[:, 2].reshape(-1, 1))
        t_1 = torch.zeros_like(x[:, 3].reshape(-1, 1))

        return torch.cat((x_1, y_1, z_1, t_1), dim=1), y_1, z_1

    def forward_2(self, x):
        x = F.relu(self.fc3(x))
        x_1 = self.fc3_1(x[:, 0].reshape(-1, 1))
        y_1 = torch.zeros_like(x[:, 1].reshape(-1, 1))
        z_1 = self.fc3_3(x[:, 2].reshape(-1, 1))
        t_1 = self.fc3_4(x[:, 3].reshape(-1, 1))

        return torch.cat((x_1, y_1, z_1, t_1), dim=1), y_1, z_1

    def calculate(self, curr_tr, input_tr, delta_tr):
        x1, y_1, z_1 = self.forward_1(input_tr)
        x1 = torch.mul(x1, delta_tr)
        x1 = torch.sum(x1, dim=1, keepdim=True)

        x2, y_2, z_2 = self.forward_2(input_tr)
        x2 = torch.mul(x2, delta_tr)
        x2 = torch.sum(x2, dim=1, keepdim=True)

        x = torch.cat((x1, x2), dim=1)  
        x = curr_tr + x

        return x, y_1, z_1, y_2, z_2
    
    def forward(self, curr_tr, input_tr, delta_tr):
        return self.calculate(curr_tr, input_tr, delta_tr)

In [None]:
def predict_steps(model, curr_tr, input_tr, delta_tr, output_tr, num_predict=4):
    r2_loop = []

    t_next = model(curr_tr[0:-num_predict, :], input_tr[0:-num_predict, :], delta_tr[0:-num_predict, :])
    r2 = r2_score(output_tr[:-num_predict, :].to("cpu").detach().numpy(), t_next.to("cpu").detach().numpy())
    r2_loop.append(r2)
    
    delta = t_next - curr_tr[0:-num_predict, :]
    delta_copy = torch.empty_like(delta_tr[0:-num_predict, :])
    delta_copy[:, 0] = delta[:, 0]
    delta_copy[:, 1] = delta_tr[1:1-num_predict, 1]
    delta_copy[:, 2] = delta[:, 1]
    delta_copy[:, 3] = delta_tr[1:1-num_predict, 3]
    
    t_next_2 = model(t_next, input_tr[1:1-num_predict, :], delta_copy)
    r2 = r2_score(output_tr[1:1-num_predict].to("cpu").detach().numpy(), t_next_2.to("cpu").detach().numpy())
    r2_loop.append(r2)

    for i in range(2, num_predict):
        delta = t_next_2 - t_next
        delta_copy = torch.empty_like(delta_tr[i:i-num_predict, :]) 
        delta_copy[:, 0] = delta[:, 0]
        delta_copy[:, 1] = delta_tr[i:i-num_predict, 1]
        delta_copy[:, 2] = delta[:, 1]
        delta_copy[:, 3] = delta_tr[i:i-num_predict, 3]
        
        t_next = t_next_2
        t_next_2 = model(t_next_2, input_tr[i:i-num_predict, :], delta_copy)
        r2 = r2_score(output_tr[i:i-num_predict].to("cpu").detach().numpy(), t_next_2.to("cpu").detach().numpy())
        r2_loop.append(r2)
    
    return r2_loop

In [None]:
def train_model(model, optimizer, criterion, curr_tr, input_tr, delta_tr, output_tr, scheduler=None, epochs=20000, patience=1000, threshold=1e-10, print_interval=5000):
    prev_loss = float('inf')
    no_improvement_counter = 0
    loss_array = []
    last_loss = 0
    for epoch in range(epochs):
        model.train()  
        optimizer.zero_grad()
        
        out, y_1, z_1, y_2, z_2 = model(curr_tr, input_tr, delta_tr)
        
        term_1 = F.relu(-y_1)
        term_2 = F.relu(-y_2)
        term_3 = F.relu(-z_1)
        term_4 = F.relu(-z_2)
        
        alpha = 0.05  
        regularized_term = alpha * (term_1.mean() + term_2.mean() + term_3.mean() + term_4.mean())
        
        loss = criterion(out, output_tr) + regularized_term
        loss_array.append(loss.item())
  
        loss.backward(retain_graph=True)
        optimizer.step()
        last_loss = loss
        if scheduler:
            scheduler.step()
        if epoch % print_interval == 0:
            print(f'Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.9f}')
        if abs(prev_loss - loss.item()) < threshold:
            no_improvement_counter += 1
        else:
            no_improvement_counter = 0 
        prev_loss = loss.item()
        if no_improvement_counter >= patience:
            print(f'Early stopping at epoch {epoch+1} due to no significant improvement.')
            break
        if loss.item() < 0.000000000308385432:
            print(f'Early stopping at epoch {epoch+1} due to very low loss.')
            print("min of loss function is", min(loss_array))
            break
    return loss.item()

In [None]:
epochs = 12000
patience = 2000 
lr = 0.001
num_test = 1
threshold = 1e-15
decay_factor = 1. 
decay_step_size = 1000  

In [None]:
mse = []
mse.append(1.2362453e-02)
learning_rates = np.arange(0.001, 0.1, 0.002)
for lr in learning_rates:
    model_Taylor = Taylor_Net().to(device)
    optimizer = optim.Adam(model_Taylor.parameters(), lr=lr)
    scheduler = StepLR(optimizer, step_size=decay_step_size, gamma=decay_factor)
    
    train_model(model=model_Taylor, 
                optimizer=optimizer, 
                criterion=nn.MSELoss(), 
                curr_tr=curr_tr, 
                input_tr=input_tr, 
                delta_tr=delta_tr, 
                output_tr=output_tr, 
                scheduler=scheduler, 
                epochs=epochs, 
                patience=patience, 
                threshold=threshold, 
                print_interval=5000)
    
    t_next_test,_,_,_,_ = model_Taylor(curr_tr, input_tr, delta_tr)
    current_mse = mean_squared_error(output_tr.to("cpu").detach().numpy(), t_next_test.to("cpu").detach().numpy())
   
    if current_mse > -10:
        if current_mse < min(mse):
            mse.append(current_mse)
            torch.save(model_Taylor.state_dict(), f'Two_Taylor_1st_loss_TC_test_15s_non_25_50_400.pth')
            print(current_mse)

Epoch [1/12000], Loss: 0.106505089
Epoch [5001/12000], Loss: 0.000011538
Epoch [10001/12000], Loss: 0.000009538
9.562442e-06
Epoch [1/12000], Loss: 0.011210811
Epoch [5001/12000], Loss: 0.000008146
Epoch [10001/12000], Loss: 0.000008238
8.162831e-06
Epoch [1/12000], Loss: 0.040222429
Epoch [5001/12000], Loss: 0.000009656
Epoch [10001/12000], Loss: 0.000008227
Epoch [1/12000], Loss: 0.000437165
Epoch [5001/12000], Loss: 0.000008622
Epoch [10001/12000], Loss: 0.000008628
Epoch [1/12000], Loss: 0.025787162
Epoch [5001/12000], Loss: 0.000008059
Epoch [10001/12000], Loss: 0.000008227
Epoch [1/12000], Loss: 0.000161716
Epoch [5001/12000], Loss: 0.000008240
Epoch [10001/12000], Loss: 0.000008136
Epoch [1/12000], Loss: 0.001450838
Epoch [5001/12000], Loss: 0.000009250
Epoch [10001/12000], Loss: 0.000009065
Epoch [1/12000], Loss: 0.033112850
Epoch [5001/12000], Loss: 0.000008335
Epoch [10001/12000], Loss: 0.000009785
Epoch [1/12000], Loss: 0.015919808
Epoch [5001/12000], Loss: 0.000008294
Epoch

In [None]:
model_path = 'Two_Taylor_1st_loss_TC_test_15s_non_25_50_400.pth'
model_Taylor = Taylor_Net().to(device)
model_Taylor.load_state_dict(torch.load(model_path))

r2_results = predict_steps(model_Taylor, curr_tr, input_tr, delta_tr, output_tr, num_predict=4)
print(r2_results)