Skip to content

Commit

Permalink
Merge pull request #695 from rhpvorderman/fixkmerfinder
Browse files Browse the repository at this point in the history
Fix exponential runtime for longer adapters
  • Loading branch information
marcelm committed Apr 12, 2023
2 parents 1551fbd + e3212c8 commit 27ab483
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 59 deletions.
106 changes: 60 additions & 46 deletions src/cutadapt/kmer_heuristic.py
@@ -1,39 +1,29 @@
import io
import itertools
import sys
from typing import List, Optional, Set, Tuple
from collections import defaultdict


def kmer_possibilities(sequence: str, chunks: int) -> List[Set[str]]:
def kmer_chunks(sequence: str, chunks: int) -> Set[str]:
"""
Partition a sequence in almost equal sized chunks. Return all possibilities.
Example sequence ABCDEFGH with 3 chunks. Possibilities:
["ABC", "DEF", "GH"]; ["ABC", "DE", "FGH"]; ["AB", "CDE", "FGH"]
Partition a sequence in almost equal sized chunks. Returns the shortest
possibility. AABCABCABC, 3 returns {"AABC", "ABC"}
"""
chunk_size = len(sequence) // (chunks)
remainder = len(sequence) % (chunks)
chunk_sizes: List[int] = remainder * [chunk_size + 1] + (chunks - remainder) * [
chunk_size
]
possible_orderings = set(itertools.permutations(chunk_sizes))
kmer_sets = []
for chunk_list in possible_orderings:
offset = 0
chunk_set = set()
for size in chunk_list:
chunk_set.add(sequence[offset : offset + size])
offset += size
kmer_sets.append(chunk_set)
return kmer_sets
offset = 0
chunk_set = set()
for size in chunk_sizes:
chunk_set.add(sequence[offset : offset + size])
offset += size
return chunk_set


# A SearchSet is a start and stop combined with a list of possible kmer sets
# which should appear between this start and stop. Start and stop follow python
# indexing rules. (Negative start is a position relative to the end. None end
# is to the end of the sequence)
SearchSet = Tuple[int, Optional[int], List[Set[str]]]
# A SearchSet is a start and stop combined with a set of strings to search
# for at that position
SearchSet = Tuple[int, Optional[int], Set[str]]


def minimize_kmer_search_list(
Expand Down Expand Up @@ -74,24 +64,22 @@ def minimize_kmer_search_list(
return kmers_and_positions


def find_optimal_kmers(
def remove_redundant_kmers(
search_sets: List[SearchSet],
) -> List[Tuple[int, Optional[int], List[str]]]:
minimal_score = sys.maxsize
best_combination = []
positions = [(start, stop) for start, stop, kmer_set_list in search_sets]
kmer_set_lists = [kmer_set_list for start, stop, kmer_set_list in search_sets]
for kmer_sets in itertools.product(*kmer_set_lists):
kmer_search_list = []
for kmer_set, (start, stop) in zip(kmer_sets, positions):
for kmer in kmer_set:
kmer_search_list.append((kmer, start, stop))
minimized_search_list = minimize_kmer_search_list(kmer_search_list)
if len(minimized_search_list) < minimal_score:
best_combination = minimized_search_list
minimal_score = len(minimized_search_list)
"""
This removes kmers that are searched in multiple search sets and makes
sure they are only searched in the larger search set. This reduces the
amount of searched patterns and therefore the number of false positives.
"""

kmer_search_list = []
for start, stop, kmer_set in search_sets:
for kmer in kmer_set:
kmer_search_list.append((kmer, start, stop))
minimized_search_list = minimize_kmer_search_list(kmer_search_list)
result_dict = defaultdict(list)
for kmer, start, stop in best_combination:
for kmer, start, stop in minimized_search_list:
result_dict[(start, stop)].append(kmer)
return [(start, stop, kmers) for (start, stop), kmers in result_dict.items()]

Expand Down Expand Up @@ -120,10 +108,10 @@ def create_back_overlap_searchsets(
min_overlap_kmer_length = 5
if minimum_length < min_overlap_kmer_length:
for i in range(minimum_length, min_overlap_kmer_length):
search_set = (-i, None, [{adapter[:i]}])
search_set = (-i, None, {adapter[:i]})
search_sets.append(search_set)
minimum_length = min_overlap_kmer_length
kmer_sets = kmer_possibilities(adapter[:minimum_length], max_errors + 1)
kmer_sets = kmer_chunks(adapter[:minimum_length], max_errors + 1)
search_sets.append((-length, None, kmer_sets))
minimum_length = length + 1
return search_sets
Expand Down Expand Up @@ -166,16 +154,14 @@ def create_positions_and_kmers(
adapter[::-1], min_overlap, error_rate
)
front_search_sets = []
for start, stop, kmer_sets in reversed_back_search_sets:
new_kmer_sets = [
{kmer[::-1] for kmer in kmer_set} for kmer_set in kmer_sets
]
front_search_sets.append((0, -start, new_kmer_sets))
for start, stop, kmer_set in reversed_back_search_sets:
new_kmer_set = {kmer[::-1] for kmer in kmer_set}
front_search_sets.append((0, -start, new_kmer_set))
search_sets.extend(front_search_sets)
if internal:
kmer_sets = kmer_possibilities(adapter, max_errors + 1)
kmer_sets = kmer_chunks(adapter, max_errors + 1)
search_sets.append((0, None, kmer_sets))
return find_optimal_kmers(search_sets)
return remove_redundant_kmers(search_sets)


def kmer_probability_analysis(
Expand Down Expand Up @@ -215,3 +201,31 @@ def kmer_probability_analysis(
f"Chance for profile hit by random sequence: {(1 - accumulated_not_hit_chance) * 100:.2f}%\n"
)
return out.getvalue()


if __name__ == "__main__":
# This allows for easy debugging and benchmarking of the kmer heuristic code.
import argparse
from ._kmer_finder import KmerFinder
import dnaio

parser = argparse.ArgumentParser()
parser.add_argument("--adapter")
parser.add_argument("--anywhere", action="store_true")
parser.add_argument("fastq")
args = parser.parse_args()
kmers_and_offsets = create_positions_and_kmers(
args.adapter, 3, 0.1, back_adapter=True, front_adapter=args.anywhere
)
kmer_finder = KmerFinder(kmers_and_offsets)
print(kmer_probability_analysis(kmers_and_offsets))
with dnaio.open(args.fastq, mode="r", open_threads=0) as reader: # type: ignore
number_of_records = 0
possible_adapters_found = 0
for number_of_records, record in enumerate(reader, start=1):
if kmer_finder.kmers_present(record.sequence):
possible_adapters_found += 1
print(
f"Percentage possible adapters: "
f"{possible_adapters_found * 100 / number_of_records:.2f}%"
)
37 changes: 24 additions & 13 deletions tests/test_kmer_heuristic.py
@@ -1,7 +1,7 @@
import pytest

from cutadapt.kmer_heuristic import (
kmer_possibilities,
kmer_chunks,
minimize_kmer_search_list,
create_back_overlap_searchsets,
create_positions_and_kmers,
Expand All @@ -11,15 +11,12 @@
@pytest.mark.parametrize(
["sequence", "chunks", "expected"],
[
("ABC", 3, [{"A", "B", "C"}]),
("ABCD", 3, [{"A", "B", "CD"}, {"A", "BC", "D"}, {"AB", "C", "D"}]),
("ABC", 3, {"A", "B", "C"}),
("ABCD", 3, {"AB", "C", "D"}),
],
)
def test_kmer_possibilities(sequence, chunks, expected):
frozen_expected = set(frozenset(s) for s in expected)
result = kmer_possibilities(sequence, chunks)
frozen_result = set(frozenset(s) for s in result)
assert frozen_expected == frozen_result
def test_kmer_chunks(sequence, chunks, expected):
assert kmer_chunks(sequence, chunks) == expected


@pytest.mark.parametrize(
Expand All @@ -45,11 +42,11 @@ def test_create_back_overlap_searchsets():
adapter = "ABCDEFGHIJ0123456789"
searchsets = create_back_overlap_searchsets(adapter, 3, 0.1)
assert len(searchsets) == 5
assert (-3, None, [{"ABC"}]) in searchsets
assert (-4, None, [{"ABCD"}]) in searchsets
assert (-9, None, [{"ABCDE"}]) in searchsets
assert (-19, None, kmer_possibilities(adapter[:10], 2)) in searchsets
assert (-20, None, kmer_possibilities(adapter, 3)) in searchsets
assert (-3, None, {"ABC"}) in searchsets
assert (-4, None, {"ABCD"}) in searchsets
assert (-9, None, {"ABCDE"}) in searchsets
assert (-19, None, kmer_chunks(adapter[:10], 2)) in searchsets
assert (-20, None, kmer_chunks(adapter, 3)) in searchsets


@pytest.mark.parametrize(
Expand Down Expand Up @@ -106,3 +103,17 @@ def test_create_kmers_and_positions(kwargs, expected):
assert {(start, stop): frozenset(kmers) for start, stop, kmers in result} == {
(start, stop): frozenset(kmers) for start, stop, kmers in expected
}


@pytest.mark.timeout(0.5)
def test_create_positions_and_kmers_slow():
create_positions_and_kmers(
# Ridiculous size to check if there aren't any quadratic or exponential
# algorithms in the code.
"A" * 1000,
min_overlap=3,
error_rate=0.1,
back_adapter=True,
front_adapter=False,
internal=True,
)

0 comments on commit 27ab483

Please sign in to comment.