diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index e9f91041e706..c52d329392f9 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -99,7 +99,7 @@ -spec info_keys() -> rabbit_types:info_keys(). -spec init_with_backing_queue_state (rabbit_types:amqqueue(), atom(), tuple(), any(), - [rabbit_types:delivery()], pmon:pmon(), dict:dict()) -> + [rabbit_types:delivery()], pmon:pmon(), gb_trees:tree()) -> #q{}. %%---------------------------------------------------------------------------- diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index b006e37eb2b0..b9952178e02f 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -57,13 +57,13 @@ coordinator :: pid(), backing_queue :: atom(), backing_queue_state :: any(), - seen_status :: dict:dict(), + seen_status :: map(), confirmed :: [rabbit_guid:guid()], known_senders :: sets:set() }. -spec promote_backing_queue_state (rabbit_amqqueue:name(), pid(), atom(), any(), pid(), [any()], - dict:dict(), [pid()]) -> + map(), [pid()]) -> master_state(). -spec sender_death_fun() -> death_fun(). @@ -127,7 +127,7 @@ init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) -> coordinator = CPid, backing_queue = BQ, backing_queue_state = BQS, - seen_status = dict:new(), + seen_status = #{}, confirmed = [], known_senders = sets:new(), wait_timeout = rabbit_misc:get_env(rabbit, slave_wait_timeout, 15000) }; @@ -266,7 +266,7 @@ publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid, Flow, seen_status = SS, backing_queue = BQ, backing_queue_state = BQS }) -> - false = dict:is_key(MsgId, SS), %% ASSERTION + false = maps:is_key(MsgId, SS), %% ASSERTION ok = gm:broadcast(GM, {publish, ChPid, Flow, MsgProps, Msg}, rabbit_basic:msg_size(Msg)), BQS1 = BQ:publish(Msg, MsgProps, IsDelivered, ChPid, Flow, BQS), @@ -281,7 +281,7 @@ batch_publish(Publishes, ChPid, Flow, lists:foldl(fun ({Msg = #basic_message { id = MsgId }, MsgProps, _IsDelivered}, {Pubs, false, Sizes}) -> {[{Msg, MsgProps, true} | Pubs], %% [0] - false = dict:is_key(MsgId, SS), %% ASSERTION + false = maps:is_key(MsgId, SS), %% ASSERTION Sizes + rabbit_basic:msg_size(Msg)} end, {[], false, 0}, Publishes), Publishes2 = lists:reverse(Publishes1), @@ -298,7 +298,7 @@ publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps, seen_status = SS, backing_queue = BQ, backing_queue_state = BQS }) -> - false = dict:is_key(MsgId, SS), %% ASSERTION + false = maps:is_key(MsgId, SS), %% ASSERTION ok = gm:broadcast(GM, {publish_delivered, ChPid, Flow, MsgProps, Msg}, rabbit_basic:msg_size(Msg)), {AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, Flow, BQS), @@ -313,7 +313,7 @@ batch_publish_delivered(Publishes, ChPid, Flow, {false, MsgSizes} = lists:foldl(fun ({Msg = #basic_message { id = MsgId }, _MsgProps}, {false, Sizes}) -> - {false = dict:is_key(MsgId, SS), %% ASSERTION + {false = maps:is_key(MsgId, SS), %% ASSERTION Sizes + rabbit_basic:msg_size(Msg)} end, {false, 0}, Publishes), ok = gm:broadcast(GM, {batch_publish_delivered, ChPid, Flow, Publishes}, @@ -326,7 +326,7 @@ discard(MsgId, ChPid, Flow, State = #state { gm = GM, backing_queue = BQ, backing_queue_state = BQS, seen_status = SS }) -> - false = dict:is_key(MsgId, SS), %% ASSERTION + false = maps:is_key(MsgId, SS), %% ASSERTION ok = gm:broadcast(GM, {discard, ChPid, Flow, MsgId}), ensure_monitoring(ChPid, State #state { backing_queue_state = @@ -353,7 +353,7 @@ drain_confirmed(State = #state { backing_queue = BQ, lists:foldl( fun (MsgId, {MsgIdsN, SSN}) -> %% We will never see 'discarded' here - case dict:find(MsgId, SSN) of + case maps:find(MsgId, SSN) of error -> {[MsgId | MsgIdsN], SSN}; {ok, published} -> @@ -364,7 +364,7 @@ drain_confirmed(State = #state { backing_queue = BQ, %% consequently we need to filter out the %% confirm here. We will issue the confirm %% when we see the publish from the channel. - {MsgIdsN, dict:store(MsgId, confirmed, SSN)}; + {MsgIdsN, maps:put(MsgId, confirmed, SSN)}; {ok, confirmed} -> %% Well, confirms are racy by definition. {[MsgId | MsgIdsN], SSN} @@ -457,7 +457,7 @@ msg_rates(#state { backing_queue = BQ, backing_queue_state = BQS }) -> info(backing_queue_status, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> BQ:info(backing_queue_status, BQS) ++ - [ {mirror_seen, dict:size(State #state.seen_status)}, + [ {mirror_seen, maps:size(State #state.seen_status)}, {mirror_senders, sets:size(State #state.known_senders)} ]; info(Item, #state { backing_queue = BQ, backing_queue_state = BQS }) -> BQ:info(Item, BQS). @@ -480,7 +480,7 @@ is_duplicate(Message = #basic_message { id = MsgId }, %% it. %% We will never see {published, ChPid, MsgSeqNo} here. - case dict:find(MsgId, SS) of + case maps:find(MsgId, SS) of error -> %% We permit the underlying BQ to have a peek at it, but %% only if we ourselves are not filtering out the msg. @@ -494,7 +494,7 @@ is_duplicate(Message = #basic_message { id = MsgId }, %% immediately after calling is_duplicate). The msg is %% invalid. We will not see this again, nor will we be %% further involved in confirming this message, so erase. - {true, State #state { seen_status = dict:erase(MsgId, SS) }}; + {true, State #state { seen_status = maps:remove(MsgId, SS) }}; {ok, Disposition} when Disposition =:= confirmed %% It got published when we were a slave via gm, and @@ -509,7 +509,7 @@ is_duplicate(Message = #basic_message { id = MsgId }, %% Message was discarded while we were a slave. Confirm now. %% As above, amqqueue_process will have the entry for the %% msg_id_to_channel mapping. - {true, State #state { seen_status = dict:erase(MsgId, SS), + {true, State #state { seen_status = maps:remove(MsgId, SS), confirmed = [MsgId | Confirmed] }} end. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 61623c9441df..748a5afdf5be 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -129,10 +129,10 @@ handle_go(Q = #amqqueue{name = QName}) -> rate_timer_ref = undefined, sync_timer_ref = undefined, - sender_queues = dict:new(), - msg_id_ack = dict:new(), + sender_queues = #{}, + msg_id_ack = #{}, - msg_id_status = dict:new(), + msg_id_status = #{}, known_senders = pmon:new(delegate), depth_delta = undefined @@ -310,7 +310,7 @@ handle_cast({sync_start, Ref, Syncer}, State1 = #state{rate_timer_ref = TRef} = ensure_rate_timer(State), S = fun({MA, TRefN, BQSN}) -> State1#state{depth_delta = undefined, - msg_id_ack = dict:from_list(MA), + msg_id_ack = maps:from_list(MA), rate_timer_ref = TRefN, backing_queue_state = BQSN} end, @@ -546,7 +546,7 @@ send_or_record_confirm(published, #delivery { sender = ChPid, id = MsgId, is_persistent = true } }, MS, #state { q = #amqqueue { durable = true } }) -> - dict:store(MsgId, {published, ChPid, MsgSeqNo} , MS); + maps:put(MsgId, {published, ChPid, MsgSeqNo} , MS); send_or_record_confirm(_Status, #delivery { sender = ChPid, confirm = true, msg_seq_no = MsgSeqNo }, @@ -559,7 +559,7 @@ confirm_messages(MsgIds, State = #state { msg_id_status = MS }) -> lists:foldl( fun (MsgId, {CMsN, MSN} = Acc) -> %% We will never see 'discarded' here - case dict:find(MsgId, MSN) of + case maps:find(MsgId, MSN) of error -> %% If it needed confirming, it'll have %% already been done. @@ -567,12 +567,12 @@ confirm_messages(MsgIds, State = #state { msg_id_status = MS }) -> {ok, published} -> %% Still not seen it from the channel, just %% record that it's been confirmed. - {CMsN, dict:store(MsgId, confirmed, MSN)}; + {CMsN, maps:put(MsgId, confirmed, MSN)}; {ok, {published, ChPid, MsgSeqNo}} -> %% Seen from both GM and Channel. Can now %% confirm. {rabbit_misc:gb_trees_cons(ChPid, MsgSeqNo, CMsN), - dict:erase(MsgId, MSN)}; + maps:remove(MsgId, MSN)}; {ok, confirmed} -> %% It's already been confirmed. This is %% probably it's been both sync'd to disk @@ -672,21 +672,21 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, %% Master, or MTC in queue_process. St = [published, confirmed, discarded], - SS = dict:filter(fun (_MsgId, Status) -> lists:member(Status, St) end, MS), - AckTags = [AckTag || {_MsgId, AckTag} <- dict:to_list(MA)], + SS = maps:filter(fun (_MsgId, Status) -> lists:member(Status, St) end, MS), + AckTags = [AckTag || {_MsgId, AckTag} <- maps:to_list(MA)], MasterState = rabbit_mirror_queue_master:promote_backing_queue_state( QName, CPid, BQ, BQS, GM, AckTags, SS, MPids), - MTC = dict:fold(fun (MsgId, {published, ChPid, MsgSeqNo}, MTC0) -> + MTC = maps:fold(fun (MsgId, {published, ChPid, MsgSeqNo}, MTC0) -> gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0); (_Msgid, _Status, MTC0) -> MTC0 end, gb_trees:empty(), MS), Deliveries = [promote_delivery(Delivery) || - {_ChPid, {PubQ, _PendCh, _ChState}} <- dict:to_list(SQ), + {_ChPid, {PubQ, _PendCh, _ChState}} <- maps:to_list(SQ), Delivery <- queue:to_list(PubQ)], - AwaitGmDown = [ChPid || {ChPid, {_, _, down_from_ch}} <- dict:to_list(SQ)], + AwaitGmDown = [ChPid || {ChPid, {_, _, down_from_ch}} <- maps:to_list(SQ)], KS1 = lists:foldl(fun (ChPid0, KS0) -> pmon:demonitor(ChPid0, KS0) end, KS, AwaitGmDown), @@ -798,20 +798,20 @@ forget_sender(Down1, Down2) when Down1 =/= Down2 -> true. maybe_forget_sender(ChPid, ChState, State = #state { sender_queues = SQ, msg_id_status = MS, known_senders = KS }) -> - case dict:find(ChPid, SQ) of + case maps:find(ChPid, SQ) of error -> State; {ok, {MQ, PendCh, ChStateRecord}} -> case forget_sender(ChState, ChStateRecord) of true -> credit_flow:peer_down(ChPid), - State #state { sender_queues = dict:erase(ChPid, SQ), + State #state { sender_queues = maps:remove(ChPid, SQ), msg_id_status = lists:foldl( - fun dict:erase/2, + fun maps:remove/2, MS, sets:to_list(PendCh)), known_senders = pmon:demonitor(ChPid, KS) }; false -> - SQ1 = dict:store(ChPid, {MQ, PendCh, ChState}, SQ), + SQ1 = maps:put(ChPid, {MQ, PendCh, ChState}, SQ), State #state { sender_queues = SQ1 } end end. @@ -823,32 +823,32 @@ maybe_enqueue_message( send_mandatory(Delivery), %% must do this before confirms State1 = ensure_monitoring(ChPid, State), %% We will never see {published, ChPid, MsgSeqNo} here. - case dict:find(MsgId, MS) of + case maps:find(MsgId, MS) of error -> {MQ, PendingCh, ChState} = get_sender_queue(ChPid, SQ), MQ1 = queue:in(Delivery, MQ), - SQ1 = dict:store(ChPid, {MQ1, PendingCh, ChState}, SQ), + SQ1 = maps:put(ChPid, {MQ1, PendingCh, ChState}, SQ), State1 #state { sender_queues = SQ1 }; {ok, Status} -> MS1 = send_or_record_confirm( - Status, Delivery, dict:erase(MsgId, MS), State1), + Status, Delivery, maps:remove(MsgId, MS), State1), SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ), State1 #state { msg_id_status = MS1, sender_queues = SQ1 } end. get_sender_queue(ChPid, SQ) -> - case dict:find(ChPid, SQ) of + case maps:find(ChPid, SQ) of error -> {queue:new(), sets:new(), running}; {ok, Val} -> Val end. remove_from_pending_ch(MsgId, ChPid, SQ) -> - case dict:find(ChPid, SQ) of + case maps:find(ChPid, SQ) of error -> SQ; {ok, {MQ, PendingCh, ChState}} -> - dict:store(ChPid, {MQ, sets:del_element(MsgId, PendingCh), ChState}, + maps:put(ChPid, {MQ, sets:del_element(MsgId, PendingCh), ChState}, SQ) end. @@ -865,7 +865,7 @@ publish_or_discard(Status, ChPid, MsgId, case queue:out(MQ) of {empty, _MQ2} -> {MQ, sets:add_element(MsgId, PendingCh), - dict:store(MsgId, Status, MS)}; + maps:put(MsgId, Status, MS)}; {{value, Delivery = #delivery { message = #basic_message { id = MsgId } }}, MQ2} -> {MQ2, PendingCh, @@ -880,7 +880,7 @@ publish_or_discard(Status, ChPid, MsgId, %% expecting any confirms from us. {MQ, PendingCh, MS} end, - SQ1 = dict:store(ChPid, {MQ1, PendingCh1, ChState}, SQ), + SQ1 = maps:put(ChPid, {MQ1, PendingCh1, ChState}, SQ), State1 #state { sender_queues = SQ1, msg_id_status = MS1 }. @@ -1002,9 +1002,9 @@ msg_ids_to_acktags(MsgIds, MA) -> {AckTags, MA1} = lists:foldl( fun (MsgId, {Acc, MAN}) -> - case dict:find(MsgId, MA) of + case maps:find(MsgId, MA) of error -> {Acc, MAN}; - {ok, AckTag} -> {[AckTag | Acc], dict:erase(MsgId, MAN)} + {ok, AckTag} -> {[AckTag | Acc], maps:remove(MsgId, MAN)} end end, {[], MA}, MsgIds), {lists:reverse(AckTags), MA1}. @@ -1012,7 +1012,7 @@ msg_ids_to_acktags(MsgIds, MA) -> maybe_store_ack(false, _MsgId, _AckTag, State) -> State; maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA }) -> - State #state { msg_id_ack = dict:store(MsgId, AckTag, MA) }. + State #state { msg_id_ack = maps:put(MsgId, AckTag, MA) }. set_delta(0, State = #state { depth_delta = undefined }) -> ok = record_synchronised(State#state.q), diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index 4e9715fba219..f13a46fcf32c 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -64,7 +64,8 @@ -spec max_active_priority(state()) -> integer() | 'infinity' | 'empty'. -spec inactive(state()) -> boolean(). -spec all(state()) -> [{ch(), rabbit_types:ctag(), boolean(), - non_neg_integer(), rabbit_framing:amqp_table()}]. + non_neg_integer(), rabbit_framing:amqp_table(), + rabbit_types:username()}]. -spec count() -> non_neg_integer(). -spec unacknowledged_message_count() -> non_neg_integer(). -spec add(ch(), rabbit_types:ctag(), boolean(), pid(), boolean(), @@ -280,7 +281,7 @@ subtract_acks([T | TL] = AckTags, Prefix, CTagCounts, AckQ) -> orddict:update_counter(CTag, 1, CTagCounts), QTail); {{value, V}, QTail} -> subtract_acks(AckTags, [V | Prefix], CTagCounts, QTail); - {empty, _} -> + {empty, _} -> subtract_acks([], Prefix, CTagCounts, AckQ) end.