Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cluster ha improvements #1769

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions apps/vmq_server/src/vmq_cluster_com.erl
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@ process_bytes(Bytes, Buffer, St) ->
<<"vmq-send", L:32, BFrames:L/binary, Rest/binary>> ->
process(BFrames, St),
process_bytes(Rest, <<>>, St);
<<"vmq-tail", L:32, BinPid:L/binary, Rest/binary>> ->
Pid = binary_to_term(BinPid),
Pid ! block_ack,
process_bytes(Rest, <<>>, St);
_ ->
%% if we have received something else than "vmq-send" we
%% will buffer everything unbounded forever and ever!
Expand Down
66 changes: 53 additions & 13 deletions apps/vmq_server/src/vmq_cluster_node.erl
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,18 @@
socket,
transport,
reachable=false,
waiting_for_ack=false,
pending = [],
transit = [],
max_queue_size,
reconnect_tref,
waiting_ack_tref,
async_connect_pid,
bytes_dropped={os:timestamp(), 0},
bytes_send={os:timestamp(), 0}}).

-define(RECONNECT, 1000).
-define(ACK_TIMEOUT, 3000).

%%%===================================================================
%%% API
Expand Down Expand Up @@ -120,9 +124,10 @@ init([Parent, RemoteNode]) ->
erlang:send_after(1000, self(), reconnect),
loop(#state{parent=Parent, node=RemoteNode, max_queue_size=MaxQueueSize}).

loop(#state{pending=Pending, reachable=Reachable} = State)
loop(#state{pending=Pending, waiting_for_ack=Waiting, reachable=Reachable} = State)
when
Pending == [];
Waiting == true;
Reachable == false ->
receive
M ->
Expand Down Expand Up @@ -201,20 +206,32 @@ handle_message({connect_async_done, AsyncPid, {ok, {Transport, Socket}}},
Msg = [<<"vmq-connect">>, <<L:32, NodeName/binary>>],
case send(Transport, Socket, Msg) of
ok ->
lager:info("successfully connected to cluster node ~p", [RemoteNode]),
State#state{socket=Socket, transport=Transport,
lager:info("cluster_node ~p : successfully connected to cluster node ~p", [node(), RemoteNode]),
NewState = State#state{socket=Socket, transport=Transport,
%% !!! remote node is reachable
async_connect_pid=undefined,
reachable=true};
reachable=true,
waiting_for_ack=false},
internal_flush(NewState);
{error, Reason} ->
lager:warning("can't initiate connect to cluster node ~p due to ~p", [RemoteNode, Reason]),
lager:warning("cluster_node ~p : can't initiate connect to cluster node ~p due to ~p", [node(), RemoteNode, Reason]),
close_reconnect(State)
end;
handle_message({connect_async_done, AsyncPid, error}, #state{async_connect_pid=AsyncPid} = State) ->
% connect_async already logged the error details
close_reconnect(State);
handle_message(block_ack, State) ->
%% block ack received, cancel timeout timer
NewState = cancel_ack_timer(State),
NewState#state{waiting_for_ack=false, transit=[]};
handle_message(reconnect, #state{reachable=false} = State) ->
connect(State#state{reconnect_tref=undefined});
%% avoid timer interference
NewState = cancel_ack_timer(State),
connect(NewState#state{reconnect_tref=undefined});
handle_message(reconnect, #state{reachable=true} = State) ->
%% avoid timer interference
NewState = cancel_ack_timer(State),
close_reconnect(NewState);
handle_message({status, CallerPid, Ref}, #state{socket=Socket, reachable=Reachable}=State) ->
Status =
case Reachable of
Expand Down Expand Up @@ -256,12 +273,29 @@ maybe_flush(#state{pending=Pending} = State) ->
State
end.

%% In case the receiving node crashes while it still had messages in the receiving tcp buffer
%% those messages would be lost.
%% Therefore we introduce a handshake at application level:
%% - append a 'tail' to the block of messages that are sent
%% - keep a backup copy of the messages in a 'transit' buffer
%% - await the block_ack from the receiving node, as it handled the complete block
%% (upon receipt of the 'tail')
%% - when the block_ack is received, the transit buffer is cleared
%% and the next block can be transmitted
%% This handshake is protected by a timeout which will trigger a reconnect
%% Note that currently the backup is kept only in memory, a future improvement
%% would be to persist the backup on disk to also protect against message loss
%% in case the transmitting node crashes.
Comment on lines +286 to +288
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should put this in a ticket @ioolkos so it doesn't get lost in the code comments

internal_flush(#state{reachable=false} = State) -> State;
internal_flush(#state{pending=[]} = State) -> State;
internal_flush(#state{pending=Pending, node=Node, transport=Transport,
internal_flush(#state{waiting_for_ack=true} = State) -> State;
internal_flush(#state{pending=[], transit=[]} = State) -> State;
internal_flush(#state{pending=Pending, transit=Backup, node=Node, transport=Transport,
socket=Socket, bytes_send={{M, S, _}, V}} = State) ->
L = iolist_size(Pending),
Msg = [<<"vmq-send", L:32>>|lists:reverse(Pending)],
Transit = [Backup, lists:reverse(Pending)],
L = iolist_size(Transit),
BinPid = term_to_binary(self()),
P = byte_size(BinPid),
Msg = [<<"vmq-send", L:32>>, Transit, <<"vmq-tail", P:32>>, BinPid],
case send(Transport, Socket, Msg) of
ok ->
NewBytesSend =
Expand All @@ -272,10 +306,9 @@ internal_flush(#state{pending=Pending, node=Node, transport=Transport,
_ = vmq_metrics:incr_cluster_bytes_sent(V + L),
{TS, 0}
end,
State#state{pending=[], bytes_send=NewBytesSend};
State#state{pending=[], transit=Transit, waiting_for_ack=true, waiting_ack_tref=waiting_ack_timer(), bytes_send=NewBytesSend};
{error, Reason} ->
lager:warning("can't send ~p bytes to ~p due to ~p, reconnect!",
[iolist_size(Pending), Node, Reason]),
lager:warning("can't send ~p bytes to ~p due to ~p, reconnect!", [L, Node, Reason]),
close_reconnect(State)
end.

Expand Down Expand Up @@ -331,6 +364,13 @@ close_reconnect(#state{transport=Transport, socket=Socket} = State) ->
reconnect_timer() ->
erlang:send_after(?RECONNECT, self(), reconnect).

waiting_ack_timer() ->
erlang:send_after(?ACK_TIMEOUT, self(), reconnect).

cancel_ack_timer(#state{waiting_ack_tref=undefined} = State) -> State;
cancel_ack_timer(#state{waiting_ack_tref=WaitingAckTref} = State) ->
erlang:cancel_timer(WaitingAckTref),
State#state{waiting_ack_tref=undefined}.

%% connect_params is called by a RPC
connect_params(_Node) ->
Expand Down
32 changes: 22 additions & 10 deletions apps/vmq_server/src/vmq_mqtt5_fsm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -453,25 +453,24 @@ connected({mail, QPid, Msgs, _, Dropped},
{NewState2, Out} =
case handle_messages(Msgs, [], 0, NewState, Waiting) of
{NewState1, HandledMsgs, []} ->
%% messages aren't delivered (yet) but are queued in this process
%% we notify the queue that we're ready for new messages.
vmq_queue:notify(QPid),
{NewState1, HandledMsgs};
{NewState1, HandledMsgs, NewWaiting} ->
%% messages aren't delivered (yet) but are queued in this process
%% we tell the queue to get rid of them
vmq_queue:notify_recv(QPid),
%% we call vmq_queue:notify as soon as
%% the check_in_flight returns true again
%% SEE: Comment in handle_waiting_msgs function.
%% we still have waiting messages
{NewState1#state{waiting_msgs=NewWaiting}, HandledMsgs}
end,
{NewState2, Out};
connected(#mqtt5_puback{message_id=MessageId, reason_code=RC}, #state{waiting_acks=WAcks} = State) ->
%% qos1 flow
_ = vmq_metrics:incr({?MQTT5_PUBACK_RECEIVED, rc2rcn(RC)}),
case maps:get(MessageId, WAcks, not_found) of
case Msg = maps:get(MessageId, WAcks, not_found) of
#vmq_msg{} ->
Cnt = fc_decr_cnt(State#state.fc_send_cnt, puback),
handle_waiting_msgs(State#state{fc_send_cnt=Cnt, waiting_acks=maps:remove(MessageId, WAcks)});
{NewState, Out} = handle_waiting_msgs(State#state{fc_send_cnt=Cnt, waiting_acks=maps:remove(MessageId, WAcks)}),
{release_message(Msg, NewState), Out};
not_found ->
_ = vmq_metrics:incr(?MQTT5_PUBACK_INVALID_ERROR),
{State, []}
Expand All @@ -481,12 +480,13 @@ connected(#mqtt5_pubrec{message_id=MessageId, reason_code=RC}, State) when RC <
#state{waiting_acks=WAcks} = State,
%% qos2 flow
_ = vmq_metrics:incr({?MQTT5_PUBREC_RECEIVED,rc2rcn(RC)}),
case maps:get(MessageId, WAcks, not_found) of
case Msg = maps:get(MessageId, WAcks, not_found) of
#vmq_msg{} ->
PubRelFrame = #mqtt5_pubrel{message_id=MessageId, reason_code=?M5_SUCCESS, properties=#{}},
_ = vmq_metrics:incr({?MQTT5_PUBREL_SENT, ?SUCCESS}),
{State#state{waiting_acks=maps:update(MessageId, PubRelFrame, WAcks)},
[serialise_frame(PubRelFrame)]};
NewState = State#state{waiting_acks=maps:update(MessageId, PubRelFrame, WAcks)},
{release_message(Msg, NewState), [serialise_frame(PubRelFrame)]};

#mqtt5_pubrel{message_id=MessageId} = PubRelFrame ->
%% handle PUBREC retries from the client.
_ = vmq_metrics:incr({?MQTT5_PUBREL_SENT, ?SUCCESS}),
Expand Down Expand Up @@ -1381,6 +1381,7 @@ prepare_frame(#deliver{qos=QoS, msg_id=MsgId, msg=Msg}, State0) ->
properties=Props0,
expiry_ts=ExpiryTS} = Msg,
NewQoS = maybe_upgrade_qos(QoS, MsgQoS, State0),
maybe_release_message(NewQoS, Msg, State0),
{Topic1, Payload1, Props2} =
case on_deliver_hook(User, SubscriberId, QoS, Topic0, Payload0, IsRetained, Props0) of
{error, _} ->
Expand Down Expand Up @@ -1420,6 +1421,17 @@ prepare_frame(#deliver{qos=QoS, msg_id=MsgId, msg=Msg}, State0) ->
Msg#vmq_msg{qos=NewQoS}, WAcks)}}
end.

%% non-upgraded qos0 message is released immediately
-spec maybe_release_message(qos(), msg(), state()) -> state().
maybe_release_message(0, Msg, State) ->
release_message(Msg,State);
maybe_release_message(_,_,State) -> State.

-spec release_message(msg(), state()) -> state().
release_message(Msg, #state{queue_pid=QPid} = State) ->
vmq_queue:release_message(QPid, Msg),
State.

on_deliver_hook(User, SubscriberId, QoS, Topic, Payload, IsRetain, Props) ->
HookArgs0 = [User, SubscriberId, Topic, Payload, Props],
case vmq_plugin:all_till_ok(on_deliver_m5, HookArgs0) of
Expand Down
34 changes: 22 additions & 12 deletions apps/vmq_server/src/vmq_mqtt_fsm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -295,24 +295,23 @@ connected({mail, QPid, Msgs, _, Dropped},
{NewState2, Out} =
case handle_messages(Msgs, [], 0, NewState, Waiting) of
{NewState1, HandledMsgs, []} ->
vmq_queue:notify(QPid),
%% messages aren't delivered (yet) but are queued in this process
%% we notify the queue that we're ready for new messages.
vmq_queue:notify(QPid),
{NewState1, HandledMsgs};
{NewState1, HandledMsgs, NewWaiting} ->
%% messages aren't delivered (yet) but are queued in this process
%% we tell the queue to get rid of them
vmq_queue:notify_recv(QPid),
%% we call vmq_queue:notify as soon as
%% the check_in_flight returns true again
%% SEE: Comment in handle_waiting_msgs function.
%% we still have waiting messages
{NewState1#state{waiting_msgs=NewWaiting}, HandledMsgs}
end,
{NewState2, Out};
connected(#mqtt_puback{message_id=MessageId}, #state{waiting_acks=WAcks} = State) ->
%% qos1 flow
_ = vmq_metrics:incr_mqtt_puback_received(),
case maps:get(MessageId, WAcks, not_found) of
case Msg = maps:get(MessageId, WAcks, not_found) of
#vmq_msg{} ->
handle_waiting_msgs(State#state{waiting_acks=maps:remove(MessageId, WAcks)});
{NewState, Out} = handle_waiting_msgs(State#state{waiting_acks=maps:remove(MessageId, WAcks)}),
{release_message(Msg, NewState), Out};
not_found ->
_ = vmq_metrics:incr_mqtt_error_invalid_puback(),
{State, []}
Expand All @@ -322,14 +321,14 @@ connected(#mqtt_pubrec{message_id=MessageId}, State) ->
retry_queue=RetryQueue} = State,
%% qos2 flow
_ = vmq_metrics:incr_mqtt_pubrec_received(),
case maps:get(MessageId, WAcks, not_found) of
case Msg = maps:get(MessageId, WAcks, not_found) of
#vmq_msg{} ->
PubRelFrame = #mqtt_pubrel{message_id=MessageId},
_ = vmq_metrics:incr_mqtt_pubrel_sent(),
{State#state{
NewState = State#state{
retry_queue=set_retry(pubrel, MessageId, RetryInterval, RetryQueue),
waiting_acks=maps:update(MessageId, PubRelFrame, WAcks)},
[PubRelFrame]};
{release_message(Msg, NewState), [PubRelFrame]};
#mqtt_pubrel{message_id=MessageId} = PubRelFrame ->
%% handle PUBREC retries from the client.
_ = vmq_metrics:incr_mqtt_pubrel_sent(),
Expand Down Expand Up @@ -904,7 +903,6 @@ handle_waiting_msgs(#state{waiting_msgs=[]} = State) ->
handle_waiting_msgs(#state{waiting_msgs=Msgs, queue_pid=QPid} = State) ->
case handle_messages(lists:reverse(Msgs), [], 0, State, []) of
{NewState, HandledMsgs, []} ->
%% we're ready to take more
vmq_queue:notify(QPid),
{NewState#state{waiting_msgs=[]}, HandledMsgs};
{NewState, HandledMsgs, Waiting} ->
Expand Down Expand Up @@ -950,6 +948,7 @@ prepare_frame(#deliver{qos=QoS, msg_id=MsgId, msg=Msg}, State) ->
dup=IsDup,
qos=MsgQoS} = Msg,
NewQoS = maybe_upgrade_qos(QoS, MsgQoS, State),
maybe_release_message(NewQoS, Msg, State),
{NewTopic, NewPayload} =
case on_deliver_hook(User, SubscriberId, QoS, Topic, Payload, IsRetained) of
{error, _} ->
Expand Down Expand Up @@ -981,6 +980,17 @@ prepare_frame(#deliver{qos=QoS, msg_id=MsgId, msg=Msg}, State) ->
Msg#vmq_msg{qos=NewQoS}, WAcks)}}
end.

%% non-upgraded qos0 message is released immediately
-spec maybe_release_message(qos(), msg(), state()) -> state().
maybe_release_message(0, Msg, State) ->
release_message(Msg,State);
maybe_release_message(_,_,State) -> State.

-spec release_message(msg(), state()) -> state().
release_message(Msg, #state{queue_pid=QPid} = State) ->
vmq_queue:release_message(QPid, Msg),
State.

-spec on_deliver_hook(username(), subscriber_id(), qos(), topic(), payload(), flag()) -> any().
on_deliver_hook(User, SubscriberId, QoS, Topic, Payload, IsRetain) ->
HookArgs0 = [User, SubscriberId, Topic, Payload],
Expand Down
Loading