-
Notifications
You must be signed in to change notification settings - Fork 7
/
subtyper.py
520 lines (433 loc) · 21.4 KB
/
subtyper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
# -*- coding: utf-8 -*-
"""
Functions for subtyping of reads (e.g. FASTQ) and contigs (e.g. FASTA) using bio_hansel-compatible subtyping schemes.
"""
import logging
from typing import Optional, List, Dict, Union, Tuple
import pandas as pd
import re
from .aho_corasick import check_total_kmers, init_automaton, find_in_fasta, find_in_fastqs
from .const import COLUMNS_TO_REMOVE
from .qc import perform_quality_check, QC
from .subtype import Subtype
from .subtype_stats import SubtypeCounts
from .subtype_stats import subtype_counts
from .subtyping_params import SubtypingParams
from .utils import find_inconsistent_subtypes, get_scheme_fasta, get_scheme_version, init_subtyping_params
def subtype_reads_samples(reads: List[Tuple[List[str], str]],
scheme: str,
scheme_name: Optional[str] = None,
subtyping_params: Optional[SubtypingParams] = None,
scheme_subtype_counts: Optional[Dict[str, SubtypeCounts]] = None,
n_threads: int = 1) -> List[Tuple[Subtype, pd.DataFrame]]:
"""Subtype input genomes using a scheme.
Args:
reads: input genomes; tuple of list of FASTQ file paths and genome name
scheme: bio_hansel scheme FASTA path
scheme_name: optional scheme name
subtyping_params: scheme specific subtyping parameters
scheme_subtype_counts: summary information about scheme
n_threads: number of threads to use for subtyping analysis
Returns:
List of tuple of Subtype and detailed subtyping results for each sample
"""
if n_threads == 1:
logging.info('Serial single threaded run mode on %s input genomes', len(reads))
outputs = [subtype_reads(reads=fastq_files,
genome_name=genome_name,
scheme=scheme,
scheme_name=scheme_name,
subtyping_params=subtyping_params,
scheme_subtype_counts=scheme_subtype_counts)
for fastq_files, genome_name in reads]
else:
outputs = parallel_query_reads(reads=reads,
scheme=scheme,
scheme_name=scheme_name,
subtyping_params=subtyping_params,
scheme_subtype_counts=scheme_subtype_counts,
n_threads=n_threads)
return outputs
def subtype_contigs_samples(input_genomes: List[Tuple[str, str]],
scheme: str,
scheme_name: Optional[str] = None,
subtyping_params: Optional[SubtypingParams] = None,
scheme_subtype_counts: Optional[Dict[str, SubtypeCounts]] = None,
n_threads: int = 1) -> List[Tuple[Subtype, pd.DataFrame]]:
"""Subtype input genomes using a scheme.
Args:
input_genomes: input genomes; tuple of FASTA file path and genome name
scheme: bio_hansel scheme FASTA path
scheme_name: optional scheme name
subtyping_params: scheme specific subtyping parameters
scheme_subtype_counts: summary information about scheme
n_threads: number of threads to use for subtyping analysis
Returns:
List of tuple of Subtype and detailed subtyping results for each sample
"""
if n_threads == 1:
logging.info('Serial single threaded run mode on %s input genomes', len(input_genomes))
outputs = [subtype_contigs(fasta_path=input_fasta,
genome_name=genome_name,
scheme=scheme,
scheme_name=scheme_name,
subtyping_params=subtyping_params,
scheme_subtype_counts=scheme_subtype_counts)
for input_fasta, genome_name in input_genomes]
else:
outputs = parallel_query_contigs(input_genomes, scheme, scheme_name, subtyping_params, scheme_subtype_counts,
n_threads)
return outputs
def subtype_contigs(fasta_path: str,
genome_name: str,
scheme: str,
subtyping_params: Optional[SubtypingParams] = None,
scheme_name: Optional[str] = None,
scheme_subtype_counts: Optional[Dict[str, SubtypeCounts]] = None) -> Tuple[
Subtype, pd.DataFrame]:
"""Subtype input contigs using a particular scheme.
Args:
fasta_path: Input FASTA file path
genome_name: Input genome name
scheme: bio_hansel scheme FASTA path
subtyping_params: scheme specific subtyping parameters
scheme_name: optional scheme name
scheme_subtype_counts: summary information about scheme
Returns:
- Subtype result
- pd.DataFrame of detailed subtyping results
"""
scheme_fasta = get_scheme_fasta(scheme)
if scheme_subtype_counts is None:
scheme_subtype_counts = subtype_counts(scheme_fasta)
if subtyping_params is None:
subtyping_params = init_subtyping_params(scheme=scheme)
scheme_version = get_scheme_version(scheme)
st = Subtype(sample=genome_name,
file_path=fasta_path,
scheme=scheme_name or scheme,
scheme_version=scheme_version,
scheme_subtype_counts=scheme_subtype_counts)
check_total_kmers(scheme_fasta, subtyping_params)
automaton = init_automaton(scheme_fasta)
df = find_in_fasta(automaton, fasta_path)
if df is None or df.shape[0] == 0:
logging.warning('No subtyping kmer matches for input "%s" for scheme "%s"', fasta_path, scheme)
st.qc_status = QC.FAIL
st.qc_message = QC.NO_TARGETS_FOUND
st.are_subtypes_consistent = False
return st, empty_results(st)
refpositions = [x for x, y in df.kmername.str.split('-')]
subtypes = [y for x, y in df.kmername.str.split('-')]
df['refposition'] = [int(x.replace('negative', '')) for x in refpositions]
df['subtype'] = subtypes
df['is_pos_kmer'] = ~df.kmername.str.contains('negative')
process_subtyping_results(st, df, scheme_subtype_counts)
st.qc_status, st.qc_message = perform_quality_check(st, df, subtyping_params)
logging.info(st)
df['sample'] = genome_name
df['file_path'] = fasta_path
df['scheme'] = scheme_name or scheme
df['scheme_version'] = scheme_version
df['qc_status'] = st.qc_status
df['qc_message'] = st.qc_message
df = df[df.columns[~df.columns.isin(COLUMNS_TO_REMOVE)]]
return st, df
def parallel_query_contigs(input_genomes: List[Tuple[str, str]],
scheme: str,
scheme_name: Optional[str] = None,
subtyping_params: Optional[SubtypingParams] = None,
scheme_subtype_counts: Optional[Dict[str, SubtypeCounts]] = None,
n_threads: int = 1):
"""Parallel subtyping of input contigs
Subtype and analyse each input in parallel using a multiprocessing thread pool.
Args:
input_genomes: Input genome FASTA paths
scheme: bio_hansel scheme FASTA path to subtype with
scheme_name: optional scheme name
subtyping_params: scheme specific subtyping parameters
scheme_subtype_counts: scheme summary information
n_threads: Number of threads to use
Returns:
A list of tuples of Subtype results and a pd.DataFrame of detailed subtyping results for each input
"""
from multiprocessing import Pool
logging.info('Initializing thread pool with %s threads', n_threads)
pool = Pool(processes=n_threads)
logging.info('Running analysis asynchronously on %s input genomes', len(input_genomes))
res = [pool.apply_async(subtype_contigs, (input_fasta,
genome_name,
scheme,
subtyping_params,
scheme_name,
scheme_subtype_counts))
for input_fasta, genome_name in input_genomes]
logging.info('Parallel analysis complete! Retrieving analysis results')
outputs = [x.get() for x in res]
return outputs
def parallel_query_reads(reads: List[Tuple[List[str], str]],
scheme: str,
scheme_name: Optional[str] = None,
subtyping_params: Optional[SubtypingParams] = None,
scheme_subtype_counts: Optional[Dict[str, SubtypeCounts]] = None,
n_threads: int = 1) -> List[Tuple[Subtype, pd.DataFrame]]:
"""Parallel subtyping of input reads
Subtype and analyse each input in parallel using a multiprocessing thread pool.
Args:
reads: Input reads; list of tuples of FASTQ file paths and genome names
scheme: bio_hansel scheme FASTA path
scheme_name: optional scheme name
subtyping_params: scheme specific subtyping parameters
scheme_subtype_counts: scheme summary information
n_threads: number of threads to use
Returns:
A list of tuples of Subtype results and a pd.DataFrame of detailed subtyping results for each input
"""
from multiprocessing import Pool
logging.info('Initializing thread pool with %s threads', n_threads)
pool = Pool(processes=n_threads)
logging.info('Running analysis asynchronously on %s input genomes', len(reads))
res = [pool.apply_async(subtype_reads, (fastqs,
genome_name,
scheme,
scheme_name,
subtyping_params,
scheme_subtype_counts))
for fastqs, genome_name in reads]
logging.info('Parallel analysis complete! Retrieving analysis results')
outputs = [x.get() for x in res]
return outputs
def subtype_reads(reads: Union[str, List[str]],
genome_name: str,
scheme: str,
scheme_name: Optional[str] = None,
subtyping_params: Optional[SubtypingParams] = None,
scheme_subtype_counts: Optional[Dict[str, SubtypeCounts]] = None) -> Tuple[
Subtype, Optional[pd.DataFrame]]:
"""Subtype input reads using a particular scheme.
Args:
reads: Input FASTQ file path(s)
genome_name: Input genome name
scheme: bio_hansel scheme FASTA path
scheme_name: optional scheme name
subtyping_params: scheme specific subtyping parameters
scheme_subtype_counts: summary information about scheme
Returns:
- Subtype result
- pd.DataFrame of detailed subtyping results
"""
scheme_fasta = get_scheme_fasta(scheme)
if scheme_subtype_counts is None:
scheme_subtype_counts = subtype_counts(scheme_fasta)
if subtyping_params is None:
subtyping_params = init_subtyping_params(scheme=scheme)
scheme_version = get_scheme_version(scheme)
st = Subtype(sample=genome_name,
file_path=reads,
scheme=scheme_name or scheme,
scheme_version=scheme_version,
scheme_subtype_counts=scheme_subtype_counts)
check_total_kmers(scheme_fasta, subtyping_params)
automaton = init_automaton(scheme_fasta)
if isinstance(reads, str):
df = find_in_fastqs(automaton, reads)
elif isinstance(reads, list):
df = find_in_fastqs(automaton, *reads)
else:
raise ValueError('Unexpected type "{}" for "reads": {}'.format(type(reads), reads))
if df is None or df.shape[0] == 0:
logging.warning('No subtyping kmer matches for input "%s" for scheme "%s"', reads, scheme)
st.are_subtypes_consistent = False
st.qc_status = QC.FAIL
st.qc_message = QC.NO_TARGETS_FOUND
return st, empty_results(st)
refpositions = [x for x, y in df.kmername.str.split('-')]
subtypes = [y for x, y in df.kmername.str.split('-')]
df['refposition'] = [int(x.replace('negative', '')) for x in refpositions]
df['subtype'] = subtypes
df['is_pos_kmer'] = ~df.kmername.str.contains('negative')
df['is_kmer_freq_okay'] = (df.freq >= subtyping_params.min_kmer_freq) & (df.freq <= subtyping_params.max_kmer_freq)
st.avg_kmer_coverage = df['freq'].mean()
st, df = process_subtyping_results(st, df[df.is_kmer_freq_okay], scheme_subtype_counts)
st.qc_status, st.qc_message = perform_quality_check(st, df, subtyping_params)
df['file_path'] = str(st.file_path)
df['sample'] = genome_name
df['scheme'] = scheme_name or scheme
df['scheme_version'] = scheme_version
df['qc_status'] = st.qc_status
df['qc_message'] = st.qc_message
df = df[df.columns[~df.columns.isin(COLUMNS_TO_REMOVE)]]
return st, df
def process_subtyping_results(st: Subtype, df: pd.DataFrame, scheme_subtype_counts: Dict[str, SubtypeCounts]) -> Tuple[
Subtype, pd.DataFrame]:
"""Process the subtyping results to get the final subtype result and summary stats
Args:
st: Subtype result
df: Subtyping results
scheme_subtype_counts: Subtyping scheme summary info
Returns:
Tuple of `st` and `df`
"""
dfpos = df[df.is_pos_kmer]
dfpos_highest_res = highest_resolution_subtype_results(dfpos)
subtype_list = [x for x in dfpos_highest_res.subtype.unique()]
st = set_subtype_results(st, dfpos, subtype_list)
st = set_inconsistent_subtypes(st, find_inconsistent_subtypes(sorted_subtype_ints(dfpos.subtype)))
st = set_subtyping_stats(st, df, dfpos, dfpos_highest_res, subtype_list, scheme_subtype_counts)
st.non_present_subtypes = absent_downstream_subtypes(st.subtype, df.subtype, list(scheme_subtype_counts.keys()))
st.missing_nested_subtypes = missing_nested_subtypes(st.subtype, dfpos)
return st, df
def set_subtype_results(st: Subtype, df_positive: pd.DataFrame, subtype_list: List[str]) -> Subtype:
"""Set subtype results
Args:
st: Subtype result
df_positive: Positive kmers subtyping results
subtype_list: List of subtypes found
"""
st.subtype = '; '.join(subtype_list)
st.kmers_matching_subtype = '; '.join(subtype_list)
pos_subtypes_str = [x for x in df_positive.subtype.unique()]
pos_subtypes_str.sort(key=lambda x: len(x))
st.all_subtypes = '; '.join(pos_subtypes_str)
return st
def set_subtyping_stats(st: Subtype,
df: pd.DataFrame,
dfpos: pd.DataFrame,
dfpos_highest_res: pd.DataFrame,
subtype_list: List[str],
scheme_subtype_counts: Dict[str, SubtypeCounts]) -> Subtype:
"""Set subtyping result stats
- `n_kmers_matching_subtype`: # kmers matching final subtype
- `n_kmers_matching_all`: # kmers found
- `n_kmers_matching_positive`: # positive kmers found
- `n_kmers_matching_negative`: # negative kmers found
- `n_kmers_matching_all_expected`: expected # of all kmers found for each subtype
- `n_kmers_matching_positive_expected`: expected # of positive kmers found for each subtype
- `n_kmers_matching_subtype_expected`: expected # of subtype-specific kmers found for each subtype
Args:
st: Subtype result
df: Subtyping results
dfpos: Positive kmer subtyping results
dfpos_highest_res: Final subtype specific subtyping results
subtype_list: List of subtypes found
scheme_subtype_counts: Subtyping scheme summary info
"""
st.n_kmers_matching_subtype = dfpos_highest_res.shape[0]
st.n_kmers_matching_all = df.kmername.unique().size
st.n_kmers_matching_positive = dfpos.kmername.unique().size
st.n_kmers_matching_negative = df[~df.is_pos_kmer].shape[0]
st.n_kmers_matching_all_expected = ';'.join([str(scheme_subtype_counts[x].all_kmer_count) for x in subtype_list])
st.n_kmers_matching_positive_expected = ';'.join(
[str(scheme_subtype_counts[x].positive_kmer_count) for x in subtype_list])
st.n_kmers_matching_subtype_expected = ';'.join(
[str(scheme_subtype_counts[x].subtype_kmer_count) for x in subtype_list])
return st
def count_periods(s: str) -> int:
"""Count the number of periods in a string.
Examples:
count_periods("2.1.1") => 2
count_periods("1") => 0
Args:
s: Some string with periods
Returns:
The number of periods found in the input string.
"""
return sum((1 for c in list(s) if c == '.'))
def highest_resolution_subtype_results(df: pd.DataFrame) -> pd.DataFrame:
"""Get the highest resolution subtype results
Where the highest resolution result has the most periods ('.') in its designation.
Args:
df: positive subtyping results
Returns:
Highest resolution (most periods in subtype) subtyping results
"""
subtype_lens = df.subtype.apply(count_periods)
max_subtype_strlen = subtype_lens.max()
return df[subtype_lens == max_subtype_strlen]
def sorted_subtype_ints(subtypes: pd.Series) -> List[List[int]]:
"""Get the list of subtypes as lists of integers sorted by subtype resolution
Where the subtype resolution is determined by the number of periods ('.') in the subtype.
Args:
subtypes: pd.Series of subtype strings
Return:
list of subtypes as lists of integers sorted by subtype resolution
"""
subtypes_ints = [[int(y) for y in x.split('.')] for x in subtypes.unique()]
subtypes_ints.sort(key=lambda a: len(a))
return subtypes_ints
def absent_downstream_subtypes(subtype: str, subtypes: pd.Series, scheme_subtypes: List[str]) -> Optional[List[str]]:
"""Find the downstream subtypes that are not present in the results
Args:
subtype: Final subtype result
subtypes: Subtypes found
scheme_subtypes: Possible subtyping scheme subtypes
Returns:
List of downstream subtypes that are not present in the results or `None` if all immediately downstream
subtypes are present.
"""
escaped_subtype = re.escape(subtype)
re_subtype = re.compile(r'^{}\.\d+$'.format(escaped_subtype))
downstream_subtypes = [s for s in scheme_subtypes if re_subtype.search(s)]
absentees = [x for x in downstream_subtypes if not (subtypes == x).any()]
return absentees if len(absentees) > 0 else None
def subtype_sets(st_vals: list, pos_subtypes_set: set, primary_subtypes_set: set) -> Optional[set]:
"""Compare nested subtypes from the final subtype call to positive subtypes found
Args:
st_vals: List of integers making up subtype split by the "."
pos_subtypes_set: Set of positive subtypes
primary_subtypes_set: Set of missing nested subtypes
Returns:
Set of missing nested hierarchical subtypes or `None` if there are no missing nested hierarchical subtypes
"""
for i in range(len(st_vals)):
sub_subtype = '.'.join(st_vals[0 : i+1])
if sub_subtype not in pos_subtypes_set:
primary_subtypes_set.add(sub_subtype)
return primary_subtypes_set
def missing_nested_subtypes(subtype: str, df_positive: pd.DataFrame) -> Optional[str]:
"""Find nested subtypes that are missing from the final subtype call
Args:
subtype: Final subtype result
positive_subtypes: List of unique positive subtypes found
Returns:
String of missing hierarchical subtypes or `None` if there are no missing nested hierarchical subtypes.
"""
subtype = subtype.split(';')
pos_subtypes_set = set(df_positive.subtype.unique())
primary_subtypes_set = set()
for i in subtype:
st_vals = i.split('.')
missing_subtypes_set = subtype_sets(st_vals, pos_subtypes_set, primary_subtypes_set)
return '; '.join(missing_subtypes_set)
def set_inconsistent_subtypes(st: Subtype, inconsistent_subtypes: List[str]) -> Subtype:
"""Save the list of inconsistent subtypes, if any, to the Subtype result
Args:
st: Subtype result
inconsistent_subtypes: List of inconsistent subtypes
Returns:
"""
if len(inconsistent_subtypes) > 0:
st.are_subtypes_consistent = False
st.inconsistent_subtypes = inconsistent_subtypes
else:
st.are_subtypes_consistent = True
st.inconsistent_subtypes = None
return st
def empty_results(st: Subtype) -> pd.DataFrame:
"""Get a results DataFrame with 1 entry when no results are retrieved.
When the output DataFrame for subtyping is empty, generate a "dummy" DataFrame with one row showing that subtyping
was performed on the sample, but no results were generated because nothing was found.
Args:
st: Subtype results; should be a null result
Returns:
pd.DataFrame with 1 row with null result
"""
return pd.DataFrame({0: dict(sample=st.sample,
file_path=st.file_path,
subtype=st.subtype,
refposition=None,
is_pos_kmer=None,
scheme=st.scheme,
scheme_version=st.scheme_version,
qc_status=st.qc_status,
qc_message=st.qc_message)}).transpose()