In [26]:
import torch
from torch import nn, Tensor
import torch.nn.functional as F

import numpy as np


class FeedForwardNN(nn.Module):
    '''
        A standard two-hidden-layer Feed Forward Neural Network
    '''
    def __init__(self, in_dim: int, hidden_dim: int, out_dim: int, custom_init_behavior: bool = False, output_bias: Tensor = None) -> None:
        '''
            Initialize the network and set up the layers.

            Parameters:
                in_dim - input dimensions as an int
                hidden_dim - hidden layer dimensions as an int
                out_dim - output dimensions as an int
                custom_init_behavior - boolean to reduce weights of last layer by 100 and add custom bias if given
                output_bias - replace the last layer's bias with this value if custom_init_behavior is true

            Return:
                None
        '''
        super(FeedForwardNN, self).__init__()

        self.fc1 = nn.Linear(in_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = nn.Linear(hidden_dim, hidden_dim)
        self.fc4 = nn.Linear(hidden_dim, out_dim)

        if custom_init_behavior:
            self.fc4.weight.data = self.fc4.weight.data / 100
            if output_bias is not None:
                self.fc4.bias.data = output_bias

    def forward(self, obs: Tensor) -> Tensor:
        '''
            Runs a forward pass on the neural network

            Parameters:
                obs - observation to pass as input (either Tensor or converts to Tensor)

            Return:
                output - the output of the forward pass
        '''
        if not isinstance(obs, Tensor):
            obs = torch.tensor(obs, dtype=torch.float)

        activation1 = F.tanh(self.fc1(obs))
        activation2 = F.tanh(self.fc2(activation1))
        activation3 = F.tanh(self.fc3(activation2))
        output = self.fc4(activation3)

        return output
    
    
class Standardizer:
    def __init__(self, min_maxes):
        self.means = np.array([np.mean(x) for x in min_maxes])
        self.stds = np.array([abs(x[1] - x[0]) / 2.0 for x in min_maxes])
        self.diag_stds = np.diag(self.stds)
        self.inv_stds = np.array([2.0 / abs(x[1] - x[0]) for x in min_maxes])
        self.diag_inv_stds = np.diag(self.inv_stds)

    def standardize(self, values):
        # Parse input to tensor if not already and get device
        if not isinstance(values, torch.Tensor):
            values = torch.tensor(values, dtype=torch.float)
        device = torch.device("cuda" if values.get_device() >= 0 else "cpu")
        # Set the means and diagonal inverse standard deviations to tensors on given device
        means = torch.tensor(self.means, dtype=torch.float, device=device)
        diag_inv_stds = torch.tensor(self.diag_inv_stds, dtype=torch.float, device=device)
        if values.ndim > 1:
            means = torch.tile(means, (values.shape[0],1))
        # Return the standardized values
        return (values - means) @ diag_inv_stds
    
    def destandardize(self, values):
        # Parse input to tensor if not already and get device
        if not isinstance(values, torch.Tensor):
            values = torch.tensor(values, dtype=torch.float)
        device = torch.device("cuda" if values.get_device() >= 0 else "cpu")
        # Set the means and diagonal standard deviations to tensors on given device
        means = torch.tensor(self.means, dtype=torch.float, device=device)
        diag_std = torch.tensor(self.diag_stds, dtype=torch.float, device=device)
        if values.ndim > 1:
            means = torch.tile(means, (values.shape[0],1))
        # Return the de-standardized values
        return (values @ diag_std) + means

In [27]:
SAFETY_PARAMS = {
    'Feed,Feed initial;MassFlow': [0, 90.71437571],
    'Make up,Feed initial;Temperature': [20.55555556, 26.66666667],
    'Steam;MassFlow': [0, 136.0715636],
    'Condenser Recycle;MassFlow': [13.60715636, 45.35718785],
    'Condenser Recycle;Pressure': [0, 128.905],
    'Condenser Recycle;Temperature': [-17.77777778, 65.55555556],
    'Reboiler Recycle;MassFlow': [0, 45.35718785],
    'Reboiler Recycle;Pressure': [0, 963.2],
    'Reboiler Recycle;Temperature': [-17.77777778, 176.6666667],
    'Feed initial;Pressure': [0, 204.75],
    'Vapour Recycle;MassFlow': [0, 9.071437571],
    'Vapour Recycle;Pressure': [0, 273.7],
    'Vapour Recycle;Temperature': [-17.77777778, 204.4444444],
    'Feed;Pressure': [0, 204.75],
    'Feed;Temperature': [-17.77777778, 204.4444444],
    'Water Total;MassFlow': [0, 453.5718785],
    'Water Total;Pressure': [0, 515.025],
    'Distillation Tower;Pressures:0': [0, 446.0908],
    'Distillation Tower;Pressures:-1': [0, 446.0908],
    'Tee-2;Splits': [0, 1],
    'Tee-3;Splits': [0, 1],
    'Tee-4;Splits': [0, 1],
}
min_maxes = list(SAFETY_PARAMS.values())

initial_state = [45.35922921968971, 22.666666666666664, 22.861051526723546, 22.520690109625438, 128.47428972294057, 
                 33.80356163168125, 23.012775154629644, 137.42956072400227, 145.53321062799398, 137.8951817355074, 
                 0.0, 137.8951817355074, 106.5500703866208, 137.8806735391037, 84.45374160437058, 372.85286418584946, 
                 144.14872822721267, 128.9319949226994, 137.8951817355074, 0.675, 0.5, 0.4]

standardizer = Standardizer(min_maxes)

standard_init_state = standardizer.standardize(initial_state)

print(standard_init_state)

tensor([ 4.4995e-05, -3.0909e-01, -6.6398e-01, -4.3852e-01,  9.9332e-01,
         2.3795e-01,  1.4736e-02, -7.1464e-01,  6.7977e-01,  3.4696e-01,
        -1.0000e+00,  7.6374e-03,  1.1895e-01,  3.4682e-01, -7.9916e-02,
         6.4407e-01, -4.4023e-01, -4.2195e-01, -3.8176e-01,  3.5000e-01,
         0.0000e+00, -2.0000e-01])


In [34]:
model = FeedForwardNN(22, 64, 22, custom_init_behavior=True, output_bias=standard_init_state)

print("model.fc4.bias:", model.fc4.bias.data)

outputs = []
with torch.no_grad():
    for _ in range(10000):
        outputs.append(model((torch.rand((22)) * 2) - 1))

outputs = torch.tensor(np.array(outputs))
print("Means:",outputs.mean(dim=0))
print("Std:", outputs.std(dim=0))

model.fc4.bias: tensor([ 4.4995e-05, -3.0909e-01, -6.6398e-01, -4.3852e-01,  9.9332e-01,
         2.3795e-01,  1.4736e-02, -7.1464e-01,  6.7977e-01,  3.4696e-01,
        -1.0000e+00,  7.6374e-03,  1.1895e-01,  3.4682e-01, -7.9916e-02,
         6.4407e-01, -4.4023e-01, -4.2195e-01, -3.8176e-01,  3.5000e-01,
         0.0000e+00, -2.0000e-01])
Means: tensor([-6.6373e-04, -3.0900e-01, -6.6441e-01, -4.3833e-01,  9.9313e-01,
         2.3898e-01,  1.4606e-02, -7.1541e-01,  6.7953e-01,  3.4690e-01,
        -9.9988e-01,  7.5665e-03,  1.1917e-01,  3.4643e-01, -8.0424e-02,
         6.4421e-01, -4.4025e-01, -4.2172e-01, -3.8172e-01,  3.4971e-01,
        -2.8585e-05, -1.9981e-01])
Std: tensor([0.0008, 0.0007, 0.0007, 0.0007, 0.0005, 0.0007, 0.0006, 0.0008, 0.0004,
        0.0006, 0.0007, 0.0003, 0.0005, 0.0006, 0.0005, 0.0005, 0.0004, 0.0007,
        0.0007, 0.0007, 0.0006, 0.0006])
