In [None]:
import pandas as pd
import numpy as np
import os
import random
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score

import torch
from transformers import AutoTokenizer, AutoModelForMaskedLM, TrainingArguments,Trainer
from datasets import Dataset
from peft import LoraConfig, IA3Config, get_peft_model

from utils import *
from models import *

## Setting random state for results reproducability

In [None]:
# Set the seed for Python's built-in random module
random.seed(42)

# Set the seed for NumPy
np.random.seed(42)

# Set the seed for PyTorch
torch.manual_seed(42)

# Set the seed for PyTorch CUDA (if using GPU)
if torch.cuda.is_available():
    torch.cuda.manual_seed(42)
    torch.cuda.manual_seed_all(42)  # if you are using multi-GPU

# Ensure deterministic behavior for certain operations
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

# SpeciesLM

In [None]:
tokenizer = AutoTokenizer.from_pretrained("gagneurlab/SpeciesLM", revision="downstream_species_lm")
lm = AutoModelForMaskedLM.from_pretrained("gagneurlab/SpeciesLM", revision="downstream_species_lm")

lm.eval()
device = "cuda"
lm.to(device)
print("Done")

# Loading and Tokenizing the Dataset

In [None]:
mpra_df = pd.read_csv("./data/segal_2015.tsv",sep="\t").dropna().reset_index(drop=True).reset_index()

mpra_df["text"] = ["candida_glabrata " + " ".join(get_kmers(seq)) for seq in mpra_df["Oligo Sequence"]]
mpra_df["label"] = np.log2(mpra_df["Expression"])

In [None]:
df = mpra_df[["text", "label"]]

In [None]:
df.head()

In [None]:
train_df, test_val_df = train_test_split(df, test_size=0.2, random_state=42)
test_df, val_df = train_test_split(test_val_df, test_size=0.5, random_state=42)

# Convert DataFrames to Hugging Face Datasets
train_dataset = Dataset.from_pandas(train_df)
test_dataset = Dataset.from_pandas(test_df)
val_dataset = Dataset.from_pandas(val_df)

In [None]:
def tokenize_function(examples):
    return tokenizer(examples["text"])
# Tokenize the data
train_dataset = train_dataset.map(tokenize_function, batched=True)
test_dataset = test_dataset.map(tokenize_function, batched=True)
val_dataset = val_dataset.map(tokenize_function, batched=True)

In [None]:
train_dataset = train_dataset.remove_columns(["text", "__index_level_0__"])
test_dataset = test_dataset.remove_columns(["text", "__index_level_0__"])
val_dataset = val_dataset.remove_columns(["text", "__index_level_0__"])

In [None]:
train_dataset

In [None]:
test_dataset

In [None]:
val_dataset

# Training and Testing the Fine-tuned Model

In [None]:
for param in lm.parameters():
    param.requires_grad = False

In [None]:
# An ugly code block to select the fine tuning method to use for dora and lora
# if should be set to true and use_dora should also be set to necessary value
# for baseline if should be set to False and elif should be set to True
# for IA3 if and elif should be set to False

lr = 1e-4 # base: 2e-3
batch_size = 16 # base: 32


if True:
    # dora or lora
    rank = 32 #2, 4, 8, 16, 32
    use_dora = True
    if use_dora:
        method = "DoRA"
    else:
        method = "LoRA"
        
    config = LoraConfig(
        use_dora=use_dora,
        r=rank,
        lora_alpha=32,
        lora_dropout=0.01,
        bias="none",
    )
    lm = get_peft_model(lm, config)
elif False:
    # baseline
    method = "baseline"
    rank = 0
    use_dora = False
else:
    # IA3
    method = "IA3"
    rank = 0
    use_dora = "IA3"
    config = IA3Config(
    peft_type="IA3",
    )
    lm = get_peft_model(lm, config)

In [None]:
model = EncoderForRegression(lm).to(device)
print_trainable_parameters(model)

In [None]:
training_args = TrainingArguments(
    output_dir="./results",
    eval_strategy="epoch",
    learning_rate=lr,
    per_device_train_batch_size=batch_size,
    per_device_eval_batch_size=batch_size,
    num_train_epochs=5,
    weight_decay=0.01,
    logging_strategy="epoch", 
)

In [None]:
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
)

# Train the model
trainer.train()

# Evaluate the model
training_perform = trainer.evaluate()

# Results

In [None]:
# Make prediction
test_result = trainer.predict(val_dataset) 

In [None]:
predictions = test_result.predictions
labels = test_result.label_ids
metrics = test_result.metrics

In [None]:
metrics

In [None]:
# Mean Absolute Error
mae = mean_absolute_error(labels, predictions.flatten())
print(f"Mean Absolute Error (MAE): {mae}")

# Mean Squared Error
mse = mean_squared_error(labels, predictions.flatten())
print(f"Mean Squared Error (MSE): {mse}")

# Root Mean Squared Error
rmse = np.sqrt(mse)
print(f"Root Mean Squared Error (RMSE): {rmse}")

# R-squared Score
r2 = r2_score(labels, predictions.flatten())
print(f"R-squared (R2) Score: {r2}")

# Mean Absolute Percentage Error
mape = np.mean(np.abs((labels - predictions.flatten()) / labels)) * 100
print(f"Mean Absolute Percentage Error (MAPE): {mape}%")

# Save results

In [None]:
# Define the path to your CSV file
csv_file_path = './data/final_results.csv'

# Function to load existing DataFrame or create a new one if it doesn't exist
def load_dataframe(csv_file_path):
    if os.path.exists(csv_file_path):
        return pd.read_csv(csv_file_path)
    else:
        # Create an empty DataFrame with the required columns
        columns = ['method', 'use_Dora','rank','batch_size','learning_rate','MAE', 'MSE', 'RMSE', 'R2','pearson', 'MAPE', 'Trainable Params', 'All Params', 'Trainable Percentage', 'Runtime']
        return pd.DataFrame(columns=columns)
df = load_dataframe(csv_file_path)

In [None]:
trainable_params = 0
all_param = 0
for _, param in model.named_parameters():
    all_param += param.numel()
    if param.requires_grad:
        trainable_params += param.numel()
print(
    f"trainable params: {trainable_params} || all params: {all_param} || trainable%: {100 * trainable_params / all_param}"
)

In [None]:
data = pd.DataFrame([{
    'method' : method,
    'use_Dora' : use_dora,
    'rank' : rank,
    'batch_size' : batch_size,
    'learning_rate' : lr,
    'MAE': mae,
    'MSE': mse,
    'RMSE': rmse,
    'R2': r2,
    'pearson': pearson_r2_metric(labels, predictions.flatten()),
    'MAPE': mape,
    'Trainable Params': trainable_params,
    'All Params': all_param,
    'Trainable Percentage': 100 * trainable_params / all_param,
    'Runtime': training_perform['eval_runtime']
}])

df = pd.concat([df, data], ignore_index=True)

# Save DataFrame to the CSV file
df.to_csv(csv_file_path, index=False)

## Test Set

In [None]:
# # Make prediction
# test_result = trainer.predict(test_dataset) 

In [None]:
# predictions = test_result.predictions
# labels = test_result.label_ids
# metrics = test_result.metrics

In [None]:
# metrics

In [None]:
# # Mean Absolute Error
# mae = mean_absolute_error(labels, predictions.flatten())
# print(f"Mean Absolute Error (MAE): {mae}")

# # Mean Squared Error
# mse = mean_squared_error(labels, predictions.flatten())
# print(f"Mean Squared Error (MSE): {mse}")

# # Root Mean Squared Error
# rmse = np.sqrt(mse)
# print(f"Root Mean Squared Error (RMSE): {rmse}")

# # R-squared Score
# r2 = r2_score(labels, predictions.flatten())
# print(f"R-squared (R2) Score: {r2}")

# # Mean Absolute Percentage Error
# mape = np.mean(np.abs((labels - predictions.flatten()) / labels)) * 100
# print(f"Mean Absolute Percentage Error (MAPE): {mape}%")

In [None]:
# # Define the path to your CSV file
# csv_file_path = './data/final_results.csv'

# # Function to load existing DataFrame or create a new one if it doesn't exist
# def load_dataframe(csv_file_path):
#     if os.path.exists(csv_file_path):
#         return pd.read_csv(csv_file_path)
#     else:
#         # Create an empty DataFrame with the required columns
#         columns = ['method', 'use_Dora','rank','batch_size','learning_rate','MAE', 'MSE', 'RMSE', 'R2','pearson', 'MAPE', 'Trainable Params', 'All Params', 'Trainable Percentage', 'Runtime']
#         return pd.DataFrame(columns=columns)
# df = load_dataframe(csv_file_path)

In [None]:
# trainable_params = 0
# all_param = 0
# for _, param in model.named_parameters():
#     all_param += param.numel()
#     if param.requires_grad:
#         trainable_params += param.numel()
# print(
#     f"trainable params: {trainable_params} || all params: {all_param} || trainable%: {100 * trainable_params / all_param}"
# )

In [None]:
# data = pd.DataFrame([{
#     'method' : method + '_test',
#     'use_Dora' : use_dora,
#     'rank' : rank,
#     'batch_size' : batch_size,
#     'learning_rate' : lr,
#     'MAE': mae,
#     'MSE': mse,
#     'RMSE': rmse,
#     'R2': r2,
#     'pearson': pearson_r2_metric(labels, predictions.flatten()),
#     'MAPE': mape,
#     'Trainable Params': trainable_params,
#     'All Params': all_param,
#     'Trainable Percentage': 100 * trainable_params / all_param,
#     'Runtime': training_perform['eval_runtime']
# }])

# df = pd.concat([df, data], ignore_index=True)

# # Save DataFrame to the CSV file
# df.to_csv(csv_file_path, index=False)