In [62]:
import torch
import torch.nn as nn



# Definition of the layers we need in the network :

class LinearL1(nn.Module):
    
    def __init__(self, in_features, out_features):
        super(LinearL1, self).__init__()
        self.linear = nn.Linear(in_features, out_features)
        self.reg_weight_decay = 1e-6
        self.reg2_weight_decay = 0.1
        self.register_buffer('weight_loss', torch.zeros(1))
        self.register_buffer('bias_loss', torch.zeros(1))

    def forward(self, x):
        output = self.linear(x)

        # Ajouter la régularisation L1 aux poids
        self.weight_loss = torch.norm(self.linear.weight, p=1)
        self.bias_loss = torch.norm(self.linear.bias, p=1)
        regularizer_loss = self.reg_weight_decay * self.weight_loss + self.bias_loss*self.reg2_weight_decay
        
        return output, regularizer_loss


# Lambda functions wrapped in a layer to be able to use them in the network
# We need them to apply exp function to an output of a layer in the network
class LambdaLayer(nn.Module):
    def __init__(self, lambda_func):
        super(LambdaLayer, self).__init__()
        self.lambda_func = lambda_func
        
    def forward(self, x):
        return self.lambda_func(x)
    

# Multiplication layer to multiply two layers :

class MultiplyLayer(nn.Module):
    def __init__(self):
        super(MultiplyLayer, self).__init__()
    
    def forward(self, inputs):
        assert len(inputs) == 2, "MultiplyLayer expects exactly 2 inputs"
        return torch.mul(inputs[0], inputs[1])

# Addition layer to add two layers :

class AddLayer(nn.Module):
    def __init__(self):
        super(AddLayer, self).__init__()
    
    def forward(self, inputs):
        assert len(inputs) == 2, "AddLayer expects exactly 2 inputs"
        return torch.add(inputs[0], inputs[1])
    
# Concatenation layer to concatenate two layers :

class ConcatLayer(nn.Module):
    def __init__(self):
        super(ConcatLayer, self).__init__()
        
    def forward(self, inputs):
        assert len(inputs) == 2, "ConcatLayer expects exactly 2 inputs"
        return torch.concat((inputs[0], inputs[1]), -1)
    
# Note that we don't need to define a constructor due to the fact there's no need to initialize any layers like in the lambda layer

class WNN(nn.Module):
    '''
    This is the WNN model written in PyTorch. It is a weight prediction model based on the Tensorflow version of the WNN.
    It follows the exact same architecture and layer name (that you can check on the WNN.summary() of Tensorflow).
    
    The model work with the function implemented in the ``WNN_PT/wnn_callback.py`` file which is the exact same function used in the 
    WeightForecasting callback class of the Tensorflow version.
    
    You can also check the architecture in the ``WNN_PT/Viz/model_pt.onnx.png`` and the ``WNN_PT/Viz/model_tf.onnx.png``which represents the two versions of the WNN
    written in Tensorflow and PyTorch (it is the exact same architecture but slightly different because of the framework)
    '''
    def __init__(self,input_size,output_size):
        
        super(WNN, self).__init__()
         
        # We're defining the differents layers of the network here :
        # In fact, there's only 12 differents layer throughout the network (6 dense, 4 calculus layers, the LeakyReLU layer and the ReLU one)
        
        
        # Init with torch.nn.init to implement the same initialization as in the Tensorflow version
        
        self.dense = LinearL1(input_size, 64) # input 1 (dense n°0)
        self.dense_6 = LinearL1(input_size-1, 64) # input 2 (dense n°6)
        
        self.dense_1 = nn.Linear(64, 8) 
        self.dense_2 = nn.Linear(8, 64) 
        self.dense_3 = nn.Linear(8, 64)
        self.dense_4 = nn.Linear(8, 64) 
        self.dense_5 = LinearL1(64,32) 

        self.dense_7 = nn.Linear(64, 8) 
        self.dense_8 = nn.Linear(8, 64) 
        self.dense_9 = nn.Linear(8, 64) 
        self.dense_10 = nn.Linear(8, 64) 
        self.dense_11 = LinearL1(64, 32)
        
        self.dense_12 = nn.Linear(64, output_size) 
        
        self.lambda_0 = LambdaLayer(lambda x: torch.exp(x)) 
        self.lambda_1 = LambdaLayer(lambda x: torch.exp(x)) 
        self.add = AddLayer() 
        self.add_1 = AddLayer() 
        self.multiply = MultiplyLayer() 
        self.multiply_1 = MultiplyLayer() 
        self.multiply_2 = MultiplyLayer() 
        self.multiply_3 = MultiplyLayer()
        
        self.leaky_re_lu = nn.LeakyReLU() 
        self.leaky_re_lu_1 = nn.LeakyReLU()
        
        self.concatenate = ConcatLayer() 
        
        
        # We have every layer we need for the forward propagation
        # Note : We directly hard coded the size of inputs and outputs of each layer --> We don't need to pay attention to
        #                                                                                the batch size pytorch will do it for us
        
        
    def forward(self, x):
        
        x1 = x[0]
        x2 = x[1]
        # x2 = x1[0:4] - x1[1:5]
        
        fc1 = self.dense(x1)[0] # dense n°0
        d1 = self.dense_1(fc1) 
        d1 = nn.ReLU()(d1) 
        d2 = self.dense_2(d1)
        d3 = self.dense_3(d1)
        d4 = self.dense_4(d1)
        
        lambd0 = self.lambda_0(self.multiply_1([fc1, d4])) 
        d5 = self.dense_5(self.add([self.multiply([d3, lambd0]), d2]))[0]
        leaky = self.leaky_re_lu(d5)
        
        fc2 = self.dense_6(x2)[0] # dense n°6
        d7 = self.dense_7(fc2)
        d7 = nn.ReLU()(d7)
        d8 = self.dense_8(d7)
        d9 = self.dense_9(d7)
        d10 = self.dense_10(d7)
        
        lambda1 = self.lambda_1(self.multiply_3([fc2, d10]))
        d11 = self.dense_11(self.add_1([self.multiply_2([d9, lambda1]), d8]))[0]
        leaky1 = self.leaky_re_lu_1(d11)
        
        conc = self.concatenate([leaky, leaky1])
        out = self.dense_12(conc)
        out = nn.Tanh()(out)
        
        return out 

In [63]:
import sys
sys.path.append('../')
import numpy as np


In [64]:
data = np.load("../dataset/Least_Squares_3d_GD/Sample_1/sample.npy")

In [65]:
data_tr = data[:100,:,:]
data_val = data[100:150,:,:]
data_te = data[150:,:,:]

In [122]:
tr_W1 = data_tr[:,:21,:]
tr_W2 = data_tr[:,40:41,:]
val_W1 = data_val[:,:21,:]
val_W2 = data_val[:,40:41,:]
te_W1 = data_te[:,:21,:]
te_W2 = data_te[:,40:41,:]

In [123]:
tr_W1, tr_W2  = np.transpose(tr_W1, (0,2,1)),np.transpose(tr_W2, (0,2,1))
val_W1, val_W2  = np.transpose(val_W1, (0,2,1)),np.transpose(val_W2, (0,2,1))
te_W1, te_W2  = np.transpose(te_W1, (0,2,1)),np.transpose(te_W2, (0,2,1))

In [105]:
tr_dX = tr_W1[:,:,:-1] - tr_W1[:,:,1:] / np.expand_dims((2 * (np.max(tr_W1, (1,2) )- np.min(tr_W1,(1,2) ))),axis=(1,2))
tr_X = (tr_W1- np.expand_dims(tr_W1[:,:,-1],axis=2)) / np.expand_dims((2 * (np.max(tr_W1, (1,2) )- np.min(tr_W1,(1,2) ))),axis=(1,2))
tr_y = (tr_W2- np.expand_dims(tr_W1[:,:,-1],axis=2)) / np.expand_dims((2 * (np.max(tr_W1, (1,2) )- np.min(tr_W1,(1,2) ))),axis=(1,2))

In [124]:
val_dX = val_W1[:,:,:-1] - val_W1[:,:,1:] / np.expand_dims((2 * (np.max(val_W1, (1,2) )- np.min(val_W1,(1,2) ))),axis=(1,2))
val_X = (val_W1- np.expand_dims(val_W1[:,:,-1],axis=2)) / np.expand_dims((2 * (np.max(val_W1, (1,2) )- np.min(val_W1,(1,2) ))),axis=(1,2))
val_y = (val_W2- np.expand_dims(val_W1[:,:,-1],axis=2)) / np.expand_dims((2 * (np.max(val_W1, (1,2) )- np.min(val_W1,(1,2) ))),axis=(1,2))

In [125]:
te_dX = te_W1[:,:,:-1] - te_W1[:,:,1:] / np.expand_dims((2 * (np.max(te_W1, (1,2) )- np.min(te_W1,(1,2) ))),axis=(1,2))
te_X = (te_W1- np.expand_dims(te_W1[:,:,-1],axis=2)) / np.expand_dims((2 * (np.max(te_W1, (1,2) )- np.min(te_W1,(1,2) ))),axis=(1,2))
te_y = (te_W2- np.expand_dims(te_W1[:,:,-1],axis=2)) / np.expand_dims((2 * (np.max(te_W1, (1,2) )- np.min(te_W1,(1,2) ))),axis=(1,2))

In [117]:

input_size = tr_W1.shape[-1]
output_size=tr_W2.shape[-1]
pt_WNN = WNN(input_size = tr_W1.shape[-1], output_size=tr_W2.shape[-1]) 

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

batch_size = 32


In [126]:
X_tensor = torch.from_numpy(tr_X).float()
dX_tensor = torch.from_numpy(tr_dX).float()
y_tensor = torch.from_numpy(tr_y).float()

val_X_tensor = torch.from_numpy(val_X).float()
val_dX_tensor = torch.from_numpy(val_dX).float()
val_y_tensor = torch.from_numpy(val_y).float()

te_X_tensor = torch.from_numpy(te_X).float()
te_dX_tensor = torch.from_numpy(te_dX).float()
#te_y_tensor = torch.from_numpy(te_y).float()



In [141]:
import torch.optim as optim
# Convert numpy arrays to PyTorch tensors
# X_tensor = torch.from_numpy(tr_W1).float()
# y_tensor = torch.from_numpy(tr_W2).float()

# ### need to check if this is true
# target = (y_tensor- X_tensor[:,:,0:1]) /torch.unsqueeze((2 * (torch.max(y_tensor,1).values - torch.min(y_tensor,1).values)).repeat(1,3),dim=2)

# maeloss = nn.L1Loss()

learn_rate = 0.0001
#batchsize
batch_size = 32

# Loss and optimizer
optimizer = optim.Adam(pt_WNN.parameters(), lr=learn_rate)

# Training the model
num_epochs = 1000


for epoch in range(num_epochs):
    # Forward pass
    indices = torch.randperm(len(X_tensor))

    for i in range(0, len(X_tensor), batch_size):
        batch_indices = indices[i:i+batch_size]
        outputs = pt_WNN( [X_tensor[batch_indices], dX_tensor[batch_indices]] )
    
        loss = torch.squeeze(torch.mean(torch.abs(outputs - y_tensor[batch_indices])))
        #print(loss)
        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    if (epoch + 1) % 10 == 0:
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.10f}')
        

Epoch [10/1000], Loss: 0.0026305572
Epoch [20/1000], Loss: 0.0028059988
Epoch [30/1000], Loss: 0.0032102072
Epoch [40/1000], Loss: 0.0035660127
Epoch [50/1000], Loss: 0.0052570044
Epoch [60/1000], Loss: 0.0032333166
Epoch [70/1000], Loss: 0.0036472220
Epoch [80/1000], Loss: 0.0019792954
Epoch [90/1000], Loss: 0.0019802342
Epoch [100/1000], Loss: 0.0043692905
Epoch [110/1000], Loss: 0.0031993557
Epoch [120/1000], Loss: 0.0019463958
Epoch [130/1000], Loss: 0.0024841081
Epoch [140/1000], Loss: 0.0024112302
Epoch [150/1000], Loss: 0.0036436450
Epoch [160/1000], Loss: 0.0035383459
Epoch [170/1000], Loss: 0.0018966409
Epoch [180/1000], Loss: 0.0023068925
Epoch [190/1000], Loss: 0.0032135688
Epoch [200/1000], Loss: 0.0031546608
Epoch [210/1000], Loss: 0.0012166785
Epoch [220/1000], Loss: 0.0022658010
Epoch [230/1000], Loss: 0.0024332425
Epoch [240/1000], Loss: 0.0019080350
Epoch [250/1000], Loss: 0.0039159362
Epoch [260/1000], Loss: 0.0023111321
Epoch [270/1000], Loss: 0.0030870249
Epoch [280

In [142]:
predicted =  np.squeeze(pt_WNN([te_X_tensor, te_dX_tensor] ).detach().numpy())

In [143]:
pred_y = predicted * (np.expand_dims((2 * (np.max(te_W1, (1,2) )- np.min(te_W1,(1,2) ))),axis=1)) * 2 + te_W1[:,:,-1]

In [144]:
np.mean(np.square(pred_y - np.squeeze(te_W2)))

0.025676319986553158