-
Notifications
You must be signed in to change notification settings - Fork 155
/
mtx_to_h5ad.py
executable file
·160 lines (126 loc) · 5.63 KB
/
mtx_to_h5ad.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
#!/usr/bin/env python
# Set numba chache dir to current working directory (which is a writable mount also in containers)
import os
os.environ["NUMBA_CACHE_DIR"] = "."
import scanpy as sc
import pandas as pd
import argparse
from scipy import io
from anndata import AnnData
def _10x_h5_to_adata(mtx_h5: str, sample: str):
adata = sc.read_10x_h5(mtx_h5)
adata.var["gene_symbols"] = adata.var_names
adata.var.set_index("gene_ids", inplace=True)
adata.obs["sample"] = sample
# reorder columns for 10x mtx files
adata.var = adata.var[["gene_symbols", "feature_types", "genome"]]
return adata
def _mtx_to_adata(
mtx_file: str,
barcode_file: str,
feature_file: str,
sample: str,
aligner: str,
):
adata = sc.read_mtx(mtx_file)
# for some reason star matrix comes transposed and doesn't fit when values are appended directly
# also true for cellranger files ( this is only used when running with the custom emptydrops_filtered files )
# otherwise, it uses the cellranger .h5 files
if aligner in [
"cellranger",
"cellrangermulti",
"star",
]:
adata = adata.transpose()
adata.obs_names = pd.read_csv(barcode_file, header=None, sep="\t")[0].values
adata.var_names = pd.read_csv(feature_file, header=None, sep="\t")[0].values
adata.obs["sample"] = sample
return adata
def input_to_adata(
input_data: str,
barcode_file: str,
feature_file: str,
sample: str,
aligner: str,
txp2gene: str,
star_index: str,
verbose: bool = True,
):
if verbose and (txp2gene or star_index):
print("Reading in {}".format(input_data))
#
# open main data
#
if aligner == "cellranger" and input_data.lower().endswith('.h5'):
adata = _10x_h5_to_adata(input_data, sample)
else:
adata = _mtx_to_adata(input_data, barcode_file, feature_file, sample, aligner)
#
# open gene information
#
if verbose and (txp2gene or star_index):
print("Reading in {}".format(txp2gene))
if aligner == "cellranger" and not input_data.lower().endswith('.h5'):
#
# for cellranger workflow, we do not have a txp2gene file, so, when using this normal/manual function for empty drops
# we need to provide this information coming directly from the features.tsv file
# by not using the .h5 file for conversion, we loose the two col information: feature_types and genome
#
t2g = pd.read_table(feature_file, header=None, names=["gene_id", "gene_symbol", "feature_types"], usecols=[0, 1, 2])
else:
if txp2gene:
t2g = pd.read_table(txp2gene, header=None, names=["gene_id", "gene_symbol"], usecols=[1, 2])
elif star_index:
t2g = pd.read_table(
f"{star_index}/geneInfo.tab", header=None, skiprows=1, names=["gene_id", "gene_symbol"], usecols=[0, 1]
)
if txp2gene or star_index or (aligner == "cellranger" and not input_data.lower().endswith('.h5')):
t2g = t2g.drop_duplicates(subset="gene_id").set_index("gene_id")
adata.var["gene_symbol"] = t2g["gene_symbol"]
return adata
def write_counts(
adata: AnnData,
out: str,
verbose: bool = False,
):
pd.DataFrame(adata.obs.index).to_csv(os.path.join(out, "barcodes.tsv"), sep="\t", index=False, header=None)
pd.DataFrame(adata.var).to_csv(os.path.join(out, "features.tsv"), sep="\t", index=True, header=None)
io.mmwrite(os.path.join(out, "matrix.mtx"), adata.X.T, field="integer")
if verbose:
print("Wrote features.tsv, barcodes.tsv, and matrix.mtx files to {}".format(args["out"]))
def dump_versions(task_process):
import pkg_resources
with open("versions.yml", "w") as f:
f.write(f"{task_process}:\n\t")
f.write("\n\t".join([f"{pkg.key}: {pkg.version}" for pkg in pkg_resources.working_set]))
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Converts mtx output to h5ad.")
parser.add_argument("-i", "--input_data", dest="input_data", help="Path to either mtx or mtx h5 file.")
parser.add_argument("-v", "--verbose", dest="verbose", help="Toggle verbose messages", default=False)
parser.add_argument("-f", "--feature", dest="feature", help="Path to feature file.", nargs="?", const="")
parser.add_argument("-b", "--barcode", dest="barcode", help="Path to barcode file.", nargs="?", const="")
parser.add_argument("-s", "--sample", dest="sample", help="Sample name")
parser.add_argument("-o", "--out", dest="out", help="Output path.")
parser.add_argument("-a", "--aligner", dest="aligner", help="Which aligner has been used?")
parser.add_argument("--task_process", dest="task_process", help="Task process name.")
parser.add_argument("--txp2gene", dest="txp2gene", help="Transcript to gene (t2g) file.", nargs="?", const="")
parser.add_argument(
"--star_index", dest="star_index", help="Star index folder containing geneInfo.tab.", nargs="?", const=""
)
args = vars(parser.parse_args())
# create the directory with the sample name
os.makedirs(os.path.dirname(args["out"]), exist_ok=True)
adata = input_to_adata(
input_data=args["input_data"],
barcode_file=args["barcode"],
feature_file=args["feature"],
sample=args["sample"],
aligner=args["aligner"],
txp2gene=args["txp2gene"],
star_index=args["star_index"],
verbose=args["verbose"],
)
write_counts(adata=adata, out=args["sample"], verbose=args["verbose"])
adata.write_h5ad(args["out"], compression="gzip")
print("Wrote h5ad file to {}".format(args["out"]))
dump_versions(task_process=args["task_process"])