# Training Trajectory Corrector

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import torch
from glob import glob
from flow_predictor import flow_predictor
from tqdm import tqdm
from pathgen import training_twosteps

mps_device = torch.device("cpu")

plt.close('all')
from constants.filepath import PROJECT_PATH
import os

## Loading Dataset

In [2]:
location = os.path.join(PROJECT_PATH, 'data', 'sim_samples')
workspace_list = glob(os.path.join(location,'*.npz'))

residuals = []
analytical_data = []
command_data = []


# print loading data in red text
print("\033[91mLoading data...\033[0m")
for file in tqdm(workspace_list):
    saved_data = np.load(file)
    Q_out_sim, Q_com_sim, P_p_sim, t_sim = saved_data['Qo_x'], saved_data['Qcom_x'], saved_data['Pp_x'], saved_data['time']
    W_com_sim = np.ones(np.shape(t_sim)) * 0.0029
    t_vbn, W_com_vbn, Q_com_vbn, Q_out_vbn = flow_predictor(t_sim, Q_com_sim, W_com_sim)
    residuals.append(np.interp(t_vbn[t_vbn > 0.2], t_sim, Q_out_sim) - Q_out_vbn[t_vbn > 0.2])
    analytical_data.append(Q_out_vbn[t_vbn > 0.2])
    command_data.append(Q_com_vbn[t_vbn > 0.2])
residuals = np.concatenate(residuals) * 1e9
analytical_data = np.concatenate(analytical_data) * 1e9
command_data = np.concatenate(command_data) * 1e9
# print finished loading data in green text
print("\033[92mFinished loading data.\033[0m")

[91mLoading data...[0m


100%|██████████| 36/36 [04:04<00:00,  6.80s/it]

[92mFinished loading data.[0m





In [3]:
input_data = np.stack((command_data, analytical_data), axis=1)
# input_features = torch.tensor(analytical_data, dtype=torch.float32, device=mps_device).view(-1, 1)
input_features = torch.tensor(input_data, dtype=torch.float32, device=mps_device)
target_residuals = torch.tensor(residuals, dtype=torch.float32, device=mps_device).view(-1, 1)

In [4]:
class ResidualModel(torch.nn.Module):
    def __init__(self, fc_dim_list = [2, 64, 512, 1024, 512, 64, 1]):
        super(ResidualModel, self).__init__()
        # self.fc1 = torch.nn.Linear(2, 64)  # Input layer
        # self.fc2 = torch.nn.Linear(64, 256)  # Hidden layer
        # self.fc3 = torch.nn.Linear(256, 64)  # Output layer
        # self.fc4 = torch.nn.Linear(64, 1)  # Output layer
        
        self.sequence = []
        for i in range(1,len(fc_dim_list)):
            self.sequence.append(torch.nn.Linear(fc_dim_list[i-1], fc_dim_list[i]))
            if i < len(fc_dim_list)-1:
                self.sequence.append(torch.nn.ReLU())
        self.sequence = torch.nn.Sequential(*self.sequence)

        # self.fc1 = torch.nn.Linear(2, 64)  # Input layer
        # self.fc2 = torch.nn.Linear(64, 512)  # Hidden layer
        # self.fc3 = torch.nn.Linear(512, 1024)  # Hidden layer
        # self.fc4 = torch.nn.Linear(1024, 512)  # Output layer
        # self.fc5 = torch.nn.Linear(512, 64)  # Output layer
        # self.fc6 = torch.nn.Linear(64, 1)  # Output layer

    def forward(self, x):
        return self.sequence(x)

In [None]:
model = ResidualModel([2, 512, 1])
model.to(mps_device)
criterion = torch.nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# Training loop
epochs = 500
model.train()

print("Training model...")
for epoch in tqdm(range(epochs)):

    # Forward pass
    predictions = model(input_features)
    loss = criterion(predictions, target_residuals)

    # Backward pass and optimization
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    # print loss every 10 epochs
    if (epoch + 1) % 10 == 0:
        print(f"Epoch [{epoch + 1}/{epochs}], Loss: {loss.item():.4f}")

Training model...


  0%|          | 0/500 [00:00<?, ?it/s]

  2%|▏         | 10/500 [04:52<4:00:22, 29.43s/it]

Epoch [10/500], Loss: 1.2481


  4%|▍         | 20/500 [09:41<3:48:00, 28.50s/it]

Epoch [20/500], Loss: 1.3535


  6%|▌         | 30/500 [14:30<3:49:34, 29.31s/it]

Epoch [30/500], Loss: 1.0810


  6%|▋         | 32/500 [15:30<3:51:33, 29.69s/it]

In [None]:
# Example: Run model inference (optional, based on your needs)
model.eval()

#%%

test = training_twosteps(flowrate_magnitudes = [[0.75, 0.001], [0.2, 0.001], [0.48, 0.001]],
                         flowrate_times = [[0.0001, 8], [15, 20], [22, 33]],
                         beadwidth_magnitudes = [[]],
                         beadwidth_times = [[]],
                         dt = 0.001,
                         tmax=40)
test_t_input, test_Q_input, test_W_input = test.pathgen()
test_t, test_W, test_Q_com, test_Q_out = flow_predictor(test_t_input, test_Q_input, test_W_input)

test_analytical = test_Q_out * 1e9
test_command = test_Q_com * 1e9
test_combined = np.stack((test_command, test_analytical), axis=1)
test_input = torch.tensor(test_combined, dtype=torch.float32, device=mps_device)

with torch.no_grad():
    output = model(test_input) * 1e-9
    test_result = (output).cpu().numpy().reshape(-1)
    
plt.figure()
plt.plot(test_t, test_Q_com, label='Input', color='black', linestyle='--')
# plt.plot(test_accel.ts, test_accel.sim_Q_out, label = 'Simulated Data', color = 'red')
plt.plot(test_t, test_Q_out, label = 'Analytical Data', color = 'blue')
plt.plot(test_t, test_result, label='Residual', color='magenta')
plt.plot(test_t, test_Q_out + test_result, label='Total', color='green')

plt.legend()


In [None]:
torch.save(model.state_dict(), 'trajectory_correction_DNN_v0.pth')