Skip to content

Commit

Permalink
fix(connector): validate connector name before converting ssl certs
Browse files Browse the repository at this point in the history
  • Loading branch information
thalesmg committed Nov 14, 2023
1 parent aded4a5 commit d6e9bbb
Show file tree
Hide file tree
Showing 10 changed files with 283 additions and 45 deletions.
54 changes: 41 additions & 13 deletions apps/emqx_bridge/src/emqx_bridge.erl
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
]).

-export([config_key_path/0]).
-export([validate_bridge_name/1]).

%% exported for `emqx_telemetry'
-export([get_basic_usage_info/0]).
Expand Down Expand Up @@ -268,7 +267,12 @@ config_key_path() ->
pre_config_update([?ROOT_KEY], RawConf, RawConf) ->
{ok, RawConf};
pre_config_update([?ROOT_KEY], NewConf, _RawConf) ->
{ok, convert_certs(NewConf)}.
case multi_validate_bridge_names(NewConf) of
ok ->
{ok, convert_certs(NewConf)};
Error ->
Error
end.

post_config_update([?ROOT_KEY], _Req, NewConf, OldConf, _AppEnv) ->
#{added := Added, removed := Removed, changed := Updated} =
Expand Down Expand Up @@ -657,21 +661,45 @@ get_basic_usage_info() ->
InitialAcc
end.

validate_bridge_name(BridgeName0) ->
BridgeName = to_bin(BridgeName0),
case re:run(BridgeName, ?MAP_KEY_RE, [{capture, none}]) of
match ->
ok;
nomatch ->
{error, #{
kind => validation_error,
reason => bad_bridge_name,
value => BridgeName
}}
validate_bridge_name(BridgeName) ->
try
_ = emqx_resource:validate_name(to_bin(BridgeName)),
ok
catch
throw:Error ->
{error, Error}
end.

to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
to_bin(B) when is_binary(B) -> B.

upgrade_type(Type) ->
emqx_bridge_lib:upgrade_type(Type).

multi_validate_bridge_names(Conf) ->
BridgeTypeAndNames =
[
{Type, Name}
|| {Type, NameToConf} <- maps:to_list(Conf),
{Name, _Conf} <- maps:to_list(NameToConf)
],
BadBridges =
lists:filtermap(
fun({Type, Name}) ->
case validate_bridge_name(Name) of
ok -> false;
_Error -> {true, #{type => Type, name => Name}}
end
end,
BridgeTypeAndNames
),
case BadBridges of
[] ->
ok;
[_ | _] ->
{error, #{
kind => validation_error,
reason => bad_bridge_names,
bad_bridges => BadBridges
}}
end.
18 changes: 15 additions & 3 deletions apps/emqx_bridge/src/emqx_bridge_app.erl
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pre_config_update(_, {Oper, _Type, _Name}, OldConfig) ->
%% to save the 'enable' to the config files
{ok, OldConfig#{<<"enable">> => operation_to_enable(Oper)}};
pre_config_update(Path, Conf, _OldConfig) when is_map(Conf) ->
case validate_bridge_name(Path) of
case validate_bridge_name_in_config(Path) of
ok ->
case emqx_connector_ssl:convert_certs(filename:join(Path), Conf) of
{error, Reason} ->
Expand Down Expand Up @@ -104,11 +104,23 @@ post_config_update([bridges, BridgeType, BridgeName], _Req, NewConf, OldConf, _A
operation_to_enable(disable) -> false;
operation_to_enable(enable) -> true.

validate_bridge_name(Path) ->
validate_bridge_name_in_config(Path) ->
[RootKey] = emqx_bridge:config_key_path(),
case Path of
[RootKey, _BridgeType, BridgeName] ->
emqx_bridge:validate_bridge_name(BridgeName);
validate_bridge_name(BridgeName);
_ ->
ok
end.

to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
to_bin(B) when is_binary(B) -> B.

validate_bridge_name(BridgeName) ->
try
_ = emqx_resource:validate_name(to_bin(BridgeName)),
ok
catch
throw:Error ->
{error, Error}
end.
30 changes: 29 additions & 1 deletion apps/emqx_bridge/test/emqx_bridge_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -199,13 +199,41 @@ t_create_with_bad_name(_Config) ->
?assertMatch(
{error,
{pre_config_update, emqx_bridge_app, #{
reason := bad_bridge_name,
reason := <<"only 0-9a-zA-Z_- is allowed in resource name", _/binary>>,
kind := validation_error
}}},
emqx:update_config(Path, Conf)
),
ok.

t_create_with_bad_name_root(_Config) ->
BadBridgeName = <<"test_哈哈">>,
BridgeConf = #{
<<"bridge_mode">> => false,
<<"clean_start">> => true,
<<"keepalive">> => <<"60s">>,
<<"proto_ver">> => <<"v4">>,
<<"server">> => <<"127.0.0.1:1883">>,
<<"ssl">> =>
#{
%% needed to trigger pre_config_update
<<"certfile">> => cert_file("certfile"),
<<"enable">> => true
}
},
Conf = #{<<"mqtt">> => #{BadBridgeName => BridgeConf}},
Path = [bridges],
?assertMatch(
{error,
{pre_config_update, _ConfigHandlerMod, #{
kind := validation_error,
reason := bad_bridge_names,
bad_bridges := [#{type := <<"mqtt">>, name := BadBridgeName}]
}}},
emqx:update_config(Path, Conf)
),
ok.

data_file(Name) ->
Dir = code:lib_dir(emqx_bridge, test),
{ok, Bin} = file:read_file(filename:join([Dir, "data", Name])),
Expand Down
8 changes: 7 additions & 1 deletion apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1362,7 +1362,13 @@ t_create_with_bad_name(Config) ->
Config
),
Msg = emqx_utils_json:decode(Msg0, [return_maps]),
?assertMatch(#{<<"reason">> := <<"bad_bridge_name">>}, Msg),
?assertMatch(
#{
<<"kind">> := <<"validation_error">>,
<<"reason">> := <<"only 0-9a-zA-Z_- is allowed in resource name", _/binary>>
},
Msg
),
ok.

validate_resource_request_ttl(single, Timeout, Name) ->
Expand Down
31 changes: 29 additions & 2 deletions apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ con_schema() ->
fields("connector") ->
[
{enable, hoconsc:mk(any(), #{})},
{resource_opts, hoconsc:mk(map(), #{})}
{resource_opts, hoconsc:mk(map(), #{})},
{ssl, hoconsc:ref(ssl)}
];
fields("api_post") ->
[
Expand All @@ -159,7 +160,9 @@ fields("api_post") ->
{type, hoconsc:mk(bridge_type(), #{})},
{send_to, hoconsc:mk(atom(), #{})}
| fields("connector")
].
];
fields(ssl) ->
emqx_schema:client_ssl_opts_schema(#{required => false}).

con_config() ->
#{
Expand Down Expand Up @@ -806,3 +809,27 @@ t_scenario_2(Config) ->
?assert(is_rule_enabled(RuleId2)),

ok.

t_create_with_bad_name(_Config) ->
BadBridgeName = <<"test_哈哈">>,
%% Note: must contain SSL options to trigger bug.
Cacertfile = emqx_common_test_helpers:app_path(
emqx,
filename:join(["etc", "certs", "cacert.pem"])
),
Opts = #{
name => BadBridgeName,
overrides => #{
<<"ssl">> =>
#{<<"cacertfile">> => Cacertfile}
}
},
{error,
{{_, 400, _}, _, #{
<<"code">> := <<"BAD_REQUEST">>,
<<"message">> := #{
<<"kind">> := <<"validation_error">>,
<<"reason">> := <<"only 0-9a-zA-Z_- is allowed in resource name", _/binary>>
}
}}} = create_bridge_http_api_v1(Opts),
ok.
2 changes: 1 addition & 1 deletion apps/emqx_connector/src/emqx_connector.app.src
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_connector, [
{description, "EMQX Data Integration Connectors"},
{vsn, "0.1.33"},
{vsn, "0.1.34"},
{registered, []},
{mod, {emqx_connector_app, []}},
{applications, [
Expand Down
70 changes: 64 additions & 6 deletions apps/emqx_connector/src/emqx_connector.erl
Original file line number Diff line number Diff line change
Expand Up @@ -108,18 +108,28 @@ config_key_path() ->
pre_config_update([?ROOT_KEY], RawConf, RawConf) ->
{ok, RawConf};
pre_config_update([?ROOT_KEY], NewConf, _RawConf) ->
{ok, convert_certs(NewConf)};
case multi_validate_connector_names(NewConf) of
ok ->
{ok, convert_certs(NewConf)};
Error ->
Error
end;
pre_config_update(_, {_Oper, _, _}, undefined) ->
{error, connector_not_found};
pre_config_update(_, {Oper, _Type, _Name}, OldConfig) ->
%% to save the 'enable' to the config files
{ok, OldConfig#{<<"enable">> => operation_to_enable(Oper)}};
pre_config_update(Path, Conf, _OldConfig) when is_map(Conf) ->
case emqx_connector_ssl:convert_certs(filename:join(Path), Conf) of
{error, Reason} ->
{error, Reason};
{ok, ConfNew} ->
{ok, ConfNew}
case validate_connector_name_in_config(Path) of
ok ->
case emqx_connector_ssl:convert_certs(filename:join(Path), Conf) of
{error, Reason} ->
{error, Reason};
{ok, ConfNew} ->
{ok, ConfNew}
end;
Error ->
Error
end.

operation_to_enable(disable) -> false;
Expand Down Expand Up @@ -458,3 +468,51 @@ ensure_no_channels(Configs) ->
{error, Reason, _State} ->
{error, Reason}
end.

to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
to_bin(B) when is_binary(B) -> B.

validate_connector_name(ConnectorName) ->
try
_ = emqx_resource:validate_name(to_bin(ConnectorName)),
ok
catch
throw:Error ->
{error, Error}
end.

validate_connector_name_in_config(Path) ->
case Path of
[?ROOT_KEY, _ConnectorType, ConnectorName] ->
validate_connector_name(ConnectorName);
_ ->
ok
end.

multi_validate_connector_names(Conf) ->
ConnectorTypeAndNames =
[
{Type, Name}
|| {Type, NameToConf} <- maps:to_list(Conf),
{Name, _Conf} <- maps:to_list(NameToConf)
],
BadConnectors =
lists:filtermap(
fun({Type, Name}) ->
case validate_connector_name(Name) of
ok -> false;
_Error -> {true, #{type => Type, name => Name}}
end
end,
ConnectorTypeAndNames
),
case BadConnectors of
[] ->
ok;
[_ | _] ->
{error, #{
kind => validation_error,
reason => bad_connector_names,
bad_connectors => BadConnectors
}}
end.
65 changes: 65 additions & 0 deletions apps/emqx_connector/test/emqx_connector_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,71 @@ t_remove_fail(_Config) ->
),
ok.

t_create_with_bad_name_direct_path({init, Config}) ->
meck:new(emqx_connector_ee_schema, [passthrough]),
meck:expect(emqx_connector_ee_schema, resource_type, 1, ?CONNECTOR),
meck:new(?CONNECTOR, [non_strict]),
meck:expect(?CONNECTOR, callback_mode, 0, async_if_possible),
meck:expect(?CONNECTOR, on_start, 2, {ok, connector_state}),
meck:expect(?CONNECTOR, on_stop, 2, ok),
meck:expect(?CONNECTOR, on_get_status, 2, connected),
Config;
t_create_with_bad_name_direct_path({'end', _Config}) ->
meck:unload(),
ok;
t_create_with_bad_name_direct_path(_Config) ->
Path = [connectors, kafka_producer, 'test_哈哈'],
ConnConfig0 = connector_config(),
%% Note: must contain SSL options to trigger original bug.
Cacertfile = emqx_common_test_helpers:app_path(
emqx,
filename:join(["etc", "certs", "cacert.pem"])
),
ConnConfig = ConnConfig0#{<<"ssl">> => #{<<"cacertfile">> => Cacertfile}},
?assertMatch(
{error,
{pre_config_update, _ConfigHandlerMod, #{
kind := validation_error,
reason := <<"only 0-9a-zA-Z_- is allowed in resource name", _/binary>>
}}},
emqx:update_config(Path, ConnConfig)
),
ok.

t_create_with_bad_name_root_path({init, Config}) ->
meck:new(emqx_connector_ee_schema, [passthrough]),
meck:expect(emqx_connector_ee_schema, resource_type, 1, ?CONNECTOR),
meck:new(?CONNECTOR, [non_strict]),
meck:expect(?CONNECTOR, callback_mode, 0, async_if_possible),
meck:expect(?CONNECTOR, on_start, 2, {ok, connector_state}),
meck:expect(?CONNECTOR, on_stop, 2, ok),
meck:expect(?CONNECTOR, on_get_status, 2, connected),
Config;
t_create_with_bad_name_root_path({'end', _Config}) ->
meck:unload(),
ok;
t_create_with_bad_name_root_path(_Config) ->
Path = [connectors],
BadConnectorName = <<"test_哈哈">>,
ConnConfig0 = connector_config(),
%% Note: must contain SSL options to trigger original bug.
Cacertfile = emqx_common_test_helpers:app_path(
emqx,
filename:join(["etc", "certs", "cacert.pem"])
),
ConnConfig = ConnConfig0#{<<"ssl">> => #{<<"cacertfile">> => Cacertfile}},
Conf = #{<<"kafka_producer">> => #{BadConnectorName => ConnConfig}},
?assertMatch(
{error,
{pre_config_update, _ConfigHandlerMod, #{
kind := validation_error,
reason := bad_connector_names,
bad_connectors := [#{type := <<"kafka_producer">>, name := BadConnectorName}]
}}},
emqx:update_config(Path, Conf)
),
ok.

%% helpers

connector_config() ->
Expand Down

0 comments on commit d6e9bbb

Please sign in to comment.