diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index f99c0f518417..ebb0085f0995 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -500,7 +500,7 @@ messages_confirmed(Counters) -> messages_errored(Counters) -> atomics:get(Counters, 3). -stream_committed_offset(Log) -> +stream_stored_offset(Log) -> osiris_log:committed_offset(Log). augment_infos_with_user_provided_connection_name(Infos, @@ -1802,7 +1802,7 @@ handle_frame_post_auth(_Transport, user = User} = Connection, State, - {commit_offset, Reference, Stream, Offset}) -> + {store_offset, Reference, Stream, Offset}) -> case rabbit_stream_utils:check_write_permitted(#resource{name = Stream, kind = queue, @@ -1813,17 +1813,17 @@ handle_frame_post_auth(_Transport, ok -> case lookup_leader(Stream, Connection) of cluster_not_found -> - rabbit_log:warning("Could not find leader to commit offset on ~p", + rabbit_log:warning("Could not find leader to store offset on ~p", [Stream]), - %% FIXME commit offset is fire-and-forget, so no response even if error, change this? + %% FIXME store offset is fire-and-forget, so no response even if error, change this? {Connection, State}; {ClusterLeader, Connection1} -> osiris:write_tracking(ClusterLeader, Reference, Offset), {Connection1, State} end; error -> - %% FIXME commit offset is fire-and-forget, so no response even if error, change this? - rabbit_log:info("Not authorized to commit offset on ~p", [Stream]), + %% FIXME store offset is fire-and-forget, so no response even if error, change this? + rabbit_log:info("Not authorized to store offset on ~p", [Stream]), {Connection, State} end; handle_frame_post_auth(Transport, @@ -2629,7 +2629,7 @@ consumer_i(offset, #consumer{counters = Counters}) -> consumer_offset(Counters); consumer_i(offset_lag, #consumer{counters = Counters, segment = Log}) -> - stream_committed_offset(Log) - consumer_offset(Counters); + stream_stored_offset(Log) - consumer_offset(Counters); consumer_i(connection_pid, _) -> self(); consumer_i(properties, #consumer{properties = Properties}) -> diff --git a/deps/rabbitmq_stream_common/include/rabbit_stream.hrl b/deps/rabbitmq_stream_common/include/rabbit_stream.hrl index b4ece0976a34..f946b1ecba30 100644 --- a/deps/rabbitmq_stream_common/include/rabbit_stream.hrl +++ b/deps/rabbitmq_stream_common/include/rabbit_stream.hrl @@ -7,7 +7,7 @@ -define(COMMAND_SUBSCRIBE, 7). -define(COMMAND_DELIVER, 8). -define(COMMAND_CREDIT, 9). --define(COMMAND_COMMIT_OFFSET, 10). +-define(COMMAND_STORE_OFFSET, 10). -define(COMMAND_QUERY_OFFSET, 11). -define(COMMAND_UNSUBSCRIBE, 12). -define(COMMAND_CREATE_STREAM, 13). diff --git a/deps/rabbitmq_stream_common/src/rabbit_stream_core.erl b/deps/rabbitmq_stream_common/src/rabbit_stream_core.erl index f8c99736ec25..f61fdea3f34e 100644 --- a/deps/rabbitmq_stream_common/src/rabbit_stream_core.erl +++ b/deps/rabbitmq_stream_common/src/rabbit_stream_core.erl @@ -72,7 +72,7 @@ {deliver, subscription_id(), Chunk :: binary()} | {credit, subscription_id(), Credit :: non_neg_integer()} | {metadata_update, stream_name(), response_code()} | - {commit_offset, offset_ref(), stream_name(), osiris:offset()} | + {store_offset, offset_ref(), stream_name(), osiris:offset()} | heartbeat | {tune, FrameMax :: non_neg_integer(), HeartBeat :: non_neg_integer()} | @@ -236,11 +236,11 @@ frame({metadata_update, Stream, ResponseCode}) -> ResponseCode:16, StreamSize:16, Stream/binary>>); -frame({commit_offset, Reference, Stream, Offset}) -> +frame({store_offset, Reference, Stream, Offset}) -> ReferenceSize = byte_size(Reference), StreamSize = byte_size(Stream), wrap_in_frame(<> | PropertiesBin]}; -request_body({commit_offset = Tag, OffsetRef, Stream, Offset}) -> +request_body({store_offset = Tag, OffsetRef, Stream, Offset}) -> {Tag, <>}; request_body({query_offset = Tag, OffsetRef, Stream}) -> {Tag, <>}; @@ -575,12 +575,12 @@ parse_request(<>) -> {metadata_update, Stream, ResponseCode}; parse_request(<>) -> - {commit_offset, OffsetRef, Stream, Offset}; + {store_offset, OffsetRef, Stream, Offset}; parse_request(<>) -> heartbeat; parse_request(< ?COMMAND_DELIVER; command_id(credit) -> ?COMMAND_CREDIT; -command_id(commit_offset) -> - ?COMMAND_COMMIT_OFFSET; +command_id(store_offset) -> + ?COMMAND_STORE_OFFSET; command_id(query_offset) -> ?COMMAND_QUERY_OFFSET; command_id(unsubscribe) -> @@ -960,8 +960,8 @@ parse_command_id(?COMMAND_DELIVER) -> deliver; parse_command_id(?COMMAND_CREDIT) -> credit; -parse_command_id(?COMMAND_COMMIT_OFFSET) -> - commit_offset; +parse_command_id(?COMMAND_STORE_OFFSET) -> + store_offset; parse_command_id(?COMMAND_QUERY_OFFSET) -> query_offset; parse_command_id(?COMMAND_UNSUBSCRIBE) -> diff --git a/deps/rabbitmq_stream_common/test/rabbit_stream_core_SUITE.erl b/deps/rabbitmq_stream_common/test/rabbit_stream_core_SUITE.erl index 9f69cb4a85bb..74b0cc1d6091 100644 --- a/deps/rabbitmq_stream_common/test/rabbit_stream_core_SUITE.erl +++ b/deps/rabbitmq_stream_common/test/rabbit_stream_core_SUITE.erl @@ -60,7 +60,7 @@ roundtrip(_Config) -> test_roundtrip({credit, 53, 12}), test_roundtrip({metadata_update, <<"stream1">>, ?RESPONSE_VHOST_ACCESS_FAILURE}), - test_roundtrip({commit_offset, <<"offset_ref">>, <<"stream">>, 12}), + test_roundtrip({store_offset, <<"offset_ref">>, <<"stream">>, 12}), test_roundtrip(heartbeat), test_roundtrip({tune, 53, 12}), %% REQUESTS