From 35c7dcb0a1c525e2e2bc4d7348996a52fc13ffba Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Wed, 8 Oct 2025 12:17:46 -0400 Subject: [PATCH 1/5] Add `osiris_log_reader` abstraction over offset readers This change introduces a behaviour `osiris_log_reader` which can be implemented externally to read from a stream at a given offset spec. This closes over the high-level reading operations `send_file/3` and `chunk_iterator/3`. `osiris:init_reader/4` selects the reader module based on application env, and then callers use `osiris_log_reader` to interact with the reader. By default all of these functions delegate to `osiris_log`. `osiris_log` doesn't need any meaningful changes this way. The only change is to expose the `header_map()` type. --- src/osiris.erl | 11 ++--- src/osiris_log.erl | 3 +- src/osiris_log_reader.erl | 89 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 94 insertions(+), 9 deletions(-) create mode 100644 src/osiris_log_reader.erl diff --git a/src/osiris.erl b/src/osiris.erl index 8d95d04..2c1c1f5 100644 --- a/src/osiris.erl +++ b/src/osiris.erl @@ -80,11 +80,6 @@ %% returned when reading -type entry() :: binary() | batch(). --type reader_options() :: #{transport => tcp | ssl, - chunk_selector => all | user_data, - filter_spec => osiris_bloom:filter_spec(), - read_ahead => boolean() | non_neg_integer() - }. -export_type([name/0, config/0, @@ -225,8 +220,8 @@ init_reader(Pid, OffsetSpec, CounterSpec) -> chunk_selector => user_data}). -spec init_reader(pid(), offset_spec(), osiris_log:counter_spec(), - reader_options()) -> - {ok, osiris_log:state()} | + osiris_log_reader:options()) -> + {ok, osiris_log_reader:state()} | {error, {offset_out_of_range, empty | {offset(), offset()}}} | {error, {invalid_last_offset_epoch, offset(), offset()}}. init_reader(Pid, OffsetSpec, {_, _} = CounterSpec, Options) @@ -235,7 +230,7 @@ init_reader(Pid, OffsetSpec, {_, _} = CounterSpec, Options) Ctx0 = osiris_util:get_reader_context(Pid), Ctx = Ctx0#{counter_spec => CounterSpec, options => Options}, - osiris_log:init_offset_reader(OffsetSpec, Ctx). + (osiris_log_reader:module()):init_offset_reader(OffsetSpec, Ctx). -spec register_offset_listener(pid(), offset()) -> ok. register_offset_listener(Pid, Offset) -> diff --git a/src/osiris_log.erl b/src/osiris_log.erl index 4a339b1..adcd1d0 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -477,7 +477,8 @@ range/0, config/0, counter_spec/0, - transport/0]). + transport/0, + header_map/0]). -spec directory(osiris:config() | list()) -> file:filename_all(). directory(#{name := Name, dir := Dir}) -> diff --git a/src/osiris_log_reader.erl b/src/osiris_log_reader.erl new file mode 100644 index 0000000..3680bb8 --- /dev/null +++ b/src/osiris_log_reader.erl @@ -0,0 +1,89 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. +%% + +-module(osiris_log_reader). + +-export([next_offset/1, + committed_offset/1, + close/1, + send_file/2, + send_file/3, + chunk_iterator/2, + chunk_iterator/3, + iterator_next/1]). + +%% Exported for internal usage +-export([module/0]). + +-type options() :: #{transport => tcp | ssl, + chunk_selector => all | user_data, + filter_spec => osiris_bloom:filter_spec(), + read_ahead => boolean() | non_neg_integer() + }. +-type config() :: + osiris:config() | + #{counter_spec := osiris_log:counter_spec(), + options := options()}. +-type state() :: term(). +-type chunk_iterator() :: {module(), term()}. +-type send_file_callback() :: + fun((osiris_log:header_map(), BytesToSend :: non_neg_integer()) -> + PrefixData :: binary()). + +-export_type([options/0, + config/0, + state/0, + chunk_iterator/0]). + +-callback init_offset_reader(osiris:offset_spec(), config()) -> + {ok, state()} | + {error, term()}. + +-callback next_offset(state()) -> osiris:offset(). +-callback committed_offset(state()) -> osiris:offset(). +-callback close(state()) -> ok. + +-callback send_file(gen_tcp:socket() | ssl:socket(), state(), + send_file_callback()) -> + {ok, state()} | + {error, term()} | + {end_of_stream, state()}. + +-callback chunk_iterator(state(), + Credit :: pos_integer() | all, + PrevIter :: chunk_iterator() | undefined) -> + {ok, osiris_log:header_map(), chunk_iterator(), state()} | + {end_of_stream, state()} | + {error, term()}. + +-callback iterator_next(chunk_iterator()) -> + {{osiris:offset(), osiris:entry()}, chunk_iterator()} | + end_of_chunk. + +next_offset(State) -> + (module()):?FUNCTION_NAME(State). +committed_offset(State) -> + (module()):?FUNCTION_NAME(State). +close(State) -> + (module()):?FUNCTION_NAME(State). + +send_file(Socket, State) -> + send_file(Socket, State, fun(_, _) -> <<>> end). +send_file(Socket, State, Callback) -> + (module()):?FUNCTION_NAME(Socket, State, Callback). + +chunk_iterator(State, Credit) -> + chunk_iterator(State, Credit, undefined). +chunk_iterator(State, Credit, PrevChunkIterator) -> + (module()):?FUNCTION_NAME(State, Credit, PrevChunkIterator). + +iterator_next(Iterator) -> + (module()):?FUNCTION_NAME(Iterator). + +%% @private +module() -> + application:get_env(osiris, log_reader, osiris_log). From e674d4c42c947aa3a6071b998a98992076be83f6 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Tue, 16 Sep 2025 14:22:31 -0400 Subject: [PATCH 2/5] Add a `{'fun', fun()}` retention spec This can be used flexibly to evaluate retention depending on the name or contents of index files. You could pass in a function which would return true until the offset in the index file's name exceeds some given offset for example, as a way to truncate everything up to that offset or to guarantee that the offset (for example an uncommitted one) won't be truncated. --- src/osiris.erl | 5 ++++- src/osiris_log.erl | 28 ++++++++++++++++++++-------- test/osiris_log_SUITE.erl | 30 ++++++++++++++++++++++++++++++ 3 files changed, 54 insertions(+), 9 deletions(-) diff --git a/src/osiris.erl b/src/osiris.erl index 2c1c1f5..a883039 100644 --- a/src/osiris.erl +++ b/src/osiris.erl @@ -66,8 +66,11 @@ {abs, offset()} | offset() | {timestamp, timestamp()}. +-type retention_fun() :: fun((IdxFile :: file:filename_all()) -> boolean()). -type retention_spec() :: - {max_bytes, non_neg_integer()} | {max_age, milliseconds()}. + {max_bytes, non_neg_integer()} | + {max_age, milliseconds()} | + {'fun', retention_fun()}. -type writer_id() :: binary(). -type batch() :: {batch, NumRecords :: non_neg_integer(), compression_type(), diff --git a/src/osiris_log.erl b/src/osiris_log.erl index adcd1d0..79e5f39 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -2206,14 +2206,15 @@ evaluate_retention(Dir, Specs) when is_binary(Dir) -> ?DEBUG_(<<>>," (~w) completed in ~fms", [Specs, Time/1_000]), Result. -evaluate_retention0(IdxFiles, []) -> - IdxFiles; -evaluate_retention0(IdxFiles, [{max_bytes, MaxSize} | Specs]) -> - RemIdxFiles = eval_max_bytes(IdxFiles, MaxSize), - evaluate_retention0(RemIdxFiles, Specs); -evaluate_retention0(IdxFiles, [{max_age, Age} | Specs]) -> - RemIdxFiles = eval_age(IdxFiles, Age), - evaluate_retention0(RemIdxFiles, Specs). +evaluate_retention0(IdxFiles, Specs) -> + lists:foldl( + fun ({max_bytes, MaxSize}, RemIdxFiles) -> + eval_max_bytes(RemIdxFiles, MaxSize); + ({max_age, Age}, RemIdxFiles) -> + eval_age(RemIdxFiles, Age); + ({'fun', Fun}, RemIdxFiles) -> + eval_retention_fun(RemIdxFiles, Fun) + end, IdxFiles, Specs). eval_age([_] = IdxFiles, _Age) -> IdxFiles; @@ -2278,6 +2279,17 @@ file_size_or_zero(Path) -> 0 end. +eval_retention_fun([], _) -> + []; +eval_retention_fun([IdxFile | Rest], Fun) -> + case Fun(IdxFile) of + true -> + ok = delete_segment_from_index(IdxFile), + eval_retention_fun(Rest, Fun); + false -> + Rest + end. + last_epoch_chunk_ids(Name, IdxFiles) -> T1 = erlang:monotonic_time(), %% no need to filter out empty index files as diff --git a/test/osiris_log_SUITE.erl b/test/osiris_log_SUITE.erl index baceac1..d769484 100644 --- a/test/osiris_log_SUITE.erl +++ b/test/osiris_log_SUITE.erl @@ -86,6 +86,7 @@ all_tests() -> evaluate_retention_max_bytes, evaluate_retention_max_age, evaluate_retention_max_age_empty, + evaluate_retention_fun, offset_tracking, offset_tracking_snapshot, many_segment_overview, @@ -1699,6 +1700,35 @@ evaluate_retention_max_age(Config) -> "the retention process didn't delete the oldest segment"), ok. +evaluate_retention_fun(Config) -> + Data = crypto:strong_rand_bytes(1500), + EpochChunks = + [begin {1, [Data || _ <- lists:seq(1, 50)]} end + || _ <- lists:seq(1, 20)], + LDir = ?config(leader_dir, Config), + Log = seed_log(LDir, EpochChunks, Config), + osiris_log:close(Log), + %% delete only the first segment + Fun = fun(IdxFile) -> + ct:pal("Eval retention for ~p", [IdxFile]), + filename:basename(IdxFile) =:= <<"00000000000000000000.index">> + end, + Spec = {'fun', Fun}, + Range = osiris_log:evaluate_retention(LDir, [Spec]), + %% idempotency check + Range = osiris_log:evaluate_retention(LDir, [Spec]), + SegFiles = + filelib:wildcard( + filename:join(LDir, "*.segment")), + ?assertMatch([_], SegFiles), + ?assertEqual([], + lists:filter(fun(S) -> + lists:suffix("00000000000000000000.segment", S) + end, + SegFiles), + "the retention process didn't delete the oldest segment"), + ok. + offset_tracking(Config) -> Conf = ?config(osiris_conf, Config), S0 = osiris_log:init(Conf), From 11381d9080998b119227bdcf93d9906a4ed464e2 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Wed, 8 Oct 2025 13:38:33 -0400 Subject: [PATCH 3/5] Replace `osiris_log:overview/1` tuple with a map This is important for allowing other manifest implementations (see child commits) to include additional info in the map. The range and epoch offsets are the only required keys and other manifests might include other info. While this change is not complicated, it is breaking and needs backward compatibility clauses in callers of `overview/1` and `init_acceptor/3`. And it ends up moving a lot of code around, inflating the diff. --- src/osiris_log.erl | 22 ++-- src/osiris_replica.erl | 253 ++++++++++++++++++++------------------ src/osiris_writer.erl | 3 + test/osiris_log_SUITE.erl | 115 ++++++++--------- 4 files changed, 207 insertions(+), 186 deletions(-) diff --git a/src/osiris_log.erl b/src/osiris_log.erl index 79e5f39..f184749 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -12,7 +12,7 @@ -export([init/1, init/2, - init_acceptor/3, + init_acceptor/2, write/2, write/3, write/5, @@ -405,6 +405,9 @@ position => non_neg_integer(), next_position => non_neg_integer()}. -type transport() :: tcp | ssl. +-type overview() :: + #{range := range(), + epoch_offsets := [{epoch(), offset()}]}. %% holds static or rarely changing fields -record(cfg, @@ -478,7 +481,8 @@ config/0, counter_spec/0, transport/0, - header_map/0]). + header_map/0, + overview/0]). -spec directory(osiris:config() | list()) -> file:filename_all(). directory(#{name := Name, dir := Dir}) -> @@ -862,9 +866,9 @@ evaluate_tracking_snapshot(#?MODULE{mode = #write{type = writer}} = State0, Trk0 end. % -spec --spec init_acceptor(range(), list(), config()) -> +-spec init_acceptor(overview(), config()) -> state(). -init_acceptor(Range, EpochOffsets0, +init_acceptor(#{range := Range, epoch_offsets := EpochOffsets0}, #{name := Name, dir := Dir} = Conf) -> %% truncate to first common last epoch offset %% * if the last local chunk offset has the same epoch but is lower @@ -2116,7 +2120,7 @@ build_segment_info(SegFile, LastChunkPos, IdxFile) -> end. -spec overview(file:filename_all()) -> - {range(), [{epoch(), offset()}]}. + overview(). overview(Dir) -> Files = list_dir(Dir), %% index files with matching segment @@ -2125,11 +2129,11 @@ overview(Dir) -> %% explicitly filter these out case index_files_with_segment(lists:sort(Files), Dir, []) of [] -> - {empty, []}; + #{range => empty, + epoch_offsets => []}; IdxFiles -> - Range = offset_range_from_idx_files(IdxFiles), - EpochOffsets = last_epoch_chunk_ids(<<>>, IdxFiles), - {Range, EpochOffsets} + #{range => offset_range_from_idx_files(IdxFiles), + epoch_offsets => last_epoch_chunk_ids(<<>>, IdxFiles)} end. index_files_with_segment([], _, Acc) -> diff --git a/src/osiris_replica.erl b/src/osiris_replica.erl index 15bb95c..dd179ad 100644 --- a/src/osiris_replica.erl +++ b/src/osiris_replica.erl @@ -144,11 +144,10 @@ init(Config) -> {ok, undefined, {continue, Config}}. handle_continue(#{name := Name0, - epoch := Epoch, - leader_pid := LeaderPid, - reference := ExtRef} = Config, undefined) + leader_pid := LeaderPid} = Config0, undefined) when ?IS_STRING(Name0) -> Name = osiris_util:normalise_name(Name0), + Config = Config0#{name := Name}, process_flag(trap_exit, true), process_flag(message_queue_data, off_heap), Node = node(LeaderPid), @@ -173,125 +172,137 @@ handle_continue(#{name := Name0, {stop, {shutdown, writer_unavailable}, undefined}; {badrpc, Reason} -> {stop, {badrpc, Reason}, undefined}; + {ok, #{range := _, epoch_offsets := _} = LeaderOverview} -> + init_replica(LeaderOverview, Config); {ok, {LeaderRange, LeaderEpochOffs}} -> - {Min, Max} = application:get_env(osiris, port_range, - ?DEFAULT_PORT_RANGE), - Transport = application:get_env(osiris, replication_transport, tcp), - Self = self(), - CntName = {?MODULE, ExtRef}, - - Dir = osiris_log:directory(Config), - Log = osiris_log:init_acceptor(LeaderRange, LeaderEpochOffs, - Config#{dir => Dir, - counter_spec => - {CntName, ?ADD_COUNTER_FIELDS}}), - CntRef = osiris_log:counters_ref(Log), - {NextOffset, LastChunk} = TailInfo = osiris_log:tail_info(Log), - - case LastChunk of - empty -> - ok; - {_, LastChId, LastTs} -> - %% need to ack last chunk back to leader so that it can - %% re-discover the committed offset - osiris_writer:ack(LeaderPid, {LastChId, LastTs}) - end, - ?INFO_(Name, "osiris replica starting in epoch ~b, next offset ~b, tail info ~w", - [Epoch, NextOffset, TailInfo]), - - %% HostName: append the HostName to the Ip(s) list: in some cases - %% like NAT or redirect the local ip addresses are not enough. - %% ex: In docker with host network configuration the `inet:getaddrs` - %% are only the IP(s) inside docker but the dns lookup happens - %% outside the docker image (host machine). - %% The host name is the last to leave the compatibility. - %% See: rabbitmq/rabbitmq-server#3510 - {ok, HostName} = inet:gethostname(), - - %% Ips: are the first values used to connect the - %% replicas - {ok, Ips} = inet:getaddrs(HostName, inet), - - %% HostNameFromHost: The hostname value from RABBITMQ_NODENAME - %% can be different from the machine hostname. - %% In case of docker with bridge and extra_hosts the use case can be: - %% RABBITMQ_NODENAME=rabbit@my-domain - %% docker hostname = "114f4317c264" - %% the HostNameFromHost will be "my-domain". - %% btw 99% of the time the HostNameFromHost is equal to HostName. - %% see: rabbitmq/osiris/issues/53 for more details - HostNameFromHost = osiris_util:hostname_from_node(), - - IpsHosts = combine_ips_hosts(Transport, Ips, HostName, - HostNameFromHost), - - Token = crypto:strong_rand_bytes(?TOKEN_SIZE), - ?DEBUG_(Name, "replica resolved host endpoints: ~0p", [IpsHosts]), - {Port, LSock} = open_listener(Transport, {Min, Max}, 0), - ?DEBUG_(Name, "replica listening on port '~b' using transport ~s", - [Port, Transport]), - Acceptor = spawn_link(fun() -> accept(Name, Transport, LSock, Self) end), - ?DEBUG_(Name, "starting replica reader on node '~w'", [Node]), - - ReplicaReaderConf = #{hosts => IpsHosts, - port => Port, - transport => Transport, - name => Name, - replica_pid => self(), - leader_pid => LeaderPid, - start_offset => TailInfo, - reference => ExtRef, - connection_token => Token}, - case osiris_replica_reader:start(Node, ReplicaReaderConf) of - {ok, RRPid} -> - true = link(RRPid), - ?DEBUG_(Name, "started replica reader on node '~w'", [Node]), - GcInterval0 = application:get_env(osiris, - replica_forced_gc_default_interval, - 4999), - - GcInterval1 = case is_integer(GcInterval0) of - true -> - _ = erlang:send_after(GcInterval0, self(), force_gc), - GcInterval0; - false -> - infinity - end, - counters:put(CntRef, ?C_COMMITTED_OFFSET, -1), - counters:put(CntRef, ?C_EPOCH, Epoch), - Shared = osiris_log:get_shared(Log), - osiris_util:cache_reader_context(self(), Dir, Name, Shared, ExtRef, - fun(Inc) -> - counters:add(CntRef, ?C_READERS, Inc) - end), - EvtFmt = maps:get(event_formatter, Config, undefined), - {noreply, - #?MODULE{cfg = - #cfg{name = Name, - leader_pid = LeaderPid, - acceptor_pid = Acceptor, - replica_reader_pid = RRPid, - directory = Dir, - port = Port, - gc_interval = GcInterval1, - reference = ExtRef, - event_formatter = EvtFmt, - counter = CntRef, - token = Token, - transport = Transport}, - log = Log, - parse_state = undefined}}; - {error, {connection_refused = R, _}} -> - %% we don't log details for connection_refused, - %% they are already in the logs of the other node - ?WARN_(Name, "failed to start replica reader on node '~w'. " - "Reason ~0p.", [Node, R]), - {stop, {shutdown, R}, undefined}; - {error, Reason} -> - ?WARN_(Name, "failed to start replica reader on node '~w'. " - "Reason ~0p.", [Node, Reason]), - {stop, {shutdown, Reason}, undefined} - end + %% backwards compatibility + init_replica(#{range => LeaderRange, + epoch_offsets => LeaderEpochOffs}, + Config) + end. + +init_replica(LeaderOverview, #{name := Name, + epoch := Epoch, + leader_pid := LeaderPid, + reference := ExtRef} = Config) -> + {Min, Max} = application:get_env(osiris, port_range, + ?DEFAULT_PORT_RANGE), + Transport = application:get_env(osiris, replication_transport, tcp), + Self = self(), + CntName = {?MODULE, ExtRef}, + + Dir = osiris_log:directory(Config), + Log = osiris_log:init_acceptor(LeaderOverview, + Config#{dir => Dir, + counter_spec => + {CntName, ?ADD_COUNTER_FIELDS}}), + CntRef = osiris_log:counters_ref(Log), + {NextOffset, LastChunk} = TailInfo = osiris_log:tail_info(Log), + + case LastChunk of + empty -> + ok; + {_, LastChId, LastTs} -> + %% need to ack last chunk back to leader so that it can + %% re-discover the committed offset + osiris_writer:ack(LeaderPid, {LastChId, LastTs}) + end, + ?INFO_(Name, "osiris replica starting in epoch ~b, next offset ~b, tail info ~w", + [Epoch, NextOffset, TailInfo]), + + %% HostName: append the HostName to the Ip(s) list: in some cases + %% like NAT or redirect the local ip addresses are not enough. + %% ex: In docker with host network configuration the `inet:getaddrs` + %% are only the IP(s) inside docker but the dns lookup happens + %% outside the docker image (host machine). + %% The host name is the last to leave the compatibility. + %% See: rabbitmq/rabbitmq-server#3510 + {ok, HostName} = inet:gethostname(), + + %% Ips: are the first values used to connect the + %% replicas + {ok, Ips} = inet:getaddrs(HostName, inet), + + %% HostNameFromHost: The hostname value from RABBITMQ_NODENAME + %% can be different from the machine hostname. + %% In case of docker with bridge and extra_hosts the use case can be: + %% RABBITMQ_NODENAME=rabbit@my-domain + %% docker hostname = "114f4317c264" + %% the HostNameFromHost will be "my-domain". + %% btw 99% of the time the HostNameFromHost is equal to HostName. + %% see: rabbitmq/osiris/issues/53 for more details + HostNameFromHost = osiris_util:hostname_from_node(), + + IpsHosts = combine_ips_hosts(Transport, Ips, HostName, + HostNameFromHost), + + Token = crypto:strong_rand_bytes(?TOKEN_SIZE), + ?DEBUG_(Name, "replica resolved host endpoints: ~0p", [IpsHosts]), + {Port, LSock} = open_listener(Transport, {Min, Max}, 0), + ?DEBUG_(Name, "replica listening on port '~b' using transport ~s", + [Port, Transport]), + Acceptor = spawn_link(fun() -> accept(Name, Transport, LSock, Self) end), + Node = node(LeaderPid), + ?DEBUG_(Name, "starting replica reader on node '~w'", [Node]), + + ReplicaReaderConf = #{hosts => IpsHosts, + port => Port, + transport => Transport, + name => Name, + replica_pid => self(), + leader_pid => LeaderPid, + start_offset => TailInfo, + reference => ExtRef, + connection_token => Token}, + case osiris_replica_reader:start(Node, ReplicaReaderConf) of + {ok, RRPid} -> + true = link(RRPid), + ?DEBUG_(Name, "started replica reader on node '~w'", [Node]), + GcInterval0 = application:get_env(osiris, + replica_forced_gc_default_interval, + 4999), + + GcInterval1 = case is_integer(GcInterval0) of + true -> + _ = erlang:send_after(GcInterval0, self(), force_gc), + GcInterval0; + false -> + infinity + end, + counters:put(CntRef, ?C_COMMITTED_OFFSET, -1), + counters:put(CntRef, ?C_EPOCH, Epoch), + Shared = osiris_log:get_shared(Log), + osiris_util:cache_reader_context(self(), Dir, Name, Shared, ExtRef, + fun(Inc) -> + counters:add(CntRef, ?C_READERS, Inc) + end), + EvtFmt = maps:get(event_formatter, Config, undefined), + {noreply, + #?MODULE{cfg = + #cfg{name = Name, + leader_pid = LeaderPid, + acceptor_pid = Acceptor, + replica_reader_pid = RRPid, + directory = Dir, + port = Port, + gc_interval = GcInterval1, + reference = ExtRef, + event_formatter = EvtFmt, + counter = CntRef, + token = Token, + transport = Transport}, + log = Log, + parse_state = undefined}}; + {error, {connection_refused = R, _}} -> + %% we don't log details for connection_refused, + %% they are already in the logs of the other node + ?WARN_(Name, "failed to start replica reader on node '~w'. " + "Reason ~0p.", [Node, R]), + {stop, {shutdown, R}, undefined}; + {error, Reason} -> + ?WARN_(Name, "failed to start replica reader on node '~w'. " + "Reason ~0p.", [Node, Reason]), + {stop, {shutdown, Reason}, undefined} end. combine_ips_hosts(tcp, IPs, HostName, HostNameFromHost) when diff --git a/src/osiris_writer.erl b/src/osiris_writer.erl index 2ca0c2d..bd66228 100644 --- a/src/osiris_writer.erl +++ b/src/osiris_writer.erl @@ -116,6 +116,9 @@ start_link(Config) -> Opts = [{reversed_batch, true}], gen_batch_server:start_link(undefined, Mod, Config, Opts). +-spec overview(pid()) -> + {ok, osiris_log:overview()} | + {error, no_process}. overview(Pid) when node(Pid) == node() -> case erlang:is_process_alive(Pid) of true -> diff --git a/test/osiris_log_SUITE.erl b/test/osiris_log_SUITE.erl index d769484..20a8192 100644 --- a/test/osiris_log_SUITE.erl +++ b/test/osiris_log_SUITE.erl @@ -1142,10 +1142,10 @@ init_epoch_offsets_empty(Config) -> FDir = ?config(follower1_dir, Config), LogInit = seed_log(LDir, EpochChunks, Config), osiris_log:close(LogInit), - EOffs = [{1, 0}], - Range = {0, 3}, + Overview = #{range => {0, 3}, + epoch_offsets => [{1, 0}]}, Log0 = - osiris_log:init_acceptor(Range, EOffs, Conf#{dir => FDir, epoch => 1}), + osiris_log:init_acceptor(Overview, Conf#{dir => FDir, epoch => 1}), {0, empty} = osiris_log:tail_info(Log0), osiris_log:close(Log0), ok. @@ -1157,10 +1157,10 @@ init_epoch_offsets_empty_writer(Config) -> LDir = ?config(leader_dir, Config), LogInit = seed_log(LDir, EpochChunks, Config), osiris_log:close(LogInit), - EOffs = [], - Range = {0, 3}, + Overview = #{range => {0, 3}, + epoch_offsets => []}, Log0 = - osiris_log:init_acceptor(Range, EOffs, Conf#{dir => LDir, epoch => 2}), + osiris_log:init_acceptor(Overview, Conf#{dir => LDir, epoch => 2}), {0, empty} = osiris_log:tail_info(Log0), osiris_log:close(Log0), ok. @@ -1170,11 +1170,10 @@ init_epoch_offsets_truncated_writer(Config) -> %% has had retention remove the head of it's log Conf = ?config(osiris_conf, Config), LDir = ?config(leader_dir, Config), - EOffs = [{3, 100}], - Range = {50, 100}, + Overview = #{range => {50, 100}, + epoch_offsets => [{3, 100}]}, Log0 = - osiris_log:init_acceptor(Range, EOffs, Conf#{dir => LDir, - epoch => 2}), + osiris_log:init_acceptor(Overview, Conf#{dir => LDir, epoch => 2}), {50, empty} = osiris_log:tail_info(Log0), osiris_log:close(Log0), @@ -1187,10 +1186,10 @@ init_epoch_offsets(Config) -> LDir = ?config(leader_dir, Config), LogInit = seed_log(LDir, EpochChunks, Config), osiris_log:close(LogInit), - EOffs = [{1, 1}], - Range = {0, 1}, + Overview = #{range => {0, 1}, + epoch_offsets => [{1, 1}]}, Log0 = - osiris_log:init_acceptor(Range, EOffs, + osiris_log:init_acceptor(Overview, #{dir => LDir, name => ?config(test_case, Config), epoch => 2}), @@ -1205,10 +1204,10 @@ init_epoch_offsets_multi_segment(Config) -> || _ <- lists:seq(1, 20)], LDir = ?config(leader_dir, Config), osiris_log:close(seed_log(LDir, EpochChunks, Config)), - EOffs = [{1, 650}], - Range = {0, 650}, + Overview = #{range => {0, 650}, + epoch_offsets => [{1, 650}]}, Log0 = - osiris_log:init_acceptor(Range, EOffs, + osiris_log:init_acceptor(Overview, #{dir => LDir, name => ?config(test_case, Config), epoch => 2}), @@ -1223,10 +1222,10 @@ init_epoch_offsets_multi_segment2(Config) -> ++ [{2, [Data || _ <- lists:seq(1, 50)]} || _ <- lists:seq(1, 5)], LDir = ?config(leader_dir, Config), osiris_log:close(seed_log(LDir, EpochChunks, Config)), - EOffs = [{3, 750}, {1, 650}], - Range = {0, 750}, + Overview = #{range => {0, 750}, + epoch_offsets => [{3, 750}, {1, 650}]}, Log0 = - osiris_log:init_acceptor(Range, EOffs, + osiris_log:init_acceptor(Overview, #{dir => LDir, name => ?config(test_case, Config), epoch => 2}), @@ -1281,9 +1280,9 @@ accept_chunk(Config) -> Now = ?LINE, L2 = osiris_log:write([{<<"filter">>, <<"ho">>}], ?CHNK_USER, Now, <<>>, L1), - {Range, EOChIds} = osiris_log:overview(LDir), + Overview = osiris_log:overview(LDir), - F0 = osiris_log:init_acceptor(Range, EOChIds, FConf), + F0 = osiris_log:init_acceptor(Overview, FConf), %% replica reader RConf = LConf#{shared => osiris_log:get_shared(L2)}, @@ -1297,7 +1296,7 @@ accept_chunk(Config) -> osiris_log:close(L2), osiris_log:close(R2), osiris_log:close(F2), - FL0 = osiris_log:init_acceptor(Range, EOChIds, FConf), + FL0 = osiris_log:init_acceptor(Overview, FConf), osiris_log:close(FL0), osiris_log_shared:set_committed_chunk_id(maps:get(shared, FConf), 1), @@ -1319,9 +1318,10 @@ init_acceptor_truncates_tail(Config) -> LTail = osiris_log:tail_info(LLog), ?assertMatch({4, {1, 3, _}}, LTail), %% {NextOffs, {LastEpoch, LastChunkId, Ts}} ok = osiris_log:close(LLog), - EOffs = [{2, 4}, {1, 2}], + Overview = #{range => {0, 5}, + epoch_offsets => [{2, 4}, {1, 2}]}, ALog0 = - osiris_log:init_acceptor({0, 5}, EOffs, Conf#{dir => LDir, epoch => 2}), + osiris_log:init_acceptor(Overview, Conf#{dir => LDir, epoch => 2}), % ?assertMatch({3, {1, 2, _}}, osiris_log:tail_info(ALog0)), %% {NextOffs, {LastEpoch, LastChunkId, Ts}} {3, {1, 2, _}} = osiris_log:tail_info(ALog0), %% {NextOffs, {LastEpoch, LastChunkId, Ts}} @@ -1351,9 +1351,9 @@ accept_chunk_truncates_tail(Config) -> FLog0 = seed_log(FDir, FollowerEpochChunks, Config), osiris_log:close(FLog0), - {Range, EOffs} = osiris_log:overview(LDir), + Overview = osiris_log:overview(LDir), ALog0 = - osiris_log:init_acceptor(Range, EOffs, Conf#{dir => FDir, epoch => 2}), + osiris_log:init_acceptor(Overview, Conf#{dir => FDir, epoch => 2}), LShared = osiris_log:get_shared(LLog), osiris_log_shared:set_committed_chunk_id(LShared, 4), RConf = Conf#{dir => LDir, @@ -1367,7 +1367,7 @@ accept_chunk_truncates_tail(Config) -> osiris_log:close(ALog), % validate equal - ?assertMatch({Range, EOffs}, osiris_log:overview(FDir)), + ?assertMatch(Overview, osiris_log:overview(FDir)), {ok, V0} = osiris_log:init_data_reader({0, empty}, RConf#{dir => FDir}), {[{0, <<"one">>}], V1} = osiris_log:read_chunk_parsed(V0), @@ -1396,8 +1396,8 @@ accept_chunk_does_not_truncate_tail_in_same_epoch(Config) -> FLog0 = seed_log(FDir, FollowerEpochChunks, Config), osiris_log:close(FLog0), - {Range, EOffs} = osiris_log:overview(LDir), - ALog0 = osiris_log:init_acceptor(Range, EOffs, Conf#{dir => FDir, epoch => 2}), + Overview = osiris_log:overview(LDir), + ALog0 = osiris_log:init_acceptor(Overview, Conf#{dir => FDir, epoch => 2}), ATail = osiris_log:tail_info(ALog0), osiris_log:close(ALog0), %% ensure we don't truncate too much @@ -1424,8 +1424,8 @@ accept_chunk_in_other_epoch(Config) -> FLog0 = seed_log(FDir, FollowerEpochChunks, Config), osiris_log:close(FLog0), - {Range, EOffs} = osiris_log:overview(LDir), - ALog0 = osiris_log:init_acceptor(Range, EOffs, Conf#{dir => FDir, epoch => 2}), + Overview = osiris_log:overview(LDir), + ALog0 = osiris_log:init_acceptor(Overview, Conf#{dir => FDir, epoch => 2}), ATail = osiris_log:tail_info(ALog0), osiris_log:close(ALog0), %% ensure we don't truncate too much @@ -1441,10 +1441,10 @@ init_epoch_offsets_discards_all_when_no_overlap_in_same_epoch(Config) -> LDir = ?config(leader_dir, Config), LogInit = seed_log(LDir, EpochChunks, Config), osiris_log:close(LogInit), - EOffs = [{1, 10}], - Range = {5, 10}, %% replica's range is 0, 3 + Overview = #{range => {5, 10}, %% replica's range is 0, 3 + epoch_offsets => [{1, 10}]}, Log0 = - osiris_log:init_acceptor(Range, EOffs, + osiris_log:init_acceptor(Overview, #{dir => LDir, name => ?config(test_case, Config), epoch => 2}), @@ -1461,9 +1461,11 @@ overview(Config) -> LDir = ?config(leader_dir, Config), Log0 = seed_log(LDir, EpochChunks, Config), osiris_log:close(Log0), - {{0, 4}, [{1, 1}, {2, 4}]} = osiris_log:overview(LDir), + #{range := {0, 4}, + epoch_offsets := [{1, 1}, {2, 4}]} = osiris_log:overview(LDir), %% non existant dir should return empty - {empty, []} = osiris_log:overview("/tmp/blahblah"), + #{range := empty, + epoch_offsets := []} = osiris_log:overview("/tmp/blahblah"), ok. init_corrupted_log(Config) -> @@ -1492,7 +1494,7 @@ init_corrupted_log(Config) -> % record the segment and index sizes before corrupting the log ValidSegSize = filelib:file_size(SegPath), ValidIdxSize = filelib:file_size(IdxPath), - {{0, 1}, _} = osiris_log:overview(LDir), + #{range := {0, 1}} = osiris_log:overview(LDir), % add one more chunk and truncate it from the segment but leave in the index) Log1 = osiris_log:write([<<"three">>], Log0), @@ -1512,7 +1514,7 @@ init_corrupted_log(Config) -> ok = file:close(IdxFd), % the overview should work even before init - {Range, _} = osiris_log:overview(LDir), + #{range := Range} = osiris_log:overview(LDir), ?assertEqual({0, 1}, Range), Conf0 = ?config(osiris_conf, Config), @@ -1528,7 +1530,7 @@ init_corrupted_log(Config) -> ?assertEqual(ValidSegSize, filelib:file_size(SegPath)), % the range should not include the corrupted chunk - {Range, _} = osiris_log:overview(LDir), + #{range := Range} = osiris_log:overview(LDir), ?assertEqual({0, 1}, Range), % a consumer asking for the last chunk, should receive "two" @@ -1609,7 +1611,8 @@ init_empty_last_files(Config) -> {ok, SegFd} = file:open(LastSegFile, [raw, binary, write]), _ = file:close(SegFd), - ?assertEqual({{0,699},[{1,650}]}, osiris_log:overview(LDir)), + ?assertEqual(#{range => {0,699}, + epoch_offsets => [{1,650}]}, osiris_log:overview(LDir)), Conf0 = ?config(osiris_conf, Config), Conf = Conf0#{dir => LDir}, @@ -1806,17 +1809,17 @@ small_chunk_overview(Config) -> osiris_log:overview(Dir) end), ct:pal("OverviewTaken ~bms", [OverviewTaken div 1000]), - ?assertMatch({{0,327839}, - [{1,32783}, - {2,65567}, - {3,98351}, - {4,131135}, - {5,163919}, - {6,196703}, - {7,229487}, - {8,262271}, - {9,295055}, - {10,327839}]}, LogOverview), + ?assertMatch(#{range := {0,327839}, + epoch_offsets := [{1,32783}, + {2,65567}, + {3,98351}, + {4,131135}, + {5,163919}, + {6,196703}, + {7,229487}, + {8,262271}, + {9,295055}, + {10,327839}]}, LogOverview), {InitTaken100, {ok, L100}} = timer:tc( fun () -> osiris_log:init_offset_reader(257000, Conf) @@ -1856,8 +1859,8 @@ many_segment_overview(Config) -> ct:pal("OverviewTaken ~bms", [OverviewTaken div 1000]), ct:pal("~p", [LogOverview]), %% {{0,40959},[{-1,-1},{1,8184},{2,16376},{3,24568},{4,32760},{5,40952}]} - ?assertEqual({{0,40959}, - [{1,8184},{2,16376},{3,24568},{4,32760},{5,40952}]}, LogOverview), + ?assertEqual(#{epoch_offsets => [{1,8184},{2,16376},{3,24568},{4,32760},{5,40952}], + range => {0,40959}}, LogOverview), Conf6 = Conf#{epoch => 6}, {InitTaken, _} = timer:tc( @@ -1919,10 +1922,9 @@ many_segment_overview(Config) -> ct:pal("TimestampTaken ~p", [TimestampTaken]), %% acceptor - {Range, EOffs} = LogOverview, {InitAcceptorTaken, AcceptorLog} = timer:tc(fun () -> - osiris_log:init_acceptor(Range, EOffs, Conf6) + osiris_log:init_acceptor(LogOverview, Conf6) end), ct:pal("InitAcceptor took ~bus", [InitAcceptorTaken]), @@ -2017,7 +2019,8 @@ init_with_unexpected_file(Config) -> ?assertEqual(2, length(Indexes)), ok = file:write_file(filename:join(Dir, ".nfs000000000000000000"), <<"bananas">>), _ = osiris_log:init(?config(osiris_conf, Config)), - ?assertEqual({{0,999},[{1,950}]}, osiris_log:overview(Dir)), + ?assertEqual(#{range => {0,999}, + epoch_offsets => [{1,950}]}, osiris_log:overview(Dir)), ok. overview_with_missing_segment(Config) -> From 71c6750a5e7acba02f288921eacc22d52bcb19d0 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Thu, 9 Oct 2025 10:23:55 -0400 Subject: [PATCH 4/5] Expose `osiris_log:parse_header/2` This change refactors `parse_header/2` to take the chunk header binary and the position at which it was read and return a `header_map()`. This is useful for other readers - so that they do not need to duplicate the binary match code and `next_position` calculation. --- src/osiris_log.erl | 118 +++++++++++++++++++++++++-------------------- 1 file changed, 65 insertions(+), 53 deletions(-) diff --git a/src/osiris_log.erl b/src/osiris_log.erl index f184749..db79d55 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -54,7 +54,8 @@ directory/1, delete_directory/1, make_counter/1, - generate_log/4]). + generate_log/4, + parse_header/2]). -export([dump_init/1, dump_init_idx/1, @@ -391,19 +392,19 @@ -type offset_spec() :: osiris:offset_spec(). -type retention_spec() :: osiris:retention_spec(). -type header_map() :: - #{chunk_id => offset(), - epoch => epoch(), - type => chunk_type(), - crc => integer(), - num_records => non_neg_integer(), - num_entries => non_neg_integer(), - timestamp => osiris:timestamp(), - data_size => non_neg_integer(), - trailer_size => non_neg_integer(), - filter_size => 16..255, - header_data => binary(), - position => non_neg_integer(), - next_position => non_neg_integer()}. + #{chunk_id := offset(), + epoch := epoch(), + type := chunk_type(), + crc := integer(), + num_records := non_neg_integer(), + num_entries := non_neg_integer(), + timestamp := osiris:timestamp(), + data_size := non_neg_integer(), + trailer_size := non_neg_integer(), + filter_size := 16..255, + header_data := binary(), + position := non_neg_integer(), + next_position := non_neg_integer()}. -type transport() :: tcp | ssl. -type overview() :: #{range := range(), @@ -2988,7 +2989,8 @@ read_header_with_ra(#?MODULE{cfg = #cfg{directory = Dir, case ra_read(Pos, ?HEADER_SIZE_B, Ra0) of Bin when is_binary(Bin) andalso byte_size(Bin) == ?HEADER_SIZE_B -> - parse_header(Bin, State); + {ok, Header} = parse_header(Bin, Pos), + read_header_with_ra0(Header, State); undefined -> case ra_fill(Fd, Pos, Ra0) of {ok, Ra} -> @@ -3025,60 +3027,36 @@ read_header_with_ra(#?MODULE{cfg = #cfg{directory = Dir, end end. -parse_header(<> = HeaderData0, - #?MODULE{cfg = #cfg{counter = CntRef}, - fd = Fd, - mode = #read{next_offset = NextChId0, - position = Pos, - filter = Filter, - read_ahead = Ra0} = Read0} = State0) -> - +read_header_with_ra0(#{chunk_id := NextChId0, + num_records := NumRecords, + data_size := DataSize, + filter_size := FilterSize, + next_position := NextPos} = Header, + #?MODULE{cfg = #cfg{counter = CntRef}, + fd = Fd, + mode = #read{next_offset = NextChId0, + position = Pos, + filter = Filter, + read_ahead = Ra0} = Read0} = State0) -> Ra1 = ra_update_size(Filter, FilterSize, DataSize, Ra0), case ra_read(Pos + ?HEADER_SIZE_B, FilterSize, Ra1) of undefined -> {ok, Ra} = ra_fill(Fd, Pos + ?HEADER_SIZE_B, Ra1), - parse_header(HeaderData0, - State0#?MODULE{mode = Read0#read{read_ahead = Ra}}); + State = State0#?MODULE{mode = Read0#read{read_ahead = Ra}}, + read_header_with_ra0(Header, State); ChunkFilter -> counters:put(CntRef, ?C_OFFSET, NextChId0 + NumRecords), counters:add(CntRef, ?C_CHUNKS, 1), - NextPos = Pos + ?HEADER_SIZE_B + FilterSize + DataSize + TrailerSize, case osiris_bloom:is_match(ChunkFilter, Filter) of true -> - <> = HeaderData0, State = case Ra1 of Ra0 -> State0; Ra -> State0#?MODULE{mode = Read0#read{read_ahead = Ra}} end, - {ok, #{chunk_id => NextChId0, - epoch => Epoch, - type => ChType, - crc => Crc, - num_records => NumRecords, - num_entries => NumEntries, - timestamp => Timestamp, - data_size => DataSize, - trailer_size => TrailerSize, - header_data => HeaderData, - filter_size => FilterSize, - next_position => NextPos, - position => Pos}, - State}; + {ok, Header, State}; false -> Read = Read0#read{next_offset = NextChId0 + NumRecords, position = NextPos, @@ -3095,6 +3073,40 @@ parse_header(< + {ok, header_map()} | + {error, invalid_chunk_header}. +parse_header(<> = HeaderData, + Pos) -> + NextPos = Pos + ?HEADER_SIZE_B + FilterSize + DataSize + TrailerSize, + {ok, #{chunk_id => NextChId0, + epoch => Epoch, + type => ChType, + crc => Crc, + num_records => NumRecords, + num_entries => NumEntries, + timestamp => Timestamp, + data_size => DataSize, + trailer_size => TrailerSize, + header_data => HeaderData, + filter_size => FilterSize, + next_position => NextPos, + position => Pos}}; +parse_header(_, _) -> + {error, invalid_chunk_header}. + %% keep the previous value if the current one is 0 (i.e. no filter in the chunk) read_ahead_fsize(Previous, 0) -> Previous; From 6ce7e82cae487d62ea82c5b656e70200a8846905 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Thu, 9 Oct 2025 14:01:59 -0400 Subject: [PATCH 5/5] Add `osiris_log_manifest` behaviour for hooks into log writers `osiris_log_manifest` adds a few callbacks which are executed during initialization of writers and at events like rolling segments and writing chunks. `overview/1` is brought into this behavior as well so that other implementers can extend the `overview()` type (by setting additional keys in the map). --- src/osiris_log.erl | 166 +++++++++++++++++++++++++++--------- src/osiris_log_manifest.erl | 50 +++++++++++ 2 files changed, 178 insertions(+), 38 deletions(-) create mode 100644 src/osiris_log_manifest.erl diff --git a/src/osiris_log.erl b/src/osiris_log.erl index db79d55..1ea4582 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -47,8 +47,8 @@ get_default_max_segment_size_bytes/0, counters_ref/1, close/1, - overview/1, format_status/1, + update_retention/2, evaluate_retention/2, directory/1, @@ -57,6 +57,16 @@ generate_log/4, parse_header/2]). +-behaviour(osiris_log_manifest). +%% osiris_log_manifest implementations +-export([overview/1, + acceptor_manifest/2, + writer_manifest/1, + recover_tracking/3, + handle_event/2, + close_manifest/1, + delete/1]). + -export([dump_init/1, dump_init_idx/1, dump_chunk/1, @@ -424,7 +434,8 @@ %% that will be included in snapshots written to new segments readers_counter_fun = fun(_) -> ok end :: function(), shared :: atomics:atomics_ref(), - filter_size = ?DEFAULT_FILTER_SIZE :: osiris_bloom:filter_size() + filter_size = ?DEFAULT_FILTER_SIZE :: osiris_bloom:filter_size(), + manifest_module = ?MODULE :: module() }). -record(ra, {size = ?HEADER_SIZE_B + ?DEFAULT_FILTER_SIZE :: non_neg_integer(), @@ -446,7 +457,8 @@ {type = writer :: writer | acceptor, segment_size = {?LOG_HEADER_SIZE, 0} :: {non_neg_integer(), non_neg_integer()}, current_epoch :: non_neg_integer(), - tail_info = {0, empty} :: osiris:tail_info() + tail_info = {0, empty} :: osiris:tail_info(), + manifest :: osiris_log_manifest:state() }). -record(?MODULE, {cfg :: #cfg{}, @@ -458,7 +470,7 @@ %% record chunk_info does not map exactly to an index record (field 'num' differs) -record(chunk_info, {id :: offset(), - timestamp :: non_neg_integer(), + timestamp :: osiris:timestamp(), epoch :: epoch(), num :: non_neg_integer(), type :: chunk_type(), @@ -477,6 +489,7 @@ -opaque state() :: #?MODULE{}. -export_type([state/0, + chunk_type/0, chunk_iterator/0, range/0, config/0, @@ -502,20 +515,11 @@ init(Config) -> -spec init(config(), writer | acceptor) -> state(). init(#{dir := Dir, name := Name, - epoch := Epoch} = Config, + epoch := Epoch} = Config0, WriterType) -> %% scan directory for segments if in write mode - MaxSizeBytes = maps:get(max_segment_size_bytes, Config, - ?DEFAULT_MAX_SEGMENT_SIZE_B), - MaxSizeChunks = application:get_env(osiris, max_segment_size_chunks, - ?DEFAULT_MAX_SEGMENT_SIZE_C), - Retention = maps:get(retention, Config, []), - FilterSize = maps:get(filter_size, Config, ?DEFAULT_FILTER_SIZE), ?INFO("Stream: ~ts will use ~ts for osiris log data directory", [Name, Dir]), - ?DEBUG_(Name, "max_segment_size_bytes: ~b, - max_segment_size_chunks ~b, retention ~w, filter size ~b", - [MaxSizeBytes, MaxSizeChunks, Retention, FilterSize]), ok = filelib:ensure_dir(Dir), case file:make_dir(Dir) of ok -> @@ -525,7 +529,25 @@ init(#{dir := Dir, Err -> throw(Err) end, + ok = maybe_fix_corrupted_files(Config0), + + ManifestMod = manifest_module(), + {Manifest0, Config} = case Config0 of + #{manifest := M} -> + {M, Config0}; + _ -> + ManifestMod:writer_manifest(Config0) + end, + MaxSizeBytes = maps:get(max_segment_size_bytes, Config, + ?DEFAULT_MAX_SEGMENT_SIZE_B), + MaxSizeChunks = application:get_env(osiris, max_segment_size_chunks, + ?DEFAULT_MAX_SEGMENT_SIZE_C), + Retention = maps:get(retention, Config, []), + FilterSize = maps:get(filter_size, Config, ?DEFAULT_FILTER_SIZE), + ?DEBUG_(Name, "max_segment_size_bytes: ~b, + max_segment_size_chunks ~b, retention ~w, filter size ~b", + [MaxSizeBytes, MaxSizeChunks, Retention, FilterSize]), Cnt = make_counter(Config), %% initialise offset counter to -1 as 0 is the first offset in the log and %% it hasn't necessarily been written yet, for an empty log the first offset @@ -547,8 +569,8 @@ init(#{dir := Dir, counter = Cnt, counter_id = counter_id(Config), shared = Shared, - filter_size = FilterSize}, - ok = maybe_fix_corrupted_files(Config), + filter_size = FilterSize, + manifest_module = ManifestMod}, DefaultNextOffset = case Config of #{initial_offset := IO} when WriterType == acceptor -> @@ -565,6 +587,7 @@ init(#{dir := Dir, #write{type = WriterType, tail_info = {DefaultNextOffset, empty}, + manifest = Manifest0, current_epoch = Epoch}}); {NumSegments, #seg_info{first = #chunk_info{id = FstChId, @@ -606,11 +629,14 @@ init(#{dir := Dir, {ok, IdxFd} = open(IdxFilename, ?FILE_OPTS_WRITE), {ok, IdxEof} = file:position(IdxFd, eof), NumChunks = (IdxEof - ?IDX_HEADER_SIZE) div ?INDEX_RECORD_SIZE_B, + Event = {segment_opened, undefined, filename:basename(Filename)}, + Manifest = ManifestMod:handle_event(Event, Manifest0), #?MODULE{cfg = Cfg, mode = #write{type = WriterType, tail_info = TailInfo, segment_size = {Size, NumChunks}, + manifest = Manifest, current_epoch = Epoch}, current_file = filename:basename(Filename), fd = SegFd, @@ -630,10 +656,13 @@ init(#{dir := Dir, {ok, _} = file:position(IdxFd, ?IDX_HEADER_SIZE), osiris_log_shared:set_first_chunk_id(Shared, DefaultNextOffset - 1), osiris_log_shared:set_last_chunk_id(Shared, DefaultNextOffset - 1), + Event = {segment_opened, undefined, filename:basename(Filename)}, + Manifest = ManifestMod:handle_event(Event, Manifest0), #?MODULE{cfg = Cfg, mode = #write{type = WriterType, tail_info = {DefaultNextOffset, empty}, + manifest = Manifest, current_epoch = Epoch}, current_file = filename:basename(Filename), fd = SegFd, @@ -869,8 +898,8 @@ evaluate_tracking_snapshot(#?MODULE{mode = #write{type = writer}} = State0, Trk0 % -spec -spec init_acceptor(overview(), config()) -> state(). -init_acceptor(#{range := Range, epoch_offsets := EpochOffsets0}, - #{name := Name, dir := Dir} = Conf) -> +init_acceptor(#{range := Range, epoch_offsets := EpochOffsets0} = Overview0, + #{name := Name, dir := Dir} = Conf0) -> %% truncate to first common last epoch offset %% * if the last local chunk offset has the same epoch but is lower %% than the last chunk offset then just attach at next offset. @@ -881,6 +910,9 @@ init_acceptor(#{range := Range, epoch_offsets := EpochOffsets0}, EpochOffsets = lists:reverse( lists:sort(EpochOffsets0)), + Overview = Overview0#{epoch_offsets := EpochOffsets}, + + {Manifest, Conf} = (manifest_module()):acceptor_manifest(Overview, Conf0), %% then truncate to IdxFiles = sorted_index_files(Dir), @@ -892,7 +924,8 @@ init_acceptor(#{range := Range, epoch_offsets := EpochOffsets0}, {O, _} -> O end, init(Conf#{initial_offset => InitOffset, - index_files => RemIdxFiles}, acceptor). + index_files => RemIdxFiles, + manifest => Manifest}, acceptor). chunk_id_index_scan(IdxFile, ChunkId) when ?IS_STRING(IdxFile) -> @@ -1820,11 +1853,19 @@ needs_handling(_, _, _) -> -spec close(state()) -> ok. close(#?MODULE{cfg = #cfg{counter_id = CntId, + manifest_module = ManifestMod, readers_counter_fun = Fun}, fd = SegFd, - index_fd = IdxFd}) -> + index_fd = IdxFd, + mode = Mode}) -> close_fd(IdxFd), close_fd(SegFd), + case Mode of + #write{manifest = Manifest} -> + ok = ManifestMod:close_manifest(Manifest); + _ -> + ok + end, Fun(-1), case CntId of undefined -> @@ -1833,14 +1874,17 @@ close(#?MODULE{cfg = #cfg{counter_id = CntId, osiris_counters:delete(CntId) end. -delete_directory(#{name := Name, - dir := _} = Config) -> +delete_directory(Config) -> + (manifest_module()):delete(Config). + +delete(#{name := Name, + dir := _} = Config) -> Dir = directory(Config), ?DEBUG_(Name, " deleting directory ~ts", [Dir]), delete_dir(Dir); -delete_directory(#{name := Name}) -> - delete_directory(Name); -delete_directory(Name) when ?IS_STRING(Name) -> +delete(#{name := Name}) -> + delete(Name); +delete(Name) when ?IS_STRING(Name) -> Dir = directory(Name), ?DEBUG_(Name, " deleting directory ~ts", [Dir]), delete_dir(Dir). @@ -2185,11 +2229,19 @@ format_status(#?MODULE{cfg = #cfg{directory = Dir, -spec update_retention([retention_spec()], state()) -> state(). update_retention(Retention, #?MODULE{cfg = #cfg{name = Name, + manifest_module = ?MODULE, retention = Retention0} = Cfg} = State0) when is_list(Retention) -> ?DEBUG_(Name, " from: ~w to ~w", [Retention0, Retention]), State = State0#?MODULE{cfg = Cfg#cfg{retention = Retention}}, - trigger_retention_eval(State). + trigger_retention_eval(State); +update_retention(Retention, + #?MODULE{cfg = #cfg{manifest_module = ManifestMod}, + mode = #write{manifest = Manifest0} = Write0} = + State0) -> + Event = {retention_updated, Retention}, + Manifest = ManifestMod:handle_event(Event, Manifest0), + State0#?MODULE{mode = Write0#write{manifest = Manifest}}. -spec evaluate_retention(file:filename_all(), [retention_spec()]) -> {range(), FirstTimestamp :: osiris:timestamp(), @@ -2489,11 +2541,13 @@ write_chunk(Chunk, Epoch, NumRecords, #?MODULE{cfg = #cfg{counter = CntRef, + manifest_module = ManifestMod, shared = Shared} = Cfg, fd = Fd, index_fd = IdxFd, mode = #write{segment_size = {SegSizeBytes, SegSizeChunks}, + manifest = Manifest0, tail_info = {Next, _}} = Write} = State) -> @@ -2523,9 +2577,19 @@ write_chunk(Chunk, counters:put(CntRef, ?C_OFFSET, NextOffset - 1), counters:add(CntRef, ?C_CHUNKS, 1), maybe_set_first_offset(Next, Cfg), + ChunkInfo = #{id => Next, + timestamp => Timestamp, + epoch => Epoch, + num => NumRecords, + type => ChType, + size => Size, + pos => Cur}, + Event = {chunk_written, ChunkInfo, Chunk}, + Manifest = ManifestMod:handle_event(Event, Manifest0), State#?MODULE{mode = Write#write{tail_info = {NextOffset, {Epoch, Next, Timestamp}}, + manifest = Manifest, segment_size = {SegSizeBytes + Size, SegSizeChunks + 1}}} end. @@ -2734,10 +2798,13 @@ make_file_name(N, Suff) -> open_new_segment(#?MODULE{cfg = #cfg{name = Name, directory = Dir, - counter = Cnt}, + counter = Cnt, + manifest_module = ManifestMod}, fd = OldFd, index_fd = OldIdxFd, + current_file = OldFilename, mode = #write{type = _WriterType, + manifest = Manifest0, tail_info = {NextOffset, _}} = Write} = State0) -> _ = close_fd(OldFd), @@ -2758,11 +2825,15 @@ open_new_segment(#?MODULE{cfg = #cfg{name = Name, {ok, _} = file:position(IdxFd, eof), counters:add(Cnt, ?C_SEGMENTS, 1), + Event = {segment_opened, OldFilename, Filename}, + Manifest = ManifestMod:handle_event(Event, Manifest0), + State0#?MODULE{current_file = Filename, fd = Fd, - %% reset segment_size counter index_fd = IdxFd, - mode = Write#write{segment_size = {?LOG_HEADER_SIZE, 0}}}. + mode = Write#write{manifest = Manifest, + %% reset segment_size counter + segment_size = {?LOG_HEADER_SIZE, 0}}}. open_index_read(File) -> {ok, Fd} = open(File, [read, raw, binary, read_ahead]), @@ -2907,23 +2978,29 @@ skip(Len, [B | L]) when Len > 0 -> -spec recover_tracking(state()) -> osiris_tracking:state(). recover_tracking(#?MODULE{cfg = #cfg{directory = Dir, + manifest_module = ManifestMod, tracking_config = TrkConfig}, - current_file = File}) -> + current_file = File, + mode = #write{manifest = Manifest}}) -> + Trk = osiris_tracking:init(undefined, TrkConfig), + SegmentFile = filename:join(Dir, File), + ManifestMod:recover_tracking(Trk, SegmentFile, Manifest). + +recover_tracking(Trk0, SegmentFile, undefined) -> %% we need to open a new file handle here as we cannot use the one that is %% being used for appending to the segment as pread _may_ move the file %% position on some systems (such as windows) - {ok, Fd} = open(filename:join(Dir, File), [read, raw, binary]), + {ok, Fd} = open(SegmentFile, [read, raw, binary]), _ = file:advise(Fd, 0, 0, random), %% TODO: if the first chunk in the segment isn't a tracking snapshot and %% there are prior segments we could scan at least two segments increasing %% the chance of encountering a snapshot and thus ensure we don't miss any %% tracking entries - Trk0 = osiris_tracking:init(undefined, TrkConfig), - Trk = recover_tracking(Fd, Trk0, ?LOG_HEADER_SIZE), + Trk = recover_tracking0(Fd, Trk0, ?LOG_HEADER_SIZE), _ = file:close(Fd), Trk. -recover_tracking(Fd, Trk0, Pos0) -> +recover_tracking0(Fd, Trk0, Pos0) -> case file:pread(Fd, Pos0, ?HEADER_SIZE_B) of {ok, < Trk = osiris_tracking:append_trailer(ChunkId, Data, Trk0), %% A tracking delta chunk will not have any writer data %% so no need to parse writers here - recover_tracking(Fd, Trk, NextPos); + recover_tracking0(Fd, Trk, NextPos); ?CHNK_TRK_SNAPSHOT -> {ok, <<0:1, S:31, Data:S/binary>>} = file:pread(Fd, Pos + FSize, Size), Trk = osiris_tracking:init(Data, Trk0), - recover_tracking(Fd, Trk, NextPos); + recover_tracking0(Fd, Trk, NextPos); ?CHNK_USER when TSize > 0 -> {ok, TData} = file:pread(Fd, Pos + FSize + Size, TSize), Trk = osiris_tracking:append_trailer(ChunkId, TData, Trk0), - recover_tracking(Fd, Trk, NextPos); + recover_tracking0(Fd, Trk, NextPos); ?CHNK_USER -> - recover_tracking(Fd, Trk0, NextPos) + recover_tracking0(Fd, Trk0, NextPos) end; eof -> Trk0 @@ -3439,6 +3516,9 @@ ra(#{options := #{read_ahead := Limit}}) when is_integer(Limit) -> ra(_) -> #ra{}. +manifest_module() -> + application:get_env(osiris, log_manifest, ?MODULE). + generate_log(Msg, MsgsPerChunk, NumMessages, Directory) -> Name = filename:basename(Directory), @@ -3471,6 +3551,16 @@ write_in_chunks(ToWrite, MsgsPerChunk, Msg, W0) when ToWrite > 0 -> write_in_chunks(_, _, _, W) -> W. +%% Default implementation of osiris_log_manifest: +writer_manifest(Config) -> + {undefined, Config}. +acceptor_manifest(_Overview, Config) -> + {undefined, Config}. +handle_event(_Event, undefined) -> + undefined. +close_manifest(undefined) -> + ok. + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). diff --git a/src/osiris_log_manifest.erl b/src/osiris_log_manifest.erl new file mode 100644 index 0000000..f2fd67a --- /dev/null +++ b/src/osiris_log_manifest.erl @@ -0,0 +1,50 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. +%% + +-module(osiris_log_manifest). + +-type state() :: term(). + +-type chunk_info() :: + #{id := osiris:offset(), + timestamp := osiris:timestamp(), + epoch := osiris:epoch(), + num := non_neg_integer(), + type := osiris_log:chunk_type(), + %% size of data + filter + trailer + size := non_neg_integer(), + pos := integer()}. + +-type event() :: {segment_opened, + OldSegment :: file:filename_all() | undefined, + NewSegment :: file:filename_all()} | + {chunk_written, chunk_info(), iodata()} | + {retention_updated, [osiris:retention_spec()]}. + +-export_type([state/0, + chunk_info/0, + event/0]). + +-callback overview(Dir :: file:filename_all()) -> + osiris_log:overview(). + +-callback acceptor_manifest(osiris_log:overview(), osiris_log:config()) -> + {state(), osiris_log:config()}. + +-callback writer_manifest(osiris_log:config()) -> + {state(), osiris_log:config()}. + +-callback recover_tracking(InitState :: osiris_tracking:state(), + SegmentAbsname :: file:filename_all(), + state()) -> + osiris_tracking:state(). + +-callback handle_event(event(), state()) -> state(). + +-callback close_manifest(state()) -> ok. + +-callback delete(osiris_log:config()) -> ok.