Skip to content

Commit

Permalink
Merge pull request #3177 from rabbitmq/stream-commit-offset-becomes-s…
Browse files Browse the repository at this point in the history
…tore-offset

Use "store" instead of "commit" for offset tracking
  • Loading branch information
pjk25 committed Jul 13, 2021
2 parents e4920f0 + 8ddff0f commit 860333a
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 19 deletions.
14 changes: 7 additions & 7 deletions deps/rabbitmq_stream/src/rabbit_stream_reader.erl
Expand Up @@ -500,7 +500,7 @@ messages_confirmed(Counters) ->
messages_errored(Counters) ->
atomics:get(Counters, 3).

stream_committed_offset(Log) ->
stream_stored_offset(Log) ->
osiris_log:committed_offset(Log).

augment_infos_with_user_provided_connection_name(Infos,
Expand Down Expand Up @@ -1802,7 +1802,7 @@ handle_frame_post_auth(_Transport,
user = User} =
Connection,
State,
{commit_offset, Reference, Stream, Offset}) ->
{store_offset, Reference, Stream, Offset}) ->
case rabbit_stream_utils:check_write_permitted(#resource{name =
Stream,
kind = queue,
Expand All @@ -1813,17 +1813,17 @@ handle_frame_post_auth(_Transport,
ok ->
case lookup_leader(Stream, Connection) of
cluster_not_found ->
rabbit_log:warning("Could not find leader to commit offset on ~p",
rabbit_log:warning("Could not find leader to store offset on ~p",
[Stream]),
%% FIXME commit offset is fire-and-forget, so no response even if error, change this?
%% FIXME store offset is fire-and-forget, so no response even if error, change this?
{Connection, State};
{ClusterLeader, Connection1} ->
osiris:write_tracking(ClusterLeader, Reference, Offset),
{Connection1, State}
end;
error ->
%% FIXME commit offset is fire-and-forget, so no response even if error, change this?
rabbit_log:info("Not authorized to commit offset on ~p", [Stream]),
%% FIXME store offset is fire-and-forget, so no response even if error, change this?
rabbit_log:info("Not authorized to store offset on ~p", [Stream]),
{Connection, State}
end;
handle_frame_post_auth(Transport,
Expand Down Expand Up @@ -2629,7 +2629,7 @@ consumer_i(offset, #consumer{counters = Counters}) ->
consumer_offset(Counters);
consumer_i(offset_lag,
#consumer{counters = Counters, segment = Log}) ->
stream_committed_offset(Log) - consumer_offset(Counters);
stream_stored_offset(Log) - consumer_offset(Counters);
consumer_i(connection_pid, _) ->
self();
consumer_i(properties, #consumer{properties = Properties}) ->
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbitmq_stream_common/include/rabbit_stream.hrl
Expand Up @@ -7,7 +7,7 @@
-define(COMMAND_SUBSCRIBE, 7).
-define(COMMAND_DELIVER, 8).
-define(COMMAND_CREDIT, 9).
-define(COMMAND_COMMIT_OFFSET, 10).
-define(COMMAND_STORE_OFFSET, 10).
-define(COMMAND_QUERY_OFFSET, 11).
-define(COMMAND_UNSUBSCRIBE, 12).
-define(COMMAND_CREATE_STREAM, 13).
Expand Down
20 changes: 10 additions & 10 deletions deps/rabbitmq_stream_common/src/rabbit_stream_core.erl
Expand Up @@ -72,7 +72,7 @@
{deliver, subscription_id(), Chunk :: binary()} |
{credit, subscription_id(), Credit :: non_neg_integer()} |
{metadata_update, stream_name(), response_code()} |
{commit_offset, offset_ref(), stream_name(), osiris:offset()} |
{store_offset, offset_ref(), stream_name(), osiris:offset()} |
heartbeat |
{tune, FrameMax :: non_neg_integer(),
HeartBeat :: non_neg_integer()} |
Expand Down Expand Up @@ -236,11 +236,11 @@ frame({metadata_update, Stream, ResponseCode}) ->
ResponseCode:16,
StreamSize:16,
Stream/binary>>);
frame({commit_offset, Reference, Stream, Offset}) ->
frame({store_offset, Reference, Stream, Offset}) ->
ReferenceSize = byte_size(Reference),
StreamSize = byte_size(Stream),
wrap_in_frame(<<?REQUEST:1,
?COMMAND_COMMIT_OFFSET:15,
?COMMAND_STORE_OFFSET:15,
?VERSION_1:16,
ReferenceSize:16,
Reference/binary,
Expand Down Expand Up @@ -473,7 +473,7 @@ request_body({subscribe = Tag,
end,
{Tag,
[<<SubscriptionId:8, ?STRING(Stream), Data/binary>> | PropertiesBin]};
request_body({commit_offset = Tag, OffsetRef, Stream, Offset}) ->
request_body({store_offset = Tag, OffsetRef, Stream, Offset}) ->
{Tag, <<?STRING(OffsetRef), ?STRING(Stream), Offset:64>>};
request_body({query_offset = Tag, OffsetRef, Stream}) ->
{Tag, <<?STRING(OffsetRef), ?STRING(Stream)>>};
Expand Down Expand Up @@ -575,12 +575,12 @@ parse_request(<<?REQUEST:1,
Stream:StreamSize/binary>>) ->
{metadata_update, Stream, ResponseCode};
parse_request(<<?REQUEST:1,
?COMMAND_COMMIT_OFFSET:15,
?COMMAND_STORE_OFFSET:15,
?VERSION_1:16,
?STRING(RefSize, OffsetRef),
?STRING(SSize, Stream),
Offset:64>>) ->
{commit_offset, OffsetRef, Stream, Offset};
{store_offset, OffsetRef, Stream, Offset};
parse_request(<<?REQUEST:1, ?COMMAND_HEARTBEAT:15, ?VERSION_1:16>>) ->
heartbeat;
parse_request(<<?REQUEST:1,
Expand Down Expand Up @@ -909,8 +909,8 @@ command_id(deliver) ->
?COMMAND_DELIVER;
command_id(credit) ->
?COMMAND_CREDIT;
command_id(commit_offset) ->
?COMMAND_COMMIT_OFFSET;
command_id(store_offset) ->
?COMMAND_STORE_OFFSET;
command_id(query_offset) ->
?COMMAND_QUERY_OFFSET;
command_id(unsubscribe) ->
Expand Down Expand Up @@ -960,8 +960,8 @@ parse_command_id(?COMMAND_DELIVER) ->
deliver;
parse_command_id(?COMMAND_CREDIT) ->
credit;
parse_command_id(?COMMAND_COMMIT_OFFSET) ->
commit_offset;
parse_command_id(?COMMAND_STORE_OFFSET) ->
store_offset;
parse_command_id(?COMMAND_QUERY_OFFSET) ->
query_offset;
parse_command_id(?COMMAND_UNSUBSCRIBE) ->
Expand Down
Expand Up @@ -60,7 +60,7 @@ roundtrip(_Config) ->
test_roundtrip({credit, 53, 12}),
test_roundtrip({metadata_update, <<"stream1">>,
?RESPONSE_VHOST_ACCESS_FAILURE}),
test_roundtrip({commit_offset, <<"offset_ref">>, <<"stream">>, 12}),
test_roundtrip({store_offset, <<"offset_ref">>, <<"stream">>, 12}),
test_roundtrip(heartbeat),
test_roundtrip({tune, 53, 12}),
%% REQUESTS
Expand Down

0 comments on commit 860333a

Please sign in to comment.