diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl index 67f3be221486..ff6089fb76a5 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl @@ -56,7 +56,41 @@ create(VirtualHost, Reference, Arguments, Username) -> delete(VirtualHost, Reference, Username) -> gen_server:call(?MODULE, {delete, VirtualHost, Reference, Username}). +<<<<<<< HEAD -spec lookup_leader(binary(), binary()) -> pid() | cluster_not_found. +======= +-spec create_super_stream(binary(), + binary(), + [binary()], + #{binary() => binary()}, + [binary()], + binary()) -> + ok | {error, term()}. +create_super_stream(VirtualHost, + Name, + Partitions, + Arguments, + RoutingKeys, + Username) -> + gen_server:call(?MODULE, + {create_super_stream, + VirtualHost, + Name, + Partitions, + Arguments, + RoutingKeys, + Username}). + +-spec delete_super_stream(binary(), binary(), binary()) -> + ok | {error, term()}. +delete_super_stream(VirtualHost, Name, Username) -> + gen_server:call(?MODULE, + {delete_super_stream, VirtualHost, Name, Username}). + +-spec lookup_leader(binary(), binary()) -> + {ok, pid()} | {error, not_available} | + {error, not_found}. +>>>>>>> bd4771addb (Return appropriate response when stream leader not available) lookup_leader(VirtualHost, Stream) -> gen_server:call(?MODULE, {lookup_leader, VirtualHost, Stream}). @@ -256,20 +290,25 @@ handle_call({lookup_leader, VirtualHost, Stream}, _From, State) -> LeaderPid = amqqueue:get_pid(Q), case process_alive(LeaderPid) of true -> - LeaderPid; + {ok, LeaderPid}; false -> case leader_from_members(Q) of {ok, Pid} -> - Pid; + {ok, Pid}; _ -> - cluster_not_found + {error, not_available} end end; _ -> - cluster_not_found + {error, not_found} end; - _ -> - cluster_not_found + {error, not_found} -> + case rabbit_amqqueue:not_found_or_absent_dirty(Name) of + not_found -> + {error, not_found}; + _ -> + {error, not_available} + end end, {reply, Res, State}; handle_call({lookup_local_member, VirtualHost, Stream}, _From, diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 12d43526036c..abda3a64c23d 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -1494,7 +1494,7 @@ handle_frame_post_auth(Transport, of {false, false} -> case lookup_leader(Stream, Connection0) of - cluster_not_found -> + {error, not_found} -> response(Transport, Connection0, declare_publisher, @@ -1504,6 +1504,16 @@ handle_frame_post_auth(Transport, ?STREAM_DOES_NOT_EXIST, 1), {Connection0, State}; + {error, not_available} -> + response(Transport, + Connection0, + declare_publisher, + CorrelationId, + ?RESPONSE_CODE_STREAM_NOT_AVAILABLE), + rabbit_global_counters:increase_protocol_counter(stream, + ?STREAM_NOT_AVAILABLE, + 1), + {Connection0, State}; {ClusterLeader, #stream_connection{publishers = Publishers0, publisher_to_ids = RefIds0} = @@ -1960,9 +1970,9 @@ handle_frame_post_auth(_Transport, of ok -> case lookup_leader(Stream, Connection) of - cluster_not_found -> - rabbit_log:warning("Could not find leader to store offset on ~p", - [Stream]), + {error, Error} -> + rabbit_log:warning("Could not find leader to store offset on ~p: ~p", + [Stream, Error]), %% FIXME store offset is fire-and-forget, so no response even if error, change this? {Connection, State}; {ClusterLeader, Connection1} -> @@ -1992,11 +2002,16 @@ handle_frame_post_auth(Transport, of ok -> case lookup_leader(Stream, Connection0) of - cluster_not_found -> + {error, not_found} -> rabbit_global_counters:increase_protocol_counter(stream, ?STREAM_DOES_NOT_EXIST, 1), {?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, 0, Connection0}; + {error, not_available} -> + rabbit_global_counters:increase_protocol_counter(stream, + ?STREAM_NOT_AVAILABLE, + 1), + {?RESPONSE_CODE_STREAM_NOT_AVAILABLE, 0, Connection0}; {LeaderPid, C} -> {RC, O} = case osiris:read_tracking(LeaderPid, Reference) of @@ -2532,9 +2547,9 @@ lookup_leader(Stream, case maps:get(Stream, StreamLeaders, undefined) of undefined -> case lookup_leader_from_manager(VirtualHost, Stream) of - cluster_not_found -> - cluster_not_found; - LeaderPid -> + {error, Error} -> + {error, Error}; + {ok, LeaderPid} -> Connection1 = maybe_monitor_stream(LeaderPid, Stream, Connection), {LeaderPid, diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java index b80647120728..29918c1ce7e7 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java @@ -16,11 +16,14 @@ package com.rabbitmq.stream; +import static com.rabbitmq.stream.TestUtils.waitUntil; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.fail; import com.rabbitmq.stream.codec.WrapperMessageBuilder; import com.rabbitmq.stream.impl.Client; +import com.rabbitmq.stream.impl.Client.ClientParameters; +import com.rabbitmq.stream.impl.Client.Response; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.*; @@ -66,7 +69,7 @@ void leaderFailureWhenPublisherConnectedToReplica() throws Exception { Client.StreamMetadata streamMetadata = metadata.get(stream); assertThat(streamMetadata).isNotNull(); - TestUtils.waitUntil(() -> client.metadata(stream).get(stream).getReplicas().size() == 2); + waitUntil(() -> client.metadata(stream).get(stream).getReplicas().size() == 2); streamMetadata = client.metadata(stream).get(stream); assertThat(streamMetadata.getLeader().getPort()).isEqualTo(TestUtils.streamPortNode1()); @@ -372,8 +375,7 @@ void consumerReattachesToOtherReplicaWhenReplicaGoesAway() throws Exception { Client.StreamMetadata streamMetadata = metadata.get(stream); assertThat(streamMetadata).isNotNull(); - TestUtils.waitUntil( - () -> metadataClient.metadata(stream).get(stream).getReplicas().size() == 2); + waitUntil(() -> metadataClient.metadata(stream).get(stream).getReplicas().size() == 2); metadata = metadataClient.metadata(stream); streamMetadata = metadata.get(stream); @@ -551,4 +553,33 @@ void consumerReattachesToOtherReplicaWhenReplicaGoesAway() throws Exception { confirmedIds.forEach(confirmedId -> assertThat(consumedIds).contains(confirmedId)); } + + @Test + void declarePublisherShouldNotReturnStreamDoesNotExistOnRestart() throws Exception { + try { + Host.rabbitmqctl("stop_app"); + } finally { + Host.rabbitmqctl("start_app"); + } + AtomicReference client = new AtomicReference<>(); + waitUntil( + () -> { + try { + client.set(cf.get(new ClientParameters().port(TestUtils.streamPortNode1()))); + } catch (Exception e) { + + } + return client.get() != null; + }); + Set responseCodes = ConcurrentHashMap.newKeySet(); + + waitUntil( + () -> { + Response response = client.get().declarePublisher((byte) 0, null, stream); + responseCodes.add(response.getResponseCode()); + return response.isOk(); + }); + + assertThat(responseCodes).doesNotContain(Constants.RESPONSE_CODE_STREAM_DOES_NOT_EXIST); + } } diff --git a/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl new file mode 100644 index 000000000000..397b9f6d5361 --- /dev/null +++ b/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl @@ -0,0 +1,180 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_stream_manager_SUITE). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +-compile(export_all). + +all() -> + [{group, non_parallel_tests}]. + +groups() -> + [{non_parallel_tests, [], [manage_super_stream, lookup_leader]}]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + case rabbit_ct_helpers:is_mixed_versions() of + true -> + {skip, "mixed version clusters are not supported"}; + _ -> + rabbit_ct_helpers:log_environment(), + Config + end. + +end_per_suite(Config) -> + Config. + +init_per_group(_, Config) -> + Config1 = + rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, false}]), + Config2 = + rabbit_ct_helpers:set_config(Config1, + {rabbitmq_ct_tls_verify, verify_none}), + Config3 = + rabbit_ct_helpers:set_config(Config2, {rabbitmq_stream, verify_none}), + rabbit_ct_helpers:run_setup_steps(Config3, + [fun(StepConfig) -> + rabbit_ct_helpers:merge_app_env(StepConfig, + {rabbit, + [{core_metrics_gc_interval, + 1000}]}) + end, + fun(StepConfig) -> + rabbit_ct_helpers:merge_app_env(StepConfig, + {rabbitmq_stream, + [{connection_negotiation_step_timeout, + 500}]}) + end] + ++ rabbit_ct_broker_helpers:setup_steps()). + +end_per_group(_, Config) -> + rabbit_ct_helpers:run_steps(Config, + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase). + +end_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +%% ------------------------------------------------------------------- +%% Testcases. +%% ------------------------------------------------------------------- + +lookup_leader(Config) -> + Stream = <<"stream_manager_lookup_leader_stream">>, + ?assertMatch({ok, _}, create_stream(Config, Stream)), + + {ok, Pid} = lookup_leader(Config, Stream), + ?assert(is_pid(Pid)), + + ?assertEqual({error, not_found}, lookup_leader(Config, <<"foo">>)), + + ?assertEqual({ok, deleted}, delete_stream(Config, Stream)). + +manage_super_stream(Config) -> + % create super stream + ?assertEqual(ok, + create_super_stream(Config, + <<"invoices">>, + [<<"invoices-0">>, <<"invoices-1">>, + <<"invoices-2">>], + [<<"0">>, <<"1">>, <<"2">>])), + % get the correct partitions + ?assertEqual({ok, + [<<"invoices-0">>, <<"invoices-1">>, <<"invoices-2">>]}, + partitions(Config, <<"invoices">>)), + + [?assertEqual({ok, [Partition]}, + route(Config, RoutingKey, <<"invoices">>)) + || {Partition, RoutingKey} + <- [{<<"invoices-0">>, <<"0">>}, {<<"invoices-1">>, <<"1">>}, + {<<"invoices-2">>, <<"2">>}]], + + % get an error if trying to re-create it + ?assertMatch({error, _}, + create_super_stream(Config, + <<"invoices">>, + [<<"invoices-0">>, <<"invoices-1">>, + <<"invoices-2">>], + [<<"0">>, <<"1">>, <<"2">>])), + + % can delete it + ?assertEqual(ok, delete_super_stream(Config, <<"invoices">>)), + + % create a stream with the same name as a potential partition + ?assertMatch({ok, _}, create_stream(Config, <<"invoices-1">>)), + + % cannot create the super stream because a partition already exists + ?assertMatch({error, _}, + create_super_stream(Config, + <<"invoices">>, + [<<"invoices-0">>, <<"invoices-1">>, + <<"invoices-2">>], + [<<"0">>, <<"1">>, <<"2">>])), + + ok. + +create_super_stream(Config, Name, Partitions, RKs) -> + rabbit_ct_broker_helpers:rpc(Config, + 0, + rabbit_stream_manager, + create_super_stream, + [<<"/">>, + Name, + Partitions, + #{}, + RKs, + <<"guest">>]). + +delete_super_stream(Config, Name) -> + rabbit_ct_broker_helpers:rpc(Config, + 0, + rabbit_stream_manager, + delete_super_stream, + [<<"/">>, Name, <<"guest">>]). + +create_stream(Config, Name) -> + rabbit_ct_broker_helpers:rpc(Config, + 0, + rabbit_stream_manager, + create, + [<<"/">>, Name, [], <<"guest">>]). + +delete_stream(Config, Name) -> + rabbit_ct_broker_helpers:rpc(Config, + 0, + rabbit_stream_manager, + delete, + [<<"/">>, Name, <<"guest">>]). + +lookup_leader(Config, Name) -> + rabbit_ct_broker_helpers:rpc(Config, + 0, + rabbit_stream_manager, + lookup_leader, + [<<"/">>, Name]). + +partitions(Config, Name) -> + rabbit_ct_broker_helpers:rpc(Config, + 0, + rabbit_stream_manager, + partitions, + [<<"/">>, Name]). + +route(Config, RoutingKey, SuperStream) -> + rabbit_ct_broker_helpers:rpc(Config, + 0, + rabbit_stream_manager, + route, + [RoutingKey, <<"/">>, SuperStream]).