In [1]:
%matplotlib inline
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torchvision
from tqdm import tqdm

In [2]:
class Module:
    def __init__(self):
        self.with_weights = False
        pass
    def forward(self):
        pass
    def backward(self):
        pass
    def __call__(self, *inputs, **kwargs):
        return self.forward(*inputs, **kwargs)

In [3]:
class Linear(Module):
    def __init__(self, input_dim, output_dim):
        super(Linear, self).__init__()
        # Initialize weights
        self.W = np.random.randn(input_dim, output_dim) * 1e-2
        self.b = np.zeros((1, output_dim))
        # The linear layer's parameters should be updated
        self.with_weights = True
        
    def forward(self, input_array):
        res = np.zeros((input_array.shape[0], self.W.shape[1]))
        res = np.dot(input_array, self.W) + self.b

        return res

    def backward(self, input_array, output_gradient, lr=0.05):
        res = np.zeros_like(input_array)
        
        # 1. Compute the gradient to be passed to the previous layer (dL/dX)
        res = np.dot(output_gradient, self.W.T)
        
        # 2. Compute the gradients for W and b, then update W and b
        dW = np.dot(input_array.T, output_gradient)
        db = np.sum(output_gradient, axis=0, keepdims=True)
        
        # Update the weights and biases
        self.W -= lr * dW
        self.b -= lr * db
        
        return res



In [4]:
class ReLU(Module):
    def __init__(self):
        super(ReLU, self).__init__()
        pass
    
    def forward(self, input_array):
        res = np.zeros_like(input_array)        
        res = np.maximum(0, input_array)       
        return res

    def backward(self, input_array, output_gradient):
        res = np.zeros_like(input_array)        
        res = output_gradient * (input_array > 0).astype(float)       
        return res


In [5]:
class Sigmoid(Module):
    def __init__(self):
        super(Sigmoid, self).__init__()

    def forward(self, input_array):
        return 1 / (1 + np.exp(-input_array))

    def backward(self, input_array, output_gradient):
        return output_gradient * self.forward(input_array) * (1 - self.forward(input_array))

In [6]:
class MSELoss(Module):
    def __init__(self):
        super(MSELoss, self).__init__()   
        
    def forward(self, predicted_y, y):
        return np.mean((predicted_y - y) ** 2)

    def backward(self, predicted_y, y):
        return (predicted_y - y) * 2 / len(y)

In [7]:
class SimpleNN(Module):
    def __init__(self, layers, loss, lr = 0.05):
        super(SimpleNN, self).__init__()    
        self.layers = layers
        self.loss = loss
        self.inputs = [None for _ in range(len(self.layers))]
        self.output = None
        self.loss_value = 1e5
        self.lr = lr
        pass

    def forward(self, input_array):
        current_input = input_array
        for i in range(len(self.layers)):
            self.inputs[i] = current_input
            current_input = self.layers[i](current_input)
        self.output = current_input
        return self.output
        
    def backward(self, y):
        if self.inputs[-1] is None:
            print("call forward first.")
            return
        self.loss_value = self.loss(self.output, y)
        output_gradient = self.loss.backward(self.output, y)
        for i in range(len(self.layers)-1, -1, -1):
            # here we check wheather a layer should be updated in the backpropagation by judging its `.with_weights` value.
            # whether we update the weight is determined by whether the layer has trainable parameters. 
            # for instance, a layer such as x^2 or ||x||^2 does not have a parameter, so we set `.with_weight`` to false.
            if not isinstance(self.inputs[i], np.ndarray):
                self.inputs[i]=self.inputs[i].numpy()
            if self.layers[i].with_weights:
                output_gradient = self.layers[i].backward(self.inputs[i], output_gradient, self.lr)
            else:
                output_gradient = self.layers[i].backward(self.inputs[i], output_gradient)
        self.output = None

In [29]:
layers = [
    Linear(2, 20),
    Sigmoid(),
    Linear(20, 2)
]

loss = MSELoss()
model = SimpleNN(layers, loss, lr=0.01)


In [30]:
import pandas as pd
from sklearn.metrics import mean_absolute_error

train_data = pd.read_csv('train_data.csv')

x1 = (train_data['H1'] + train_data['H2'] + train_data['H3']) / 3
x2 = train_data['Pdeficit']

y1 = train_data['f_drop_real']
y2 = train_data['t_nadir_real']

# Combine data into X and Y for training
X_train = np.column_stack((x1, x2))
Y_train = np.column_stack((y1, y2))

epoch = 0
max_iteration = 50000
loss_values = []  
plt.figure(figsize=(15, 8))

pred_y = model(X_train)

convergence_threshold = 1e-4
convergence_epoch = None  


while epoch < max_iteration and model.loss_value > convergence_threshold:
    pred_y = model(X_train)
    model.backward(Y_train)
    loss_values.append(model.loss_value)  
    if epoch % 500 == 0:
        print(f"epoch {epoch}/{max_iteration}, loss: {model.loss_value:.6f}")
    epoch += 1

epoch 0/50000, loss: 16.598175
epoch 500/50000, loss: 0.089535
epoch 1000/50000, loss: 0.041829
epoch 1500/50000, loss: 0.033773
epoch 2000/50000, loss: 0.032144
epoch 2500/50000, loss: 0.030796
epoch 3000/50000, loss: 0.029570
epoch 3500/50000, loss: 0.028425
epoch 4000/50000, loss: 0.027342
epoch 4500/50000, loss: 0.026306
epoch 5000/50000, loss: 0.025309
epoch 5500/50000, loss: 0.024346
epoch 6000/50000, loss: 0.023411
epoch 6500/50000, loss: 0.022737
epoch 7000/50000, loss: 0.024539
epoch 7500/50000, loss: 0.024311
epoch 8000/50000, loss: 0.024040
epoch 8500/50000, loss: 0.023742
epoch 9000/50000, loss: 0.023430
epoch 9500/50000, loss: 0.023111
epoch 10000/50000, loss: 0.022792
epoch 10500/50000, loss: 0.022476
epoch 11000/50000, loss: 0.022167
epoch 11500/50000, loss: 0.021866
epoch 12000/50000, loss: 0.021573
epoch 12500/50000, loss: 0.021291
epoch 13000/50000, loss: 0.021017
epoch 13500/50000, loss: 0.020754
epoch 14000/50000, loss: 0.020499
epoch 14500/50000, loss: 0.020253
epo

<Figure size 1500x800 with 0 Axes>

In [31]:
test_data = pd.read_csv('test_data.csv')

x1_test = (test_data['H1'] + test_data['H2'] + test_data['H3']) / 3
x2_test = test_data['Pdeficit']

y1_test = test_data['f_drop_real']
y2_test = test_data['t_nadir_real']

X_test = np.column_stack((x1_test, x2_test))
Y_test = np.column_stack((y1_test, y2_test))

pred_test = model(X_test)

mae_y1 = mean_absolute_error(Y_test[:, 0], pred_test[:, 0])
mae_y2 = mean_absolute_error(Y_test[:, 1], pred_test[:, 1])

print(f"MAE for y1 (f_drop_real): {mae_y1:.6f}")
print(f"MAE for y2 (t_nadir_real): {mae_y2:.6f}")

MAE for y1 (f_drop_real): 0.039366
MAE for y2 (t_nadir_real): 0.067993
