Skip to content

Commit

Permalink
Merge pull request emqx#10901 from fix/EMQX-9985/ft-fin-checksum
Browse files Browse the repository at this point in the history
fix(ft): respect checksum in `fin` packets
  • Loading branch information
keynslug committed Jun 1, 2023
2 parents 7055bd8 + 0293b54 commit 76e5243
Show file tree
Hide file tree
Showing 10 changed files with 73 additions and 45 deletions.
4 changes: 2 additions & 2 deletions apps/emqx/src/emqx_maybe.erl
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ define(Term, _) ->
Term.

%% @doc Apply a function to a maybe argument.
-spec apply(fun((A) -> maybe(A)), maybe(A)) ->
maybe(A).
-spec apply(fun((A) -> B), maybe(A)) ->
maybe(B).
apply(_Fun, undefined) ->
undefined;
apply(Fun, Term) when is_function(Fun) ->
Expand Down
28 changes: 17 additions & 11 deletions apps/emqx_ft/src/emqx_ft.erl
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@
offset/0,
filemeta/0,
segment/0,
checksum/0
checksum/0,
finopts/0
]).

%% Number of bytes
Expand Down Expand Up @@ -80,6 +81,10 @@

-type segment() :: {offset(), _Content :: binary()}.

-type finopts() :: #{
checksum => checksum()
}.

%%--------------------------------------------------------------------
%% API for app
%%--------------------------------------------------------------------
Expand Down Expand Up @@ -170,8 +175,8 @@ on_file_command(PacketId, FileId, Msg, FileCommand) ->
ChecksumBin = emqx_maybe:from_list(MaybeChecksum),
validate(
[{size, FinalSizeBin}, {{maybe, checksum}, ChecksumBin}],
fun([FinalSize, Checksum]) ->
on_fin(PacketId, Msg, Transfer, FinalSize, Checksum)
fun([FinalSize, FinalChecksum]) ->
on_fin(PacketId, Msg, Transfer, FinalSize, FinalChecksum)
end
);
[<<"abort">>] ->
Expand Down Expand Up @@ -251,21 +256,21 @@ on_segment(PacketId, Msg, Transfer, Offset, Checksum) ->
end
end).

on_fin(PacketId, Msg, Transfer, FinalSize, Checksum) ->
on_fin(PacketId, Msg, Transfer, FinalSize, FinalChecksum) ->
?tp(info, "file_transfer_fin", #{
mqtt_msg => Msg,
packet_id => PacketId,
transfer => Transfer,
final_size => FinalSize,
checksum => Checksum
checksum => FinalChecksum
}),
%% TODO: handle checksum? Do we need it?
FinPacketKey = {self(), PacketId},
Callback = fun(Result) ->
?MODULE:on_complete("assemble", FinPacketKey, Transfer, Result)
end,
with_responder(FinPacketKey, Callback, emqx_ft_conf:assemble_timeout(), fun() ->
case assemble(Transfer, FinalSize) of
case assemble(Transfer, FinalSize, FinalChecksum) of
%% Assembling completed, ack through the responder right away
ok ->
emqx_ft_responder:ack(FinPacketKey, ok);
Expand Down Expand Up @@ -314,9 +319,10 @@ store_segment(Transfer, Segment) ->
{error, {internal_error, E}}
end.

assemble(Transfer, FinalSize) ->
assemble(Transfer, FinalSize, FinalChecksum) ->
try
emqx_ft_storage:assemble(Transfer, FinalSize)
FinOpts = [{checksum, FinalChecksum} || FinalChecksum /= undefined],
emqx_ft_storage:assemble(Transfer, FinalSize, maps:from_list(FinOpts))
catch
C:E:S ->
?tp(error, "start_assemble_failed", #{
Expand Down Expand Up @@ -397,8 +403,8 @@ do_validate([{checksum, Checksum} | Rest], Parsed) ->
{error, _Reason} ->
{error, {invalid_checksum, Checksum}}
end;
do_validate([{integrity, Payload, Checksum} | Rest], Parsed) ->
case crypto:hash(sha256, Payload) of
do_validate([{integrity, Payload, {Algo, Checksum}} | Rest], Parsed) ->
case crypto:hash(Algo, Payload) of
Checksum ->
do_validate(Rest, [Payload | Parsed]);
Mismatch ->
Expand All @@ -411,7 +417,7 @@ do_validate([{{maybe, T}, Value} | Rest], Parsed) ->

parse_checksum(Checksum) when is_binary(Checksum) andalso byte_size(Checksum) =:= 64 ->
try
{ok, binary:decode_hex(Checksum)}
{ok, {sha256, binary:decode_hex(Checksum)}}
catch
error:badarg ->
{error, invalid_checksum}
Expand Down
14 changes: 8 additions & 6 deletions apps/emqx_ft/src/emqx_ft_assembler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

-module(emqx_ft_assembler).

-export([start_link/3]).
-export([start_link/4]).

-behaviour(gen_statem).
-export([callback_mode/0]).
Expand All @@ -29,6 +29,7 @@
-type stdata() :: #{
storage := emqx_ft_storage_fs:storage(),
transfer := emqx_ft:transfer(),
finopts := emqx_ft:finopts(),
assembly := emqx_ft_assembly:t(),
export => emqx_ft_storage_exporter:export()
}.
Expand All @@ -38,8 +39,8 @@

%%

start_link(Storage, Transfer, Size) ->
gen_statem:start_link(?REF(Transfer), ?MODULE, {Storage, Transfer, Size}, []).
start_link(Storage, Transfer, Size, Opts) ->
gen_statem:start_link(?REF(Transfer), ?MODULE, {Storage, Transfer, Size, Opts}, []).

where(Transfer) ->
gproc:where(?NAME(Transfer)).
Expand All @@ -60,11 +61,12 @@ callback_mode() ->
handle_event_function.

-spec init(_Args) -> {ok, state(), stdata()}.
init({Storage, Transfer, Size}) ->
init({Storage, Transfer, Size, Opts}) ->
_ = erlang:process_flag(trap_exit, true),
St = #{
storage => Storage,
transfer => Transfer,
finopts => Opts,
assembly => emqx_ft_assembly:new(Size)
},
{ok, idle, St}.
Expand Down Expand Up @@ -164,8 +166,8 @@ handle_event(internal, _, {assemble, [{Node, Segment} | Rest]}, St = #{export :=
end;
handle_event(internal, _, {assemble, []}, St = #{}) ->
{next_state, complete, St, ?internal([])};
handle_event(internal, _, complete, St = #{export := Export}) ->
Result = emqx_ft_storage_exporter:complete(Export),
handle_event(internal, _, complete, St = #{export := Export, finopts := Opts}) ->
Result = emqx_ft_storage_exporter:complete(Export, Opts),
_ = maybe_garbage_collect(Result, St),
{stop, {shutdown, Result}, maps:remove(export, St)}.

Expand Down
6 changes: 3 additions & 3 deletions apps/emqx_ft/src/emqx_ft_assembler_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@
-module(emqx_ft_assembler_sup).

-export([start_link/0]).
-export([ensure_child/3]).
-export([ensure_child/4]).

-behaviour(supervisor).
-export([init/1]).

start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).

ensure_child(Storage, Transfer, Size) ->
ensure_child(Storage, Transfer, Size, Opts) ->
Childspec = #{
id => Transfer,
start => {emqx_ft_assembler, start_link, [Storage, Transfer, Size]},
start => {emqx_ft_assembler, start_link, [Storage, Transfer, Size, Opts]},
restart => temporary
},
case supervisor:start_child(?MODULE, Childspec) of
Expand Down
10 changes: 5 additions & 5 deletions apps/emqx_ft/src/emqx_ft_storage.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
[
store_filemeta/2,
store_segment/2,
assemble/2,
assemble/3,

files/0,
files/1,
Expand Down Expand Up @@ -88,7 +88,7 @@
ok | {async, pid()} | {error, term()}.
-callback store_segment(storage(), emqx_ft:transfer(), emqx_ft:segment()) ->
ok | {async, pid()} | {error, term()}.
-callback assemble(storage(), emqx_ft:transfer(), _Size :: emqx_ft:bytes()) ->
-callback assemble(storage(), emqx_ft:transfer(), _Size :: emqx_ft:bytes(), emqx_ft:finopts()) ->
ok | {async, pid()} | {error, term()}.

-callback files(storage(), query(Cursor)) ->
Expand All @@ -114,10 +114,10 @@ store_filemeta(Transfer, FileMeta) ->
store_segment(Transfer, Segment) ->
dispatch(store_segment, [Transfer, Segment]).

-spec assemble(emqx_ft:transfer(), emqx_ft:bytes()) ->
-spec assemble(emqx_ft:transfer(), emqx_ft:bytes(), emqx_ft:finopts()) ->
ok | {async, pid()} | {error, term()}.
assemble(Transfer, Size) ->
dispatch(assemble, [Transfer, Size]).
assemble(Transfer, Size, FinOpts) ->
dispatch(assemble, [Transfer, Size, FinOpts]).

-spec files() ->
{ok, page(file_info(), _)} | {error, term()}.
Expand Down
23 changes: 15 additions & 8 deletions apps/emqx_ft/src/emqx_ft_storage_exporter.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
%% Export API
-export([start_export/3]).
-export([write/2]).
-export([complete/1]).
-export([complete/2]).
-export([discard/1]).

%% Listing API
Expand Down Expand Up @@ -117,12 +117,19 @@ write(#{mod := ExporterMod, st := ExportSt, hash := Hash} = Export, Content) ->
Error
end.

-spec complete(export()) ->
-spec complete(export(), emqx_ft:finopts()) ->
ok | {error, _Reason}.
complete(#{mod := ExporterMod, st := ExportSt, hash := Hash, filemeta := Filemeta}) ->
case verify_checksum(Hash, Filemeta) of
{ok, Checksum} ->
ExporterMod:complete(ExportSt, Checksum);
complete(#{mod := ExporterMod, st := ExportSt, hash := Hash, filemeta := Filemeta}, Opts) ->
Checksum = emqx_maybe:define(
% NOTE
% Checksum in `Opts` takes precedence over one in `Filemeta` according to the spec.
% We do not care if they differ.
maps:get(checksum, Opts, undefined),
maps:get(checksum, Filemeta, undefined)
),
case verify_checksum(Hash, Checksum) of
{ok, ExportChecksum} ->
ExporterMod:complete(ExportSt, ExportChecksum);
{error, _} = Error ->
_ = ExporterMod:discard(ExportSt),
Error
Expand Down Expand Up @@ -183,13 +190,13 @@ init_checksum(#{}) ->
update_checksum(Ctx, IoData) ->
crypto:hash_update(Ctx, IoData).

verify_checksum(Ctx, #{checksum := {Algo, Digest} = Checksum}) ->
verify_checksum(Ctx, {Algo, Digest} = Checksum) ->
case crypto:hash_final(Ctx) of
Digest ->
{ok, Checksum};
Mismatch ->
{error, {checksum, Algo, binary:encode_hex(Mismatch)}}
end;
verify_checksum(Ctx, #{}) ->
verify_checksum(Ctx, undefined) ->
Digest = crypto:hash_final(Ctx),
{ok, {sha256, Digest}}.
12 changes: 6 additions & 6 deletions apps/emqx_ft/src/emqx_ft_storage_fs.erl
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
-export([list/3]).
-export([pread/5]).
-export([lookup_local_assembler/1]).
-export([assemble/3]).
-export([assemble/4]).

-export([transfers/1]).

Expand Down Expand Up @@ -211,14 +211,14 @@ pread(_Storage, _Transfer, Frag, Offset, Size) ->
{error, Reason}
end.

-spec assemble(storage(), transfer(), emqx_ft:bytes()) ->
-spec assemble(storage(), transfer(), emqx_ft:bytes(), emqx_ft:finopts()) ->
{async, _Assembler :: pid()} | ok | {error, _TODO}.
assemble(Storage, Transfer, Size) ->
assemble(Storage, Transfer, Size, Opts) ->
LookupSources = [
fun() -> lookup_local_assembler(Transfer) end,
fun() -> lookup_remote_assembler(Transfer) end,
fun() -> check_if_already_exported(Storage, Transfer) end,
fun() -> ensure_local_assembler(Storage, Transfer, Size) end
fun() -> ensure_local_assembler(Storage, Transfer, Size, Opts) end
],
lookup_assembler(LookupSources).

Expand Down Expand Up @@ -295,8 +295,8 @@ lookup_remote_assembler(Transfer) ->
_ -> {error, not_found}
end.

ensure_local_assembler(Storage, Transfer, Size) ->
{ok, Pid} = emqx_ft_assembler_sup:ensure_child(Storage, Transfer, Size),
ensure_local_assembler(Storage, Transfer, Size, Opts) ->
{ok, Pid} = emqx_ft_assembler_sup:ensure_child(Storage, Transfer, Size, Opts),
{async, Pid}.

-spec transfers(storage()) ->
Expand Down
17 changes: 15 additions & 2 deletions apps/emqx_ft/test/emqx_ft_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@ t_invalid_topic_format(Config) ->
unspecified_error,
emqtt:publish(C, <<"$file/fileid/fin/offset">>, <<>>, 1)
),
?assertRCName(
unspecified_error,
emqtt:publish(C, <<"$file/fileid/fin/42/xyz">>, <<>>, 1)
),
?assertRCName(
unspecified_error,
emqtt:publish(C, <<"$file/">>, <<>>, 1)
Expand Down Expand Up @@ -390,9 +394,18 @@ t_invalid_checksum(Config) ->
with_offsets(Data)
),

% Send `fin` w/o checksum, should fail since filemeta checksum is invalid
FinTopic = mk_fin_topic(FileId, Filesize),
?assertRCName(
unspecified_error,
emqtt:publish(C, mk_fin_topic(FileId, Filesize), <<>>, 1)
emqtt:publish(C, FinTopic, <<>>, 1)
),

% Send `fin` with the correct checksum
Checksum = binary:encode_hex(sha256(Data)),
?assertRCName(
success,
emqtt:publish(C, <<FinTopic/binary, "/", Checksum/binary>>, <<>>, 1)
).

t_corrupted_segment_retry(Config) ->
Expand Down Expand Up @@ -507,7 +520,7 @@ t_assemble_crash(Config) ->
C = ?config(client, Config),

meck:new(emqx_ft_storage_fs),
meck:expect(emqx_ft_storage_fs, assemble, fun(_, _, _) -> meck:exception(error, oops) end),
meck:expect(emqx_ft_storage_fs, assemble, fun(_, _, _, _) -> meck:exception(error, oops) end),

?assertRCName(
unspecified_error,
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ complete_assemble(Storage, Transfer, Size) ->
complete_assemble(Storage, Transfer, Size, 1000).

complete_assemble(Storage, Transfer, Size, Timeout) ->
{async, Pid} = emqx_ft_storage_fs:assemble(Storage, Transfer, Size),
{async, Pid} = emqx_ft_storage_fs:assemble(Storage, Transfer, Size, #{}),
MRef = erlang:monitor(process, Pid),
Pid ! kickoff,
receive
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ complete_transfer(Storage, Transfer, Size) ->
complete_transfer(Storage, Transfer, Size, 100).

complete_transfer(Storage, Transfer, Size, Timeout) ->
case emqx_ft_storage_fs:assemble(Storage, Transfer, Size) of
case emqx_ft_storage_fs:assemble(Storage, Transfer, Size, #{}) of
ok ->
ok;
{async, Pid} ->
Expand Down

0 comments on commit 76e5243

Please sign in to comment.