Skip to content

Commit

Permalink
Multiversion: backport execing and hardcode 0.15.3 as having 0.15.4 a…
Browse files Browse the repository at this point in the history
…vailable

0.15.4 will be the first version with the ability to read the multiversion
metadata embedded in TigerBeetle. This presents a bit of a bootstrapping
problem:

* Operator replaces 0.15.3 with 0.15.4,
* 0.15.4 starts up, re-execs into 0.15.3,
* 0.15.3 knows nothing about 0.15.4 or how to check it's available, so we
  just hang.

Work around this by hardcoding that if 0.15.3 is present in a pack, 0.15.4
must be too.
  • Loading branch information
cb22 committed Jun 5, 2024
1 parent 73bbc1a commit 70598cc
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 29 deletions.
3 changes: 3 additions & 0 deletions src/io/darwin.zig
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,9 @@ pub const IO = struct {
const fd = try os.socket(family, sock_type | os.SOCK.NONBLOCK, protocol);
errdefer os.closeSocket(fd);

// darwin doesn't support SOCK_CLOEXEC.
_ = try os.fcntl(fd, os.F.SETFD, os.FD_CLOEXEC);

// darwin doesn't support os.MSG_NOSIGNAL, but instead a socket option to avoid SIGPIPE.
try os.setsockopt(fd, os.SOL.SOCKET, os.SO.NOSIGPIPE, &mem.toBytes(@as(c_int, 1)));
return fd;
Expand Down
2 changes: 1 addition & 1 deletion src/io/linux.zig
Original file line number Diff line number Diff line change
Expand Up @@ -972,7 +972,7 @@ pub const IO = struct {
/// Creates a socket that can be used for async operations with the IO instance.
pub fn open_socket(self: *IO, family: u32, sock_type: u32, protocol: u32) !os.socket_t {
_ = self;
return os.socket(family, sock_type, protocol);
return os.socket(family, sock_type | os.SOCK.CLOEXEC, protocol);
}

/// Opens a directory with read only access.
Expand Down
61 changes: 38 additions & 23 deletions src/io/windows.zig
Original file line number Diff line number Diff line change
Expand Up @@ -956,7 +956,7 @@ pub const IO = struct {

pub const INVALID_FILE = os.windows.INVALID_HANDLE_VALUE;

fn open_file_handle(relative_path: []const u8, method: enum { create, open }) !os.fd_t {
fn open_file_handle(relative_path: []const u8, method: enum { create, open, open_wait }) !os.fd_t {
const path_w = try os.windows.sliceToPrefixedFileW(relative_path);

// FILE_CREATE = O_CREAT | O_EXCL
Expand All @@ -970,6 +970,10 @@ pub const IO = struct {
creation_disposition = os.windows.OPEN_EXISTING;
log.info("opening \"{s}\"...", .{relative_path});
},
.open_wait => {
creation_disposition = os.windows.OPEN_EXISTING;
log.info("opening \"{s}\", retrying if locked...", .{relative_path});
},
}

// O_EXCL
Expand All @@ -991,29 +995,39 @@ pub const IO = struct {
// TODO: Add ReadFileEx/WriteFileEx support.
// Not currently needed for O_DIRECT disk IO.
// attributes |= os.windows.FILE_FLAG_OVERLAPPED;
while (true) {
const handle = os.windows.kernel32.CreateFileW(
path_w.span(),
access_mask,
shared_mode,
null, // no security attributes required
creation_disposition,
attributes,
null, // no existing template file
);

if (handle == os.windows.INVALID_HANDLE_VALUE) {
switch (os.windows.kernel32.GetLastError()) {
.FILE_NOT_FOUND => return error.FileNotFound,
.SHARING_VIOLATION => {
if (method == .open_wait) {
log.info("file is locked...", .{});
std.time.sleep(1 * std.time.ns_per_s);
continue;
} else {
return error.AccessDenied;
}
},
.ACCESS_DENIED => return error.AccessDenied,
else => |err| {
log.warn("CreateFileW(): {}", .{err});
return os.windows.unexpectedError(err);
},
}
}

const handle = os.windows.kernel32.CreateFileW(
path_w.span(),
access_mask,
shared_mode,
null, // no security attributes required
creation_disposition,
attributes,
null, // no existing template file
);

if (handle == os.windows.INVALID_HANDLE_VALUE) {
return switch (os.windows.kernel32.GetLastError()) {
.FILE_NOT_FOUND => error.FileNotFound,
.SHARING_VIOLATION, .ACCESS_DENIED => error.AccessDenied,
else => |err| {
log.warn("CreateFileW(): {}", .{err});
return os.windows.unexpectedError(err);
},
};
return handle;
}

return handle;
}

/// Opens or creates a journal file:
Expand All @@ -1028,13 +1042,14 @@ pub const IO = struct {
dir_handle: os.fd_t,
relative_path: []const u8,
size: u64,
method: enum { create, create_or_open, open },
method: enum { create, create_or_open, open, open_wait },
) !os.fd_t {
assert(relative_path.len > 0);
assert(size % constants.sector_size == 0);

const handle = switch (method) {
.open => try open_file_handle(relative_path, .open),
.open_wait => try open_file_handle(relative_path, .open_wait),
.create => try open_file_handle(relative_path, .create),
.create_or_open => open_file_handle(relative_path, .open) catch |err| switch (err) {
error.FileNotFound => try open_file_handle(relative_path, .create),
Expand Down
110 changes: 105 additions & 5 deletions src/tigerbeetle/main.zig
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ const SuperBlock = vsr.SuperBlockType(Storage);
const superblock_zone_size = vsr.superblock.superblock_zone_size;
const data_file_size_min = vsr.superblock.data_file_size_min;

const ReplicaReleaseExecuteType = if (builtin.target.os.tag == .windows)
?std.process.ArgIterator
else
void;

var replica_release_execute_args: ReplicaReleaseExecuteType =
if (builtin.target.os.tag == .windows) null;

pub const std_options = struct {
pub const log_level: std.log.Level = constants.log_level;
pub const logFn = constants.log;
Expand All @@ -59,6 +67,13 @@ pub fn main() !void {
var arg_iterator = try std.process.argsWithAllocator(allocator);
defer arg_iterator.deinit();

// On Windows, store a copy of the arg iterator, so it can be used later to determine the
// binary path.
if (builtin.target.os.tag == .windows) replica_release_execute_args = arg_iterator;
defer if (builtin.target.os.tag == .windows) {
replica_release_execute_args = null;
};

var command = try cli.parse_args(allocator, &arg_iterator);
defer command.deinit(allocator);

Expand Down Expand Up @@ -96,7 +111,17 @@ const Command = struct {
errdefer os.close(command.dir_fd);

const basename = std.fs.path.basename(path);
command.fd = try IO.open_file(command.dir_fd, basename, data_file_size_min, if (must_create) .create else .open);
command.fd = try IO.open_file(
command.dir_fd,
basename,
data_file_size_min,

// Unlike on Linux / macOS which use `exec` for multiversion binaries, Windows has to
// use CreateProcess. This is a problem, because it's a race between the parent
// process exiting and the new process starting. Work around this by waiting for the
// data file to be unlocked, rather than erroring immediately.
if (must_create) .create else if (builtin.os.tag == .windows) .open_wait else .open,
);
errdefer os.close(command.fd);

command.io = try IO.init(128, 0);
Expand Down Expand Up @@ -148,7 +173,6 @@ const Command = struct {

// TODO Panic if the data file's size is larger that args.storage_size_limit.
// (Here or in Replica.open()?).

const allocator = traced_allocator.allocator();

var command: Command = undefined;
Expand Down Expand Up @@ -186,13 +210,25 @@ const Command = struct {
const nonce = std.crypto.random.int(u128);
assert(nonce != 0); // Broken CSPRNG is the likeliest explanation for zero.

// Ensure that this code is only ever built as 0.15.3.
comptime assert(
config.process.release.value == vsr.Release.from(.{
.major = 0,
.minor = 15,
.patch = 3,
}).value,
);

var replica: Replica = undefined;
replica.open(allocator, .{
.node_count = @intCast(args.addresses.len),
.release = config.process.release,
// TODO Where should this be set?
.release_client_min = config.process.release,
.releases_bundled = &[_]vsr.Release{config.process.release},
.releases_bundled = &[_]vsr.Release{
config.process.release,
vsr.Release{ .value = config.process.release.value + 1 },
},
.release_execute = replica_release_execute,
.storage_size_limit = args.storage_size_limit,
.storage = &command.storage,
Expand Down Expand Up @@ -348,8 +384,72 @@ fn replica_release_execute(replica: *Replica, release: vsr.Release) noreturn {
@panic("release_execute: binary missing required version");
}

// TODO(Multiversioning) Exec into the new release.
unreachable;
// This is a custom build for the epoch of multiversioning. The binary_path will always be
// argv[0] since this binary would never have been executed by a user directly, and our caller
// will provide an absolute path there.
switch (builtin.os.tag) {
.macos, .linux => {
var binary_path = std.os.argv[0];
assert(std.fs.path.isAbsolute(std.mem.span(binary_path)));

// We can pass through our env and args as-is to exec. We have to manipulate the types
// here somewhat: they're cast in start.zig and we can't access `argc_argv_ptr`
// directly.
// process.zig does the same trick in execve().
const cast_args: [*:null]const ?[*:0]const u8 = @ptrCast(std.os.argv.ptr);
const cast_envp: [*:null]const ?[*:0]const u8 = @ptrCast(std.os.environ.ptr);

std.log.info("replica_release_execute: executing '{s}'...\n", .{binary_path});
std.os.execveZ(binary_path, cast_args, cast_envp) catch @panic("execveZ failed");
unreachable;
},
.windows => {
var binary_path = replica_release_execute_args.?.next().?;
assert(std.fs.path.isAbsolute(binary_path));

// Includes the null byte, that utf8ToUtf16LeWithNull needs.
var buffer: [std.fs.MAX_PATH_BYTES]u8 = undefined;
var fixed_allocator = std.heap.FixedBufferAllocator.init(&buffer);
const allocator = fixed_allocator.allocator();

const binary_path_w = std.unicode.utf8ToUtf16LeWithNull(allocator, binary_path) catch
unreachable;
defer allocator.free(binary_path_w);

// "The Unicode version of this function, CreateProcessW, can modify the contents of
// this string. Therefore, this parameter cannot be a pointer to read-only memory (such
// as a const variable or a literal string). If this parameter is a constant string,
// the function may cause an access violation."
//
// That said, with how CreateProcessW is called, this should _never_ happen, since its
// both provided a full lpApplicationName, and because GetCommandLineW actually points
// to a copy of memory from the PEB.
const cmd_line_w = os.windows.kernel32.GetCommandLineW();

var lp_startup_info = std.mem.zeroes(std.os.windows.STARTUPINFOW);
lp_startup_info.cb = @sizeOf(std.os.windows.STARTUPINFOW);

var lp_process_information: std.os.windows.PROCESS_INFORMATION = undefined;

// If bInheritHandles is FALSE, and dwFlags inside STARTUPINFOW doesn't have
// STARTF_USESTDHANDLES set, the stdin/stdout/stderr handles of the parent will
// be passed through to the child.
std.os.windows.CreateProcessW(
binary_path_w,
cmd_line_w,
null,
null,
std.os.windows.FALSE,
std.os.windows.CREATE_UNICODE_ENVIRONMENT,
null,
null,
&lp_startup_info,
&lp_process_information,
) catch @panic("error creating process");
os.exit(0);
},
else => @panic("unsupported platform"),
}
}

fn print_value(
Expand Down

0 comments on commit 70598cc

Please sign in to comment.