In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class ActivationSigma(nn.Module):
    def forward(self, x):
        return F.elu(x) + 1

class ProbabilisticNN(nn.Module):
    def __init__(self, num_features=6):
        super().__init__()

        self.selective_neurons = nn.ModuleList([
            nn.Linear(1, 1) for _ in range(num_features)
        ])

        self.feature_shared = nn.Linear(num_features, 10)

        self.mu_S_layer = nn.Linear(1, 5)
        self.mu_R_layer = nn.Linear(1, 5)

        self.sigma_S_layer = nn.Linear(1, 5)
        self.sigma_R_layer = nn.Linear(1, 5)

        self.mu_final = nn.Linear(5, 1)
        self.sigma_final = nn.Linear(5, 1)

        self.sigma_activation = ActivationSigma()

    def forward(self, inputs, masks, S, R):
        selective_outputs = [
            torch.tanh(layer(inp)) * mask
            for inp, mask, layer in zip(inputs, masks, self.selective_neurons)
        ]
        x_feat = torch.cat(selective_outputs, dim=1)  # shape: [batch, 6]

        shared_feat = torch.tanh(self.feature_shared(x_feat))  # [batch, 10]
        mu_feat = shared_feat[:, :5]
        sigma_feat = shared_feat[:, 5:]

        mu_feat = torch.sum(mu_feat, dim=1, keepdim=True)
        sigma_feat = torch.sum(sigma_feat, dim=1, keepdim=True)

        mu_S = torch.tanh(self.mu_S_layer(S))  # [batch, 5]
        mu_R = torch.tanh(self.mu_R_layer(R))  # [batch, 5]
        mu_contrib = self.mu_final(mu_S + mu_R)  # [batch, 1]

        sigma_S = torch.tanh(self.sigma_S_layer(S))
        sigma_R = torch.tanh(self.sigma_R_layer(R))
        sigma_contrib = self.sigma_final(sigma_S + sigma_R)

        mu = mu_feat + mu_contrib
        sigma = self.sigma_activation(sigma_feat + sigma_contrib)

        return mu, sigma

    def constrain_sigma(self):
        with torch.no_grad():
            sigma_S_weights = self.sigma_S_layer.weight.data.view(-1)    # shape: [5]
            sigma_final_weights = self.sigma_final.weight.data.view(-1)  # shape: [5]
            
            # Enforce opposite sign
            constrained = -sigma_final_weights.abs() * sigma_S_weights.sign()
            self.sigma_final.weight.data.copy_(constrained.view(1, -1))




In [11]:
def mdn_cost(mu, sigma, y, delta):
    sigma = torch.clamp(sigma, min=1e-6)  # Ensures validity
    dist = torch.distributions.Normal(loc=mu, scale=sigma)
    log_prob = dist.log_prob(y)
    log_sf = torch.log(1 - dist.cdf(y) + 1e-7)  # Survival function
    loss = -delta * log_prob - (1 - delta) * log_sf
    return torch.mean(loss)


In [None]:
import warnings
warnings.filterwarnings('ignore')

import numpy as np
import pandas as pd

from scipy.stats import norm
import json
import time
import os
from sklearn.preprocessing import MinMaxScaler
import torch
start = time.time()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

Mean_Log_Score_all = []
Mean_Log_Score_complete = []
AM = pd.read_csv("SLM.csv")
for complete_number in range(1,7):
    ##Using one of these complete numbers for testing
    print(complete_number)
    ##Training data is from numbers 1 to 12 except the number chosen for testing
    Train_data=AM[(AM['Complete No.']>=1)&(AM['Complete No.']<=12)&(AM['Complete No.']!=complete_number)]
    ##Defining test  data
    Test_data=AM[AM['Complete No.']==complete_number]
    #Train and test Smax
    Train_S=Train_data['Smax']
    Test_S=Test_data['Smax']
    
    #Train and test stress Ratio
    Train_R=Train_data['Ratio']
    Test_R=Test_data['Ratio']
    #Train and test Speed
    Train_V=Train_data['V']
    Test_V=Test_data['V']
    #Train and test Laser power
    Train_P=Train_data['Power']
    Test_P=Test_data['Power']
    #Train and test Hatch distance
    Train_H=Train_data['Hatch']
    Test_H=Test_data['Hatch']

    #Train and test Thickness
    Train_thick=Train_data['Thickness']
    Test_thick=Test_data['Thickness']
    #Train and test Heat Temperature
    Train_temp=Train_data['Temperature']
    Test_temp=Test_data['Temperature']
    #Train and test Heat Time
    Train_time=Train_data['Time']
    Test_time=Test_data['Time']
    #Train and test cycles
    Train_N=Train_data['N']
    Test_N=Test_data['N']
    #Train and test Delta (defining failure or runnout)
    Train_delta=Train_data['Delta']
    Test_delta=Test_data['Delta']

    ############## Extraction of missing mask for features
    V_missing_mask = ~Train_V.isnull() * 1.0
    P_missing_mask = ~Train_P.isnull() * 1.0
    H_missing_mask = ~Train_H.isnull() * 1.0
    Thick_missing_mask = ~Train_thick.isnull() * 1.0
    Temp_missing_mask = ~Train_temp.isnull() * 1.0
    Time_missing_mask = ~Train_time.isnull() * 1.0
    
    #### Fill Nans with zero
    Train_V = Train_data['V'].fillna(0.0)
    Train_P = Train_data['Power'].fillna(0.0)
    Train_H = Train_data['Hatch'].fillna(0.0)
    Train_thick = Train_data['Thickness'].fillna(0.0)
    Train_temp = Train_data['Temperature'].fillna(0.0)
    Train_time = Train_data['Time'].fillna(0.0)
    ##########Scaling by Minmax
    scaler=MinMaxScaler()
    Train_S_scaled=scaler.fit_transform(Train_S.values.reshape(-1,1))
    Test_S_scaled=scaler.fit_transform(Test_S.values.reshape(-1,1))

    Train_R_scaled=scaler.fit_transform(Train_R.values.reshape(-1,1))
    Test_R_scaled=scaler.fit_transform(Test_R.values.reshape(-1,1))

    Train_V_scaled=scaler.fit_transform(Train_V.values.reshape(-1,1))
    Test_V_scaled=scaler.fit_transform(Test_V.values.reshape(-1,1))

    Train_P_scaled=scaler.fit_transform(Train_P.values.reshape(-1,1))
    Test_P_scaled=scaler.fit_transform(Test_P.values.reshape(-1,1))

    Train_H_scaled=scaler.fit_transform(Train_H.values.reshape(-1,1))
    Test_H_scaled=scaler.fit_transform(Test_H.values.reshape(-1,1))

    Train_thick_scaled=scaler.fit_transform(Train_thick.values.reshape(-1,1))
    Test_thick_scaled=scaler.fit_transform(Test_thick.values.reshape(-1,1))

    Train_temp_scaled=scaler.fit_transform(Train_temp.values.reshape(-1,1))
    Test_temp_scaled=scaler.fit_transform(Test_temp.values.reshape(-1,1))

    Train_time_scaled=scaler.fit_transform(Train_time.values.reshape(-1,1))
    Test_time_scaled=scaler.fit_transform(Test_time.values.reshape(-1,1))

    Train_N_scaled=scaler.fit_transform(Train_N.values.reshape(-1,1))
    Test_N_scaled=scaler.fit_transform(Test_N.values.reshape(-1,1))

    Train_delta_scaled=scaler.fit_transform(Train_delta.values.reshape(-1,1))
    Test_delta_scaled=scaler.fit_transform(Test_delta.values.reshape(-1,1))

    


    # Features as torch tensors
    X_train = [
        torch.tensor(Train_V_scaled, dtype=torch.float32).to(device),
        torch.tensor(Train_P_scaled, dtype=torch.float32).to(device),
        torch.tensor(Train_H_scaled, dtype=torch.float32).to(device),
        torch.tensor(Train_thick_scaled, dtype=torch.float32).to(device),
        torch.tensor(Train_time_scaled, dtype=torch.float32).to(device),
        torch.tensor(Train_temp_scaled, dtype=torch.float32).to(device)
    ]

    masks_train = [
        torch.tensor(V_missing_mask.values.reshape(-1, 1), dtype=torch.float32).to(device),
        torch.tensor(P_missing_mask.values.reshape(-1, 1), dtype=torch.float32).to(device),
        torch.tensor(H_missing_mask.values.reshape(-1, 1), dtype=torch.float32).to(device),
        torch.tensor(Thick_missing_mask.values.reshape(-1, 1), dtype=torch.float32).to(device),
        torch.tensor(Time_missing_mask.values.reshape(-1, 1), dtype=torch.float32).to(device),
        torch.tensor(Temp_missing_mask.values.reshape(-1, 1), dtype=torch.float32).to(device)
    ]

    S_train = torch.tensor(Train_S_scaled, dtype=torch.float32).to(device)
    R_train = torch.tensor(Train_R_scaled, dtype=torch.float32).to(device)
    y_train = torch.tensor(np.log(Train_N.values).reshape(-1, 1), dtype=torch.float32).to(device)
    delta_train = torch.tensor(Train_delta.values.reshape(-1, 1), dtype=torch.float32).to(device)
    

    ###Training
    model = ProbabilisticNN(num_features=6).to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

    epochs = 1000
    for epoch in range(epochs):
        model.train()
        optimizer.zero_grad()
        model.constrain_sigma()
        mu, sigma = model(X_train, masks_train, S_train, R_train)
        loss = mdn_cost(mu, sigma, y_train, delta_train)

        loss.backward()
        optimizer.step()

 
        print(f"Epoch {epoch}, Loss: {loss.item():.4f}")

      

1
Epoch 0, Loss: 21.9515
Epoch 1, Loss: 21.4700
Epoch 2, Loss: 21.0082
Epoch 3, Loss: 20.5656
Epoch 4, Loss: 20.1413
Epoch 5, Loss: 19.7347
Epoch 6, Loss: 19.3452
Epoch 7, Loss: 18.9719
Epoch 8, Loss: 18.6143
Epoch 9, Loss: 18.2717
Epoch 10, Loss: 17.9435
Epoch 11, Loss: 17.6291
Epoch 12, Loss: 17.3277
Epoch 13, Loss: 17.0389
Epoch 14, Loss: 16.7622
Epoch 15, Loss: 16.4968
Epoch 16, Loss: 16.2424
Epoch 17, Loss: 15.9983
Epoch 18, Loss: 15.7642
Epoch 19, Loss: 15.5395
Epoch 20, Loss: 15.3238
Epoch 21, Loss: 15.1166
Epoch 22, Loss: 14.9176
Epoch 23, Loss: 14.7264
Epoch 24, Loss: 14.5425
Epoch 25, Loss: 14.3657
Epoch 26, Loss: 14.1955
Epoch 27, Loss: 14.0317
Epoch 28, Loss: 13.8739
Epoch 29, Loss: 13.7218
Epoch 30, Loss: 13.5752
Epoch 31, Loss: 13.4337
Epoch 32, Loss: 13.2971
Epoch 33, Loss: 13.1653
Epoch 34, Loss: 13.0378
Epoch 35, Loss: 12.9145
Epoch 36, Loss: 12.7951
Epoch 37, Loss: 12.6680
Epoch 38, Loss: 12.5560
Epoch 39, Loss: 12.4396
Epoch 40, Loss: 12.3343
Epoch 41, Loss: 12.2320


In [3]:
import torch
import torch.nn as nn

class GreaterThanMu0(nn.Module):
    def forward(self, w):
        # Create an empty list for the constrained rows
        constrained_rows = []
        for i in range(10):
            row = w[i, :]
            mask = (row >= 0).float()  # 1 where row >= 0, else 0
            constrained_row = row * mask
            constrained_rows.append(constrained_row.unsqueeze(0))  # shape: [1, n]

        # Stack the constrained rows: shape [10, n]
        constrained_part = torch.cat(constrained_rows, dim=0)

        # Leave rows 10:15 unchanged
        remaining_part = w[10:15, :]  # shape [5, n]

        # Concatenate both parts: shape [15, n]
        return torch.cat([constrained_part, remaining_part], dim=0)
    
class GreaterThanSigma(nn.Module):
    def forward(self, w):
        # Apply non-negativity constraint to rows 0 to 4
        constrained_rows = []
        for i in range(5):
            row = w[i, :]
            mask = (row >= 0).float()  # 1 where value >= 0, else 0
            constrained_row = row * mask
            constrained_rows.append(constrained_row.unsqueeze(0))  # shape [1, n]

        # Stack constrained rows: shape [5, n]
        constrained_part = torch.cat(constrained_rows, dim=0)

        # Keep rows 5 to 14 unchanged: shape [10, n]
        remaining_part = w[5:15, :]

        # Concatenate constrained and unchanged parts: shape [15, n]
        return torch.cat([constrained_part, remaining_part], dim=0)