Skip to content

Commit

Permalink
Return appropriate response when stream leader not available
Browse files Browse the repository at this point in the history
Fixes #3874

(cherry picked from commit bd4771a)

# Conflicts:
#	deps/rabbitmq_stream/src/rabbit_stream_manager.erl
#	deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl
  • Loading branch information
acogoluegnes authored and mergify-bot committed Dec 10, 2021
1 parent a404e82 commit 2455aec
Show file tree
Hide file tree
Showing 4 changed files with 282 additions and 17 deletions.
51 changes: 45 additions & 6 deletions deps/rabbitmq_stream/src/rabbit_stream_manager.erl
Expand Up @@ -56,7 +56,41 @@ create(VirtualHost, Reference, Arguments, Username) ->
delete(VirtualHost, Reference, Username) ->
gen_server:call(?MODULE, {delete, VirtualHost, Reference, Username}).

<<<<<<< HEAD
-spec lookup_leader(binary(), binary()) -> pid() | cluster_not_found.
=======
-spec create_super_stream(binary(),
binary(),
[binary()],
#{binary() => binary()},
[binary()],
binary()) ->
ok | {error, term()}.
create_super_stream(VirtualHost,
Name,
Partitions,
Arguments,
RoutingKeys,
Username) ->
gen_server:call(?MODULE,
{create_super_stream,
VirtualHost,
Name,
Partitions,
Arguments,
RoutingKeys,
Username}).

-spec delete_super_stream(binary(), binary(), binary()) ->
ok | {error, term()}.
delete_super_stream(VirtualHost, Name, Username) ->
gen_server:call(?MODULE,
{delete_super_stream, VirtualHost, Name, Username}).

-spec lookup_leader(binary(), binary()) ->
{ok, pid()} | {error, not_available} |
{error, not_found}.
>>>>>>> bd4771addb (Return appropriate response when stream leader not available)
lookup_leader(VirtualHost, Stream) ->
gen_server:call(?MODULE, {lookup_leader, VirtualHost, Stream}).

Expand Down Expand Up @@ -256,20 +290,25 @@ handle_call({lookup_leader, VirtualHost, Stream}, _From, State) ->
LeaderPid = amqqueue:get_pid(Q),
case process_alive(LeaderPid) of
true ->
LeaderPid;
{ok, LeaderPid};
false ->
case leader_from_members(Q) of
{ok, Pid} ->
Pid;
{ok, Pid};
_ ->
cluster_not_found
{error, not_available}
end
end;
_ ->
cluster_not_found
{error, not_found}
end;
_ ->
cluster_not_found
{error, not_found} ->
case rabbit_amqqueue:not_found_or_absent_dirty(Name) of
not_found ->
{error, not_found};
_ ->
{error, not_available}
end
end,
{reply, Res, State};
handle_call({lookup_local_member, VirtualHost, Stream}, _From,
Expand Down
31 changes: 23 additions & 8 deletions deps/rabbitmq_stream/src/rabbit_stream_reader.erl
Expand Up @@ -1494,7 +1494,7 @@ handle_frame_post_auth(Transport,
of
{false, false} ->
case lookup_leader(Stream, Connection0) of
cluster_not_found ->
{error, not_found} ->
response(Transport,
Connection0,
declare_publisher,
Expand All @@ -1504,6 +1504,16 @@ handle_frame_post_auth(Transport,
?STREAM_DOES_NOT_EXIST,
1),
{Connection0, State};
{error, not_available} ->
response(Transport,
Connection0,
declare_publisher,
CorrelationId,
?RESPONSE_CODE_STREAM_NOT_AVAILABLE),
rabbit_global_counters:increase_protocol_counter(stream,
?STREAM_NOT_AVAILABLE,
1),
{Connection0, State};
{ClusterLeader,
#stream_connection{publishers = Publishers0,
publisher_to_ids = RefIds0} =
Expand Down Expand Up @@ -1960,9 +1970,9 @@ handle_frame_post_auth(_Transport,
of
ok ->
case lookup_leader(Stream, Connection) of
cluster_not_found ->
rabbit_log:warning("Could not find leader to store offset on ~p",
[Stream]),
{error, Error} ->
rabbit_log:warning("Could not find leader to store offset on ~p: ~p",
[Stream, Error]),
%% FIXME store offset is fire-and-forget, so no response even if error, change this?
{Connection, State};
{ClusterLeader, Connection1} ->
Expand Down Expand Up @@ -1992,11 +2002,16 @@ handle_frame_post_auth(Transport,
of
ok ->
case lookup_leader(Stream, Connection0) of
cluster_not_found ->
{error, not_found} ->
rabbit_global_counters:increase_protocol_counter(stream,
?STREAM_DOES_NOT_EXIST,
1),
{?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, 0, Connection0};
{error, not_available} ->
rabbit_global_counters:increase_protocol_counter(stream,
?STREAM_NOT_AVAILABLE,
1),
{?RESPONSE_CODE_STREAM_NOT_AVAILABLE, 0, Connection0};
{LeaderPid, C} ->
{RC, O} =
case osiris:read_tracking(LeaderPid, Reference) of
Expand Down Expand Up @@ -2532,9 +2547,9 @@ lookup_leader(Stream,
case maps:get(Stream, StreamLeaders, undefined) of
undefined ->
case lookup_leader_from_manager(VirtualHost, Stream) of
cluster_not_found ->
cluster_not_found;
LeaderPid ->
{error, Error} ->
{error, Error};
{ok, LeaderPid} ->
Connection1 =
maybe_monitor_stream(LeaderPid, Stream, Connection),
{LeaderPid,
Expand Down
Expand Up @@ -16,11 +16,14 @@

package com.rabbitmq.stream;

import static com.rabbitmq.stream.TestUtils.waitUntil;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;

import com.rabbitmq.stream.codec.WrapperMessageBuilder;
import com.rabbitmq.stream.impl.Client;
import com.rabbitmq.stream.impl.Client.ClientParameters;
import com.rabbitmq.stream.impl.Client.Response;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.*;
Expand Down Expand Up @@ -66,7 +69,7 @@ void leaderFailureWhenPublisherConnectedToReplica() throws Exception {
Client.StreamMetadata streamMetadata = metadata.get(stream);
assertThat(streamMetadata).isNotNull();

TestUtils.waitUntil(() -> client.metadata(stream).get(stream).getReplicas().size() == 2);
waitUntil(() -> client.metadata(stream).get(stream).getReplicas().size() == 2);

streamMetadata = client.metadata(stream).get(stream);
assertThat(streamMetadata.getLeader().getPort()).isEqualTo(TestUtils.streamPortNode1());
Expand Down Expand Up @@ -372,8 +375,7 @@ void consumerReattachesToOtherReplicaWhenReplicaGoesAway() throws Exception {
Client.StreamMetadata streamMetadata = metadata.get(stream);
assertThat(streamMetadata).isNotNull();

TestUtils.waitUntil(
() -> metadataClient.metadata(stream).get(stream).getReplicas().size() == 2);
waitUntil(() -> metadataClient.metadata(stream).get(stream).getReplicas().size() == 2);

metadata = metadataClient.metadata(stream);
streamMetadata = metadata.get(stream);
Expand Down Expand Up @@ -551,4 +553,33 @@ void consumerReattachesToOtherReplicaWhenReplicaGoesAway() throws Exception {

confirmedIds.forEach(confirmedId -> assertThat(consumedIds).contains(confirmedId));
}

@Test
void declarePublisherShouldNotReturnStreamDoesNotExistOnRestart() throws Exception {
try {
Host.rabbitmqctl("stop_app");
} finally {
Host.rabbitmqctl("start_app");
}
AtomicReference<Client> client = new AtomicReference<>();
waitUntil(
() -> {
try {
client.set(cf.get(new ClientParameters().port(TestUtils.streamPortNode1())));
} catch (Exception e) {

}
return client.get() != null;
});
Set<Short> responseCodes = ConcurrentHashMap.newKeySet();

waitUntil(
() -> {
Response response = client.get().declarePublisher((byte) 0, null, stream);
responseCodes.add(response.getResponseCode());
return response.isOk();
});

assertThat(responseCodes).doesNotContain(Constants.RESPONSE_CODE_STREAM_DOES_NOT_EXIST);
}
}
180 changes: 180 additions & 0 deletions deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl
@@ -0,0 +1,180 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved.
%%

-module(rabbit_stream_manager_SUITE).

-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").

-compile(export_all).

all() ->
[{group, non_parallel_tests}].

groups() ->
[{non_parallel_tests, [], [manage_super_stream, lookup_leader]}].

%% -------------------------------------------------------------------
%% Testsuite setup/teardown.
%% -------------------------------------------------------------------

init_per_suite(Config) ->
case rabbit_ct_helpers:is_mixed_versions() of
true ->
{skip, "mixed version clusters are not supported"};
_ ->
rabbit_ct_helpers:log_environment(),
Config
end.

end_per_suite(Config) ->
Config.

init_per_group(_, Config) ->
Config1 =
rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, false}]),
Config2 =
rabbit_ct_helpers:set_config(Config1,
{rabbitmq_ct_tls_verify, verify_none}),
Config3 =
rabbit_ct_helpers:set_config(Config2, {rabbitmq_stream, verify_none}),
rabbit_ct_helpers:run_setup_steps(Config3,
[fun(StepConfig) ->
rabbit_ct_helpers:merge_app_env(StepConfig,
{rabbit,
[{core_metrics_gc_interval,
1000}]})
end,
fun(StepConfig) ->
rabbit_ct_helpers:merge_app_env(StepConfig,
{rabbitmq_stream,
[{connection_negotiation_step_timeout,
500}]})
end]
++ rabbit_ct_broker_helpers:setup_steps()).

end_per_group(_, Config) ->
rabbit_ct_helpers:run_steps(Config,
rabbit_ct_broker_helpers:teardown_steps()).

init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_started(Config, Testcase).

end_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_finished(Config, Testcase).

%% -------------------------------------------------------------------
%% Testcases.
%% -------------------------------------------------------------------

lookup_leader(Config) ->
Stream = <<"stream_manager_lookup_leader_stream">>,
?assertMatch({ok, _}, create_stream(Config, Stream)),

{ok, Pid} = lookup_leader(Config, Stream),
?assert(is_pid(Pid)),

?assertEqual({error, not_found}, lookup_leader(Config, <<"foo">>)),

?assertEqual({ok, deleted}, delete_stream(Config, Stream)).

manage_super_stream(Config) ->
% create super stream
?assertEqual(ok,
create_super_stream(Config,
<<"invoices">>,
[<<"invoices-0">>, <<"invoices-1">>,
<<"invoices-2">>],
[<<"0">>, <<"1">>, <<"2">>])),
% get the correct partitions
?assertEqual({ok,
[<<"invoices-0">>, <<"invoices-1">>, <<"invoices-2">>]},
partitions(Config, <<"invoices">>)),

[?assertEqual({ok, [Partition]},
route(Config, RoutingKey, <<"invoices">>))
|| {Partition, RoutingKey}
<- [{<<"invoices-0">>, <<"0">>}, {<<"invoices-1">>, <<"1">>},
{<<"invoices-2">>, <<"2">>}]],

% get an error if trying to re-create it
?assertMatch({error, _},
create_super_stream(Config,
<<"invoices">>,
[<<"invoices-0">>, <<"invoices-1">>,
<<"invoices-2">>],
[<<"0">>, <<"1">>, <<"2">>])),

% can delete it
?assertEqual(ok, delete_super_stream(Config, <<"invoices">>)),

% create a stream with the same name as a potential partition
?assertMatch({ok, _}, create_stream(Config, <<"invoices-1">>)),

% cannot create the super stream because a partition already exists
?assertMatch({error, _},
create_super_stream(Config,
<<"invoices">>,
[<<"invoices-0">>, <<"invoices-1">>,
<<"invoices-2">>],
[<<"0">>, <<"1">>, <<"2">>])),

ok.

create_super_stream(Config, Name, Partitions, RKs) ->
rabbit_ct_broker_helpers:rpc(Config,
0,
rabbit_stream_manager,
create_super_stream,
[<<"/">>,
Name,
Partitions,
#{},
RKs,
<<"guest">>]).

delete_super_stream(Config, Name) ->
rabbit_ct_broker_helpers:rpc(Config,
0,
rabbit_stream_manager,
delete_super_stream,
[<<"/">>, Name, <<"guest">>]).

create_stream(Config, Name) ->
rabbit_ct_broker_helpers:rpc(Config,
0,
rabbit_stream_manager,
create,
[<<"/">>, Name, [], <<"guest">>]).

delete_stream(Config, Name) ->
rabbit_ct_broker_helpers:rpc(Config,
0,
rabbit_stream_manager,
delete,
[<<"/">>, Name, <<"guest">>]).

lookup_leader(Config, Name) ->
rabbit_ct_broker_helpers:rpc(Config,
0,
rabbit_stream_manager,
lookup_leader,
[<<"/">>, Name]).

partitions(Config, Name) ->
rabbit_ct_broker_helpers:rpc(Config,
0,
rabbit_stream_manager,
partitions,
[<<"/">>, Name]).

route(Config, RoutingKey, SuperStream) ->
rabbit_ct_broker_helpers:rpc(Config,
0,
rabbit_stream_manager,
route,
[RoutingKey, <<"/">>, SuperStream]).

0 comments on commit 2455aec

Please sign in to comment.