In [1]:
import sys, os
import torch
import torch.nn as nn
import math
import numpy as np

In [2]:
class LayerPerceptron(torch.nn.Module):
    def __init__(
        self,
        in_dim     = None,
        out_dim    = None,
        w_init     = False,
        b_init     = False,
        act        = nn.Tanh(),
    ):
        """
            Initialize LayerPerceptron
        """
        super(LayerPerceptron,self).__init__()
        self.dense      = nn.Linear(in_features=in_dim,
                                    out_features=out_dim,
                                    dtype=torch.float)

        self.activation = act
        # Initialize parameters
        self.init_param(w_init, b_init)

    def init_param(self, w_init, b_init):
        """
            Initialize parameters
        """
        if w_init:
            nn.init.constant_(self.dense.weight, w_init)
            if b_init:
                fan_in, _ = nn.init._calculate_fan_in_and_fan_out(self.dense.weight)
                bound = 1 / math.sqrt(fan_in) if fan_in > 0 else 0
                nn.init.uniform_(self.bias, -bound, bound)
            else:
                nn.init.zeros_(self.dense.bias)
        else:
            nn.init.kaiming_normal_(self.dense.weight,a=math.sqrt(5))
            if b_init:
                fan_in, _ = nn.init._calculate_fan_in_and_fan_out(self.dense.weight)
                bound = 1 / math.sqrt(fan_in) if fan_in > 0 else 0
                nn.init.uniform_(self.bias, -bound, bound)
            else:
                nn.init.zeros_(self.dense.bias)

    def forward(self,x):
        """
            Forward propagate
        """
        if self.activation is not None:
            out = self.activation(self.dense(x))
        else:
            out = self.dense(x)
        return out

In [3]:
class ResidualLayerPerceptron(torch.nn.Module):
    def __init__(
        self,
        in_dim     = None,
        out_dim    = None,
        w_init     = False,
        b_init     = False,
        act        = nn.Tanh(),
    ):
        """
            Initialize ResidualLayerPerceptron
        """
        super(ResidualLayerPerceptron,self).__init__()

        if in_dim != out_dim:
            raise ValueError("in_dim of ResBlock should be equal of out_dim, but got in_dim: {}, "
                             "out_dim: {}".format(in_dim, out_dim))

        self.dense      = nn.Linear(in_features=in_dim,
                                    out_features=out_dim,
                                    dtype=torch.float)

        self.activation = act
        # Initialize parameters
        self.init_param(w_init, b_init)

    def init_param(self, w_init, b_init):
        """
            Initialize parameters
        """
        if w_init:
            nn.init.constant_(self.dense.weight, w_init)
            if b_init:
                fan_in, _ = nn.init._calculate_fan_in_and_fan_out(self.dense.weight)
                bound = 1 / math.sqrt(fan_in) if fan_in > 0 else 0
                nn.init.uniform_(self.bias, -bound, bound)
            else:
                nn.init.zeros_(self.dense.bias)
        else:
            nn.init.kaiming_normal_(self.dense.weight,a=math.sqrt(5))
            if b_init:
                fan_in, _ = nn.init._calculate_fan_in_and_fan_out(self.dense.weight)
                bound = 1 / math.sqrt(fan_in) if fan_in > 0 else 0
                nn.init.uniform_(self.bias, -bound, bound)
            else:
                nn.init.zeros_(self.dense.bias)

    def forward(self,x):
        """
            Forward propagate
        """
        out = self.activation(self.dense(x)+x)
        return out

In [4]:
class MLPSequential(torch.nn.Module):
    def __init__(
        self,
        in_dim     = None,
        out_dim    = None,
        layers     = None,
        neurons    = None,
        residual   = False,
        w_init     = False,
        b_init     = False,
        act        = nn.Tanh(),
    ):
        """
            Initialize MLPSequential
        """
        super(MLPSequential,self).__init__()
        if layers < 3:
            raise ValueError("MLPSequential have at least 3 layers, but got layers: {}".format(layers))

        # Define net
        self.net = nn.Sequential()
        self.net.add_module("input", LayerPerceptron(in_dim=in_dim,
                                                           out_dim=neurons,
                                                           w_init=w_init,
                                                           b_init=b_init,
                                                           act=act,
                                                           ))
        for idx in range(layers - 2):
            if residual:
                self.net.add_module(f"res_{idx+1:02d}", ResidualLayerPerceptron(in_dim=neurons,
                                                                                     out_dim=neurons,
                                                                                     w_init=w_init,
                                                                                     b_init=b_init,
                                                                                     act=act,
                                                                                     ))
            else:
                self.net.add_module(f"mlp_{idx+1:02d}", LayerPerceptron(in_dim=neurons,
                                                                        out_dim=neurons,
                                                                        w_init=w_init,
                                                                        b_init=b_init,
                                                                        act=act,
                                                                        ))

        self.net.add_module("output", LayerPerceptron(in_dim=neurons,
                                                      out_dim=out_dim,
                                                      w_init=w_init,
                                                      b_init=b_init,
                                                      act=None,
                                                      ))       


    def forward(self,x):
        """
            Forward propagate
        """
        out = self.net(x)
        return out

In [5]:
class InputScaleNet(torch.nn.Module):
    def __init__(
        self,
        scales     = [],
        centers    = None,
    ):
        """
            Initialize InputScaleNet
        """
        super(InputScaleNet,self).__init__()
        self.scales     = torch.from_numpy(np.array(scales)).type(torch.float)
        if centers is None:
            self.centers = torch.zeros_like(self.scales, dtype=torch.float)
        else:
            self.centers = torch.from_numpy(np.array(centers)).type(torch.float)


    def forward(self,x):
        """
            Forward propagate
        """
        out = torch.mul(x - self.centers, self.scales)
        return out

In [6]:
class MultiScaleMLPSequential(torch.nn.Module):
    def __init__(
        self,
        in_dim     = None,
        out_dim    = None,
        layers     = None,
        neurons    = None,
        residual   = False, # use residual | [True, False]
        w_init     = False, # constant std | (ex 0.1, 0.01)
        b_init     = False, # use bias | [True, False]
        act        = nn.Tanh(),
        subnets    = None,  # subnet(multi scale) number
        amp        = 1.0,   # amplification factor of input
        base_scale = 2.0,   # base scale factor
        in_scale   = None,  # scale factor of input (ex [x,y,t])
        in_center  = None,  # Center position of coordinate translation (ex [x,y,t])
        latent_vec = None,  # latent vector (ex Tensor[shape=(4,16)] )
    ):
        """
            Initialize MultiScaleMLPSequential
        """
        super(MultiScaleMLPSequential,self).__init__()
        self.subnets = subnets
        self.scale_coef = [amp * (base_scale**i) for i in range(self.subnets)]
        self.latent_vec = latent_vec
        
        if self.latent_vec is not None:
            self.num_scenarios = latent_vec.shape[0]
            self.latent_size = latent_vec.shape[1]
            in_dim += self.latent_size
        else:
            self.num_scenarios = 1
            self.latent_size = 0

        # Define MultiScaleMLP
        self.msnet = nn.Sequential()
        for i in range(self.subnets):
            self.msnet.add_module(f"Scale_{i+1}_Net",MLPSequential(in_dim=in_dim,
                                                               out_dim=out_dim,
                                                               layers=layers,
                                                               neurons=neurons,
                                                               residual=residual,
                                                               w_init=w_init,
                                                               b_init=b_init,
                                                               act=act,
                                                               ))
        if in_scale:
            self.in_scale = InputScaleNet(in_scale, in_center)
        else:
            self.in_scale = torch.nn.Identity()


    def forward(self,x):
        """
            Forward propagate
        """
        x = self.in_scale(x)
        if self.latent_vec is not None:
            batch_size = x.shape[0]
            latent_vectors = self.latent_vec.view(self.num_scenarios, 1, self.latent_size).repeat(1,batch_size//self.num_scenarios,1).view(-1,self.latent_size)
            x = torch.concat([x, latent_vectors], axis=1)
        
        out = 0
        for i in range(self.subnets):
            x_s = x * self.scale_coef[i]
            out = out + self.msnet[i](x_s)
        return out

In [7]:
L1 = LayerPerceptron(in_dim=3,
               out_dim=64)
L2 = ResidualLayerPerceptron(in_dim=64,
                            out_dim=64)

In [8]:
input_sample = torch.from_numpy(np.array([0.0, 0.5, 0.3])).type(torch.float)
input_sample

tensor([0.0000, 0.5000, 0.3000])

In [9]:
L1_result = L1(input_sample)
L2_result = L2(L1_result)

print(L1_result)
print(L2_result)

tensor([-0.1224,  0.0877, -0.0410,  0.2476, -0.0109,  0.2787,  0.3345,  0.1550,
         0.1737,  0.3972, -0.0446, -0.0885,  0.2446, -0.2994,  0.2292, -0.0351,
        -0.0681,  0.2415,  0.0074,  0.0759,  0.1249, -0.3604, -0.1365, -0.1692,
         0.1675,  0.0378, -0.1753,  0.1410, -0.0211,  0.1218,  0.0643, -0.0088,
         0.1628,  0.2645, -0.1146, -0.1899, -0.1624,  0.4539,  0.0374,  0.0473,
         0.0303, -0.0399,  0.1481, -0.0997,  0.0590, -0.1565,  0.1017,  0.1456,
         0.4167, -0.0896,  0.2215, -0.1190, -0.0468, -0.1906, -0.1803, -0.2644,
        -0.0810, -0.1746, -0.4509,  0.1829, -0.0760,  0.3074, -0.0942, -0.1054],
       grad_fn=<TanhBackward0>)
tensor([-0.2390, -0.0173, -0.2266,  0.1401,  0.1874,  0.4809,  0.4530, -0.0850,
         0.2399,  0.5614,  0.1005, -0.0180,  0.3894, -0.2739,  0.2445,  0.0274,
        -0.2036,  0.1456,  0.0085,  0.1595,  0.2854, -0.4788, -0.1331, -0.3290,
        -0.0808,  0.0808, -0.0932,  0.2495, -0.0165,  0.2511, -0.0500, -0.1102,
       

In [10]:
net = MLPSequential(
        in_dim     = 3,
        out_dim    = 1,
        layers     = 5,
        neurons    = 64,
        residual   = False,
        w_init     = False,
        b_init     = False,
        act        = nn.Tanh(),
    )

In [11]:
net

MLPSequential(
  (net): Sequential(
    (input): LayerPerceptron(
      (dense): Linear(in_features=3, out_features=64, bias=True)
      (activation): Tanh()
    )
    (mlp_01): LayerPerceptron(
      (dense): Linear(in_features=64, out_features=64, bias=True)
      (activation): Tanh()
    )
    (mlp_02): LayerPerceptron(
      (dense): Linear(in_features=64, out_features=64, bias=True)
      (activation): Tanh()
    )
    (mlp_03): LayerPerceptron(
      (dense): Linear(in_features=64, out_features=64, bias=True)
      (activation): Tanh()
    )
    (output): LayerPerceptron(
      (dense): Linear(in_features=64, out_features=1, bias=True)
    )
  )
)

In [12]:
net(input_sample)

tensor([-0.0006], grad_fn=<AddBackward0>)

In [13]:
scale_norm = InputScaleNet(scales=[2.0],
                          centers=[1.0])

In [14]:
scale_norm(input_sample)

tensor([-2.0000, -1.0000, -1.4000])

In [15]:
net = MultiScaleMLPSequential(
    in_dim=3,
    out_dim=1,
    layers=7,
    neurons=32,
    residual=True,
    subnets=3,
    in_scale= [1/5.0,1/20.0,1/100.0],
    in_center=[3.0, 10.0, -15.0],
    latent_vec= torch.from_numpy(np.random.randn(4, 12) / np.sqrt(12)).type(torch.float)
)

In [19]:
net.msnet

Sequential(
  (Scale_1_Net): MLPSequential(
    (net): Sequential(
      (input): LayerPerceptron(
        (dense): Linear(in_features=15, out_features=32, bias=True)
        (activation): Tanh()
      )
      (res_01): ResidualLayerPerceptron(
        (dense): Linear(in_features=32, out_features=32, bias=True)
        (activation): Tanh()
      )
      (res_02): ResidualLayerPerceptron(
        (dense): Linear(in_features=32, out_features=32, bias=True)
        (activation): Tanh()
      )
      (res_03): ResidualLayerPerceptron(
        (dense): Linear(in_features=32, out_features=32, bias=True)
        (activation): Tanh()
      )
      (res_04): ResidualLayerPerceptron(
        (dense): Linear(in_features=32, out_features=32, bias=True)
        (activation): Tanh()
      )
      (res_05): ResidualLayerPerceptron(
        (dense): Linear(in_features=32, out_features=32, bias=True)
        (activation): Tanh()
      )
      (output): LayerPerceptron(
        (dense): Linear(in_featur

In [17]:
input_sample = torch.rand(size=(16,3), dtype=torch.float)
input_sample.shape

torch.Size([16, 3])

In [20]:
net(input_sample)

tensor([[-0.0119],
        [ 0.0034],
        [-0.0125],
        [-0.0151],
        [-0.5644],
        [-0.5729],
        [-0.5750],
        [-0.5479],
        [-0.1439],
        [-0.1499],
        [-0.1491],
        [-0.1408],
        [-0.4426],
        [-0.3898],
        [-0.4611],
        [-0.4404]], grad_fn=<AddBackward0>)