## Parallelization demo

You can accelerate RVINN training by parallelizing the computation across multiple CPU cores.

If you're using an HPC environment, consider running array jobs on CPU nodes for better scalability.

We’ve also provided a simple demo below for parallel execution on a local machine, but please be aware that compatibility in various user’s environment cannot be guaranteed.

**Note:** Before running parallel computation, it’s recommended to check how many CPU cores are available and ensure you have sufficient disk space for your output directory.

In [None]:
# for running the code in the notebook
#!pip install git+https://github.com/omuto/RVINN
#!git clone https://github.com/omuto/RVINN # clone the repo to get the data
#%cd RVINN/demo

In [None]:
import os
import numpy as np
import pandas as pd
import torch
from multiprocessing import Pool

import rvinn.model
import rvinn.ut

total_cpus = os.cpu_count()
print(f"total number of cpus: {total_cpus}")

In [None]:
genes_arr = np.loadtxt(
    "./HALLMARK_ESTROGEN_RESPONSE_EARLY.txt",
    dtype=str)

known_E2_responsive_genes_list = genes_arr.tolist()  
known_E2_responsive_genes_list

In [None]:
Sp_df, Un_df, observed_timepoints = rvinn.ut.load_mcf7_data('E2') 

# a subset of known E2-responsive genes for testing
genes_list = known_E2_responsive_genes_list[:20]


#filtering out genes not in the datasets
genes_list = sorted(list(set(Sp_df.index.to_list()) & set(Un_df.index.to_list()) & set(genes_list)))
num_genes = len(genes_list)
print(f"Included {num_genes} genes in datasets: {genes_list}")

Included 12 genes in datasets: ['ABAT', 'ABCA3', 'ADCY1', 'ADCY9', 'AFF1', 'AKAP1', 'ALDH3B1', 'AMFR', 'ANXA9', 'ARL3', 'ASB13', 'BAG1']


In [None]:
def init_worker(sp_df, un_df, t_star_arg, t_f_arg, n_repl_arg, res_dir_arg, random_seed_arg):
    global df_Sp, df_Un, t_star, t_f, n_replicates, results_dir, seed
    df_Sp        = sp_df
    df_Un        = un_df
    t_star       = t_star_arg
    t_f          = t_f_arg
    n_replicates = n_repl_arg
    results_dir  = res_dir_arg
    seed         = random_seed_arg
    os.environ["OMP_NUM_THREADS"]     = "1"
    os.environ["MKL_NUM_THREADS"]     = "1"
    os.environ["OPENBLAS_NUM_THREADS"]= "1"

def rvinn_run_cpus(gene: str):
    """
    global variables:
      - df_Sp, df_Un: pandas.DataFrame
      - t_star, t_f: list or np.ndarray of timepoints
      - n_replicates: int
      - results_dir: str
    """
    print(f"Process ID [{os.getpid()}] Running for {gene}\n")

    device = torch.device('cpu')
    torch.set_num_threads(1)
    torch.manual_seed(seed)

    save_dir = rvinn.ut.create_save_directory(gene, output_dir_name=results_dir)

    Sp_star, Un_star = rvinn.ut.load_target(gene, df_Sp, df_Un, n_replicates)

    Sp_e = np.empty((len(t_f), 0))
    Un_e = np.empty((len(t_f), 0))
    k1_e = np.empty((len(t_f), 0))
    k2_e = np.empty((len(t_f), 0))
    k3_e = np.empty((len(t_f), 0))

    for i in range(1, 11):
        model = rvinn.model.Model(
            device=device,
            t_train=t_star,
            spliced_train=Sp_star,
            unspliced_train=Un_star,
            t_f=t_f,
            init_steady=True,
            init_data=True,
            loss_verbose=False,
        )
        model.train()
        Sp, Un, k1, k2, k3, *_ = model.predict(t_f)

        # append along axis=1
        Sp_e = np.append(Sp_e, Sp, axis=1) 
        Un_e = np.append(Un_e, Un, axis=1)
        k1_e = np.append(k1_e, k1, axis=1) # k1:transcription rate
        k2_e = np.append(k2_e, k2, axis=1) # k2:splicing rate
        k3_e = np.append(k3_e, k3, axis=1) # k3:degradation rate

        del model
        
    print(f"PID [{os.getpid()}] Finished: {gene}")
    total, used, free = map(int, os.popen('free -t -m').readlines()[-1].split()[1:])
    print(f"RAM used: {round((used/total)*100,2)}%\n")

    rvinn.ut.save_mean_std_results(
        gene,
        save_dir,
        Sp_e, Un_e,
        k1_e, k2_e, k3_e
    )



def parallel_compute(
    genes_list: list,
    df_Sp_arg: pd.DataFrame,
    df_Un_arg: pd.DataFrame,
    observed_timepoints: list,
    n_replicates_arg: int,
    results_dir_arg: str,
    random_seed_arg: int,
    max_cpus: int
):
    assert isinstance(genes_list, list)
    assert isinstance(df_Sp_arg, pd.DataFrame)
    assert isinstance(df_Un_arg, pd.DataFrame)
    assert isinstance(observed_timepoints, list)
    assert isinstance(n_replicates_arg, int) and n_replicates_arg > 0
    assert isinstance(results_dir_arg, str)
    assert isinstance(random_seed_arg, int) and random_seed_arg >= 0
    assert isinstance(max_cpus, int) and 1 <= max_cpus <= (os.cpu_count() - 1), f"Does not have enough CPU cores to allow {max_cpus} parallel processes" 

    # t_star, t_f
    t_star_arg, t_f_arg = rvinn.ut.load_observation_period(observed_timepoints)

    # make results directory if it doesn't exist
    os.makedirs(results_dir_arg, exist_ok=True)

    # initialize worker function
    initargs = (
        df_Sp_arg,
        df_Un_arg,
        t_star_arg,
        t_f_arg,
        n_replicates_arg,
        results_dir_arg,
        random_seed_arg
    )

    # parallel processing
    with Pool(
        processes=max_cpus,
        initializer=init_worker,
        initargs=initargs
    ) as pool:
        pool.map(rvinn_run_cpus, genes_list)

    print("All genes processed in the genes_list.")


In [None]:
parallel_compute(genes_list=genes_list,
                 df_Sp_arg=Sp_df,
                 df_Un_arg=Un_df,
                 observed_timepoints=observed_timepoints,
                 n_replicates_arg=3,
                 results_dir_arg="parallel_results",
                 random_seed_arg=1,
                 max_cpus=6) # set your number of cpus here, e.g. 6

Process ID [1640] Running RVINN for ABAT
Process ID [1641] Running RVINN for ABCA3
Process ID [1642] Running RVINN for ADCY1
Process ID [1643] Running RVINN for ADCY9
Process ID [1644] Running RVINN for AFF1
Process ID [1645] Running RVINN for AKAP1






