diff --git a/CHANGELOG.md b/CHANGELOG.md index 50ed84c5066..a7dfc3e2600 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,6 +36,8 @@ ## BUG FIXES +* `dataflow/concatenate_h5mu` and `dataflow/concat`: Fix `TypeError` when using mode 'move' and a column with conflicting metadata does not exist across all samples (PR #631). + * `dataflow/concatenate_h5mu` and `dataflow/concat`: Fix an issue where joining columns with different datatypes caused `TypeError` (PR #619). # openpipelines 0.12.1 diff --git a/src/dataflow/concat/script.py b/src/dataflow/concat/script.py index 152e692bb36..ca978ed96d4 100644 --- a/src/dataflow/concat/script.py +++ b/src/dataflow/concat/script.py @@ -121,20 +121,19 @@ def any_row_contains_duplicate_values(n_processes: int, frame: pd.DataFrame) -> is_duplicated = pool.map(nunique, iter(numpy_array)) return any(is_duplicated) -def concatenate_matrices(n_processes: int, input_ids: tuple[str], matrices: Iterable[pd.DataFrame]) \ +def concatenate_matrices(n_processes: int, matrices: dict[str, pd.DataFrame]) \ -> tuple[dict[str, pd.DataFrame], pd.DataFrame | None, dict[str, pd.core.dtypes.dtypes.Dtype]]: """ Merge matrices by combining columns that have the same name. Columns that contain conflicting values (e.i. the columns have different values), are not merged, but instead moved to a new dataframe. """ - column_names = set(column_name for var in matrices for column_name in var) + column_names = set(column_name for var in matrices.values() for column_name in var) logger.debug('Trying to concatenate columns: %s.', ",".join(column_names)) if not column_names: return {}, None conflicts, concatenated_matrix = \ split_conflicts_and_concatenated_columns(n_processes, - input_ids, matrices, column_names) concatenated_matrix = cast_to_writeable_dtype(concatenated_matrix) @@ -150,8 +149,7 @@ def get_first_non_na_value_vector(df): return pd.Series(numpy_arr.ravel()[flat_index], index=df.index, name=df.columns[0]) def split_conflicts_and_concatenated_columns(n_processes: int, - input_ids: tuple[str], - matrices: Iterable[pd.DataFrame], + matrices: dict[str, pd.DataFrame], column_names: Iterable[str]) -> \ tuple[dict[str, pd.DataFrame], pd.DataFrame]: """ @@ -164,11 +162,13 @@ def split_conflicts_and_concatenated_columns(n_processes: int, conflicts = {} concatenated_matrix = [] for column_name in column_names: - columns = [var[column_name] for var in matrices if column_name in var] + columns = {input_id: var[column_name] + for input_id, var in matrices.items() + if column_name in var} assert columns, "Some columns should have been found." - concatenated_columns = pd.concat(columns, axis=1, join="outer") + concatenated_columns = pd.concat(columns.values(), axis=1, join="outer") if any_row_contains_duplicate_values(n_processes, concatenated_columns): - concatenated_columns.columns = input_ids + concatenated_columns.columns = columns.keys() # Use the sample id as column name conflicts[f'conflict_{column_name}'] = concatenated_columns else: unique_values = get_first_non_na_value_vector(concatenated_columns) @@ -203,7 +203,7 @@ def cast_to_writeable_dtype(result: pd.DataFrame) -> pd.DataFrame: result[obj_col] = result[obj_col].where(result[obj_col].isna(), result[obj_col].astype(str)).astype('category') return result -def split_conflicts_modalities(n_processes: int, input_ids: tuple[str], samples: Iterable[anndata.AnnData], output: anndata.AnnData) \ +def split_conflicts_modalities(n_processes: int, samples: dict[str, anndata.AnnData], output: anndata.AnnData) \ -> anndata.AnnData: """ Merge .var and .obs matrices of the anndata objects. Columns are merged @@ -213,8 +213,8 @@ def split_conflicts_modalities(n_processes: int, input_ids: tuple[str], samples: """ matrices_to_parse = ("var", "obs") for matrix_name in matrices_to_parse: - matrices = [getattr(sample, matrix_name) for sample in samples] - conflicts, concatenated_matrix = concatenate_matrices(n_processes, input_ids, matrices) + matrices = {sample_id: getattr(sample, matrix_name) for sample_id, sample in samples.items()} + conflicts, concatenated_matrix = concatenate_matrices(n_processes, matrices) # Write the conflicts to the output matrix_index = getattr(output, matrix_name).index @@ -235,20 +235,20 @@ def concatenate_modality(n_processes: int, mod: str, input_files: Iterable[str | } other_axis_mode_to_apply = concat_modes.get(other_axis_mode, other_axis_mode) - mod_data = [] - for input_file in input_files: + mod_data = {} + for input_id, input_file in zip(input_ids, input_files): try: - mod_data.append(mu.read_h5ad(input_file, mod=mod)) + mod_data[input_id] = mu.read_h5ad(input_file, mod=mod) except KeyError as e: # Modality does not exist for this sample, skip it if f"Unable to open object '{mod}' doesn't exist" not in str(e): raise e pass - check_observations_unique(mod_data) + check_observations_unique(mod_data.values()) - concatenated_data = anndata.concat(mod_data, join='outer', merge=other_axis_mode_to_apply) + concatenated_data = anndata.concat(mod_data.values(), join='outer', merge=other_axis_mode_to_apply) if other_axis_mode == "move": - concatenated_data = split_conflicts_modalities(n_processes, input_ids, mod_data, concatenated_data) + concatenated_data = split_conflicts_modalities(n_processes, mod_data, concatenated_data) return concatenated_data diff --git a/src/dataflow/concat/test.py b/src/dataflow/concat/test.py index fbe3d1f708f..175aed83eae 100644 --- a/src/dataflow/concat/test.py +++ b/src/dataflow/concat/test.py @@ -1,5 +1,4 @@ import mudata as md -import anndata as ad import subprocess from pathlib import Path import pandas as pd @@ -7,6 +6,7 @@ import pytest import re import sys +import uuid ## VIASH START meta = { @@ -45,13 +45,22 @@ def mudata_without_genome(tmp_path, request): result.append(new_path) return result + +@pytest.fixture +def make_obs_names_unique(): + def wrapper(mudata): + for mod_data in mudata.mod.values(): + mod_data.obs.index = mod_data.obs.index.map( + lambda index_val: uuid.uuid4().hex + index_val + ) + return wrapper + @pytest.fixture -def mudata_copy_with_unique_obs(request): +def mudata_copy_with_unique_obs(request, make_obs_names_unique): mudata_to_copy = request.param mudata_contents = md.read(mudata_to_copy) copied_contents = mudata_contents.copy() - for mod_data in copied_contents.mod.values(): - mod_data.obs.index = "make_unique_" + mod_data.obs.index + make_obs_names_unique(copied_contents) return mudata_contents, copied_contents @pytest.fixture @@ -311,6 +320,34 @@ def test_concat_dtypes(run_component, copied_mudata_with_extra_annotation_column concatenated_data = md.read("concat.h5mu") concatenated_data.mod['atac'].obs['test'].dtype == expected +@pytest.mark.parametrize("extra_column_annotation_matrix", ["var"]) +@pytest.mark.parametrize("extra_column_value_sample1,extra_column_value_sample2", [("2", "1")]) +@pytest.mark.parametrize("mudata_copy_with_unique_obs", + [input_sample1_file], + indirect=["mudata_copy_with_unique_obs"]) +def test_resolve_annotation_conflict_missing_column(run_component, copied_mudata_with_extra_annotation_column, make_obs_names_unique, tmp_path): + """ + Test using mode 'move' and resolving a conflict in metadata between the samples, + but the metadata column is missing in one of the samples. + """ + tempfile_input1, tempfile_input2 = copied_mudata_with_extra_annotation_column + original_data = md.read_h5mu(input_sample1_file) + make_obs_names_unique(original_data) + original_data_path = tmp_path / f"{uuid.uuid4().hex}.h5mu" + original_data.write_h5mu(original_data_path) + run_component([ + "--input_id", "mouse,human,sample_without_column", + "--input", tempfile_input1, + "--input", tempfile_input2, + "--input", original_data_path, + "--output", "concat.h5mu", + "--other_axis_mode", "move" + ]) + concatenated_data = md.read("concat.h5mu") + assert 'test' not in concatenated_data.mod['atac'].var.columns + assert 'test' not in concatenated_data.mod['atac'].obs.columns + assert 'conflict_test' in concatenated_data.mod['atac'].varm + def test_mode_move(run_component, tmp_path): tempfile_input1 = tmp_path / "input1.h5mu" tempfile_input2 = tmp_path / "input2.h5mu" diff --git a/src/dataflow/concatenate_h5mu/script.py b/src/dataflow/concatenate_h5mu/script.py index 152e692bb36..ca978ed96d4 100644 --- a/src/dataflow/concatenate_h5mu/script.py +++ b/src/dataflow/concatenate_h5mu/script.py @@ -121,20 +121,19 @@ def any_row_contains_duplicate_values(n_processes: int, frame: pd.DataFrame) -> is_duplicated = pool.map(nunique, iter(numpy_array)) return any(is_duplicated) -def concatenate_matrices(n_processes: int, input_ids: tuple[str], matrices: Iterable[pd.DataFrame]) \ +def concatenate_matrices(n_processes: int, matrices: dict[str, pd.DataFrame]) \ -> tuple[dict[str, pd.DataFrame], pd.DataFrame | None, dict[str, pd.core.dtypes.dtypes.Dtype]]: """ Merge matrices by combining columns that have the same name. Columns that contain conflicting values (e.i. the columns have different values), are not merged, but instead moved to a new dataframe. """ - column_names = set(column_name for var in matrices for column_name in var) + column_names = set(column_name for var in matrices.values() for column_name in var) logger.debug('Trying to concatenate columns: %s.', ",".join(column_names)) if not column_names: return {}, None conflicts, concatenated_matrix = \ split_conflicts_and_concatenated_columns(n_processes, - input_ids, matrices, column_names) concatenated_matrix = cast_to_writeable_dtype(concatenated_matrix) @@ -150,8 +149,7 @@ def get_first_non_na_value_vector(df): return pd.Series(numpy_arr.ravel()[flat_index], index=df.index, name=df.columns[0]) def split_conflicts_and_concatenated_columns(n_processes: int, - input_ids: tuple[str], - matrices: Iterable[pd.DataFrame], + matrices: dict[str, pd.DataFrame], column_names: Iterable[str]) -> \ tuple[dict[str, pd.DataFrame], pd.DataFrame]: """ @@ -164,11 +162,13 @@ def split_conflicts_and_concatenated_columns(n_processes: int, conflicts = {} concatenated_matrix = [] for column_name in column_names: - columns = [var[column_name] for var in matrices if column_name in var] + columns = {input_id: var[column_name] + for input_id, var in matrices.items() + if column_name in var} assert columns, "Some columns should have been found." - concatenated_columns = pd.concat(columns, axis=1, join="outer") + concatenated_columns = pd.concat(columns.values(), axis=1, join="outer") if any_row_contains_duplicate_values(n_processes, concatenated_columns): - concatenated_columns.columns = input_ids + concatenated_columns.columns = columns.keys() # Use the sample id as column name conflicts[f'conflict_{column_name}'] = concatenated_columns else: unique_values = get_first_non_na_value_vector(concatenated_columns) @@ -203,7 +203,7 @@ def cast_to_writeable_dtype(result: pd.DataFrame) -> pd.DataFrame: result[obj_col] = result[obj_col].where(result[obj_col].isna(), result[obj_col].astype(str)).astype('category') return result -def split_conflicts_modalities(n_processes: int, input_ids: tuple[str], samples: Iterable[anndata.AnnData], output: anndata.AnnData) \ +def split_conflicts_modalities(n_processes: int, samples: dict[str, anndata.AnnData], output: anndata.AnnData) \ -> anndata.AnnData: """ Merge .var and .obs matrices of the anndata objects. Columns are merged @@ -213,8 +213,8 @@ def split_conflicts_modalities(n_processes: int, input_ids: tuple[str], samples: """ matrices_to_parse = ("var", "obs") for matrix_name in matrices_to_parse: - matrices = [getattr(sample, matrix_name) for sample in samples] - conflicts, concatenated_matrix = concatenate_matrices(n_processes, input_ids, matrices) + matrices = {sample_id: getattr(sample, matrix_name) for sample_id, sample in samples.items()} + conflicts, concatenated_matrix = concatenate_matrices(n_processes, matrices) # Write the conflicts to the output matrix_index = getattr(output, matrix_name).index @@ -235,20 +235,20 @@ def concatenate_modality(n_processes: int, mod: str, input_files: Iterable[str | } other_axis_mode_to_apply = concat_modes.get(other_axis_mode, other_axis_mode) - mod_data = [] - for input_file in input_files: + mod_data = {} + for input_id, input_file in zip(input_ids, input_files): try: - mod_data.append(mu.read_h5ad(input_file, mod=mod)) + mod_data[input_id] = mu.read_h5ad(input_file, mod=mod) except KeyError as e: # Modality does not exist for this sample, skip it if f"Unable to open object '{mod}' doesn't exist" not in str(e): raise e pass - check_observations_unique(mod_data) + check_observations_unique(mod_data.values()) - concatenated_data = anndata.concat(mod_data, join='outer', merge=other_axis_mode_to_apply) + concatenated_data = anndata.concat(mod_data.values(), join='outer', merge=other_axis_mode_to_apply) if other_axis_mode == "move": - concatenated_data = split_conflicts_modalities(n_processes, input_ids, mod_data, concatenated_data) + concatenated_data = split_conflicts_modalities(n_processes, mod_data, concatenated_data) return concatenated_data diff --git a/src/dataflow/concatenate_h5mu/test.py b/src/dataflow/concatenate_h5mu/test.py index eb07be329a8..17cef8e575c 100644 --- a/src/dataflow/concatenate_h5mu/test.py +++ b/src/dataflow/concatenate_h5mu/test.py @@ -6,6 +6,7 @@ import pytest import re import sys +import uuid ## VIASH START meta = { @@ -44,13 +45,22 @@ def mudata_without_genome(tmp_path, request): result.append(new_path) return result + +@pytest.fixture +def make_obs_names_unique(): + def wrapper(mudata): + for mod_data in mudata.mod.values(): + mod_data.obs.index = mod_data.obs.index.map( + lambda index_val: uuid.uuid4().hex + index_val + ) + return wrapper + @pytest.fixture -def mudata_copy_with_unique_obs(request): +def mudata_copy_with_unique_obs(request, make_obs_names_unique): mudata_to_copy = request.param mudata_contents = md.read(mudata_to_copy) copied_contents = mudata_contents.copy() - for mod_data in copied_contents.mod.values(): - mod_data.obs.index = "make_unique_" + mod_data.obs.index + make_obs_names_unique(copied_contents) return mudata_contents, copied_contents @pytest.fixture @@ -310,6 +320,33 @@ def test_concat_dtypes(run_component, copied_mudata_with_extra_annotation_column concatenated_data = md.read("concat.h5mu") concatenated_data.mod['atac'].obs['test'].dtype == expected +@pytest.mark.parametrize("extra_column_annotation_matrix", ["var"]) +@pytest.mark.parametrize("extra_column_value_sample1,extra_column_value_sample2", [("2", "1")]) +@pytest.mark.parametrize("mudata_copy_with_unique_obs", + [input_sample1_file], + indirect=["mudata_copy_with_unique_obs"]) +def test_resolve_annotation_conflict_missing_column(run_component, copied_mudata_with_extra_annotation_column, make_obs_names_unique, tmp_path): + """ + Test using mode 'move' and resolving a conflict in metadata between the samples, + but the metadata column is missing in one of the samples. + """ + tempfile_input1, tempfile_input2 = copied_mudata_with_extra_annotation_column + original_data = md.read_h5mu(input_sample1_file) + make_obs_names_unique(original_data) + original_data_path = tmp_path / f"{uuid.uuid4().hex}.h5mu" + original_data.write_h5mu(original_data_path) + run_component([ + "--input_id", "mouse,human,sample_without_column", + "--input", tempfile_input1, + "--input", tempfile_input2, + "--input", original_data_path, + "--output", "concat.h5mu", + "--other_axis_mode", "move" + ]) + concatenated_data = md.read("concat.h5mu") + assert 'test' not in concatenated_data.mod['atac'].var.columns + assert 'test' not in concatenated_data.mod['atac'].obs.columns + assert 'conflict_test' in concatenated_data.mod['atac'].varm def test_mode_move(run_component, tmp_path): tempfile_input1 = tmp_path / "input1.h5mu"