In [None]:
import numpy as np
import sys
import os
import torch
from torch import Tensor
import pandas as pd
import torch.nn as nn
from torch.utils.data import random_split, DataLoader, TensorDataset
import torch.nn.functional as F
from torch.optim import Adam
from scipy.stats import pearsonr
from tqdm import tqdm
from sklearn.metrics import mean_squared_error, r2_score

# Define the current directory if __file__ is not available
current_dir = os.getcwd()  # Gets the current working directory
parent_dir = os.path.abspath(os.path.join(current_dir, '..'))  # Moves one level up

# Add the parent directory to the Python path
sys.path.insert(0, parent_dir)

from preprocessing import *

Dask dataframe query planning is disabled because dask-expr is not installed.

You can install it with `pip install dask[dataframe]` or `conda install dask`.
This will raise in a future version.



In [None]:
data_dir = os.path.abspath(os.path.join(os.getcwd(), '..'))  # Move one level up

# 1. Choose the phenotype

In [None]:
phenotype = "YPD_doublingtime"
#phenotype = "YPDCUSO410MM_40h"

# 2. Preprocess the data

In [None]:
X_file = os.path.join(data_dir, f"data/X_matrix_{phenotype}.csv")
Y_file = os.path.join(data_dir, f"data/y_{phenotype}.csv")

print("Loading the data...")
x_df = pd.read_csv(X_file)
y_df = pd.read_csv(Y_file)
x_data_f, y_data_f = preprocessed_data(x_df, y_df)

In [None]:
print("Preparing the data...")

x = x_data_f.drop(columns=["Yeast_ID"]).fillna(0) 
y = y_data_f["YPD_doublingtime"].fillna(y_data_f["YPD_doublingtime"].mean())

x = x.values.astype('float32')  
y = y.values.flatten().astype('int64')  

# Convert to PyTorch tensors
input = torch.tensor(x)  
print('\nInput format: ', input.shape, input.dtype)

output = torch.tensor(y)     
print('Output format: ', output.shape, output.dtype)

# Create a TensorDataset for easy data handling in PyTorch
data = TensorDataset(input, output)  # Combine input and output tensors into a single dataset object


Préparation des données...

Input format:  torch.Size([792, 341957]) torch.float32
Output format:  torch.Size([792]) torch.int64


In [None]:
# Split to Train, Validate and Test sets using random_split
train_batch_size = 8
number_rows = len(input)
test_split = int(number_rows * 0.3)
validate_split = int(number_rows * 0.2)
train_split = number_rows - test_split - validate_split
train_set, validate_set, test_set = random_split(data, [train_split, validate_split, test_split])

In [None]:
# Create Dataloader to read the data within batch sizes and put into memory
train_loader = DataLoader(train_set, batch_size=train_batch_size, shuffle=True)
validate_loader = DataLoader(validate_set, batch_size=1)
test_loader = DataLoader(test_set, batch_size=1)

# 3. Run the model

In [None]:
# Define model parameters
input_size = list(input.shape)[1]
learning_rate = 0.025
output_size = 1

print(input_size, output_size)

# Define neural network

class Network(nn.Module):
    """
    A feedforward neural network with configurable layer sizes and dropout.

    Parameters:
        layer_sizes (list): A list of integers specifying the sizes of each layer, 
                            including input, hidden, and output layers.

    Attributes:
        layers (nn.ModuleList): A list containing Linear layers and Dropout layers.

    Methods:
        forward(x): Performs the forward pass through the network.
    """
    def __init__(self, layer_sizes):
        super(Network, self).__init__()
        self.layers = nn.ModuleList()
        for i in range(len(layer_sizes) - 1):
            self.layers.append(nn.Linear(layer_sizes[i], layer_sizes[i + 1]))
            if i < len(layer_sizes) - 2:  # Add dropout to intermediate layers
                self.layers.append(nn.Dropout(0.5))

    def forward(self, x):
        """
        Forward pass of the neural network.

        Parameters:
            x (torch.Tensor): Input tensor of shape [batch_size, input_size].

        Returns:
            torch.Tensor: Output tensor after passing through all layers.
        """
        for layer in self.layers:
            x = layer(x)
            if isinstance(layer, nn.Linear):
                x = F.relu(x)  # Activation after Linear layers
        return x

layer_sizes = [input_size, 7500, 7000, 6000, 5000, output_size]
model = Network(layer_sizes)      

# Define your execution device
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print("The model will be running on", device, "device\n")
model.to(device)

def saveModel():
    """
    Save the current model's state dictionary to a file.
    
    The file is named based on the 'phenotype' variable, 
    ensuring clarity for which phenotype the model was trained.

    File format: "./NN_regmat_PA_<phenotype>.pth"
    """
    path = f"./NN_regmat_PA_{phenotype}.pth"
    torch.save(model.state_dict(), path)

341957 1
The model will be running on cpu device



In [None]:
# Define the loss function with MSE loss and an optimizer with Adam optimizer
loss_fn = nn.MSELoss()
optimizer = Adam(model.parameters(), lr=0.0005, weight_decay=0.1)

In [None]:
def train(num_epochs):
    """
    Train the neural network model using training and validation datasets.
    
    Parameters:
        num_epochs (int): Number of epochs to train the model.
    
    Returns:
        predicted_outputs (torch.Tensor): Predictions from the final model on the validation set.
        outputs (torch.Tensor): Actual target values from the validation set.
        running_accuracy (float): Cumulative Pearson correlation coefficient across validation batches.
    """
    best_accuracy = 0.0
    val_epoch = 5
    print("Begin training...")
    for epoch in tqdm(range(1, num_epochs + 1)):
        running_train_loss = 0.0
        running_accuracy = 0.0
        running_vall_loss = 0.0
        total = 0

        # Training Loop
        for data in train_loader:
            # For data in enumerate(train_loader, 0):
            inputs, outputs = data              # Get the input and real species as outputs; data is a list of [inputs, outputs]
            inputs = inputs.float().to(device)
            outputs = outputs.float().to(device)
            optimizer.zero_grad()               # Zero the parameter gradients
            torch.autograd.set_detect_anomaly(True)
            predicted_outputs = model(inputs)   # Predict output from the model

            outputs = outputs.view(-1,1)
            train_loss = loss_fn(predicted_outputs, outputs)    # Calculate loss for the predicted output

            train_loss.backward()               # Backpropagate the loss

            optimizer.step()                    # Adjust parameters based on the calculated gradients
            running_train_loss += train_loss.item()             # Track the loss value

        train_loss_value = running_train_loss / len(train_loader)
        if (epoch % val_epoch == 0) and (epoch > 0):
            # Validation Loop
            with torch.no_grad():
                model.eval()
                for data in validate_loader:
                    inputs, outputs = data
                    inputs = inputs.float()
                    outputs = outputs.float()
                    predicted_outputs = model(inputs)
                    outputs.view(-1,1)
                    val_loss = loss_fn(predicted_outputs, outputs)

                    # The label with the highest value will be our prediction
                    running_vall_loss += val_loss.item()
                    total += outputs.size(0)
                    running_accuracy += (pearsonr(outputs[0].cpu().detach().numpy(), predicted_outputs[0].cpu().detach().numpy())[0])

            # Calculate validation loss value
            val_loss_value = running_vall_loss / len(validate_loader)

            # Calculate accuracy as the number of average Pearson's coefficient in the validation batch divided by the total number of predictions done.
            accuracy = (100 * running_accuracy / total)

            # Save the model if the accuracy is the best
            if accuracy > best_accuracy:
                saveModel()
                best_accuracy = accuracy

            # Print the statistics of the epoch
            print('Completed training batch', epoch, 'Training Loss is: %4f' % train_loss_value,
                  'Validation Loss is: %.4f' % val_loss_value, 'Accuracy is %d %%' % accuracy)

    return predicted_outputs, outputs, running_accuracy

In [None]:
def test():
    """
    Test the trained model on the test dataset and calculate its performance.

    Steps:
        1. Load the pre-trained model.
        2. Run inference on the test data.
        3. Compute accuracy based on Pearson correlation.
        4. Return the ground truth outputs and predicted outputs.

    Returns:
        output_matrix (list): List of true output tensors.
        predicted_output_matrix (list): List of predicted output tensors.
    """
    # Load the model that we saved at the end of the training loop
    model = Network(input_size, output_size)
    path = "./NN_regmat_PA.pth"
    model.load_state_dict(torch.load(path))
    predicted_output_matrix = []
    output_matrix = []
    running_accuracy = 0
    total = 0

    with torch.no_grad():
        for data in test_loader:
            inputs, outputs = data
            inputs = inputs.float()
            outputs = outputs.float()
            output_matrix.append(outputs)
            predicted_outputs = model(inputs)
            predicted_output_matrix.append(predicted_outputs)
            _, predicted = torch.max(predicted_outputs, 1)
            total += outputs.size(0)
            running_accuracy += pearsonr(outputs[0].cpu().detach().numpy(),predicted_outputs[0].cpu().detach().numpy())[0]

        print('Accuracy of the model based on the test set of', test_split,
              'inputs is: %d %%' % (100 * running_accuracy / total))

    return output_matrix, predicted_output_matrix

In [None]:
num_epochs = 5
train_output = train(num_epochs)
print('Finished Training\n')

Begin training...


  0%|          | 0/5 [00:00<?, ?it/s]




NameError: name 'train_loader' is not defined

In [None]:
# Run the test function to get model outputs
test_output = test()

# Convert outputs and predictions from tensors to lists
for k in range(len(test_output[0])):
    test_output[0][k] = test_output[0][k].tolist()[0]  # Convert true outputs
    test_output[1][k] = test_output[1][k].tolist()[0]  # Convert predicted outputs

In [None]:
test_matrix = np.reshape(test_output[0], (test_split, output_size)).T  
test_matrix_predicted = np.reshape(test_output[1], (test_split, output_size)).T  

pd.DataFrame(test_matrix).to_csv('Test_matrix_true_using_PA.csv', index=False) 
pd.DataFrame(test_matrix_predicted).to_csv('Test_matrix_predicted_using_PA.csv', index=False)  

In [None]:
def predict_on_full_data(model, x_data, device):
    """
    Generate predictions on the full dataset using the trained model.

    Parameters:
        model (torch.nn.Module): Trained neural network model.
        x_data (pd.DataFrame): Input feature data for prediction.
        device (torch.device): Device to run the model (CPU or GPU).

    Returns:
        np.ndarray: Array of predictions.
    """
    model.eval()                        
    x_data = torch.tensor(x_data.values, dtype=torch.float32).to(device)  
    with torch.no_grad():               
        predictions = model(x_data)     # Generate predictions
    return predictions.cpu().numpy()    


In [None]:
def evaluate_model(predictions, targets):
    """
    Evaluate the model's performance using Mean Squared Error (MSE) and R-squared (R2) metrics.

    Parameters:
        predictions (array-like): Predicted values from the model.
        targets (array-like): Ground truth target values.

    Returns:
        mse (float): Mean Squared Error between predictions and targets.
        r2 (float): R-squared score representing the goodness of fit.
    """
    mse = mean_squared_error(targets, predictions)  # Calculate Mean Squared Error
    r2 = r2_score(targets, predictions)             # Calculate R-squared score
    return mse, r2

# Compute metrics
mse, r2 = evaluate_model(predictions, y_train.values)
print(f"MSE: {mse}, R²: {r2}")