/
test_hll.py
131 lines (95 loc) · 3.8 KB
/
test_hll.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import gzip
from tempfile import NamedTemporaryFile
from screed.fasta import fasta_iter
import pytest
from sourmash.hll import HLL
import sourmash_tst_utils as utils
K = 21 # size of kmer
ERR_RATE = 0.01
N_UNIQUE = 3356
TRANSLATE = {'A': 'T', 'C': 'G', 'T': 'A', 'G': 'C'}
def test_hll_add_python():
# test python code to count unique kmers using HyperLogLog.
# use the lower level add() method, which accepts anything,
# and compare to an exact count using collections.Counter
filename = utils.get_test_data('ecoli.genes.fna')
hll = HLL(ERR_RATE, K)
counter = set()
with open(filename) as f:
for n, record in enumerate(fasta_iter(f)):
sequence = record['sequence']
seq_len = len(sequence)
for n in range(0, seq_len + 1 - K):
kmer = sequence[n:n + K]
rc = "".join(TRANSLATE[c] for c in kmer[::-1])
hll.add(kmer)
if rc in counter:
kmer = rc
counter.update([kmer])
n_unique = len(counter)
assert n_unique == N_UNIQUE
assert abs(1 - float(hll.cardinality()) / N_UNIQUE) < ERR_RATE
def test_hll_consume_string():
# test rust code to count unique kmers using HyperLogLog,
# using screed to feed each read to the counter.
filename = utils.get_test_data('ecoli.genes.fna')
hll = HLL(ERR_RATE, K)
n_consumed = n = 0
with open(filename) as f:
for n, record in enumerate(fasta_iter(f), 1):
hll.add_sequence(record['sequence'])
assert abs(1 - float(len(hll)) / N_UNIQUE) < ERR_RATE
def test_hll_similarity_containment():
N_UNIQUE_H1 = 500741
N_UNIQUE_H2 = 995845
N_UNIQUE_U = 995845
SIMILARITY = 0.502783
CONTAINMENT_H1 = 1.
CONTAINMENT_H2 = 0.502783
INTERSECTION = 500838
hll1 = HLL(ERR_RATE, K)
hll2 = HLL(ERR_RATE, K)
hllu = HLL(ERR_RATE, K)
filename = utils.get_test_data('genome-s10.fa.gz')
with gzip.GzipFile(filename) as f:
for n, record in enumerate(fasta_iter(f)):
sequence = record['sequence']
seq_len = len(sequence)
for n in range(0, seq_len + 1 - K):
kmer = sequence[n:n + K]
hll1.add(kmer)
hllu.add(kmer)
filename = utils.get_test_data('genome-s10+s11.fa.gz')
with gzip.GzipFile(filename) as f:
for n, record in enumerate(fasta_iter(f)):
sequence = record['sequence']
seq_len = len(sequence)
for n in range(0, seq_len + 1 - K):
kmer = sequence[n:n + K]
hll2.add(kmer)
hllu.add(kmer)
assert abs(1 - float(hll1.cardinality()) / N_UNIQUE_H1) < ERR_RATE
assert abs(1 - float(hll2.cardinality()) / N_UNIQUE_H2) < ERR_RATE
assert abs(1 - float(hll1.similarity(hll2)) / SIMILARITY) < ERR_RATE
assert abs(1 - float(hll1.containment(hll2)) / CONTAINMENT_H1) < ERR_RATE
assert abs(1 - float(hll2.containment(hll1)) / CONTAINMENT_H2) < ERR_RATE
assert abs(1 - float(hll1.intersection(hll2)) / INTERSECTION) < ERR_RATE
"""
hll1.merge(hll2)
assert abs(1 - float(hllu.similarity(hll1))) < ERR_RATE
assert abs(1 - float(hllu.containment(hll1))) < ERR_RATE
assert abs(1 - float(hllu.containment(hll2))) < ERR_RATE
assert abs(1 - float(hll1.intersection(hllu)) / N_UNIQUE_U) < ERR_RATE
"""
def test_hll_save_load():
filename = utils.get_test_data('ecoli.genes.fna')
hll = HLL(ERR_RATE, K)
n_consumed = n = 0
with open(filename) as f:
for n, record in enumerate(fasta_iter(f), 1):
hll.add_sequence(record['sequence'])
assert abs(1 - float(len(hll)) / N_UNIQUE) < ERR_RATE
with NamedTemporaryFile() as f:
hll.save(f.name)
new_hll = HLL.load(f.name)
assert len(hll) == len(new_hll)