Skip to content

Commit

Permalink
Merge pull request #1848 from tigerbeetle/batiati-query-engine
Browse files Browse the repository at this point in the history
Query engine 1/N
  • Loading branch information
batiati committed Apr 25, 2024
2 parents 78ef338 + 6a5ec42 commit 7512b69
Show file tree
Hide file tree
Showing 13 changed files with 1,669 additions and 492 deletions.
2 changes: 1 addition & 1 deletion build.zig
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ pub fn build(b: *std.Build) !void {
.optimize = mode,
});
if (mode == .ReleaseSafe) {
tigerbeetle.strip = true;
tigerbeetle.strip = tracer_backend == .none;
}
if (emit_llvm_ir) {
_ = tigerbeetle.getEmittedLlvmIr();
Expand Down
158 changes: 94 additions & 64 deletions src/lsm/forest_fuzz.zig
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ const StateMachine = @import("../state_machine.zig").StateMachineType(Storage, c
const Reservation = @import("../vsr/free_set.zig").Reservation;
const GridType = @import("../vsr/grid.zig").GridType;
const GrooveType = @import("groove.zig").GrooveType;
const ScanBuffer = @import("../lsm/scan_buffer.zig").ScanBuffer;
const ScanRangeType = @import("../lsm/scan_range.zig").ScanRangeType;
const EvaluateNext = @import("../lsm/scan_range.zig").EvaluateNext;
const ScanLookupType = @import("../lsm/scan_lookup.zig").ScanLookupType;
const TimestampRange = @import("timestamp_range.zig").TimestampRange;
const Direction = @import("../direction.zig").Direction;
Expand Down Expand Up @@ -123,7 +126,7 @@ const Environment = struct {
forest: Forest,
checkpoint_op: ?u64,
ticks_remaining: usize,
scan_account_buffer: []tb.Account,
scan_lookup_buffer: []tb.Account,

fn init(env: *Environment, storage: *Storage) !void {
env.storage = storage;
Expand All @@ -139,7 +142,7 @@ const Environment = struct {
.missing_tables_max = 0,
});

env.scan_account_buffer = try allocator.alloc(
env.scan_lookup_buffer = try allocator.alloc(
tb.Account,
StateMachine.constants.batch_max.create_accounts,
);
Expand All @@ -152,7 +155,7 @@ const Environment = struct {
fn deinit(env: *Environment) void {
env.superblock.deinit(allocator);
env.grid.deinit(allocator);
allocator.free(env.scan_account_buffer);
allocator.free(env.scan_lookup_buffer);
}

pub fn run(storage: *Storage, fuzz_ops: []const FuzzOp) !void {
Expand Down Expand Up @@ -368,73 +371,99 @@ const Environment = struct {
return account;
}

const Scanner = struct {
const AccountsScanLookup = ScanLookupType(
fn ScannerIndexType(comptime index: std.meta.FieldEnum(GrooveAccounts.IndexTrees)) type {
const Tree = std.meta.fieldInfo(GrooveAccounts.IndexTrees, index).type;
const Value = Tree.Table.Value;
const Prefix = std.meta.fieldInfo(Value, .field).type;

const ScanRange = ScanRangeType(
Tree,
Storage,
void,
struct {
inline fn value_next(_: void, _: *const Value) EvaluateNext {
return .include_and_continue;
}
}.value_next,
struct {
inline fn timestamp_from_value(_: void, value: *const Value) u64 {
return value.timestamp;
}
}.timestamp_from_value,
);

const ScanLookup = ScanLookupType(
GrooveAccounts,
GrooveAccounts.ScanBuilder.Scan,
ScanRange,
Storage,
);
scan_lookup: AccountsScanLookup = undefined,
result: ?[]const tb.Account = null,

fn scan_lookup_callback(scan_lookup: *AccountsScanLookup) void {
const scanner = @fieldParentPtr(Scanner, "scan_lookup", scan_lookup);
assert(scanner.result == null);
scanner.result = scan_lookup.slice();
}
return struct {
const Self = @This();

lookup: ScanLookup = undefined,
result: ?[]const tb.Account = null,

fn scan(
self: *Self,
env: *Environment,
params: ScanParams,
) ![]const tb.Account {
var min: Prefix = @intCast(params.min);
var max: Prefix = @intCast(params.max);
assert(min <= max);

const scan_buffer_pool = &env.forest.scan_buffer_pool;
const groove_accounts = &env.forest.grooves.accounts;
defer scan_buffer_pool.reset();

// It's not expected to exceed `lsm_scans_max` here.
const scan_buffer = scan_buffer_pool.acquire() catch unreachable;

var scan_range = ScanRange.init(
{},
&@field(groove_accounts.indexes, @tagName(index)),
scan_buffer,
lsm.snapshot_latest,
Value.key_from_value(&.{
.field = min,
.timestamp = TimestampRange.timestamp_min,
}),
Value.key_from_value(&.{
.field = max,
.timestamp = TimestampRange.timestamp_max,
}),
params.direction,
);

self.lookup = ScanLookup.init(groove_accounts, &scan_range);
self.lookup.read(env.scan_lookup_buffer, &scan_lookup_callback);

while (self.result == null) {
if (env.ticks_remaining == 0) return error.OutOfTicks;
env.ticks_remaining -= 1;
env.storage.tick();
}

fn scan(
self: *Scanner,
comptime index: std.meta.FieldEnum(GrooveAccounts.IndexTrees),
env: *Environment,
params: ScanParams,
) !void {
const Tree = std.meta.fieldInfo(GrooveAccounts.IndexTrees, index).type;
const Prefix = std.meta.fieldInfo(Tree.Table.Value, .field).type;

var min: Prefix = @intCast(params.min);
var max: Prefix = @intCast(params.max);
assert(min <= max);

const scan_buffer_pool = &env.forest.scan_buffer_pool;
const groove_accounts = &env.forest.grooves.accounts;
const scan_builder = &groove_accounts.scan_builder;
defer {
scan_buffer_pool.reset();
scan_builder.reset();
return self.result.?;
}

// It's not expected to exceed `lsm_scans_max` here.
const scan_buffer = scan_buffer_pool.acquire() catch unreachable;

// TODO: add support for timestamp ranges.
const scan_range = scan_builder.scan_range(
index,
scan_buffer,
lsm.snapshot_latest,
.{ .field = min, .timestamp = 0 },
.{ .field = max, .timestamp = std.math.maxInt(u63) },
params.direction,
);

self.scan_lookup = AccountsScanLookup.init(groove_accounts, scan_range);
self.scan_lookup.read(env.scan_account_buffer, &scan_lookup_callback);

while (self.result == null) {
if (env.ticks_remaining == 0) return error.OutOfTicks;
env.ticks_remaining -= 1;
env.storage.tick();
fn scan_lookup_callback(lookup: *ScanLookup, result: []const tb.Account) void {
const self = @fieldParentPtr(Self, "lookup", lookup);
assert(self.result == null);
self.result = result;
}
}
};

fn scan_accounts(env: *Environment, params: ScanParams) ![]const tb.Account {
var scanner: Scanner = .{};
try switch (params.index) {
inline else => |field| scanner.scan(field, env, params),
};
}

return scanner.result.?;
fn scan_accounts(env: *Environment, params: ScanParams) ![]const tb.Account {
switch (params.index) {
inline else => |index| {
const Scanner = ScannerIndexType(index);
var scanner = Scanner{};
return try scanner.scan(env, params);
},
}
}

// The forest should behave like a simple key-value data-structure.
Expand Down Expand Up @@ -629,6 +658,11 @@ const Environment = struct {
// Asserting the positive space:
// all objects found by the scan must exist in our model.
for (accounts) |account| {
const prefix_current: u128 = switch (params.index) {
inline else => |field| @field(account, @tagName(field)),
};
assert(prefix_current >= params.min and prefix_current <= params.max);

const model_account = model.get_account(account.id).?;
assert(model_account.id == account.id);
assert(model_account.user_data_128 == account.user_data_128);
Expand All @@ -650,10 +684,6 @@ const Environment = struct {
timestamp_last = account.timestamp;
} else {
// If not exact, it's expected to be sorted by prefix and then timestamp.
const prefix_current: u128 = switch (params.index) {
inline else => |field| @field(account, @tagName(field)),
};

if (prefix_last) |prefix| {
// If range (between min .. max), it's expected to be sorted by prefix.
switch (params.direction) {
Expand Down
Loading

0 comments on commit 7512b69

Please sign in to comment.