Skip to content

Commit

Permalink
Merge pull request #11 from klarman-cell-observatory/boli
Browse files Browse the repository at this point in the history
Pegasusio passed test
  • Loading branch information
bli25 committed Jun 2, 2020
2 parents 618ef8d + aa39dd5 commit d214030
Show file tree
Hide file tree
Showing 22 changed files with 1,238 additions and 548 deletions.
6 changes: 5 additions & 1 deletion pegasusio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@

modalities = ['rna', 'citeseq', 'hashing', 'tcr', 'bcr', 'crispr', 'atac']

from .decorators import timer, run_gc

from .unimodal_data import UnimodalData
from .vdj_data import VDJData
from .citeseq_data import CITESeqData
from .cyto_data import CytoData
from .qc_utils import apply_qc_filters
from .multimodal_data import MultimodalData
from .aggr_data import AggrData
from .readwrite import infer_file_type, read_input, write_output
from .preprocessing import qc_metrics, filter_data
from .data_aggregation import aggregate_matrices

from ._version import get_versions

Expand Down
44 changes: 44 additions & 0 deletions pegasusio/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
"""
Pegasusio is the IO component of Pegasus and a multi-modality single-cell genomics IO package. It is a command line tool, a python package and a base for Cloud-based analysis workflows.
Usage:
pegasus <command> [<args>...]
pegasus -h | --help
pegasus -v | --version
Sub-commands:
aggregate_matrix Aggregate multi-modality single-cell genomics data into one data object. It also enables users to import metadata into the aggregated data.
Options:
-h, --help Show help information.
-v, --version Show version.
Description:
This is Pegasusio, the IO component of Pegasus.
"""

from docopt import docopt
from docopt import DocoptExit
from . import __version__ as VERSION
from . import commands


def main():
args = docopt(__doc__, version=VERSION, options_first=True)

command_name = args["<command>"]
command_args = args["<args>"]
command_args.insert(0, command_name)

try:
command_class = getattr(commands, command_name)
except AttributeError:
print("Unknown command {cmd}!".format(cmd=command_name))
raise DocoptExit()

command = command_class(command_args)
command.execute()


if __name__ == "__main__":
main()
197 changes: 197 additions & 0 deletions pegasusio/aggr_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
import numpy as np
import pandas as pd
from scipy.sparse import csr_matrix, vstack, coo_matrix
from typing import List, Dict, Union
from collections import defaultdict

import logging
logger = logging.getLogger(__name__)

from pegasusio import timer, run_gc
from pegasusio import UnimodalData, CITESeqData, CytoData, VDJData, MultimodalData


class AggrData:
def __init__(self):
self.aggr = defaultdict(list)


def add_data(self, data: MultimodalData) -> None:
for key in data.list_data():
self.aggr[key].append(data.get_data(key))


def _get_fillna_dict(self, df: pd.DataFrame) -> dict:
""" Generate a fillna dict for columns in a df """
fillna_dict = {}
for column in df:
fillna_dict[column] = "" if df[column].dtype.kind in {"O", "S"} else 0
return fillna_dict


@run_gc
def _merge_matrices(self, feature_metadata: pd.DataFrame, unilist: List[UnimodalData], modality: str) -> Dict[str, csr_matrix]:
""" Merge all matrices together """
f2idx = pd.Series(data=range(feature_metadata.shape[0]), index=feature_metadata.index)

mat_keys = set()
no_reorg = True
for unidata in unilist:
mat_keys.update(unidata.matrices)
if no_reorg and (feature_metadata.shape[0] > unidata.feature_metadata.shape[0] or (feature_metadata.index != unidata.feature_metadata.index).sum() > 0):
no_reorg = False

if modality == "bcr" or modality == "tcr":
for key in VDJData._uns_keywords:
mat_keys.discard(key[1:])


matrices = {}
if no_reorg:
for mat_key in mat_keys:
mat_list = []
for unidata in unilist:
mat = unidata.matrices.pop(mat_key, None)
if mat is not None:
mat_list.append(mat)
matrices[mat_key] = vstack(mat_list) if modality != "cyto" else np.vstack(mat_list)
else:
colmap = []
for unidata in unilist:
colmap.append(f2idx[unidata.feature_metadata.index].values)

if modality == "cyto":
# matrices are dense.
for mat_key in mat_keys:
mat_list = []
for i, unidata in enumerate(unilist):
mat = unidata.matrices.pop(mat_key, None)
if mat is not None:
newmat = np.zeros((mat.shape[0], feature_metadata.shape[0]), dtype = mat.dtype)
newmat[:, colmap[i]] = mat
mat_list.append(newmat)
matrices[mat_key] = np.vstack(mat_list)
else:
for mat_key in mat_keys:
data_list = []
row_list = []
col_list = []
row_base = 0
for i, unidata in enumerate(unilist):
mat = unidata.matrices.pop(mat_key, None)
if mat is not None:
mat = mat.tocoo(copy = False) # convert to coo format
data_list.append(mat.data)
row_list.append(mat.row + row_base)
col_list.append(colmap[i][mat.col])
row_base += mat.shape[0]
data = np.concatenate(data_list)
row = np.concatenate(row_list)
col = np.concatenate(col_list)
matrices[mat_key] = coo_matrix((data, (row, col))).tocsr(copy = False)

return matrices


@run_gc
def _vdj_update_metadata_matrices(self, metadata: dict, matrices: dict, unilist: List[UnimodalData]) -> None:
metadata.pop("uns_dict", None)
for key in VDJData._uns_keywords:
values = set()
for unidata in unilist:
values.update(unidata.metadata[key])
values.discard("None") # None must be 0 for sparse matrix
metadata[key] = np.array(["None"] + list(values), dtype = np.object)
val2num = dict(zip(metadata[key], range(metadata[key].size)))

mat_key = key[1:]
mat_list = []
for unidata in unilist:
mat = unidata.matrices.pop(mat_key)
old2new = np.array([val2num[val] for val in unidata.metadata[key]], dtype = np.int32)
mat.data = old2new[mat.data]
mat_list.append(mat)
matrices[mat_key] = vstack(mat_list)


@run_gc
def _aggregate_unidata(self, unilist: List[UnimodalData]) -> UnimodalData:
if len(unilist) == 1:
del unilist[0].metadata["_sample"]
return unilist[0]

modality = unilist[0].get_modality()

barcode_metadata_dfs = [unidata.barcode_metadata for unidata in unilist]
barcode_metadata = pd.concat(barcode_metadata_dfs, axis=0, sort=False, copy=False)
fillna_dict = self._get_fillna_dict(barcode_metadata)
barcode_metadata.fillna(value=fillna_dict, inplace=True)


var_dict = {}
for unidata in unilist:
if modality == "citeseq":
unidata.feature_metadata.drop(columns = CITESeqData._var_keywords, inplace = True)
elif modality == "cyto":
unidata.feature_metadata.drop(columns = CytoData._var_keywords, inplace = True)

idx = unidata.feature_metadata.columns.difference(["featureid"])
if idx.size > 0:
var_dict[unidata.metadata["_sample"]] = unidata.feature_metadata[idx]
unidata.feature_metadata.drop(columns = idx, inplace = True)

feature_metadata = unilist[0].feature_metadata
for other in unilist[1:]:
keys = ["featurekey"] + feature_metadata.columns.intersection(other.feature_metadata.columns).values.tolist()
feature_metadata = feature_metadata.merge(other.feature_metadata, on=keys, how="outer", sort=False, copy=False) # If sort is True, feature keys will be changed even if all channels share the same feature keys.
fillna_dict = self._get_fillna_dict(feature_metadata)
feature_metadata.fillna(value=fillna_dict, inplace=True)


matrices = self._merge_matrices(feature_metadata, unilist, modality)


uns_dict = {}
metadata = {"genome": unilist[0].metadata["genome"], "modality": unilist[0].metadata["modality"]}
for unidata in unilist:
assert unidata.metadata.pop("genome") == metadata["genome"]
assert unidata.metadata.pop("modality") == metadata["modality"]
if modality == "citeseq":
for key in CITESeqData._uns_keywords:
unidata.metadata.pop(key, None)
del unidata.metadata["_obs_keys"]
elif modality == "cyto":
for key in CytoData._uns_keywords:
unidata.metadata.pop(key, None)
sample_name = unidata.metadata.pop("_sample")
if len(unidata.metadata) > 0:
uns_dict[sample_name] = unidata.metadata.mapping

if len(var_dict) > 0:
metadata["var_dict"] = var_dict
if len(uns_dict) > 0:
metadata["uns_dict"] = uns_dict


unidata = None
if isinstance(unilist[0], CITESeqData):
unidata = CITESeqData(barcode_metadata, feature_metadata, matrices, metadata)
elif isinstance(unilist[0], CytoData):
unidata = CytoData(barcode_metadata, feature_metadata, matrices, metadata)
elif isinstance(unilist[0], VDJData):
self._vdj_update_metadata_matrices(metadata, matrices, unilist)
unidata = VDJData(barcode_metadata, feature_metadata, matrices, metadata)
else:
unidata = UnimodalData(barcode_metadata, feature_metadata, matrices, metadata)

return unidata


@timer(logger=logger)
def aggregate(self) -> MultimodalData:
""" Aggregate all data together """
data = MultimodalData()
for key in list(self.aggr):
unidata = self._aggregate_unidata(self.aggr.pop(key))
data.add_data(unidata)
return data
30 changes: 18 additions & 12 deletions pegasusio/citeseq_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
from scipy.sparse import csr_matrix, hstack
from typing import List, Dict, Union

import logging
logger = logging.getLogger(__name__)

import anndata
from pegasusio import UnimodalData
from .views import INDEX, _parse_index, UnimodalDataView
Expand All @@ -21,15 +24,14 @@ def __init__(
metadata: dict,
barcode_multiarrays: Dict[str, np.ndarray] = None,
feature_multiarrays: Dict[str, np.ndarray] = None,
cur_matrix: str = None,
cur_matrix: str = "raw.count",
) -> None:
assert metadata["modality"] == "citeseq"
super().__init__(barcode_metadata, feature_metadata, matrices, metadata, barcode_multiarrays, feature_multiarrays, cur_matrix)
assert len(self.matrices) == 1
self._cur_matrix = list(self.matrices)[0]

if self._cur_matrix == "raw.count":
# Prepare for the control antibody list.

# Prepare for the control antibody list if not loaded from Zarr
if "_control_names" not in self.metadata:
assert len(self.matrices) == 1 and "raw.count" in self.matrices
self.metadata["_control_names"] = np.array(["None"], dtype = object)
self.metadata["_control_counts"] = csr_matrix((self._shape[0], 1), dtype = np.int32)
self.metadata["_obs_keys"] = ["_control_counts"]
Expand Down Expand Up @@ -77,20 +79,24 @@ def load_control_list(self, control_csv: str) -> None:
ctrls = {"None": 0}
series = pd.read_csv(control_csv, header=0, index_col=0, squeeze=True)
for antibody, control in series.iteritems():
if antibody not in self.feature_metadata.index:
continue

pos = ctrls.get(control, None)
if pos is None:
if control not in self.feature_metadata.index:
raise ValueError(f"Detected unknown control antibody '{control}'!")
pos = len(ctrls)
ctrls[control] = pos
if control in self.feature_metadata.index:
pos = len(ctrls)
ctrls[control] = pos
else:
logger.warning(f"Detected and ignored unknown control antibody '{control}'!")
pos = 0

if antibody not in self.feature_metadata.index:
raise ValueError(f"Detected unknown CITE-Seq antibody '{antibody}'!")
self.feature_metadata.loc[antibody, "_control_id"] = pos

ctrl_names = np.empty(len(ctrls), dtype = object)
for ctrl_name, pos in ctrls.items():
ctrl_names[pos] = ctrl_name

locs = self.feature_metadata.index.get_indexer(pd.Index(ctrl_names[1:], copy = False))
idx = np.ones(self._shape[1], dtype = bool)
idx[locs] = False
Expand Down
54 changes: 54 additions & 0 deletions pegasusio/commands/AggregateMatrix.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from .Base import Base
from pegasusio import aggregate_matrices, write_output


class AggregateMatrix(Base):
"""
Aggregate multiple single-modality or multi-modality data into one big MultimodalData object and write it back to disk as a zipped Zarr file.
Usage:
pegasusio aggregate_matrix <csv_file> <output_name> [--restriction <restriction>... options]
pegasusio aggregate_matrix -h
Arguments:
csv_file This function takes as input a csv_file, which contains at least 2 columns — Sample, sample name; Location, file that contains the count matrices (e.g. filtered_gene_bc_matrices_h5.h5), and merges matrices from the same genome together. If multi-modality exists, a third Modality column might be required. If Reference column presents, you can replace genome names by using "hg19:GRCh38,mm9:mm10", which will replace all hg19 to GRCh38 and all mm9 to mm10. However, in this case, the genome passed to read_input will be None.
output_name The output file name.
Options:
--restriction <restriction>... Select data that satisfy all restrictions. Each restriction takes the format of name:value,...,value or name:~value,..,value, where ~ refers to not. You can specifiy multiple restrictions by setting this option multiple times.
--attributes <attributes> Specify a comma-separated list of outputted attributes. These attributes should be column names in the csv file.
--default-reference <reference> If sample count matrix is in either DGE, mtx, csv, tsv or loom format and there is no Reference column in the csv_file, use <reference> as the reference.
--select-only-singlets If we have demultiplexed data, turning on this option will make pegasusio only include barcodes that are predicted as singlets.
--min-genes <number> Only keep cells with at least <number> of genes.
--max-genes <number> Only keep cells with less than <number> of genes.
--min-umis <number> Only keep cells with at least <number> of UMIs.
--max-umis <number> Only keep cells with less than <number> of UMIs.
--mito-prefix <prefix> Prefix for mitochondrial genes. If multiple prefixes are provided, separate them by comma (e.g. "MT-,mt-").
--percent-mito <percent> Only keep cells with mitochondrial percent less than <percent>%. Only when both mito_prefix and percent_mito set, the mitochondrial filter will be triggered.
--no-append-sample-name Turn this option on if you do not want to append sample name in front of each sample's barcode (concatenated using '-').
-h, --help Print out help information.
Outputs:
output_name.zarr.zip A zipped Zarr file containing aggregated data.
Examples:
pegasusio aggregate_matrix --restriction Source:BM,CB --restriction Individual:1-8 --attributes Source,Platform count_matrix.csv aggr_data
"""

def execute(self):
data = aggregate_matrices(
self.args["<csv_file>"],
restrictions=self.args["--restriction"],
attributes=self.split_string(self.args["--attributes"]),
default_ref=self.args["--default-reference"],
append_sample_name=not self.args["--no-append-sample-name"],
select_singlets=self.args["--select-only-singlets"],
min_genes=self.convert_to_int(self.args["--min-genes"]),
max_genes=self.convert_to_int(self.args["--max-genes"]),
min_umis=self.convert_to_int(self.args["--min-umis"]),
max_umis=self.convert_to_int(self.args["--max-umis"]),
mito_prefix=self.args["--mito-prefix"],
percent_mito=self.convert_to_float(self.args["--percent-mito"])
)
write_output(data, self.args["<output_name>"] + ".zarr.zip")

0 comments on commit d214030

Please sign in to comment.