Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ Warning: These experimental features are subject to change in future releases.

* Added `annotate/singler`: Cell type annotation using SingleR (PR #1051).

* Added `tiledb/move_mudata_obsm_to_tiledb` (PR #1065).

## MAJOR CHANGES

* `mapping/cellranger_*`: Upgrade CellRanger to v9.0 (PR #992 and #1006).
Expand Down
81 changes: 81 additions & 0 deletions src/tiledb/move_mudata_obsm_to_tiledb/config.vsh.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
name: move_mudata_obsm_to_tiledb
namespace: tiledb
scope: "private"
description: |
Move .obsm items from a MuData modality to an existing tileDB database.
The .obsm keys should not exist in the database yet; and the observations from the modality and
their order should match with what is already present the tiledb database.
authors:
- __merge__: /src/authors/dries_schaumont.yaml
roles: [ author, maintainer ]
argument_groups:
- name: Input database
description: "Open a tileDB-SOMA database by URI."
arguments:
- name: "--input_uri"
type: string
description: "A URI pointing to a TileDB-SOMA database."
required: true
example: "s3://bucket/path"
- name: "--s3_region"
description: |
Region where the TileDB-SOMA database is hosted.
type: string
required: false
- name: "--endpoint"
type: string
description: |
Custom endpoint to use to connect to S3
required: false
- name: "--output_modality"
type: string
description: |
TileDB-SOMA measurement to add the output to.
- name: "MuData input"
arguments:
- name: "--input_mudata"
type: file
description: |
MuData object to take the columns from. The observations and their order should
match between the database and the input modality.
- name: "--modality"
type: string
description: |
Modality where to take the .obsm from.
- name: "--obsm_input"
type: string
multiple: true
description: |
Keys from .obm to copy. The keys should not be present yet in the database.

resources:
- type: python_script
path: script.py
- path: /src/utils/setup_logger.py
test_resources:
- type: python_script
path: test.py
- path: /resources_test/tiledb/pbmc_1k_protein_v3_mms
dest: tiledb/pbmc_1k_protein_v3_mms
- path: /resources_test/pbmc_1k_protein_v3/pbmc_1k_protein_v3_mms.h5mu
dest: pbmc_1k_protein_v3/pbmc_1k_protein_v3_mms.h5mu
engines:
- type: docker
image: python:3.12
setup:
- type: python
packages:
- tiledbsoma
__merge__: /src/base/requirements/anndata_mudata.yaml
test_setup:
- type: python
packages:
- moto[server]
- boto3
__merge__: [ /src/base/requirements/python_test_setup.yaml, .]
runners:
- type: executable
docker_run_args: ["--env", "AWS_ACCESS_KEY_ID", "--env", "AWS_SECRET_ACCESS_KEY"]
- type: nextflow
directives:
label: [highmem, midcpu]
103 changes: 103 additions & 0 deletions src/tiledb/move_mudata_obsm_to_tiledb/script.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import sys
import mudata
import tiledbsoma
import tiledbsoma.io
import pandas as pd
import json

## VIASH START
par = {}
meta = {"resources_dir": "src/utils"}
## VIASH END

sys.path.append(meta["resources_dir"])
from setup_logger import setup_logger

logger = setup_logger()

tiledbsoma.logging.info()


def main(par):
logger.info(f"Component {meta['name']} started.")
par["input_uri"] = par["input_uri"].rstrip("/")
if not par["obsm_input"]:
raise ValueError("Please provide at least one .obsm key.")
logger.info(
"Opening mudata file '%s', modality '%s'.", par["input_mudata"], par["modality"]
)
modality_data = mudata.read_h5ad(par["input_mudata"], mod=par["modality"])
logger.info(
"Done reading modality. Looking at .obsm for keys: '%s'",
",".join(par["obsm_input"]),
)
try:
keys_to_transfer = {
obsm_key: modality_data.obsm[obsm_key] for obsm_key in par["obsm_input"]
}
except KeyError as e:
raise KeyError("Not all .obsm keys were found in the input!") from e

logger.info("Done getting .obsm keys.")
tiledb_config = {
"vfs.s3.no_sign_request": "false",
}
optional_config = {
"vfs.s3.region": par["s3_region"],
"vfs.s3.endpoint_override": par["endpoint"],
}
for config_setting, config_val in optional_config.items():
if config_val is not None:
tiledb_config[config_setting] = config_val
logger.info("Using the following config to connect: %s", tiledb_config)
context = tiledbsoma.SOMATileDBContext(tiledb_config=tiledb_config)

logger.info(
"Trying to access '%s' in region '%s'", par["input_uri"], par["s3_region"]
)
with tiledbsoma.open(
par["input_uri"], mode="w", context=context
) as open_experiment:
logger.info("Connection established.")
logger.info("Looking for measurement %s", par["output_modality"])
measurement = open_experiment.ms[par["output_modality"]]
logger.info("Checking if keys do not already exist.")
existing_keys = measurement.obsm.keys()
overlap = set(existing_keys).intersection(set(keys_to_transfer))
if overlap:
raise ValueError(
f"The following keys already exist in the database: {','.join(overlap)}."
)
logger.info("Adding keys to database.")
for key, obsm_val in keys_to_transfer.items():
logger.info("Adding .obsm key '%s', of class '%s'", key, type(obsm_val))
index_as_json = None
if isinstance(obsm_val, pd.DataFrame):
# tileDB does not allow column indices to be saved directly
# So need to add those as JSON metadata
index_to_write = obsm_val.columns.to_list()
if not isinstance(index_to_write, pd.RangeIndex):
index_as_json = json.dumps(index_to_write)
obsm_val = obsm_val.to_numpy()

tiledbsoma.io.add_matrix_to_collection(
open_experiment,
measurement_name=par["output_modality"],
ingest_mode="write",
collection_name="obsm",
matrix_name=key,
matrix_data=obsm_val,
context=context,
)
if index_as_json:
uri = f"{par['input_uri']}/ms/{par['output_modality']}/obsm/{key}"
with tiledbsoma.open(
uri=uri, mode="w", context=context
) as open_obsm_array:
open_obsm_array.metadata["column_index"] = index_as_json

logger.info("Finished!")


if __name__ == "__main__":
main(par)
196 changes: 196 additions & 0 deletions src/tiledb/move_mudata_obsm_to_tiledb/test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
import sys
import pytest
import boto3
import os
from moto.server import ThreadedMotoServer
import socket
import tiledbsoma
import subprocess
import re
import pandas as pd
import numpy as np
import mudata
import json


## VIASH START
meta = {
"executable": "target/executable/tiledb/move_mudata_obsm_to_tiledb/move_mudata_obsm_to_tiledb",
"resources_dir": "./resources_test",
"cpus": 2,
"config": "./src/tiledb/move_mudata_obsm_to_tiledb/config.vsh.yaml",
}
sys.path.append("src/utils")
## VIASH END

sys.path.append(meta["resources_dir"])
from setup_logger import setup_logger

logger = setup_logger()

input_dir = f"{meta['resources_dir']}/tiledb/pbmc_1k_protein_v3_mms"


@pytest.fixture
def input_mudata():
return mudata.read_h5mu(
f"{meta['resources_dir']}/pbmc_1k_protein_v3/pbmc_1k_protein_v3_mms.h5mu"
)


@pytest.fixture
def input_mudata_extra_output_slot(input_mudata):
new_obsm_key = pd.DataFrame(
np.random.rand(input_mudata["rna"].n_obs, 5),
index=input_mudata["rna"].obs_names,
columns=pd.Index(["a", "b", "c", "d", "e"]),
)
input_mudata["rna"].obsm["test_input_slot"] = new_obsm_key
return input_mudata


@pytest.fixture
def input_mudata_path(random_h5mu_path, input_mudata):
output_path = random_h5mu_path()
input_mudata.write(output_path)
return output_path


@pytest.fixture(scope="module")
def aws_credentials():
"""Mocked AWS Credentials for moto."""
os.environ["AWS_ACCESS_KEY_ID"] = "testing"
os.environ["AWS_SECRET_ACCESS_KEY"] = "testing"
os.environ["AWS_DEFAULT_REGION"] = "us-east-1"


@pytest.fixture(scope="function")
def moto_server(aws_credentials):
"""Fixture to run a mocked AWS server for testing."""
ip_addr = socket.gethostbyname(socket.gethostname())
# Note: pass `port=0` to get a random free port.
server = ThreadedMotoServer(ip_address=ip_addr, port=0)
server.start()
host, port = server.get_host_and_port()
yield f"http://{host}:{port}"
server.stop()


@pytest.fixture
def initiated_database(moto_server):
client = boto3.client("s3", endpoint_url=moto_server, region_name="us-east-1")
client.create_bucket(Bucket="test")

def raise_(ex):
raise ex

for root, _, files in os.walk(input_dir, onerror=raise_):
for filename in files:
local_path = os.path.join(root, filename)
relative_path = os.path.relpath(local_path, input_dir)
client.upload_file(local_path, "test", relative_path)
return moto_server


def test_key_already_exists_raises(
run_component, input_mudata_path, initiated_database
):
with pytest.raises(subprocess.CalledProcessError) as err:
run_component(
[
"--input_uri",
"s3://test",
"--endpoint",
initiated_database,
"--s3_region",
"us-east-1",
"--output_modality",
"rna",
"--input_mudata",
str(input_mudata_path),
"--modality",
"rna",
"--obsm_input",
"X_leiden_harmony_umap",
]
)
assert re.search(
r"ValueError: The following keys already exist in the database: X_leiden_harmony_umap",
err.value.stdout.decode("utf-8"),
)


def test_missing_obsm_key_raises(run_component, initiated_database, input_mudata_path):
with pytest.raises(subprocess.CalledProcessError) as err:
run_component(
[
"--input_uri",
"s3://test",
"--endpoint",
initiated_database,
"--s3_region",
"us-east-1",
"--output_modality",
"rna",
"--input_mudata",
str(input_mudata_path),
"--modality",
"rna",
"--obsm_input",
"doesnotexist",
]
)
assert re.search(
r"Not all \.obsm keys were found in the input!",
err.value.stdout.decode("utf-8"),
)


def test_add(
run_component, initiated_database, input_mudata_extra_output_slot, random_h5mu_path
):
input_path = random_h5mu_path()
input_mudata_extra_output_slot.write(input_path)
run_component(
[
"--input_uri",
"s3://test",
"--endpoint",
initiated_database,
"--s3_region",
"us-east-1",
"--output_modality",
"rna",
"--input_mudata",
str(input_path),
"--modality",
"rna",
"--obsm_input",
"test_input_slot",
]
)
obsm_key_uri = "s3://test/ms/rna/obsm/test_input_slot"
tiledb_config = {
"vfs.s3.no_sign_request": "false",
"vfs.s3.region": "us-east-1",
"vfs.s3.endpoint_override": initiated_database,
}
context = tiledbsoma.SOMATileDBContext(tiledb_config=tiledb_config)
with tiledbsoma.open(uri=obsm_key_uri, mode="r", context=context) as open_array:
obsm_data = open_array.read().coos().concat().to_scipy().todense()
assert obsm_data.shape == (713, 5)
original_data = (
input_mudata_extra_output_slot["rna"].obsm["test_input_slot"].to_numpy()
)
np.testing.assert_allclose(original_data, obsm_data)
assert json.loads(open_array.metadata["column_index"]) == [
"a",
"b",
"c",
"d",
"e",
]


if __name__ == "__main__":
sys.exit(pytest.main(["-s", __file__]))
Loading