-
Notifications
You must be signed in to change notification settings - Fork 10
/
_shogun.py
103 lines (85 loc) · 3.86 KB
/
_shogun.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# ----------------------------------------------------------------------------
# Copyright (c) 2018-2020, QIIME 2 development team.
#
# Distributed under the terms of the Modified BSD License.
#
# The full license is in the file LICENSE, distributed with this software.
# ----------------------------------------------------------------------------
import os
import subprocess
import tempfile
import shutil
import yaml
import biom
import pandas as pd
from qiime2.util import duplicate
from q2_types.feature_data import DNAFASTAFormat
from q2_types.bowtie2 import Bowtie2IndexDirFmt
def _run_command(cmd, verbose=True):
if verbose:
print("Running external command line application. This may print "
"messages to stdout and/or stderr.")
print("The command being run is below. This command cannot "
"be manually re-run as it will depend on temporary files that "
"no longer exist.")
print("\nCommand:", end=' ')
print(" ".join(cmd), end='\n\n')
subprocess.run(cmd, check=True)
def setup_database_dir(tmpdir, database, refseqs, reftaxa):
BOWTIE_PATH = 'bowtie2'
duplicate(str(refseqs), os.path.join(tmpdir, 'refseqs.fna'))
reftaxa.to_csv(os.path.join(tmpdir, 'taxa.tsv'), sep='\t')
shutil.copytree(str(database), os.path.join(tmpdir, BOWTIE_PATH),
copy_function=duplicate)
params = {
'general': {
'taxonomy': 'taxa.tsv',
'fasta': 'refseqs.fna'
},
'bowtie2': os.path.join(BOWTIE_PATH, database.get_basename())
}
with open(os.path.join(tmpdir, 'metadata.yaml'), 'w') as fh:
yaml.dump(params, fh, default_flow_style=False)
def load_table(tab_fp):
'''Convert classic OTU table to biom feature table'''
with open(tab_fp, 'r') as tab:
return biom.table.Table.from_tsv(tab, None, None, None)
def nobunaga(query: DNAFASTAFormat, reference_reads: DNAFASTAFormat,
reference_taxonomy: pd.Series, database: Bowtie2IndexDirFmt,
taxacut: float = 0.8,
threads: int = 1, percent_id: float = 0.98) -> biom.Table:
with tempfile.TemporaryDirectory() as tmpdir:
setup_database_dir(tmpdir,
database, reference_reads, reference_taxonomy)
# run aligner
cmd = ['shogun', 'align', '-i', str(query), '-d', tmpdir,
'-o', tmpdir, '-a', 'bowtie2', '-x', str(taxacut),
'-t', str(threads), '-p', str(percent_id)]
_run_command(cmd)
# assign taxonomy
taxatable = os.path.join(tmpdir, 'taxatable.tsv')
cmd = ['shogun', 'assign_taxonomy', '-i',
os.path.join(tmpdir, 'alignment.bowtie2.sam'), '-d', tmpdir,
'-o', taxatable, '-a', 'bowtie2']
_run_command(cmd)
# output taxatable as feature table
return load_table(taxatable)
def minipipe(query: DNAFASTAFormat, reference_reads: DNAFASTAFormat,
reference_taxonomy: pd.Series, database: Bowtie2IndexDirFmt,
taxacut: float = 0.8,
threads: int = 1, percent_id: float = 0.98) -> (
biom.Table, biom.Table, biom.Table, biom.Table):
with tempfile.TemporaryDirectory() as tmpdir:
setup_database_dir(tmpdir,
database, reference_reads, reference_taxonomy)
# run pipeline
cmd = ['shogun', 'pipeline', '-i', str(query), '-d', tmpdir,
'-o', tmpdir, '-a', 'bowtie2', '-x', str(taxacut),
'-t', str(threads), '-p', str(percent_id)]
_run_command(cmd)
# output selected results as feature tables
tables = ['taxatable.strain.txt',
'taxatable.strain.kegg.txt',
'taxatable.strain.kegg.modules.txt',
'taxatable.strain.kegg.pathways.txt']
return tuple(load_table(os.path.join(tmpdir, t)) for t in tables)