Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@

## BUG FIXES

* `dataflow/concatenate_h5mu` and `dataflow/concat`: Fix `TypeError` when using mode 'move' and a column with conflicting metadata does not exist across all samples (PR #631).

* `dataflow/concatenate_h5mu` and `dataflow/concat`: Fix an issue where joining columns with different datatypes caused `TypeError` (PR #619).

# openpipelines 0.12.1
Expand Down
34 changes: 17 additions & 17 deletions src/dataflow/concat/script.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,20 +121,19 @@ def any_row_contains_duplicate_values(n_processes: int, frame: pd.DataFrame) ->
is_duplicated = pool.map(nunique, iter(numpy_array))
return any(is_duplicated)

def concatenate_matrices(n_processes: int, input_ids: tuple[str], matrices: Iterable[pd.DataFrame]) \
def concatenate_matrices(n_processes: int, matrices: dict[str, pd.DataFrame]) \
-> tuple[dict[str, pd.DataFrame], pd.DataFrame | None, dict[str, pd.core.dtypes.dtypes.Dtype]]:
"""
Merge matrices by combining columns that have the same name.
Columns that contain conflicting values (e.i. the columns have different values),
are not merged, but instead moved to a new dataframe.
"""
column_names = set(column_name for var in matrices for column_name in var)
column_names = set(column_name for var in matrices.values() for column_name in var)
logger.debug('Trying to concatenate columns: %s.', ",".join(column_names))
if not column_names:
return {}, None
conflicts, concatenated_matrix = \
split_conflicts_and_concatenated_columns(n_processes,
input_ids,
matrices,
column_names)
concatenated_matrix = cast_to_writeable_dtype(concatenated_matrix)
Expand All @@ -150,8 +149,7 @@ def get_first_non_na_value_vector(df):
return pd.Series(numpy_arr.ravel()[flat_index], index=df.index, name=df.columns[0])

def split_conflicts_and_concatenated_columns(n_processes: int,
input_ids: tuple[str],
matrices: Iterable[pd.DataFrame],
matrices: dict[str, pd.DataFrame],
column_names: Iterable[str]) -> \
tuple[dict[str, pd.DataFrame], pd.DataFrame]:
"""
Expand All @@ -164,11 +162,13 @@ def split_conflicts_and_concatenated_columns(n_processes: int,
conflicts = {}
concatenated_matrix = []
for column_name in column_names:
columns = [var[column_name] for var in matrices if column_name in var]
columns = {input_id: var[column_name]
for input_id, var in matrices.items()
if column_name in var}
assert columns, "Some columns should have been found."
concatenated_columns = pd.concat(columns, axis=1, join="outer")
concatenated_columns = pd.concat(columns.values(), axis=1, join="outer")
if any_row_contains_duplicate_values(n_processes, concatenated_columns):
concatenated_columns.columns = input_ids
concatenated_columns.columns = columns.keys() # Use the sample id as column name
conflicts[f'conflict_{column_name}'] = concatenated_columns
else:
unique_values = get_first_non_na_value_vector(concatenated_columns)
Expand Down Expand Up @@ -203,7 +203,7 @@ def cast_to_writeable_dtype(result: pd.DataFrame) -> pd.DataFrame:
result[obj_col] = result[obj_col].where(result[obj_col].isna(), result[obj_col].astype(str)).astype('category')
return result

def split_conflicts_modalities(n_processes: int, input_ids: tuple[str], samples: Iterable[anndata.AnnData], output: anndata.AnnData) \
def split_conflicts_modalities(n_processes: int, samples: dict[str, anndata.AnnData], output: anndata.AnnData) \
-> anndata.AnnData:
"""
Merge .var and .obs matrices of the anndata objects. Columns are merged
Expand All @@ -213,8 +213,8 @@ def split_conflicts_modalities(n_processes: int, input_ids: tuple[str], samples:
"""
matrices_to_parse = ("var", "obs")
for matrix_name in matrices_to_parse:
matrices = [getattr(sample, matrix_name) for sample in samples]
conflicts, concatenated_matrix = concatenate_matrices(n_processes, input_ids, matrices)
matrices = {sample_id: getattr(sample, matrix_name) for sample_id, sample in samples.items()}
conflicts, concatenated_matrix = concatenate_matrices(n_processes, matrices)

# Write the conflicts to the output
matrix_index = getattr(output, matrix_name).index
Expand All @@ -235,20 +235,20 @@ def concatenate_modality(n_processes: int, mod: str, input_files: Iterable[str |
}
other_axis_mode_to_apply = concat_modes.get(other_axis_mode, other_axis_mode)

mod_data = []
for input_file in input_files:
mod_data = {}
for input_id, input_file in zip(input_ids, input_files):
try:
mod_data.append(mu.read_h5ad(input_file, mod=mod))
mod_data[input_id] = mu.read_h5ad(input_file, mod=mod)
except KeyError as e: # Modality does not exist for this sample, skip it
if f"Unable to open object '{mod}' doesn't exist" not in str(e):
raise e
pass
check_observations_unique(mod_data)
check_observations_unique(mod_data.values())

concatenated_data = anndata.concat(mod_data, join='outer', merge=other_axis_mode_to_apply)
concatenated_data = anndata.concat(mod_data.values(), join='outer', merge=other_axis_mode_to_apply)

if other_axis_mode == "move":
concatenated_data = split_conflicts_modalities(n_processes, input_ids, mod_data, concatenated_data)
concatenated_data = split_conflicts_modalities(n_processes, mod_data, concatenated_data)

return concatenated_data

Expand Down
45 changes: 41 additions & 4 deletions src/dataflow/concat/test.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import mudata as md
import anndata as ad
import subprocess
from pathlib import Path
import pandas as pd
import numpy as np
import pytest
import re
import sys
import uuid

## VIASH START
meta = {
Expand Down Expand Up @@ -45,13 +45,22 @@ def mudata_without_genome(tmp_path, request):
result.append(new_path)
return result


@pytest.fixture
def make_obs_names_unique():
def wrapper(mudata):
for mod_data in mudata.mod.values():
mod_data.obs.index = mod_data.obs.index.map(
lambda index_val: uuid.uuid4().hex + index_val
)
return wrapper

@pytest.fixture
def mudata_copy_with_unique_obs(request):
def mudata_copy_with_unique_obs(request, make_obs_names_unique):
mudata_to_copy = request.param
mudata_contents = md.read(mudata_to_copy)
copied_contents = mudata_contents.copy()
for mod_data in copied_contents.mod.values():
mod_data.obs.index = "make_unique_" + mod_data.obs.index
make_obs_names_unique(copied_contents)
return mudata_contents, copied_contents

@pytest.fixture
Expand Down Expand Up @@ -311,6 +320,34 @@ def test_concat_dtypes(run_component, copied_mudata_with_extra_annotation_column
concatenated_data = md.read("concat.h5mu")
concatenated_data.mod['atac'].obs['test'].dtype == expected

@pytest.mark.parametrize("extra_column_annotation_matrix", ["var"])
@pytest.mark.parametrize("extra_column_value_sample1,extra_column_value_sample2", [("2", "1")])
@pytest.mark.parametrize("mudata_copy_with_unique_obs",
[input_sample1_file],
indirect=["mudata_copy_with_unique_obs"])
def test_resolve_annotation_conflict_missing_column(run_component, copied_mudata_with_extra_annotation_column, make_obs_names_unique, tmp_path):
"""
Test using mode 'move' and resolving a conflict in metadata between the samples,
but the metadata column is missing in one of the samples.
"""
tempfile_input1, tempfile_input2 = copied_mudata_with_extra_annotation_column
original_data = md.read_h5mu(input_sample1_file)
make_obs_names_unique(original_data)
original_data_path = tmp_path / f"{uuid.uuid4().hex}.h5mu"
original_data.write_h5mu(original_data_path)
run_component([
"--input_id", "mouse,human,sample_without_column",
"--input", tempfile_input1,
"--input", tempfile_input2,
"--input", original_data_path,
"--output", "concat.h5mu",
"--other_axis_mode", "move"
])
concatenated_data = md.read("concat.h5mu")
assert 'test' not in concatenated_data.mod['atac'].var.columns
assert 'test' not in concatenated_data.mod['atac'].obs.columns
assert 'conflict_test' in concatenated_data.mod['atac'].varm

def test_mode_move(run_component, tmp_path):
tempfile_input1 = tmp_path / "input1.h5mu"
tempfile_input2 = tmp_path / "input2.h5mu"
Expand Down
34 changes: 17 additions & 17 deletions src/dataflow/concatenate_h5mu/script.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,20 +121,19 @@ def any_row_contains_duplicate_values(n_processes: int, frame: pd.DataFrame) ->
is_duplicated = pool.map(nunique, iter(numpy_array))
return any(is_duplicated)

def concatenate_matrices(n_processes: int, input_ids: tuple[str], matrices: Iterable[pd.DataFrame]) \
def concatenate_matrices(n_processes: int, matrices: dict[str, pd.DataFrame]) \
-> tuple[dict[str, pd.DataFrame], pd.DataFrame | None, dict[str, pd.core.dtypes.dtypes.Dtype]]:
"""
Merge matrices by combining columns that have the same name.
Columns that contain conflicting values (e.i. the columns have different values),
are not merged, but instead moved to a new dataframe.
"""
column_names = set(column_name for var in matrices for column_name in var)
column_names = set(column_name for var in matrices.values() for column_name in var)
logger.debug('Trying to concatenate columns: %s.', ",".join(column_names))
if not column_names:
return {}, None
conflicts, concatenated_matrix = \
split_conflicts_and_concatenated_columns(n_processes,
input_ids,
matrices,
column_names)
concatenated_matrix = cast_to_writeable_dtype(concatenated_matrix)
Expand All @@ -150,8 +149,7 @@ def get_first_non_na_value_vector(df):
return pd.Series(numpy_arr.ravel()[flat_index], index=df.index, name=df.columns[0])

def split_conflicts_and_concatenated_columns(n_processes: int,
input_ids: tuple[str],
matrices: Iterable[pd.DataFrame],
matrices: dict[str, pd.DataFrame],
column_names: Iterable[str]) -> \
tuple[dict[str, pd.DataFrame], pd.DataFrame]:
"""
Expand All @@ -164,11 +162,13 @@ def split_conflicts_and_concatenated_columns(n_processes: int,
conflicts = {}
concatenated_matrix = []
for column_name in column_names:
columns = [var[column_name] for var in matrices if column_name in var]
columns = {input_id: var[column_name]
for input_id, var in matrices.items()
if column_name in var}
assert columns, "Some columns should have been found."
concatenated_columns = pd.concat(columns, axis=1, join="outer")
concatenated_columns = pd.concat(columns.values(), axis=1, join="outer")
if any_row_contains_duplicate_values(n_processes, concatenated_columns):
concatenated_columns.columns = input_ids
concatenated_columns.columns = columns.keys() # Use the sample id as column name
conflicts[f'conflict_{column_name}'] = concatenated_columns
else:
unique_values = get_first_non_na_value_vector(concatenated_columns)
Expand Down Expand Up @@ -203,7 +203,7 @@ def cast_to_writeable_dtype(result: pd.DataFrame) -> pd.DataFrame:
result[obj_col] = result[obj_col].where(result[obj_col].isna(), result[obj_col].astype(str)).astype('category')
return result

def split_conflicts_modalities(n_processes: int, input_ids: tuple[str], samples: Iterable[anndata.AnnData], output: anndata.AnnData) \
def split_conflicts_modalities(n_processes: int, samples: dict[str, anndata.AnnData], output: anndata.AnnData) \
-> anndata.AnnData:
"""
Merge .var and .obs matrices of the anndata objects. Columns are merged
Expand All @@ -213,8 +213,8 @@ def split_conflicts_modalities(n_processes: int, input_ids: tuple[str], samples:
"""
matrices_to_parse = ("var", "obs")
for matrix_name in matrices_to_parse:
matrices = [getattr(sample, matrix_name) for sample in samples]
conflicts, concatenated_matrix = concatenate_matrices(n_processes, input_ids, matrices)
matrices = {sample_id: getattr(sample, matrix_name) for sample_id, sample in samples.items()}
conflicts, concatenated_matrix = concatenate_matrices(n_processes, matrices)

# Write the conflicts to the output
matrix_index = getattr(output, matrix_name).index
Expand All @@ -235,20 +235,20 @@ def concatenate_modality(n_processes: int, mod: str, input_files: Iterable[str |
}
other_axis_mode_to_apply = concat_modes.get(other_axis_mode, other_axis_mode)

mod_data = []
for input_file in input_files:
mod_data = {}
for input_id, input_file in zip(input_ids, input_files):
try:
mod_data.append(mu.read_h5ad(input_file, mod=mod))
mod_data[input_id] = mu.read_h5ad(input_file, mod=mod)
except KeyError as e: # Modality does not exist for this sample, skip it
if f"Unable to open object '{mod}' doesn't exist" not in str(e):
raise e
pass
check_observations_unique(mod_data)
check_observations_unique(mod_data.values())

concatenated_data = anndata.concat(mod_data, join='outer', merge=other_axis_mode_to_apply)
concatenated_data = anndata.concat(mod_data.values(), join='outer', merge=other_axis_mode_to_apply)

if other_axis_mode == "move":
concatenated_data = split_conflicts_modalities(n_processes, input_ids, mod_data, concatenated_data)
concatenated_data = split_conflicts_modalities(n_processes, mod_data, concatenated_data)

return concatenated_data

Expand Down
43 changes: 40 additions & 3 deletions src/dataflow/concatenate_h5mu/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import pytest
import re
import sys
import uuid

## VIASH START
meta = {
Expand Down Expand Up @@ -44,13 +45,22 @@ def mudata_without_genome(tmp_path, request):
result.append(new_path)
return result


@pytest.fixture
def make_obs_names_unique():
def wrapper(mudata):
for mod_data in mudata.mod.values():
mod_data.obs.index = mod_data.obs.index.map(
lambda index_val: uuid.uuid4().hex + index_val
)
return wrapper

@pytest.fixture
def mudata_copy_with_unique_obs(request):
def mudata_copy_with_unique_obs(request, make_obs_names_unique):
mudata_to_copy = request.param
mudata_contents = md.read(mudata_to_copy)
copied_contents = mudata_contents.copy()
for mod_data in copied_contents.mod.values():
mod_data.obs.index = "make_unique_" + mod_data.obs.index
make_obs_names_unique(copied_contents)
return mudata_contents, copied_contents

@pytest.fixture
Expand Down Expand Up @@ -310,6 +320,33 @@ def test_concat_dtypes(run_component, copied_mudata_with_extra_annotation_column
concatenated_data = md.read("concat.h5mu")
concatenated_data.mod['atac'].obs['test'].dtype == expected

@pytest.mark.parametrize("extra_column_annotation_matrix", ["var"])
@pytest.mark.parametrize("extra_column_value_sample1,extra_column_value_sample2", [("2", "1")])
@pytest.mark.parametrize("mudata_copy_with_unique_obs",
[input_sample1_file],
indirect=["mudata_copy_with_unique_obs"])
def test_resolve_annotation_conflict_missing_column(run_component, copied_mudata_with_extra_annotation_column, make_obs_names_unique, tmp_path):
"""
Test using mode 'move' and resolving a conflict in metadata between the samples,
but the metadata column is missing in one of the samples.
"""
tempfile_input1, tempfile_input2 = copied_mudata_with_extra_annotation_column
original_data = md.read_h5mu(input_sample1_file)
make_obs_names_unique(original_data)
original_data_path = tmp_path / f"{uuid.uuid4().hex}.h5mu"
original_data.write_h5mu(original_data_path)
run_component([
"--input_id", "mouse,human,sample_without_column",
"--input", tempfile_input1,
"--input", tempfile_input2,
"--input", original_data_path,
"--output", "concat.h5mu",
"--other_axis_mode", "move"
])
concatenated_data = md.read("concat.h5mu")
assert 'test' not in concatenated_data.mod['atac'].var.columns
assert 'test' not in concatenated_data.mod['atac'].obs.columns
assert 'conflict_test' in concatenated_data.mod['atac'].varm

def test_mode_move(run_component, tmp_path):
tempfile_input1 = tmp_path / "input1.h5mu"
Expand Down