/
vassemble.py
244 lines (212 loc) · 9.47 KB
/
vassemble.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
import subprocess
import os
from glob import glob
import vutils
class VAssemble:
"""Class for managing assembly wrapper calls and runtime processes.
Attributes:
finput (list): List of str containing input read file path(s).
paired_end_reads (bool): Boolean value indicating read type.
threads (str): Number of threads to be used by the assembly program.
qc (bool): Boolean value indicating if reads should be trimmed for quality control.
contigs (str): File path to the contigs file generated by the assembler.
qc_dir (str): Path to directory used to contain trimmed output generated by the
QC process.
assembler (str): Name of the underlying program to be used for assembly.
asm_dir (str): Path to the directory used to contain assembler output.
"""
def __init__(self, finput, paired_end_reads, threads):
self.finput = finput
self.paired_end_reads = paired_end_reads
self.threads = str(threads)
self.qc = False
self.contigs = None
self.qc_dir = None
self.assembler = None
self.asm_dir = None
def start_logger(self, log_dir):
"""Boiler plate method to start a new logger.
Arguments:
log_dir(str): Dirctory to be used for logger output.
_logger (Logger): Logger object for this class.
"""
self._logger = vutils.Logger('vassemble', log_dir)
def run_qc(self, qc_dir):
"""Public method for invoking the qc process.
Arguments:
qc_dir(str): Path to directory that will be used for QC output.
"""
self.qc_dir = qc_dir
self._logger.log('run_qc', 'Running sickle.\n'
'Output in: %s' % self.qc_dir
)
self._run_sickle()
self.qc = True
def run_assembly(self, assembler, asm_dir):
"""Public method for initiating the assembly process.
Arguments:
assembler(str): Name of the selected assembly program.
asm_dir (str): Path that the directory that will be used
for assembler output.
"""
self.assembler = assembler
self.asm_dir = asm_dir
formatted_input = self._to_formatted_input()
self._logger.log('run_assembly', 'Starting assembler: %s' % assembler)
self._run_assembler(formatted_input)
self._logger.log('run_assembly', 'Finished assembly.\n'
'Output: %s' % self.contigs
)
def _run_assembler(self, formatted_input):
"""Wrapper function for calling the assembly program with formatted arguments.
Arguments:
formatted_input (list): List of arguments as prooperly formatted strings to be passed
to the relevant assembler.
Output:
Output generated by the specific assembly utility, located in the assembly subdirectory
of the base output path.
Raises:
ValueError: Exception raised if assembler choice is not recognized.
"""
if self.assembler == 'spades':
print('Running spades..')
p = subprocess.check_call(['spades.py',
'--meta --threads', self.threads,
'-o', self.asm_dir
]
+ formatted_input)
self.contigs = os.path.join(self.asm_dir, 'contigs.fasta')
elif self.assembler == 'velvet':
print('Running velevth...')
p = subprocess.check_call(['velveth', self.asm_dir,
'31', '-fastq', '-shortPaired'
]
+ formatted_input)
print('Running velvetg...')
p = subprocess.check_call(['velvetg', self.asm_dir,
'-exp_cov', 'auto'
])
self.contigs = os.path.join(self.asm_dir, 'contigs.fa')
elif self.assembler == 'megahit':
temp_out = os.path.join(self.asm_dir, 'megahit')
print('Running megahit...')
p = subprocess.check_call(['megahit',
'-t', self.threads,
]
+ formatted_input
+ ['-o', temp_out]
)
vutils.copy_and_remove(temp_out, self.asm_dir)
self.contigs = os.path.join(self.asm_dir, 'final.contigs.fa')
else:
raise ValueError('Improper assembler: %s' % self.assembler)
def _run_sickle(self):
"""Wrapper method for running sickle for read quality control.
Output:
Output produced by sickle located in the trimmed subdirectory of
the base output path.
"""
print('Running sickle...')
s_type = 'sanger'
s_length = '50'
foutput = []
if not self.paired_end_reads:
fname = 'trimmed-'+os.path.basename(self.finput[0])
foutput.append(os.path.join(self.qc_dir, fname))
p = subprocess.check_call([
'sickle',
'se',
'-f', self.finput[0],
'-t', s_type,
'-l', s_length,
'-o', foutput[0]
])
else:
fname = 'trimmed-' + os.path.basename(self.finput[0])
rname = 'trimmed-' + os.path.basename(self.finput[1])
foutput.append(os.path.join(self.qc_dir, fname))
foutput.append(os.path.join(self.qc_dir, rname))
p = subprocess.check_call([
'sickle',
'pe',
'-f', self.finput[0],
'-r', self.finput[1],
'-t', s_type,
'-l', s_length,
'-o', os.path.join(self.qc_dir, fname),
'-p', os.path.join(self.qc_dir, rname),
'-s', os.path.join(self.qc_dir, 'singletons.fastq')
])
def _to_formatted_input(self):
"""Method for generating lists of the formatted arguments required by
each particular assembler.
Returns:
(list): List of formatted strings.
Raises:
ValueError: Exception raised if assembler choice is not recognized.
"""
if self.qc is True:
return self._get_trimmed_input()
else:
if self.paired_end_reads is True:
if self.assembler == 'velvet':
return self.finput
elif self.assembler == 'spades':
return [
'--pe1-1', self.finput[0],
'--pe1-2', self.finput[1],
'--meta', '--only-assembler',
'-k', '33,55,77,99,129'
]
elif self.assembler == 'megahit':
return [
'-1', self.finput[0],
'-2', self.finput[1],
]
else:
raise ValueError('Improper assembler: %s' % self.assembler)
else: # Else if single_end_reads
if self.assembler == 'velvet':
return self.finput
elif self.assembler == 'spades':
return ['-s', self.finput[0]]
elif self.assembler == 'megahit':
return ['-r', self.finput[0]]
else:
raise ValueError('Improper assembler: %s' % self.assembler)
def _get_trimmed_input(self):
"""Same functionality as to_formatted_input, but uses trimmed read files
if quality control has been selected.
Returns:
(list): List of formatted strings using the files found in the trimmed subdirectory.
Raises:
ValueError: Exception raised if assembler choice is not recognized.
"""
trimmed = glob(os.path.join(self.qc_dir, 'trimmed*'))
if self.paired_end_reads is True:
if self.assembler == 'velvet':
return glob(os.path.join(self.qc_dir, '*'))
elif self.assembler == 'spades':
return [
'--pe1-1', trimmed[0],
'--pe1-2', trimmed[1],
'--pe1-s', os.path.join(self.qc_dir, 'singletons.fastq'),
'--meta', '--only-assembler',
'-k', '33,55,77,99,127'
]
elif self.assembler == 'megahit':
return [
'-1', trimmed[0],
'-2', trimmed[1],
]
else:
raise ValueError('Improper assembler: %s' % self.assembler)
else: # Else if single_end_reads
if self.assembler == 'velvet':
return trimmed
elif self.assembler == 'spades':
return ['-s', trimmed[0]]
elif self.assembler == 'megahit':
return ['-r', trimmed[0]]
else:
raise ValueError('Improper assembler: %s' % self.assembler)