-
-
Notifications
You must be signed in to change notification settings - Fork 626
/
fast_field_range_query.rs
179 lines (163 loc) · 5.51 KB
/
fast_field_range_query.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
use core::fmt::Debug;
use std::ops::RangeInclusive;
use columnar::Column;
use crate::{DocId, DocSet, TERMINATED};
/// Helper to have a cursor over a vec of docids
#[derive(Debug)]
struct VecCursor {
docs: Vec<u32>,
current_pos: usize,
}
impl VecCursor {
fn new() -> Self {
Self {
docs: Vec::with_capacity(32),
current_pos: 0,
}
}
fn next(&mut self) -> Option<u32> {
self.current_pos += 1;
self.current()
}
#[inline]
fn current(&self) -> Option<u32> {
self.docs.get(self.current_pos).copied()
}
fn get_cleared_data(&mut self) -> &mut Vec<u32> {
self.docs.clear();
self.current_pos = 0;
&mut self.docs
}
fn last_value(&self) -> Option<u32> {
self.docs.iter().last().cloned()
}
fn is_empty(&self) -> bool {
self.current().is_none()
}
}
pub(crate) struct RangeDocSet<T> {
/// The range filter on the values.
value_range: RangeInclusive<T>,
column: Column<T>,
/// The next docid start range to fetch (inclusive).
next_fetch_start: u32,
/// Number of docs range checked in a batch.
///
/// There are two patterns.
/// - We do a full scan. => We can load large chunks. We don't know in advance if seek call
/// will come, so we start with small chunks
/// - We load docs, interspersed with seek calls. When there are big jumps in the seek, we
/// should load small chunks. When the seeks are small, we can employ the same strategy as on a
/// full scan.
fetch_horizon: u32,
/// Current batch of loaded docs.
loaded_docs: VecCursor,
last_seek_pos_opt: Option<u32>,
}
const DEFAULT_FETCH_HORIZON: u32 = 128;
impl<T: Send + Sync + PartialOrd + Copy + Debug + 'static> RangeDocSet<T> {
pub(crate) fn new(value_range: RangeInclusive<T>, column: Column<T>) -> Self {
let mut range_docset = Self {
value_range,
column,
loaded_docs: VecCursor::new(),
next_fetch_start: 0,
fetch_horizon: DEFAULT_FETCH_HORIZON,
last_seek_pos_opt: None,
};
range_docset.reset_fetch_range();
range_docset.fetch_block();
range_docset
}
fn reset_fetch_range(&mut self) {
self.fetch_horizon = DEFAULT_FETCH_HORIZON;
}
/// Returns true if more data could be fetched
fn fetch_block(&mut self) {
const MAX_HORIZON: u32 = 100_000;
while self.loaded_docs.is_empty() {
let finished_to_end = self.fetch_horizon(self.fetch_horizon);
if finished_to_end {
break;
}
// Fetch more data, increase horizon. Horizon only gets reset when doing a seek.
self.fetch_horizon = (self.fetch_horizon * 2).min(MAX_HORIZON);
}
}
/// check if the distance between the seek calls is large
fn is_last_seek_distance_large(&self, new_seek: DocId) -> bool {
if let Some(last_seek_pos) = self.last_seek_pos_opt {
(new_seek - last_seek_pos) >= 128
} else {
true
}
}
/// Fetches a block for docid range [next_fetch_start .. next_fetch_start + HORIZON]
fn fetch_horizon(&mut self, horizon: u32) -> bool {
let mut finished_to_end = false;
let limit = self.column.num_docs();
let mut end = self.next_fetch_start + horizon;
if end >= limit {
end = limit;
finished_to_end = true;
}
let last_value = self.loaded_docs.last_value();
let doc_buffer: &mut Vec<DocId> = self.loaded_docs.get_cleared_data();
self.column.get_docids_for_value_range(
self.value_range.clone(),
self.next_fetch_start..end,
doc_buffer,
);
if let Some(last_value) = last_value {
while self.loaded_docs.current() == Some(last_value) {
self.loaded_docs.next();
}
}
self.next_fetch_start = end;
finished_to_end
}
}
impl<T: Send + Sync + PartialOrd + Copy + Debug + 'static> DocSet for RangeDocSet<T> {
#[inline]
fn advance(&mut self) -> DocId {
if let Some(docid) = self.loaded_docs.next() {
return docid;
}
if self.next_fetch_start >= self.column.values.num_vals() {
return TERMINATED;
}
self.fetch_block();
self.loaded_docs.current().unwrap_or(TERMINATED)
}
#[inline]
fn doc(&self) -> DocId {
self.loaded_docs.current().unwrap_or(TERMINATED)
}
/// Advances the `DocSet` forward until reaching the target, or going to the
/// lowest [`DocId`] greater than the target.
///
/// If the end of the `DocSet` is reached, [`TERMINATED`] is returned.
///
/// Calling `.seek(target)` on a terminated `DocSet` is legal. Implementation
/// of `DocSet` should support it.
///
/// Calling `seek(TERMINATED)` is also legal and is the normal way to consume a `DocSet`.
fn seek(&mut self, target: DocId) -> DocId {
if self.is_last_seek_distance_large(target) {
self.reset_fetch_range();
}
if target > self.next_fetch_start {
self.next_fetch_start = target;
}
let mut doc = self.doc();
debug_assert!(doc <= target);
while doc < target {
doc = self.advance();
}
self.last_seek_pos_opt = Some(target);
doc
}
fn size_hint(&self) -> u32 {
0 // heuristic possible by checking number of hits when fetching a block
}
}