In [3]:
class Data_Pulse:
    PATH = "Data_Pulse/"
    Dataset = []
    ## function to read 1 archive and submits:
    #time[array.float], amplitude[array.float] and channel[string]
    @staticmethod
    def read_archive(name):
        channel = name[0:2] ## channels 
        if channel not in ["C1","C2"]: ##chekc if there is any other channel!= C1,C2
            raise ValueError("Channel different from C1 and C2:%s"%channel)
        time = list() ##time array
        amplitude = list() ##voltage values array
        dataframe = np.array(pd.read_csv(Pulse.PATH+name,skiprows=4)) ##loads csv as an array
        for line in dataframe:
            time.append(float(line[0].split(" ")[0])) ## adds time
            amplitude.append(float(line[0].split(" ")[1])) ## adds voltage
        return np.array([time, amplitude]), channel ## return dataframe and channel
    ## creates a data set for a channel
    @staticmethod
    def create_dataset():
        dataset = {"C1":[],"C2":[]} ##dataset dict format
        for archive in os.listdir(Pulse.PATH): ## read all archives
            variables,channel = Pulse.read_archive(archive) #variables = Time, Amplitude
            ## add values to corresponding channel
            if channel == "C1": 
                dataset["C1"].append(variables)
            else:
                dataset["C2"].append(variables)
        dataset["C1"] = np.array(dataset["C1"])
        dataset["C2"] = np.array(dataset["C2"])
        return dataset
    ## creates training data
    def create_training_data(self,sample_size):
    ### Create list with all arrays and labels
        all_pulses = list()
        all_labels = list()
        for channel in ["C1","C2"]:
            for pulse in self.Dataset[channel]:
                all_pulses.append(pulse)
                if channel=="C1":
                    all_labels.append(1)
                elif channel=="C2":
                    all_labels.append(0)
                else:
                    raise ValueError("Error")
        all_pulses = np.array(all_pulses)
        all_labels = np.array(all_labels)
        ### choose random train data
        train_indices = np.random.choice(np.arange(0,len(all_labels),1),
                                         sample_size,replace=False)
        train_data = all_pulses[train_indices]
        train_labels = all_labels[train_indices]
        ## evaluation data
        eval_indices = np.setdiff1d(np.arange(0,len(all_labels),1),train_indices)
        eval_data = all_pulses[eval_indices]
        eval_label = all_labels[eval_indices]
        return train_data,train_labels, eval_data,eval_label
####################################################################################
####################################################################################
####################################################################################

## class for Neural Network
class PulseLSTM(nn.Module):
    input_size = 1
    hidden_size = 64
    output_size = 1
    num_layers = 2
    ## input_size: number of input features. In this case voltage = 1 
    ## hidden_size: hidden features/number of patterns to analyze around 32 to 512
    ## output_size: number of oytput features in this case [good,bad] output_size=1
    ## num_layers: number of LSTM layers (complexity of patterns)
    ## input arrays and its label
    train_data = list()
    train_label = list()
    ##evalaution data
    eval_data = list()
    eval_label = list()
    ## initialization
    def __init__(self, input_size, hidden_size, num_layers, output_size):
        super(PulseLSTM, self).__init__()
        #The LSTM layer processes the input sequences
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        #The final layer maps LSTM outputs to the desired output 
        self.fc = nn.Linear(hidden_size, output_size)
    ##
    ##
    def forward(self, x):
        """Defines the forward pass of the network"""
        #Initializes the hidden state h_0 and cell state c_0 with zeros, matching the batch size
        h_0 = torch.zeros(num_layers, x.size(0), hidden_size).to(x.device)
        c_0 = torch.zeros(num_layers, x.size(0), hidden_size).to(x.device)
        out, ignore = self.lstm(x, (h_0, c_0))
        out = self.fc(out[:, -1, :])  # Use the last time step's output
        return out
    def run_neural_netwrok(self,num_epochs=100):
        # Initialize the model, criterion, and optimizer
        model = PulseLSTM(self.input_size, self.hidden_size, self.num_layers, self.output_size)
        criterion = nn.BCEWithLogitsLoss()  # Uses BCEWithLogitsLoss for binary classification.
        optimizer = optim.Adam(model.parameters(), lr=0.001)
        # train_data should be of shape (batch_size, sequence_length, input_size)
        # Convert to PyTorch tensors
        data_tensors = [torch.tensor(voltage_array).float() for time_array, voltage_array 
                        in self.train_data]
        train_data = pad_sequence(data_tensors, batch_first=True)  # Pads to the max length
        ## Converts the training data and labels to PyTorch tensors 
        ## and adds an extra dimension to train_data for compatibility with the LSTM
        train_data = torch.tensor(train_data).float().unsqueeze(-1) 
        train_label = torch.tensor(self.train_label).float()

        # Training loop
        for epoch in range(num_epochs):
            model.train() #Sets the model to training mode
            optimizer.zero_grad() #Clears the gradients from the previous step
            # Forward pass
            outputs = model(train_data)
            loss = criterion(outputs.squeeze(), train_label)
            # Backward pass and optimization
            loss.backward() #Passes the training data through the model to get the outputs
            optimizer.step() #Computes the loss between the model's predictions and the actual labels


            ###show %% process
            if (epoch + 1) % 10 == 0:
                print(f'Epoch {epoch + 1}/{num_epochs}, Loss: {loss.item():.4f}')

        print("Training complete!")
    ## evaluation function
    def pulse_evaluation(self,new_pulse,pulse_label,plot=False):
        # Assuming `model` is your trained PulseLSTM model
        model = PulseLSTM(self.input_size, self.hidden_size, self.num_layers, self.output_size)
        model.eval()  # Set the model to evaluation mode

        # Example of a new pulse (time_array, voltage_array)
        #index =  np.random.choice(np.arange(0,len(self.eval_data),1),1,replace=False)
        #new_pulse = eval_data[index][0]
        if plot:
            plt.plot(new_pulse[0,:],new_pulse[1,:])
            plt.xlabel("time [s]",fontsize=18)
            plt.ylabel("Voltage [V]",fontsize=18)
            if self.eval_label[index]==0:
                channel = 2
            else:
                channel = 1
            plt.title("Pulse from Channel %d"%channel,fontsize=22)
            plt.grid()
            plt.show()
        # Extract the voltage array and prepare the data as done for training
        voltage_array = torch.tensor(new_pulse[1,:]).float().unsqueeze(0).unsqueeze(-1)  # Shape: (1, sequence_length, 1)
        # Make a prediction
        with torch.no_grad():  # Disable gradient calculation since we are not training
            prediction = model(voltage_array)  # Forward pass through the model
        # Output is a probability, so apply a threshold to classify
        predicted_label = torch.round(torch.sigmoid(prediction)).item()  # 0 for bad, 1 for good
        if predicted_label==pulse_label:
            return True
        else:
            return False