From cf851ba1eb38d5ee30facf34d1739aa1a8c8ecac Mon Sep 17 00:00:00 2001 From: dorien-er Date: Wed, 2 Jul 2025 10:58:50 +0200 Subject: [PATCH 01/13] wip --- .../create_pseudobulk/config.vsh.yaml | 168 ++++++++++++++++++ .../create_pseudobulk/script.py | 164 +++++++++++++++++ 2 files changed, 332 insertions(+) create mode 100644 src/differential_expression/create_pseudobulk/config.vsh.yaml create mode 100644 src/differential_expression/create_pseudobulk/script.py diff --git a/src/differential_expression/create_pseudobulk/config.vsh.yaml b/src/differential_expression/create_pseudobulk/config.vsh.yaml new file mode 100644 index 00000000000..55c960eaa0c --- /dev/null +++ b/src/differential_expression/create_pseudobulk/config.vsh.yaml @@ -0,0 +1,168 @@ +name: scanvi +namespace: "annotate" +scope: "public" +description: | + scANVI () is a semi-supervised model for single-cell transcriptomics data. scANVI is an scVI extension that can leverage the cell type knowledge for a subset of the cells present in the data sets to infer the states of the rest of the cells. + This component will instantiate a scANVI model from a pre-trained scVI model, integrate the data and perform label prediction. + +authors: + - __merge__: /src/authors/dorien_roosen.yaml + roles: [ maintainer ] + - __merge__: /src/authors/dries_de_mayer.yaml + roles: [ contributor ] + - __merge__: /src/authors/weiwei_schultz.yaml + roles: [ contributor ] +argument_groups: + - name: Inputs + arguments: + - name: "--input" + alternatives: ["-i"] + type: file + description: Input h5mu file. + direction: input + required: true + - name: "--modality" + description: | + Which modality from the input MuData file to process. + type: string + default: "rna" + required: false + - name: "--input_layer" + type: string + required: false + description: "Input layer to use. If None, X is used. This layer must contain raw counts." + - name: "--var_input" + type: string + required: false + description: ".var column containing highly variable genes that were used to train the scVi model. By default, do not subset genes." + - name: "--var_gene_names" + type: string + required: false + description: ".var column containing gene names. By default, use the index." + - name: "--obs_labels" + type: string + required: true + description: ".obs field containing the labels" + - name: "--unlabeled_category" + type: string + default: "Unknown" + description: | + Value in the --obs_labels field that indicates unlabeled observations + - name: --random_state + type: integer + description: | + The random seed for sampling. + default: 0 + required: false + - name: scVI Model + arguments: + - name: "--scvi_model" + type: file + description: "Pretrained SCVI reference model to initialize the SCANVI model with." + example: scvi_model.pt + direction: input + required: true + + - name: Outputs + arguments: + - name: "--output" + alternatives: ["-o"] + type: file + description: Output h5mu file. + direction: output + required: true + - name: "--output_model" + type: file + description: Folder where the state of the trained model will be saved to. + required: false + direction: output + - name: "--obsm_output" + type: string + default: "X_scanvi_integrated" + required: false + description: "In which .obsm slot to store the resulting integrated embedding." + - name: "--obs_output_predictions" + type: string + default: scanvi_pred + description: "In which .obs slot to store the predicted labels." + - name: "--obs_output_probabilities" + type: string + default: scanvi_proba + description: "In which. obs slot to store the probabilities of the predicted labels." + __merge__: [., /src/base/h5_compression_argument.yaml] + + - name: "scANVI training arguments" + arguments: + - name: "--early_stopping" + required: false + type: boolean + description: "Whether to perform early stopping with respect to the validation set." + - name: "--early_stopping_monitor" + choices: ["elbo_validation", "reconstruction_loss_validation", "kl_local_validation"] + default: "elbo_validation" + type: string + description: "Metric logged during validation set epoch." + - name: "--early_stopping_patience" + type: integer + min: 1 + default: 45 + description: "Number of validation epochs with no improvement after which training will be stopped." + - name: "--early_stopping_min_delta" + min: 0 + type: double + default: 0.0 + description: "Minimum change in the monitored quantity to qualify as an improvement, + i.e. an absolute change of less than min_delta, will count as no improvement." + - name: "--max_epochs" + type: integer + description: "Number of passes through the dataset, defaults to (20000 / number of cells) * 400 or 400; whichever is smallest." + required: false + - name: "--reduce_lr_on_plateau" + description: "Whether to monitor validation loss and reduce learning rate when validation set `lr_scheduler_metric` plateaus." + type: boolean + default: True + - name: "--lr_factor" + description: "Factor to reduce learning rate." + type: double + default: 0.6 + min: 0 + - name: "--lr_patience" + description: "Number of epochs with no improvement after which learning rate will be reduced." + type: double + default: 30 + min: 0 + +resources: + - type: python_script + path: script.py + - path: /src/utils/subset_vars.py + - path: /src/utils/compress_h5mu.py + - path: /src/utils/set_var_index.py + - path: /src/utils/setup_logger.py +test_resources: + - type: python_script + path: test.py + - path: /resources_test/annotation_test_data/scvi_model/ + - path: /resources_test/annotation_test_data/TS_Blood_filtered.h5mu + - path: /resources_test/pbmc_1k_protein_v3/pbmc_1k_protein_v3_mms.h5mu + +engines: +- type: docker + image: nvcr.io/nvidia/pytorch:25.05-py3 + setup: + - type: python + __merge__: [/src/base/requirements/anndata_mudata.yaml, /src/base/requirements/scanpy.yaml, .] + - type: python + packages: + - jax[cuda] + - scvi-tools~=1.3.1 + test_setup: + - type: python + __merge__: [ /src/base/requirements/viashpy.yaml, .] + +runners: +- type: executable + # docker_run_args: ["--gpus all"] +- type: nextflow + directives: + label: [midcpu, midmem, gpu, highdisk] diff --git a/src/differential_expression/create_pseudobulk/script.py b/src/differential_expression/create_pseudobulk/script.py new file mode 100644 index 00000000000..8489574a08d --- /dev/null +++ b/src/differential_expression/create_pseudobulk/script.py @@ -0,0 +1,164 @@ +import random +import numpy as np +import anndata as ad +import mudata as mu +import logging +import sys +import scipy.sparse as sp + +## VIASH START +# adata = mu.read_h5ad("resources_test/annotation_test_data/TS_Blood_filtered.h5mu", mod="rna") +# np.random.seed(42) # Set seed for reproducibility +# n_cells = adata.n_obs +# treatment = np.random.choice(['ctrl', 'stim'], size=n_cells, p=[0.5, 0.5]) +# disease = np.random.choice(['healthy', 'diseased'], size=n_cells, p=[0.5, 0.5]) +# adata.obs['treatment'] = treatment +# adata.obs['disease'] = disease +# mdata = mu.MuData({"rna": adata}) +# mdata.write_h5mu("resources_test/annotation_test_data/TS_Blood_filtered_annotated.h5mu") + +par = { + "input": "resources_test/annotation_test_data/TS_Blood_filtered_annotated.h5mu", + "modality": "rna", + "input_layer": None, + "obs_grouping": "cell_type", + "obs_sample_conditions": ["donor_id", "treatment", "disease"], + "obs_pseudo_bulk_samples": "pb_sample", + "aggregation_method": "sum", + "min_num_cells_per_donor": 5, + "pseudo_replicates": 2, + "random_seed": 0, + "output": "resources_test/annotation_test_data/TS_Blood_filtered_annotated_pseudobulk.h5mu", +} +meta = {"resources_dir": "src/utils"} +## VIASH END + +sys.path.append(meta["resources_dir"]) +from setup_logger import setup_logger +logger = setup_logger() + + +def aggregate_and_filter( + adata, + grouping_value, + group_key, + sample_key, + obs_to_keep, + aggregation_method="sum", + num_cells_per_donor=30, + pseudo_replicates=1): + + # subset adata to the given grouping (cell/cluster) value + adata_cell_pop = adata[adata.obs[group_key] == grouping_value] + + pb_aggregated_data = [] + for i, pbs in enumerate(pb_samples := adata_cell_pop.obs[sample_key].cat.categories): + logger.info(f"Processing pseudobulk sample {i+1} out of {len(pb_samples)}...") + + pb_adata = adata_cell_pop[adata_cell_pop.obs[sample_key] == pbs] + + # check which pseudobulk sample to keep according to the number of cells + if pb_adata.shape[0] < num_cells_per_donor: + logging.info(f"Number of cells for {pbs} is less than {num_cells_per_donor}, skipping pseudobulk aggregation...") + continue + + # Create pseudo-replicates + logging.info(f"Creating {pseudo_replicates} pseudo-replicates for pseudobulk sample {pbs}...") + indices = list(pb_adata.obs_names) + np.random.seed(par["random_seed"]) # optional seed for reproducibility + random.shuffle(indices) + indices = np.array_split(np.array(indices), pseudo_replicates) + + # Aggregate data for each pseudo-replicate + for i, rep_idx in enumerate(indices): + # Aggregate gene expression values + if aggregation_method == "sum": + pb_adata_rep = ad.AnnData( + X=pb_adata[rep_idx].X.sum(axis=0), + var=pb_adata[rep_idx].var[[]] + ) + elif aggregation_method == "mean": + pb_adata_rep = ad.AnnData( + X=pb_adata[rep_idx].X.mean(axis=0), + var=pb_adata[rep_idx].var[[]] + ) + + # Store metadata + pb_adata_rep.obs_names = [f"{grouping_value}_{pbs}_{str(i)}"] + pb_adata_rep.obs[group_key] = grouping_value + for obs in obs_to_keep: + pb_adata_rep.obs[obs] = pb_adata[rep_idx].obs[obs][0] + pb_adata_rep.obs["n_cells"] = len(pb_adata[rep_idx]) + pb_aggregated_data.append(pb_adata_rep) + + adata_pb = ad.concat(pb_aggregated_data) + + return adata_pb + + +def is_normalized(layer): + + if sp.issparse(layer): + row_sums = np.array(layer.sum(axis=1)).flatten() + else: + row_sums = layer.sum(axis=1) + + return np.allclose(row_sums, 1) + + +def main(): + + # Read in data + logger.info(f"Reading input data {par['input']}...") + adata = mu.read_h5ad(par["input"], mod=par["modality"]).copy() + + # Make sure .X contains raw counts + if par["input_layer"]: + adata.X = adata.layers[par["input_layer"]] + if is_normalized(adata.X): + raise ValueError("Input layer must contain raw counts.") + + # Ensure al columns required for pseudobulk aggregation exist and are categorical + required_categorical_obs = par["obs_sample_conditions"] + [par["obs_grouping"]] + for col in required_categorical_obs: + if col not in adata.obs.columns: + raise ValueError(f"Required column '{col}' not found in .obs.") + adata.obs[col] = adata.obs[col].astype(str).replace(" ", "_").replace("+", "").astype("category") + + # Create pseudobulk groups + logger.info("Creating pseudobulk groups...") + adata.obs[par["obs_pseudo_bulk_samples"]] = [ + "_".join(values) for values in zip(*[adata.obs[col] for col in par["obs_sample_conditions"]]) + ] + adata.obs[par["obs_pseudo_bulk_samples"]] = adata.obs[par["obs_pseudo_bulk_samples"]].astype("category") + logger.info(f"Pseudobulk groups created: {adata.obs[par['obs_pseudo_bulk_samples']].unique().tolist()}") + + # Aggregate pseudobulk data per cell group + pb_datasets = [] + cell_groups = adata.obs[par["obs_grouping"]].unique() + + for group in cell_groups: + logger.info(f"Processing cell group {group}") + pb_data = aggregate_and_filter( + adata, + group, + par["obs_grouping"], + par["obs_pseudo_bulk_samples"], + par["obs_sample_conditions"], + aggregation_method=par["aggregation_method"], + num_cells_per_donor=par["min_num_cells_per_donor"], + pseudo_replicates=par["pseudo_replicates"] + ) + pb_datasets.append(pb_data) + + # Combine pseudobulk datasets + logger.info("Combining pseudobulk datasets...") + adata_pb = ad.concat(pb_datasets) + logger.info(f"Final dataset: {adata_pb.n_obs} pseudobulk samples, {adata_pb.n_vars} genes") + logger.info("Writing output data...") + mdata_pb = mu.MuData({"rna": adata_pb}) + mdata_pb.write_h5mu(par["output"]) + + +if __name__ == "__main__": + main() From 0c148781c41caf29daad35ebe8224f173bb42cb6 Mon Sep 17 00:00:00 2001 From: dorien-er Date: Wed, 2 Jul 2025 11:00:36 +0200 Subject: [PATCH 02/13] wip --- .../create_pseudobulk/script.py | 68 ++++++++++++------- .../edger/config.vsh.yaml | 0 src/differential_expression/edger/script.R | 0 src/differential_expression/edger/test.py | 0 4 files changed, 42 insertions(+), 26 deletions(-) create mode 100644 src/differential_expression/edger/config.vsh.yaml create mode 100644 src/differential_expression/edger/script.R create mode 100644 src/differential_expression/edger/test.py diff --git a/src/differential_expression/create_pseudobulk/script.py b/src/differential_expression/create_pseudobulk/script.py index 8489574a08d..f71ab9c0630 100644 --- a/src/differential_expression/create_pseudobulk/script.py +++ b/src/differential_expression/create_pseudobulk/script.py @@ -35,37 +35,44 @@ sys.path.append(meta["resources_dir"]) from setup_logger import setup_logger + logger = setup_logger() def aggregate_and_filter( - adata, - grouping_value, - group_key, - sample_key, - obs_to_keep, - aggregation_method="sum", - num_cells_per_donor=30, - pseudo_replicates=1): - + adata, + grouping_value, + group_key, + sample_key, + obs_to_keep, + aggregation_method="sum", + num_cells_per_donor=30, + pseudo_replicates=1, +): # subset adata to the given grouping (cell/cluster) value adata_cell_pop = adata[adata.obs[group_key] == grouping_value] pb_aggregated_data = [] - for i, pbs in enumerate(pb_samples := adata_cell_pop.obs[sample_key].cat.categories): - logger.info(f"Processing pseudobulk sample {i+1} out of {len(pb_samples)}...") + for i, pbs in enumerate( + pb_samples := adata_cell_pop.obs[sample_key].cat.categories + ): + logger.info(f"Processing pseudobulk sample {i + 1} out of {len(pb_samples)}...") pb_adata = adata_cell_pop[adata_cell_pop.obs[sample_key] == pbs] # check which pseudobulk sample to keep according to the number of cells if pb_adata.shape[0] < num_cells_per_donor: - logging.info(f"Number of cells for {pbs} is less than {num_cells_per_donor}, skipping pseudobulk aggregation...") + logging.info( + f"Number of cells for {pbs} is less than {num_cells_per_donor}, skipping pseudobulk aggregation..." + ) continue # Create pseudo-replicates - logging.info(f"Creating {pseudo_replicates} pseudo-replicates for pseudobulk sample {pbs}...") + logging.info( + f"Creating {pseudo_replicates} pseudo-replicates for pseudobulk sample {pbs}..." + ) indices = list(pb_adata.obs_names) - np.random.seed(par["random_seed"]) # optional seed for reproducibility + np.random.seed(par["random_seed"]) # optional seed for reproducibility random.shuffle(indices) indices = np.array_split(np.array(indices), pseudo_replicates) @@ -74,13 +81,11 @@ def aggregate_and_filter( # Aggregate gene expression values if aggregation_method == "sum": pb_adata_rep = ad.AnnData( - X=pb_adata[rep_idx].X.sum(axis=0), - var=pb_adata[rep_idx].var[[]] + X=pb_adata[rep_idx].X.sum(axis=0), var=pb_adata[rep_idx].var[[]] ) elif aggregation_method == "mean": pb_adata_rep = ad.AnnData( - X=pb_adata[rep_idx].X.mean(axis=0), - var=pb_adata[rep_idx].var[[]] + X=pb_adata[rep_idx].X.mean(axis=0), var=pb_adata[rep_idx].var[[]] ) # Store metadata @@ -97,7 +102,6 @@ def aggregate_and_filter( def is_normalized(layer): - if sp.issparse(layer): row_sums = np.array(layer.sum(axis=1)).flatten() else: @@ -107,7 +111,6 @@ def is_normalized(layer): def main(): - # Read in data logger.info(f"Reading input data {par['input']}...") adata = mu.read_h5ad(par["input"], mod=par["modality"]).copy() @@ -123,15 +126,26 @@ def main(): for col in required_categorical_obs: if col not in adata.obs.columns: raise ValueError(f"Required column '{col}' not found in .obs.") - adata.obs[col] = adata.obs[col].astype(str).replace(" ", "_").replace("+", "").astype("category") + adata.obs[col] = ( + adata.obs[col] + .astype(str) + .replace(" ", "_") + .replace("+", "") + .astype("category") + ) # Create pseudobulk groups logger.info("Creating pseudobulk groups...") adata.obs[par["obs_pseudo_bulk_samples"]] = [ - "_".join(values) for values in zip(*[adata.obs[col] for col in par["obs_sample_conditions"]]) + "_".join(values) + for values in zip(*[adata.obs[col] for col in par["obs_sample_conditions"]]) ] - adata.obs[par["obs_pseudo_bulk_samples"]] = adata.obs[par["obs_pseudo_bulk_samples"]].astype("category") - logger.info(f"Pseudobulk groups created: {adata.obs[par['obs_pseudo_bulk_samples']].unique().tolist()}") + adata.obs[par["obs_pseudo_bulk_samples"]] = adata.obs[ + par["obs_pseudo_bulk_samples"] + ].astype("category") + logger.info( + f"Pseudobulk groups created: {adata.obs[par['obs_pseudo_bulk_samples']].unique().tolist()}" + ) # Aggregate pseudobulk data per cell group pb_datasets = [] @@ -147,14 +161,16 @@ def main(): par["obs_sample_conditions"], aggregation_method=par["aggregation_method"], num_cells_per_donor=par["min_num_cells_per_donor"], - pseudo_replicates=par["pseudo_replicates"] + pseudo_replicates=par["pseudo_replicates"], ) pb_datasets.append(pb_data) # Combine pseudobulk datasets logger.info("Combining pseudobulk datasets...") adata_pb = ad.concat(pb_datasets) - logger.info(f"Final dataset: {adata_pb.n_obs} pseudobulk samples, {adata_pb.n_vars} genes") + logger.info( + f"Final dataset: {adata_pb.n_obs} pseudobulk samples, {adata_pb.n_vars} genes" + ) logger.info("Writing output data...") mdata_pb = mu.MuData({"rna": adata_pb}) mdata_pb.write_h5mu(par["output"]) diff --git a/src/differential_expression/edger/config.vsh.yaml b/src/differential_expression/edger/config.vsh.yaml new file mode 100644 index 00000000000..e69de29bb2d diff --git a/src/differential_expression/edger/script.R b/src/differential_expression/edger/script.R new file mode 100644 index 00000000000..e69de29bb2d diff --git a/src/differential_expression/edger/test.py b/src/differential_expression/edger/test.py new file mode 100644 index 00000000000..e69de29bb2d From ca2d3816bf201a60bc6e6c5afd1beefba43d3762 Mon Sep 17 00:00:00 2001 From: dorien-er Date: Wed, 2 Jul 2025 11:01:01 +0200 Subject: [PATCH 03/13] wip --- src/differential_expression/edger/config.vsh.yaml | 0 src/differential_expression/edger/script.R | 0 src/differential_expression/edger/test.py | 0 3 files changed, 0 insertions(+), 0 deletions(-) delete mode 100644 src/differential_expression/edger/config.vsh.yaml delete mode 100644 src/differential_expression/edger/script.R delete mode 100644 src/differential_expression/edger/test.py diff --git a/src/differential_expression/edger/config.vsh.yaml b/src/differential_expression/edger/config.vsh.yaml deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/src/differential_expression/edger/script.R b/src/differential_expression/edger/script.R deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/src/differential_expression/edger/test.py b/src/differential_expression/edger/test.py deleted file mode 100644 index e69de29bb2d..00000000000 From 3c6a5ca7a3b6a23abce026c5c139e9811e34b666 Mon Sep 17 00:00:00 2001 From: dorien-er Date: Wed, 2 Jul 2025 11:30:12 +0200 Subject: [PATCH 04/13] wip --- .../create_pseudobulk/config.vsh.yaml | 152 ++++++------------ 1 file changed, 45 insertions(+), 107 deletions(-) diff --git a/src/differential_expression/create_pseudobulk/config.vsh.yaml b/src/differential_expression/create_pseudobulk/config.vsh.yaml index 55c960eaa0c..5012c6d885d 100644 --- a/src/differential_expression/create_pseudobulk/config.vsh.yaml +++ b/src/differential_expression/create_pseudobulk/config.vsh.yaml @@ -1,17 +1,22 @@ -name: scanvi -namespace: "annotate" +name: differential_expression +namespace: "create_pseudobulk" scope: "public" description: | - scANVI () is a semi-supervised model for single-cell transcriptomics data. scANVI is an scVI extension that can leverage the cell type knowledge for a subset of the cells present in the data sets to infer the states of the rest of the cells. - This component will instantiate a scANVI model from a pre-trained scVI model, integrate the data and perform label prediction. + Generation of pseudobulk samples from single-cell transcriptomics data, + by aggregating raw gene expression counts from individual cells to create + bulk-like expression profiles suitable for differential expression analysis + with methods designed for bulk differential expression analysis. authors: - __merge__: /src/authors/dorien_roosen.yaml - roles: [ maintainer ] - - __merge__: /src/authors/dries_de_mayer.yaml + roles: [ author ] + - __merge__: /src/authors/jakub_majercik.yaml + roles: [ author ] + - __merge__: /src/authors/dries_de_maeyer.yaml roles: [ contributor ] - __merge__: /src/authors/weiwei_schultz.yaml roles: [ contributor ] + argument_groups: - name: Inputs arguments: @@ -31,138 +36,71 @@ argument_groups: type: string required: false description: "Input layer to use. If None, X is used. This layer must contain raw counts." - - name: "--var_input" - type: string - required: false - description: ".var column containing highly variable genes that were used to train the scVi model. By default, do not subset genes." - - name: "--var_gene_names" - type: string - required: false - description: ".var column containing gene names. By default, use the index." - - name: "--obs_labels" + - name: "--obs_grouping" type: string required: true - description: ".obs field containing the labels" - - name: "--unlabeled_category" + description: ".obs field containing the grouping variable. Typically this field contains cell groups such as annotated cell types or clusters." + - name: obs_sample_conditions type: string - default: "Unknown" - description: | - Value in the --obs_labels field that indicates unlabeled observations - - name: --random_state - type: integer - description: | - The random seed for sampling. - default: 0 - required: false - - name: scVI Model + multiple: true + description: ".obs fields containing the experimental condition to create pseudobulk sampels for." + + - name: Arguments arguments: - - name: "--scvi_model" - type: file - description: "Pretrained SCVI reference model to initialize the SCANVI model with." - example: scvi_model.pt - direction: input - required: true + - name: "--aggregation_method" + type: string + choices: ["sum", "mean"] + default: "sum" + description: "Method to aggregate the raw counts for pseudoreplicates. Either sum or mean." + - name: "--min_num_cells_per_sample" + type: integer + default: 30 + description: "Minimum number of cells per pseudobulk sample." + - name: "--pseudo_replicates" + type: integer + default: 1 + description: "Number of pseudoreplicates to create per pseudobulk sample." - name: Outputs arguments: - name: "--output" - alternatives: ["-o"] type: file - description: Output h5mu file. + description: Output h5mu file containing the aggreagted pseudobulk samples. direction: output required: true - - name: "--output_model" - type: file - description: Folder where the state of the trained model will be saved to. - required: false - direction: output - - name: "--obsm_output" + - name: "--obs_pseudo_bulk_samples" type: string - default: "X_scanvi_integrated" - required: false - description: "In which .obsm slot to store the resulting integrated embedding." - - name: "--obs_output_predictions" - type: string - default: scanvi_pred - description: "In which .obs slot to store the predicted labels." - - name: "--obs_output_probabilities" + default: "pb_sample" + description: "In which .obs slot to store the pseudobulk sample names." + - name: "--output_compression" type: string - default: scanvi_proba - description: "In which. obs slot to store the probabilities of the predicted labels." - __merge__: [., /src/base/h5_compression_argument.yaml] - - - name: "scANVI training arguments" - arguments: - - name: "--early_stopping" - required: false - type: boolean - description: "Whether to perform early stopping with respect to the validation set." - - name: "--early_stopping_monitor" - choices: ["elbo_validation", "reconstruction_loss_validation", "kl_local_validation"] - default: "elbo_validation" - type: string - description: "Metric logged during validation set epoch." - - name: "--early_stopping_patience" - type: integer - min: 1 - default: 45 - description: "Number of validation epochs with no improvement after which training will be stopped." - - name: "--early_stopping_min_delta" - min: 0 - type: double - default: 0.0 - description: "Minimum change in the monitored quantity to qualify as an improvement, - i.e. an absolute change of less than min_delta, will count as no improvement." - - name: "--max_epochs" - type: integer - description: "Number of passes through the dataset, defaults to (20000 / number of cells) * 400 or 400; whichever is smallest." + description: | + The compression format to be used on the output h5mu object. + choices: ["gzip", "lzf"] required: false - - name: "--reduce_lr_on_plateau" - description: "Whether to monitor validation loss and reduce learning rate when validation set `lr_scheduler_metric` plateaus." - type: boolean - default: True - - name: "--lr_factor" - description: "Factor to reduce learning rate." - type: double - default: 0.6 - min: 0 - - name: "--lr_patience" - description: "Number of epochs with no improvement after which learning rate will be reduced." - type: double - default: 30 - min: 0 + example: "gzip" resources: - type: python_script path: script.py - - path: /src/utils/subset_vars.py - - path: /src/utils/compress_h5mu.py - - path: /src/utils/set_var_index.py - path: /src/utils/setup_logger.py test_resources: - - type: python_script - path: test.py - - path: /resources_test/annotation_test_data/scvi_model/ + # - type: python_script + # path: test.py - path: /resources_test/annotation_test_data/TS_Blood_filtered.h5mu - - path: /resources_test/pbmc_1k_protein_v3/pbmc_1k_protein_v3_mms.h5mu engines: - type: docker - image: nvcr.io/nvidia/pytorch:25.05-py3 + image: python:3.12 setup: - type: python - __merge__: [/src/base/requirements/anndata_mudata.yaml, /src/base/requirements/scanpy.yaml, .] - - type: python - packages: - - jax[cuda] - - scvi-tools~=1.3.1 + __merge__: [/src/base/requirements/anndata_mudata.yaml] test_setup: - type: python __merge__: [ /src/base/requirements/viashpy.yaml, .] runners: - type: executable - # docker_run_args: ["--gpus all"] - type: nextflow directives: - label: [midcpu, midmem, gpu, highdisk] + label: [lowcpu, lowmem] From c044b25b18ec7af0f0f73125d4d51e4715865df9 Mon Sep 17 00:00:00 2001 From: dorien-er Date: Wed, 2 Jul 2025 14:40:04 +0200 Subject: [PATCH 05/13] update_tests --- .../create_pseudobulk/config.vsh.yaml | 24 ++- .../create_pseudobulk/script.py | 33 ++-- .../create_pseudobulk/test.py | 169 ++++++++++++++++++ .../edger/config.vsh.yaml | 0 src/differential_expression/edger/script.R | 0 src/differential_expression/edger/test.py | 0 6 files changed, 202 insertions(+), 24 deletions(-) create mode 100644 src/differential_expression/create_pseudobulk/test.py create mode 100644 src/differential_expression/edger/config.vsh.yaml create mode 100644 src/differential_expression/edger/script.R create mode 100644 src/differential_expression/edger/test.py diff --git a/src/differential_expression/create_pseudobulk/config.vsh.yaml b/src/differential_expression/create_pseudobulk/config.vsh.yaml index 5012c6d885d..7c989443682 100644 --- a/src/differential_expression/create_pseudobulk/config.vsh.yaml +++ b/src/differential_expression/create_pseudobulk/config.vsh.yaml @@ -40,10 +40,10 @@ argument_groups: type: string required: true description: ".obs field containing the grouping variable. Typically this field contains cell groups such as annotated cell types or clusters." - - name: obs_sample_conditions + - name: "--obs_sample_conditions" type: string multiple: true - description: ".obs fields containing the experimental condition to create pseudobulk sampels for." + description: ".obs fields containing the experimental condition(s) to create pseudobulk sampels for." - name: Arguments arguments: @@ -54,13 +54,22 @@ argument_groups: description: "Method to aggregate the raw counts for pseudoreplicates. Either sum or mean." - name: "--min_num_cells_per_sample" type: integer + min: 1 default: 30 description: "Minimum number of cells per pseudobulk sample." - name: "--pseudo_replicates" type: integer + min: 1 default: 1 description: "Number of pseudoreplicates to create per pseudobulk sample." - + - name: --random_state + type: integer + description: | + The random seed for sampling. + min: 0 + default: 0 + required: false + - name: Outputs arguments: - name: "--output" @@ -85,8 +94,8 @@ resources: path: script.py - path: /src/utils/setup_logger.py test_resources: - # - type: python_script - # path: test.py + - type: python_script + path: test.py - path: /resources_test/annotation_test_data/TS_Blood_filtered.h5mu engines: @@ -95,9 +104,8 @@ engines: setup: - type: python __merge__: [/src/base/requirements/anndata_mudata.yaml] - test_setup: - - type: python - __merge__: [ /src/base/requirements/viashpy.yaml, .] + __merge__: [ /src/base/requirements/python_test_setup.yaml, .] + runners: - type: executable diff --git a/src/differential_expression/create_pseudobulk/script.py b/src/differential_expression/create_pseudobulk/script.py index f71ab9c0630..73731537e31 100644 --- a/src/differential_expression/create_pseudobulk/script.py +++ b/src/differential_expression/create_pseudobulk/script.py @@ -7,27 +7,28 @@ import scipy.sparse as sp ## VIASH START -# adata = mu.read_h5ad("resources_test/annotation_test_data/TS_Blood_filtered.h5mu", mod="rna") -# np.random.seed(42) # Set seed for reproducibility -# n_cells = adata.n_obs -# treatment = np.random.choice(['ctrl', 'stim'], size=n_cells, p=[0.5, 0.5]) -# disease = np.random.choice(['healthy', 'diseased'], size=n_cells, p=[0.5, 0.5]) -# adata.obs['treatment'] = treatment -# adata.obs['disease'] = disease -# mdata = mu.MuData({"rna": adata}) -# mdata.write_h5mu("resources_test/annotation_test_data/TS_Blood_filtered_annotated.h5mu") +adata = mu.read_h5ad("resources_test/annotation_test_data/TS_Blood_filtered.h5mu", mod="rna") +np.random.seed(0) +n_cells = adata.n_obs +treatment = np.random.choice(['ctrl', 'stim'], size=n_cells, p=[0.5, 0.5]) +disease = np.random.choice(['healthy', 'diseased'], size=n_cells, p=[0.5, 0.5]) +adata.obs['treatment'] = treatment +adata.obs['disease'] = disease +mdata = mu.MuData({"rna": adata}) +mdata.write_h5mu("resources_test/annotation_test_data/TS_Blood_filtered_annotated.h5mu") par = { "input": "resources_test/annotation_test_data/TS_Blood_filtered_annotated.h5mu", "modality": "rna", "input_layer": None, "obs_grouping": "cell_type", - "obs_sample_conditions": ["donor_id", "treatment", "disease"], + "obs_sample_conditions": ["treatment", "donor_id", "disease"], + # "obs_sample_conditions": ["donor_id", "treatment", "disease"], "obs_pseudo_bulk_samples": "pb_sample", "aggregation_method": "sum", - "min_num_cells_per_donor": 5, - "pseudo_replicates": 2, - "random_seed": 0, + "min_num_cells_per_sample": 5, + "pseudo_replicates": 1, + "random_state": 0, "output": "resources_test/annotation_test_data/TS_Blood_filtered_annotated_pseudobulk.h5mu", } meta = {"resources_dir": "src/utils"} @@ -72,7 +73,7 @@ def aggregate_and_filter( f"Creating {pseudo_replicates} pseudo-replicates for pseudobulk sample {pbs}..." ) indices = list(pb_adata.obs_names) - np.random.seed(par["random_seed"]) # optional seed for reproducibility + np.random.seed(par["random_state"]) # optional seed for reproducibility random.shuffle(indices) indices = np.array_split(np.array(indices), pseudo_replicates) @@ -160,7 +161,7 @@ def main(): par["obs_pseudo_bulk_samples"], par["obs_sample_conditions"], aggregation_method=par["aggregation_method"], - num_cells_per_donor=par["min_num_cells_per_donor"], + num_cells_per_donor=par["min_num_cells_per_sample"], pseudo_replicates=par["pseudo_replicates"], ) pb_datasets.append(pb_data) @@ -173,7 +174,7 @@ def main(): ) logger.info("Writing output data...") mdata_pb = mu.MuData({"rna": adata_pb}) - mdata_pb.write_h5mu(par["output"]) + mdata_pb.write_h5mu(par["output"], compression=par["output_compression"]) if __name__ == "__main__": diff --git a/src/differential_expression/create_pseudobulk/test.py b/src/differential_expression/create_pseudobulk/test.py new file mode 100644 index 00000000000..749a63d60a2 --- /dev/null +++ b/src/differential_expression/create_pseudobulk/test.py @@ -0,0 +1,169 @@ +import os +import subprocess +import mudata as mu +import sys +import pytest +import re +import numpy as np +import pandas as pd + + +## VIASH START +meta = { + "resources_dir": "resources_test/" +} +## VIASH END + +sys.path.append(meta["resources_dir"]) + + +@pytest.fixture +def input_path(): + return f"{meta['resources_dir']}/TS_Blood_filtered.h5mu" + + +@pytest.fixture +def input_data(input_path): + mu_in = mu.read_h5mu(input_path) + return mu_in + + +@pytest.fixture +def annotated_test_data(input_data): + rna_in = input_data.mod["rna"] + np.random.seed(0) + n_cells = rna_in.n_obs + treatment = np.random.choice(['ctrl', 'stim'], size=n_cells, p=[0.5, 0.5]) + disease = np.random.choice(['healthy', 'diseased'], size=n_cells, p=[0.5, 0.5]) + rna_in.obs['treatment'] = treatment + rna_in.obs['disease'] = disease + return input_data + + +@pytest.fixture +def annotated_test_data_path(tmp_path, annotated_test_data): + temp_h5mu = tmp_path / "de_test_data.h5mu" + annotated_test_data.write_h5mu(temp_h5mu) + return temp_h5mu + + +def test_simple_execution(run_component, random_h5mu_path, annotated_test_data_path): + output_path = random_h5mu_path() + + run_component( + [ + "--input", + annotated_test_data_path, + "--output", + output_path, + "--obs_grouping", + "cell_type", + "--obs_sample_conditions", + "treatment", + "--output_compression", + "gzip", + ] + ) + + assert os.path.exists(output_path), "Output query file does not exist" + mdata = mu.read_h5mu(output_path) + adata = mdata.mod["rna"] + + expected_obs = ["treatment", "cell_type", "n_cells"] + assert all(col in adata.obs for col in expected_obs), f"Expected columns {expected_obs} not found in .obs" + assert adata.shape[0] == 8, "Expected a total of 8 pseudobulk samples in the output" + + +def test_multiple_factors(run_component, random_h5mu_path, annotated_test_data_path): + output_path = random_h5mu_path() + + run_component( + [ + "--input", + annotated_test_data_path, + "--output", + output_path, + "--obs_grouping", + "cell_type", + "--obs_sample_conditions", + "disease", + "--obs_sample_conditions", + "donor_id", + "--obs_sample_conditions", + "treatment", + "--min_num_cells_per_sample", + "5", + "--output_compression", + "gzip", + ] + ) + + assert os.path.exists(output_path), "Output query file does not exist" + mdata = mu.read_h5mu(output_path) + adata = mdata.mod["rna"] + + expected_obs = ["treatment", "cell_type", "n_cells"] + assert all(col in adata.obs for col in expected_obs), f"Expected columns {expected_obs} not found in .obs" + assert adata.shape[0] == 16, "Expected a total of 16 pseudobulk samples in the output" + + +def test_pseudo_replicates(run_component, random_h5mu_path, annotated_test_data_path): + output_path = random_h5mu_path() + + run_component( + [ + "--input", + annotated_test_data_path, + "--output", + output_path, + "--obs_grouping", + "cell_type", + "--obs_sample_conditions", + "treatment", + "--pseudo_replicates", + "2", + "--output_compression", + "gzip", + ] + ) + + assert os.path.exists(output_path), "Output query file does not exist" + mdata = mu.read_h5mu(output_path) + adata = mdata.mod["rna"] + + expected_obs = ["treatment", "cell_type", "n_cells"] + assert all(col in adata.obs for col in expected_obs), f"Expected columns {expected_obs} not found in .obs" + assert adata.shape[0] == 16, "Expected a total of 8 pseudobulk samples in the output" + + +def test_filtering(run_component, random_h5mu_path, annotated_test_data_path): + output_path = random_h5mu_path() + + run_component( + [ + "--input", + annotated_test_data_path, + "--output", + output_path, + "--obs_grouping", + "cell_type", + "--obs_sample_conditions", + "treatment", + "--min_num_cells_per_sample", + "50", + "--output_compression", + "gzip", + ] + ) + + assert os.path.exists(output_path), "Output query file does not exist" + mdata = mu.read_h5mu(output_path) + adata = mdata.mod["rna"] + + expected_obs = ["treatment", "cell_type", "n_cells"] + assert all(col in adata.obs for col in expected_obs), f"Expected columns {expected_obs} not found in .obs" + assert adata.shape[0] == 4, "Expected a total of 8 pseudobulk samples in the output" + + +if __name__ == "__main__": + sys.exit(pytest.main([__file__])) diff --git a/src/differential_expression/edger/config.vsh.yaml b/src/differential_expression/edger/config.vsh.yaml new file mode 100644 index 00000000000..e69de29bb2d diff --git a/src/differential_expression/edger/script.R b/src/differential_expression/edger/script.R new file mode 100644 index 00000000000..e69de29bb2d diff --git a/src/differential_expression/edger/test.py b/src/differential_expression/edger/test.py new file mode 100644 index 00000000000..e69de29bb2d From 73a24122e8dd76e6ccad518f07be4d05234a1208 Mon Sep 17 00:00:00 2001 From: dorien-er Date: Fri, 4 Jul 2025 09:48:24 +0200 Subject: [PATCH 06/13] typos --- .../create_pseudobulk/config.vsh.yaml | 2 +- .../create_pseudobulk/test.py | 40 +++++++++++-------- 2 files changed, 24 insertions(+), 18 deletions(-) diff --git a/src/differential_expression/create_pseudobulk/config.vsh.yaml b/src/differential_expression/create_pseudobulk/config.vsh.yaml index 7c989443682..548bbd86923 100644 --- a/src/differential_expression/create_pseudobulk/config.vsh.yaml +++ b/src/differential_expression/create_pseudobulk/config.vsh.yaml @@ -93,6 +93,7 @@ resources: - type: python_script path: script.py - path: /src/utils/setup_logger.py + test_resources: - type: python_script path: test.py @@ -106,7 +107,6 @@ engines: __merge__: [/src/base/requirements/anndata_mudata.yaml] __merge__: [ /src/base/requirements/python_test_setup.yaml, .] - runners: - type: executable - type: nextflow diff --git a/src/differential_expression/create_pseudobulk/test.py b/src/differential_expression/create_pseudobulk/test.py index 749a63d60a2..37662df5d9a 100644 --- a/src/differential_expression/create_pseudobulk/test.py +++ b/src/differential_expression/create_pseudobulk/test.py @@ -1,17 +1,11 @@ import os -import subprocess import mudata as mu import sys import pytest -import re import numpy as np -import pandas as pd - ## VIASH START -meta = { - "resources_dir": "resources_test/" -} +meta = {"resources_dir": "resources_test/"} ## VIASH END sys.path.append(meta["resources_dir"]) @@ -33,10 +27,10 @@ def annotated_test_data(input_data): rna_in = input_data.mod["rna"] np.random.seed(0) n_cells = rna_in.n_obs - treatment = np.random.choice(['ctrl', 'stim'], size=n_cells, p=[0.5, 0.5]) - disease = np.random.choice(['healthy', 'diseased'], size=n_cells, p=[0.5, 0.5]) - rna_in.obs['treatment'] = treatment - rna_in.obs['disease'] = disease + treatment = np.random.choice(["ctrl", "stim"], size=n_cells, p=[0.5, 0.5]) + disease = np.random.choice(["healthy", "diseased"], size=n_cells, p=[0.5, 0.5]) + rna_in.obs["treatment"] = treatment + rna_in.obs["disease"] = disease return input_data @@ -70,7 +64,9 @@ def test_simple_execution(run_component, random_h5mu_path, annotated_test_data_p adata = mdata.mod["rna"] expected_obs = ["treatment", "cell_type", "n_cells"] - assert all(col in adata.obs for col in expected_obs), f"Expected columns {expected_obs} not found in .obs" + assert all(col in adata.obs for col in expected_obs), ( + f"Expected columns {expected_obs} not found in .obs" + ) assert adata.shape[0] == 8, "Expected a total of 8 pseudobulk samples in the output" @@ -103,8 +99,12 @@ def test_multiple_factors(run_component, random_h5mu_path, annotated_test_data_p adata = mdata.mod["rna"] expected_obs = ["treatment", "cell_type", "n_cells"] - assert all(col in adata.obs for col in expected_obs), f"Expected columns {expected_obs} not found in .obs" - assert adata.shape[0] == 16, "Expected a total of 16 pseudobulk samples in the output" + assert all(col in adata.obs for col in expected_obs), ( + f"Expected columns {expected_obs} not found in .obs" + ) + assert adata.shape[0] == 16, ( + "Expected a total of 16 pseudobulk samples in the output" + ) def test_pseudo_replicates(run_component, random_h5mu_path, annotated_test_data_path): @@ -132,8 +132,12 @@ def test_pseudo_replicates(run_component, random_h5mu_path, annotated_test_data_ adata = mdata.mod["rna"] expected_obs = ["treatment", "cell_type", "n_cells"] - assert all(col in adata.obs for col in expected_obs), f"Expected columns {expected_obs} not found in .obs" - assert adata.shape[0] == 16, "Expected a total of 8 pseudobulk samples in the output" + assert all(col in adata.obs for col in expected_obs), ( + f"Expected columns {expected_obs} not found in .obs" + ) + assert adata.shape[0] == 16, ( + "Expected a total of 8 pseudobulk samples in the output" + ) def test_filtering(run_component, random_h5mu_path, annotated_test_data_path): @@ -161,7 +165,9 @@ def test_filtering(run_component, random_h5mu_path, annotated_test_data_path): adata = mdata.mod["rna"] expected_obs = ["treatment", "cell_type", "n_cells"] - assert all(col in adata.obs for col in expected_obs), f"Expected columns {expected_obs} not found in .obs" + assert all(col in adata.obs for col in expected_obs), ( + f"Expected columns {expected_obs} not found in .obs" + ) assert adata.shape[0] == 4, "Expected a total of 8 pseudobulk samples in the output" From 8868be12df7b1abcc78a8953be8083ed24cbf719 Mon Sep 17 00:00:00 2001 From: dorien-er Date: Wed, 16 Jul 2025 14:25:17 +0200 Subject: [PATCH 07/13] remove edger --- src/differential_expression/edger/config.vsh.yaml | 0 src/differential_expression/edger/script.R | 0 src/differential_expression/edger/test.py | 0 3 files changed, 0 insertions(+), 0 deletions(-) delete mode 100644 src/differential_expression/edger/config.vsh.yaml delete mode 100644 src/differential_expression/edger/script.R delete mode 100644 src/differential_expression/edger/test.py diff --git a/src/differential_expression/edger/config.vsh.yaml b/src/differential_expression/edger/config.vsh.yaml deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/src/differential_expression/edger/script.R b/src/differential_expression/edger/script.R deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/src/differential_expression/edger/test.py b/src/differential_expression/edger/test.py deleted file mode 100644 index e69de29bb2d..00000000000 From 9a5ccf9a8975810709666355d162cb445f7d52a7 Mon Sep 17 00:00:00 2001 From: dorien-er Date: Wed, 16 Jul 2025 15:45:17 +0200 Subject: [PATCH 08/13] linting --- .../create_pseudobulk/script.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/differential_expression/create_pseudobulk/script.py b/src/differential_expression/create_pseudobulk/script.py index 73731537e31..44fd639035f 100644 --- a/src/differential_expression/create_pseudobulk/script.py +++ b/src/differential_expression/create_pseudobulk/script.py @@ -7,13 +7,15 @@ import scipy.sparse as sp ## VIASH START -adata = mu.read_h5ad("resources_test/annotation_test_data/TS_Blood_filtered.h5mu", mod="rna") +adata = mu.read_h5ad( + "resources_test/annotation_test_data/TS_Blood_filtered.h5mu", mod="rna" +) np.random.seed(0) n_cells = adata.n_obs -treatment = np.random.choice(['ctrl', 'stim'], size=n_cells, p=[0.5, 0.5]) -disease = np.random.choice(['healthy', 'diseased'], size=n_cells, p=[0.5, 0.5]) -adata.obs['treatment'] = treatment -adata.obs['disease'] = disease +treatment = np.random.choice(["ctrl", "stim"], size=n_cells, p=[0.5, 0.5]) +disease = np.random.choice(["healthy", "diseased"], size=n_cells, p=[0.5, 0.5]) +adata.obs["treatment"] = treatment +adata.obs["disease"] = disease mdata = mu.MuData({"rna": adata}) mdata.write_h5mu("resources_test/annotation_test_data/TS_Blood_filtered_annotated.h5mu") From eac9f99e5f41936910bf4082a6ea7dc406645657 Mon Sep 17 00:00:00 2001 From: dorien-er Date: Thu, 17 Jul 2025 13:36:37 +0200 Subject: [PATCH 09/13] update test resources --- .../annotation_test_data.sh | 28 ++++++++++- .../create_pseudobulk/script.py | 17 ++----- .../create_pseudobulk/test.py | 47 ++++--------------- 3 files changed, 38 insertions(+), 54 deletions(-) mode change 100644 => 100755 resources_test_scripts/annotation_test_data.sh diff --git a/resources_test_scripts/annotation_test_data.sh b/resources_test_scripts/annotation_test_data.sh old mode 100644 new mode 100755 index 1aea9ec3291..b9db9855e11 --- a/resources_test_scripts/annotation_test_data.sh +++ b/resources_test_scripts/annotation_test_data.sh @@ -34,15 +34,20 @@ wget "https://zenodo.org/record/7580707/files/pretrained_models_Blood_ts.tar.gz? # Process Tabula Sapiens Blood reference h5ad # (Select one individual and 100 cells per cell type) # normalize and log1p transform data +# Add treatment and disease columns python <=n].groupby('cell_ontology_class').head(n).index] -# assert sub_ref_adata_final.shape == (500, 58870) + +# Normalize and log1p transform data data_for_scanpy = ad.AnnData(X=sub_ref_adata_final.X) sc.pp.normalize_total(data_for_scanpy, target_sum=10000) sc.pp.log1p( @@ -50,8 +55,17 @@ sc.pp.log1p( base=None, layer=None, copy=False, -) +) sub_ref_adata_final.layers["log_normalized"] = data_for_scanpy.X + +# Add treatment and disease columns +n_cells = sub_ref_adata_final.n_obs +treatment = np.random.choice(["ctrl", "stim"], size=n_cells, p=[0.5, 0.5]) +disease = np.random.choice(["healthy", "diseased"], size=n_cells, p=[0.5, 0.5]) +sub_ref_adata_final.obs["treatment"] = treatment +sub_ref_adata_final.obs["disease"] = disease + +# Write out data sub_ref_adata_final.write("${OUT}/TS_Blood_filtered.h5ad", compression='gzip') HEREDOC @@ -115,3 +129,13 @@ viash run src/integrate/scanvi/config.vsh.yaml --engine docker -- \ rm "${OUT}/scanvi_output.h5mu" rm "${OUT}/scvi_output.h5mu" rm -r "${OUT}/Pretrained_model/" + +echo "> Creating Pseudobulk Data for DGEA" +viash run src/differential_expression/create_pseudobulk/config.vsh.yaml --engine docker -- \ + --input "${OUT}/TS_Blood_filtered.h5mu" \ + --obs_grouping "cell_type" \ + --obs_sample_conditions "donor_id" \ + --obs_sample_conditions "treatment" \ + --obs_sample_conditions "disease" \ + --min_num_cells_per_sample 5 \ + --output "${OUT}/TS_Blood_filtered_pseudobulk.h5mu" \ No newline at end of file diff --git a/src/differential_expression/create_pseudobulk/script.py b/src/differential_expression/create_pseudobulk/script.py index 44fd639035f..8cf27f231f8 100644 --- a/src/differential_expression/create_pseudobulk/script.py +++ b/src/differential_expression/create_pseudobulk/script.py @@ -7,20 +7,8 @@ import scipy.sparse as sp ## VIASH START -adata = mu.read_h5ad( - "resources_test/annotation_test_data/TS_Blood_filtered.h5mu", mod="rna" -) -np.random.seed(0) -n_cells = adata.n_obs -treatment = np.random.choice(["ctrl", "stim"], size=n_cells, p=[0.5, 0.5]) -disease = np.random.choice(["healthy", "diseased"], size=n_cells, p=[0.5, 0.5]) -adata.obs["treatment"] = treatment -adata.obs["disease"] = disease -mdata = mu.MuData({"rna": adata}) -mdata.write_h5mu("resources_test/annotation_test_data/TS_Blood_filtered_annotated.h5mu") - par = { - "input": "resources_test/annotation_test_data/TS_Blood_filtered_annotated.h5mu", + "input": "resources_test/annotation_test_data/TS_Blood_filtered.h5mu", "modality": "rna", "input_layer": None, "obs_grouping": "cell_type", @@ -31,7 +19,8 @@ "min_num_cells_per_sample": 5, "pseudo_replicates": 1, "random_state": 0, - "output": "resources_test/annotation_test_data/TS_Blood_filtered_annotated_pseudobulk.h5mu", + "output": "test.h5mu", + "output_compression": "gzip", } meta = {"resources_dir": "src/utils"} ## VIASH END diff --git a/src/differential_expression/create_pseudobulk/test.py b/src/differential_expression/create_pseudobulk/test.py index 37662df5d9a..90b3824b294 100644 --- a/src/differential_expression/create_pseudobulk/test.py +++ b/src/differential_expression/create_pseudobulk/test.py @@ -2,7 +2,6 @@ import mudata as mu import sys import pytest -import numpy as np ## VIASH START meta = {"resources_dir": "resources_test/"} @@ -10,44 +9,16 @@ sys.path.append(meta["resources_dir"]) +input_path = meta["resources_dir"] + "/TS_Blood_filtered.h5mu" -@pytest.fixture -def input_path(): - return f"{meta['resources_dir']}/TS_Blood_filtered.h5mu" - -@pytest.fixture -def input_data(input_path): - mu_in = mu.read_h5mu(input_path) - return mu_in - - -@pytest.fixture -def annotated_test_data(input_data): - rna_in = input_data.mod["rna"] - np.random.seed(0) - n_cells = rna_in.n_obs - treatment = np.random.choice(["ctrl", "stim"], size=n_cells, p=[0.5, 0.5]) - disease = np.random.choice(["healthy", "diseased"], size=n_cells, p=[0.5, 0.5]) - rna_in.obs["treatment"] = treatment - rna_in.obs["disease"] = disease - return input_data - - -@pytest.fixture -def annotated_test_data_path(tmp_path, annotated_test_data): - temp_h5mu = tmp_path / "de_test_data.h5mu" - annotated_test_data.write_h5mu(temp_h5mu) - return temp_h5mu - - -def test_simple_execution(run_component, random_h5mu_path, annotated_test_data_path): +def test_simple_execution(run_component, random_h5mu_path): output_path = random_h5mu_path() run_component( [ "--input", - annotated_test_data_path, + input_path, "--output", output_path, "--obs_grouping", @@ -70,13 +41,13 @@ def test_simple_execution(run_component, random_h5mu_path, annotated_test_data_p assert adata.shape[0] == 8, "Expected a total of 8 pseudobulk samples in the output" -def test_multiple_factors(run_component, random_h5mu_path, annotated_test_data_path): +def test_multiple_factors(run_component, random_h5mu_path): output_path = random_h5mu_path() run_component( [ "--input", - annotated_test_data_path, + input_path, "--output", output_path, "--obs_grouping", @@ -107,13 +78,13 @@ def test_multiple_factors(run_component, random_h5mu_path, annotated_test_data_p ) -def test_pseudo_replicates(run_component, random_h5mu_path, annotated_test_data_path): +def test_pseudo_replicates(run_component, random_h5mu_path): output_path = random_h5mu_path() run_component( [ "--input", - annotated_test_data_path, + input_path, "--output", output_path, "--obs_grouping", @@ -140,13 +111,13 @@ def test_pseudo_replicates(run_component, random_h5mu_path, annotated_test_data_ ) -def test_filtering(run_component, random_h5mu_path, annotated_test_data_path): +def test_filtering(run_component, random_h5mu_path): output_path = random_h5mu_path() run_component( [ "--input", - annotated_test_data_path, + input_path, "--output", output_path, "--obs_grouping", From 3b1bf7b70c97781a8d3637b1cf36fa98c50541e9 Mon Sep 17 00:00:00 2001 From: dorien-er Date: Fri, 18 Jul 2025 14:52:22 +0200 Subject: [PATCH 10/13] address pr comments --- .../create_pseudobulk/config.vsh.yaml | 27 ++-- .../create_pseudobulk/script.py | 148 +++++------------- .../create_pseudobulk/test.py | 53 ++----- 3 files changed, 63 insertions(+), 165 deletions(-) diff --git a/src/differential_expression/create_pseudobulk/config.vsh.yaml b/src/differential_expression/create_pseudobulk/config.vsh.yaml index 548bbd86923..ce3694eef56 100644 --- a/src/differential_expression/create_pseudobulk/config.vsh.yaml +++ b/src/differential_expression/create_pseudobulk/config.vsh.yaml @@ -36,14 +36,14 @@ argument_groups: type: string required: false description: "Input layer to use. If None, X is used. This layer must contain raw counts." - - name: "--obs_grouping" + - name: "--obs_label" type: string required: true - description: ".obs field containing the grouping variable. Typically this field contains cell groups such as annotated cell types or clusters." - - name: "--obs_sample_conditions" + description: ".obs field containing the variable to group on. Typically this field contains cell groups such as annotated cell types or clusters." + - name: "--obs_groups" type: string multiple: true - description: ".obs fields containing the experimental condition(s) to create pseudobulk sampels for." + description: ".obs fields containing the experimental condition(s) to create pseudobulk samples for." - name: Arguments arguments: @@ -52,16 +52,11 @@ argument_groups: choices: ["sum", "mean"] default: "sum" description: "Method to aggregate the raw counts for pseudoreplicates. Either sum or mean." - - name: "--min_num_cells_per_sample" + - name: "--min_obs_per_sample" type: integer min: 1 default: 30 description: "Minimum number of cells per pseudobulk sample." - - name: "--pseudo_replicates" - type: integer - min: 1 - default: 1 - description: "Number of pseudoreplicates to create per pseudobulk sample." - name: --random_state type: integer description: | @@ -69,18 +64,18 @@ argument_groups: min: 0 default: 0 required: false + - name: "--obs_cell_count" + type: string + default: "n_cells" + description: "The .obs field to store the number of observations per pseudobulk sample." - name: Outputs arguments: - name: "--output" type: file - description: Output h5mu file containing the aggreagted pseudobulk samples. + description: Output h5mu file containing the aggregated pseudobulk samples. direction: output required: true - - name: "--obs_pseudo_bulk_samples" - type: string - default: "pb_sample" - description: "In which .obs slot to store the pseudobulk sample names." - name: "--output_compression" type: string description: | @@ -104,7 +99,7 @@ engines: image: python:3.12 setup: - type: python - __merge__: [/src/base/requirements/anndata_mudata.yaml] + __merge__: [/src/base/requirements/anndata_mudata.yaml, /src/base/requirements/scanpy.yaml ] __merge__: [ /src/base/requirements/python_test_setup.yaml, .] runners: diff --git a/src/differential_expression/create_pseudobulk/script.py b/src/differential_expression/create_pseudobulk/script.py index 8cf27f231f8..cb70218ee6e 100644 --- a/src/differential_expression/create_pseudobulk/script.py +++ b/src/differential_expression/create_pseudobulk/script.py @@ -1,9 +1,8 @@ -import random import numpy as np -import anndata as ad import mudata as mu -import logging +import pandas as pd import sys +import scanpy as sc import scipy.sparse as sp ## VIASH START @@ -11,13 +10,11 @@ "input": "resources_test/annotation_test_data/TS_Blood_filtered.h5mu", "modality": "rna", "input_layer": None, - "obs_grouping": "cell_type", - "obs_sample_conditions": ["treatment", "donor_id", "disease"], - # "obs_sample_conditions": ["donor_id", "treatment", "disease"], + "obs_label": "cell_type", + "obs_groups": ["treatment", "donor_id", "disease"], + "obs_cell_count": "n_cells", "obs_pseudo_bulk_samples": "pb_sample", - "aggregation_method": "sum", - "min_num_cells_per_sample": 5, - "pseudo_replicates": 1, + "min_obs_per_sample": 5, "random_state": 0, "output": "test.h5mu", "output_compression": "gzip", @@ -31,68 +28,6 @@ logger = setup_logger() -def aggregate_and_filter( - adata, - grouping_value, - group_key, - sample_key, - obs_to_keep, - aggregation_method="sum", - num_cells_per_donor=30, - pseudo_replicates=1, -): - # subset adata to the given grouping (cell/cluster) value - adata_cell_pop = adata[adata.obs[group_key] == grouping_value] - - pb_aggregated_data = [] - for i, pbs in enumerate( - pb_samples := adata_cell_pop.obs[sample_key].cat.categories - ): - logger.info(f"Processing pseudobulk sample {i + 1} out of {len(pb_samples)}...") - - pb_adata = adata_cell_pop[adata_cell_pop.obs[sample_key] == pbs] - - # check which pseudobulk sample to keep according to the number of cells - if pb_adata.shape[0] < num_cells_per_donor: - logging.info( - f"Number of cells for {pbs} is less than {num_cells_per_donor}, skipping pseudobulk aggregation..." - ) - continue - - # Create pseudo-replicates - logging.info( - f"Creating {pseudo_replicates} pseudo-replicates for pseudobulk sample {pbs}..." - ) - indices = list(pb_adata.obs_names) - np.random.seed(par["random_state"]) # optional seed for reproducibility - random.shuffle(indices) - indices = np.array_split(np.array(indices), pseudo_replicates) - - # Aggregate data for each pseudo-replicate - for i, rep_idx in enumerate(indices): - # Aggregate gene expression values - if aggregation_method == "sum": - pb_adata_rep = ad.AnnData( - X=pb_adata[rep_idx].X.sum(axis=0), var=pb_adata[rep_idx].var[[]] - ) - elif aggregation_method == "mean": - pb_adata_rep = ad.AnnData( - X=pb_adata[rep_idx].X.mean(axis=0), var=pb_adata[rep_idx].var[[]] - ) - - # Store metadata - pb_adata_rep.obs_names = [f"{grouping_value}_{pbs}_{str(i)}"] - pb_adata_rep.obs[group_key] = grouping_value - for obs in obs_to_keep: - pb_adata_rep.obs[obs] = pb_adata[rep_idx].obs[obs][0] - pb_adata_rep.obs["n_cells"] = len(pb_adata[rep_idx]) - pb_aggregated_data.append(pb_adata_rep) - - adata_pb = ad.concat(pb_aggregated_data) - - return adata_pb - - def is_normalized(layer): if sp.issparse(layer): row_sums = np.array(layer.sum(axis=1)).flatten() @@ -102,6 +37,21 @@ def is_normalized(layer): return np.allclose(row_sums, 1) +def count_obs(adata, pb_adata, obs_cols): + counts = [] + for i in range(pb_adata.n_obs): + values = pb_adata.obs.iloc[i][obs_cols] + + mask = pd.Series([True] * adata.n_obs) + for col in obs_cols: + mask &= (adata.obs[col] == values[col]).values + + count = mask.sum() + counts.append(count) + + return counts + + def main(): # Read in data logger.info(f"Reading input data {par['input']}...") @@ -113,56 +63,42 @@ def main(): if is_normalized(adata.X): raise ValueError("Input layer must contain raw counts.") - # Ensure al columns required for pseudobulk aggregation exist and are categorical - required_categorical_obs = par["obs_sample_conditions"] + [par["obs_grouping"]] - for col in required_categorical_obs: + # Sanitize pseudobulk aggregation fields + pseudobulk_cols = [par["obs_label"]] + if par["obs_groups"]: + pseudobulk_cols += par["obs_groups"] + + for col in pseudobulk_cols: if col not in adata.obs.columns: raise ValueError(f"Required column '{col}' not found in .obs.") adata.obs[col] = ( adata.obs[col] .astype(str) - .replace(" ", "_") - .replace("+", "") + .str.replace(" ", "_") + .str.replace("+", "") .astype("category") ) - # Create pseudobulk groups - logger.info("Creating pseudobulk groups...") - adata.obs[par["obs_pseudo_bulk_samples"]] = [ + # Aggregate pseudobulk data per cell group + logger.info("Creating pseudobulk samples...") + adata_pb = sc.get.aggregate(adata, by=pseudobulk_cols, func="sum", axis=0) + adata_pb.X = adata_pb.layers["sum"] + del adata_pb.layers["sum"] + + adata_pb.obs_names = [ "_".join(values) - for values in zip(*[adata.obs[col] for col in par["obs_sample_conditions"]]) + for values in zip(*[adata_pb.obs[col] for col in pseudobulk_cols]) ] - adata.obs[par["obs_pseudo_bulk_samples"]] = adata.obs[ - par["obs_pseudo_bulk_samples"] - ].astype("category") - logger.info( - f"Pseudobulk groups created: {adata.obs[par['obs_pseudo_bulk_samples']].unique().tolist()}" - ) - # Aggregate pseudobulk data per cell group - pb_datasets = [] - cell_groups = adata.obs[par["obs_grouping"]].unique() - - for group in cell_groups: - logger.info(f"Processing cell group {group}") - pb_data = aggregate_and_filter( - adata, - group, - par["obs_grouping"], - par["obs_pseudo_bulk_samples"], - par["obs_sample_conditions"], - aggregation_method=par["aggregation_method"], - num_cells_per_donor=par["min_num_cells_per_sample"], - pseudo_replicates=par["pseudo_replicates"], - ) - pb_datasets.append(pb_data) + # Filter pseudobulk samples based on minimum observation count + logger.info("Filtering pseudobulk samples based on minimum observation count...") + adata_pb.obs[par["obs_cell_count"]] = count_obs(adata, adata_pb, pseudobulk_cols) + adata_pb = adata_pb[adata_pb.obs[par["obs_cell_count"]] > par["min_obs_per_sample"]] - # Combine pseudobulk datasets - logger.info("Combining pseudobulk datasets...") - adata_pb = ad.concat(pb_datasets) logger.info( f"Final dataset: {adata_pb.n_obs} pseudobulk samples, {adata_pb.n_vars} genes" ) + logger.info("Writing output data...") mdata_pb = mu.MuData({"rna": adata_pb}) mdata_pb.write_h5mu(par["output"], compression=par["output_compression"]) diff --git a/src/differential_expression/create_pseudobulk/test.py b/src/differential_expression/create_pseudobulk/test.py index 90b3824b294..f40963ee553 100644 --- a/src/differential_expression/create_pseudobulk/test.py +++ b/src/differential_expression/create_pseudobulk/test.py @@ -21,9 +21,9 @@ def test_simple_execution(run_component, random_h5mu_path): input_path, "--output", output_path, - "--obs_grouping", + "--obs_label", "cell_type", - "--obs_sample_conditions", + "--obs_groups", "treatment", "--output_compression", "gzip", @@ -50,15 +50,15 @@ def test_multiple_factors(run_component, random_h5mu_path): input_path, "--output", output_path, - "--obs_grouping", + "--obs_label", "cell_type", - "--obs_sample_conditions", + "--obs_groups", "disease", - "--obs_sample_conditions", + "--obs_groups", "donor_id", - "--obs_sample_conditions", + "--obs_groups", "treatment", - "--min_num_cells_per_sample", + "--min_obs_per_sample", "5", "--output_compression", "gzip", @@ -78,39 +78,6 @@ def test_multiple_factors(run_component, random_h5mu_path): ) -def test_pseudo_replicates(run_component, random_h5mu_path): - output_path = random_h5mu_path() - - run_component( - [ - "--input", - input_path, - "--output", - output_path, - "--obs_grouping", - "cell_type", - "--obs_sample_conditions", - "treatment", - "--pseudo_replicates", - "2", - "--output_compression", - "gzip", - ] - ) - - assert os.path.exists(output_path), "Output query file does not exist" - mdata = mu.read_h5mu(output_path) - adata = mdata.mod["rna"] - - expected_obs = ["treatment", "cell_type", "n_cells"] - assert all(col in adata.obs for col in expected_obs), ( - f"Expected columns {expected_obs} not found in .obs" - ) - assert adata.shape[0] == 16, ( - "Expected a total of 8 pseudobulk samples in the output" - ) - - def test_filtering(run_component, random_h5mu_path): output_path = random_h5mu_path() @@ -120,11 +87,11 @@ def test_filtering(run_component, random_h5mu_path): input_path, "--output", output_path, - "--obs_grouping", + "--obs_label", "cell_type", - "--obs_sample_conditions", + "--obs_groups", "treatment", - "--min_num_cells_per_sample", + "--min_obs_per_sample", "50", "--output_compression", "gzip", From e1b38bc0a0e0974daa878442398b2aef9e92f51d Mon Sep 17 00:00:00 2001 From: dorien-er Date: Fri, 18 Jul 2025 14:53:30 +0200 Subject: [PATCH 11/13] address pr comments --- src/differential_expression/create_pseudobulk/script.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/differential_expression/create_pseudobulk/script.py b/src/differential_expression/create_pseudobulk/script.py index cb70218ee6e..a9af50f6d34 100644 --- a/src/differential_expression/create_pseudobulk/script.py +++ b/src/differential_expression/create_pseudobulk/script.py @@ -13,7 +13,6 @@ "obs_label": "cell_type", "obs_groups": ["treatment", "donor_id", "disease"], "obs_cell_count": "n_cells", - "obs_pseudo_bulk_samples": "pb_sample", "min_obs_per_sample": 5, "random_state": 0, "output": "test.h5mu", From 737910b488026531c425ed5c20f51f006de5b34f Mon Sep 17 00:00:00 2001 From: dorien-er Date: Fri, 18 Jul 2025 15:00:44 +0200 Subject: [PATCH 12/13] add changelog entry --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index c103557f235..487f20e6704 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,10 @@ * `liana`: enabled jobs to be run in parallel and added two new arguments: `consensus_opts`, `de_method` (PR #1039) +* (Experimental) `differential_expression/create_pseudobulk`: Added a component to generate pseudobulk samples from single-cell transcriptomics data, + to create bulk-like expression profiles suitable for differential expression analysis with methods designed for bulk differential expression analysis. + Warning: the functionality in this component is experimental and its behavior may change in future releases (PR #1042). + ## MAJOR CHANGES * `mapping/cellranger_*`: Upgrade CellRanger to v9.0 (PR #992 and #1006). From cb3f7a37aa1f01312ee059c9c5fd80982864bf54 Mon Sep 17 00:00:00 2001 From: dorien-er Date: Thu, 24 Jul 2025 11:15:52 +0200 Subject: [PATCH 13/13] update description --- src/differential_expression/create_pseudobulk/config.vsh.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/differential_expression/create_pseudobulk/config.vsh.yaml b/src/differential_expression/create_pseudobulk/config.vsh.yaml index ce3694eef56..aa90b260be1 100644 --- a/src/differential_expression/create_pseudobulk/config.vsh.yaml +++ b/src/differential_expression/create_pseudobulk/config.vsh.yaml @@ -6,6 +6,8 @@ description: | by aggregating raw gene expression counts from individual cells to create bulk-like expression profiles suitable for differential expression analysis with methods designed for bulk differential expression analysis. + Note that this componentonly considers factors as explanatory variables, + and excludes covariates from the analysis. authors: - __merge__: /src/authors/dorien_roosen.yaml