Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query engine 1/N #1848

Merged
merged 16 commits into from
Apr 25, 2024
8 changes: 4 additions & 4 deletions src/lsm/k_way_merge.zig
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub fn KWayMergeIteratorType(
/// The number of streams remaining in the iterator.
k: u32 = 0,

previous_key_popped: ?Key = null,
key_popped: ?Key = null,

pub fn init(
context: *Context,
Expand Down Expand Up @@ -82,7 +82,7 @@ pub fn KWayMergeIteratorType(
.streams_count = it.streams_count,
.direction = it.direction,
.state = .loading,
.previous_key_popped = it.previous_key_popped,
.key_popped = it.key_popped,
};
}

Expand Down Expand Up @@ -114,15 +114,15 @@ pub fn KWayMergeIteratorType(

while (try it.pop_heap()) |value| {
const key = key_from_value(&value);
if (it.previous_key_popped) |previous| {
if (it.key_popped) |previous| {
switch (std.math.order(previous, key)) {
.lt => assert(it.direction == .ascending),
// Discard this value and pop the next one.
.eq => continue,
.gt => assert(it.direction == .descending),
}
}
it.previous_key_popped = key;
it.key_popped = key;
return value;
}

Expand Down
6 changes: 3 additions & 3 deletions src/lsm/scan_merge.zig
Original file line number Diff line number Diff line change
Expand Up @@ -247,10 +247,10 @@ fn ScanMergeType(
if (self.merge_iterator) |*merge_iterator| {
// It's not expected to probe a scan that already produced a key equals
// or ahead the probe.
assert(merge_iterator.previous_key_popped == null or
assert(merge_iterator.key_popped == null or
switch (self.direction) {
.ascending => merge_iterator.previous_key_popped.? < timestamp,
.descending => merge_iterator.previous_key_popped.? > timestamp,
.ascending => merge_iterator.key_popped.? < timestamp,
.descending => merge_iterator.key_popped.? > timestamp,
});

// Once the underlying streams have been changed, the merge iterator needs
Expand Down
6 changes: 3 additions & 3 deletions src/lsm/scan_tree.zig
Original file line number Diff line number Diff line change
Expand Up @@ -355,10 +355,10 @@ pub fn ScanTreeType(

// It's not expected to probe a scan that already produced a key equals
// or ahead the probe.
assert(self.merge_iterator.?.previous_key_popped == null or
assert(self.merge_iterator.?.key_popped == null or
switch (self.direction) {
.ascending => self.merge_iterator.?.previous_key_popped.? < probe_key,
.descending => self.merge_iterator.?.previous_key_popped.? > probe_key,
.ascending => self.merge_iterator.?.key_popped.? < probe_key,
.descending => self.merge_iterator.?.key_popped.? > probe_key,
});

// Once the underlying streams have been changed, the merge iterator needs
Expand Down
50 changes: 31 additions & 19 deletions src/lsm/zig_zag_merge.zig
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ pub fn ZigZagMergeIteratorType(
context: *Context,
streams_count: u32,
direction: Direction,
previous_key_popped: ?Key = null,
probe_key_previous: ?Key = null,
key_popped: ?Key = null,

/// At least two scans are required for zig-zag merge.
pub fn init(
Expand Down Expand Up @@ -84,15 +85,15 @@ pub fn ZigZagMergeIteratorType(
}
}

if (it.previous_key_popped) |previous| {
if (it.key_popped) |previous| {
switch (std.math.order(previous, key)) {
.lt => assert(it.direction == .ascending),
// Duplicate values are not expected.
.eq => unreachable,
.gt => assert(it.direction == .descending),
}
}
it.previous_key_popped = key;
it.key_popped = key;

return value;
}
Expand Down Expand Up @@ -135,10 +136,15 @@ pub fn ZigZagMergeIteratorType(
}
};

if (switch (it.direction) {
.ascending => key > probe_key,
.descending => key < probe_key,
}) {
// The stream cannot regress.
assert(it.probe_key_previous == null or
key == it.probe_key_previous.? or
it.key_ahead(key, it.probe_key_previous.?));

// The keys matches, continuing to the next stream.
if (key == probe_key) continue;

if (it.key_ahead(key, probe_key)) {
// The stream is ahead, it will be the probe key,
// meaning all streams before must be probed.
probe_key = key;
Expand All @@ -147,15 +153,9 @@ pub fn ZigZagMergeIteratorType(
probing.setRangeValue(.{ .start = 0, .end = stream_index }, true);
probing.setIntersection(drained.complement());
batiati marked this conversation as resolved.
Show resolved Hide resolved
assert(!probing.isSet(stream_index));
} else if (switch (it.direction) {
.ascending => key < probe_key,
.descending => key > probe_key,
}) {
// The stream is behind.
probing.set(stream_index);
} else {
// The key matches.
assert(key == probe_key);
// The stream is behind and needs to be probed.
probing.set(stream_index);
}
}

Expand All @@ -175,13 +175,11 @@ pub fn ZigZagMergeIteratorType(
}
};

// After probed, the stream must either match the key or be ahead.
if (key == probe_key) {
probing.unset(stream_index);
} else {
assert(switch (it.direction) {
.ascending => key > probe_key,
.descending => key < probe_key,
});
assert(it.key_ahead(key, probe_key));
}
}
}
Expand All @@ -207,8 +205,22 @@ pub fn ZigZagMergeIteratorType(
}
}

// The iterator cannot regress.
assert(it.probe_key_previous == null or
probe_key == it.probe_key_previous.? or
it.key_ahead(probe_key, it.probe_key_previous.?));

it.probe_key_previous = probe_key;
return if (drained.count() == 0) probe_key else error.Drained;
}

/// Depending on the direction, returns true if key `a` is ahead of key `b`.
inline fn key_ahead(it: *const ZigZagMergeIterator, a: Key, b: Key) bool {
batiati marked this conversation as resolved.
Show resolved Hide resolved
return switch (it.direction) {
.ascending => a > b,
.descending => a < b,
};
}
};
}

Expand Down