Skip to content

Commit

Permalink
Merge pull request emqx#11315 from keynslug/ft/EMQX-9593/peek-mqueue-…
Browse files Browse the repository at this point in the history
…info

refactor(session): allow peeking at mqueue less intrusively
  • Loading branch information
keynslug committed Jul 24, 2023
2 parents 2b118f0 + e1e4c64 commit 81793c3
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 17 deletions.
9 changes: 3 additions & 6 deletions apps/emqx/src/emqx_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@
info(Channel) ->
maps:from_list(info(?INFO_KEYS, Channel)).

-spec info(list(atom()) | atom(), channel()) -> term().
-spec info(list(atom()) | atom() | tuple(), channel()) -> term().
info(Keys, Channel) when is_list(Keys) ->
[{Key, info(Key, Channel)} || Key <- Keys];
info(conninfo, #channel{conninfo = ConnInfo}) ->
Expand Down Expand Up @@ -180,6 +180,8 @@ info(username, #channel{clientinfo = ClientInfo}) ->
maps:get(username, ClientInfo, undefined);
info(session, #channel{session = Session}) ->
maybe_apply(fun emqx_session:info/1, Session);
info({session, Info}, #channel{session = Session}) ->
maybe_apply(fun(S) -> emqx_session:info(Info, S) end, Session);
info(conn_state, #channel{conn_state = ConnState}) ->
ConnState;
info(keepalive, #channel{keepalive = Keepalive}) ->
Expand Down Expand Up @@ -1195,8 +1197,6 @@ handle_call(
ChanInfo1 = info(NChannel),
emqx_cm:set_chan_info(ClientId, ChanInfo1#{sockinfo => SockInfo}),
reply(ok, reset_timer(alive_timer, NChannel));
handle_call(get_mqueue, Channel) ->
reply({ok, get_mqueue(Channel)}, Channel);
handle_call(Req, Channel) ->
?SLOG(error, #{msg => "unexpected_call", call => Req}),
reply(ignored, Channel).
Expand Down Expand Up @@ -2240,6 +2240,3 @@ get_mqtt_conf(Zone, Key, Default) ->
set_field(Name, Value, Channel) ->
Pos = emqx_utils:index_of(Name, record_info(fields, channel)),
setelement(Pos + 1, Channel, Value).

get_mqueue(#channel{session = Session}) ->
emqx_session:get_mqueue(Session).
10 changes: 6 additions & 4 deletions apps/emqx/src/emqx_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@

-export([
info/1,
info/2,
stats/1
]).

Expand Down Expand Up @@ -221,11 +222,10 @@ info(CPid) when is_pid(CPid) ->
call(CPid, info);
info(State = #state{channel = Channel}) ->
ChanInfo = emqx_channel:info(Channel),
SockInfo = maps:from_list(
info(?INFO_KEYS, State)
),
SockInfo = maps:from_list(info(?INFO_KEYS, State)),
ChanInfo#{sockinfo => SockInfo}.

-spec info([atom()] | atom() | tuple(), pid() | state()) -> term().
info(Keys, State) when is_list(Keys) ->
[{Key, info(Key, State)} || Key <- Keys];
info(socktype, #state{transport = Transport, socket = Socket}) ->
Expand All @@ -241,7 +241,9 @@ info(stats_timer, #state{stats_timer = StatsTimer}) ->
info(limiter, #state{limiter = Limiter}) ->
Limiter;
info(limiter_timer, #state{limiter_timer = Timer}) ->
Timer.
Timer;
info({channel, Info}, #state{channel = Channel}) ->
emqx_channel:info(Info, Channel).

%% @doc Get stats of the connection/channel.
-spec stats(pid() | state()) -> emqx_types:stats().
Expand Down
6 changes: 1 addition & 5 deletions apps/emqx/src/emqx_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@
info/1,
info/2,
stats/1,
obtain_next_pkt_id/1,
get_mqueue/1
obtain_next_pkt_id/1
]).

-export([
Expand Down Expand Up @@ -955,6 +954,3 @@ age(Now, Ts) -> Now - Ts.
set_field(Name, Value, Session) ->
Pos = emqx_utils:index_of(Name, record_info(fields, session)),
setelement(Pos + 1, Session, Value).

get_mqueue(#session{mqueue = Q}) ->
emqx_mqueue:to_list(Q).
7 changes: 5 additions & 2 deletions apps/emqx/test/emqx_shared_sub_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -758,13 +758,16 @@ t_qos1_random_dispatch_if_all_members_are_down(Config) when is_list(Config) ->

{ok, _} = emqtt:publish(ConnPub, Topic, <<"hello11">>, 1),
ct:sleep(100),
{ok, Msgs1} = gen_server:call(Pid1, get_mqueue),
{ok, Msgs2} = gen_server:call(Pid2, get_mqueue),
Msgs1 = emqx_mqueue:to_list(get_mqueue(Pid1)),
Msgs2 = emqx_mqueue:to_list(get_mqueue(Pid2)),
%% assert the message is in mqueue (because socket is closed)
?assertMatch([#message{payload = <<"hello11">>}], Msgs1 ++ Msgs2),
emqtt:stop(ConnPub),
ok.

get_mqueue(ConnPid) ->
emqx_connection:info({channel, {session, mqueue}}, sys:get_state(ConnPid)).

%% No ack, QoS 2 subscriptions,
%% client1 receives one message, send pubrec, then suspend
%% client2 acts normal (auto_ack=true)
Expand Down

0 comments on commit 81793c3

Please sign in to comment.