/
rabbit_exchange_type_headers.erl
113 lines (96 loc) · 3.71 KB
/
rabbit_exchange_type_headers.erl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
-module(rabbit_exchange_type_headers).
-include_lib("rabbit_common/include/rabbit.hrl").
-behaviour(rabbit_exchange_type).
-export([description/0, serialise_events/0, route/2, route/3]).
-export([validate/1, validate_binding/2,
create/2, delete/2, policy_changed/2, add_binding/3,
remove_bindings/3, assert_args_equivalence/2]).
-export([info/1, info/2]).
-rabbit_boot_step({?MODULE,
[{description, "exchange type headers"},
{mfa, {rabbit_registry, register,
[exchange, <<"headers">>, ?MODULE]}},
{requires, rabbit_registry},
{enables, kernel_ready}]}).
info(_X) -> [].
info(_X, _) -> [].
description() ->
[{description, <<"AMQP headers exchange, as per the AMQP specification">>}].
serialise_events() -> false.
route(#exchange{name = Name}, Msg) ->
route(#exchange{name = Name}, Msg, #{}).
route(#exchange{name = Name}, Msg, _Opts) ->
%% TODO: find a way not to extract x-headers unless necessary
Headers = mc:routing_headers(Msg, [x_headers]),
rabbit_router:match_bindings(
Name, fun(#binding{args = Args}) ->
case rabbit_misc:table_lookup(Args, <<"x-match">>) of
{longstr, <<"any">>} ->
match_any(Args, Headers, fun match/2);
{longstr, <<"any-with-x">>} ->
match_any(Args, Headers, fun match_x/2);
{longstr, <<"all-with-x">>} ->
match_all(Args, Headers, fun match_x/2);
_ ->
match_all(Args, Headers, fun match/2)
end
end).
match_x({<<"x-match">>, _, _}, _M) ->
skip;
match_x({K, void, _}, M) ->
maps:is_key(K, M);
match_x({K, _, V}, M) ->
maps:get(K, M, undefined) =:= V.
match({<<"x-", _/binary>>, _, _}, _M) ->
skip;
match({K, void, _}, M) ->
maps:is_key(K, M);
match({K, _, V}, M) ->
maps:get(K, M, undefined) =:= V.
match_all([], _, _MatchFun) ->
true;
match_all([Arg | Rem], M, Fun) ->
case Fun(Arg, M) of
false ->
false;
_ ->
match_all(Rem, M, Fun)
end.
match_any([], _, _Fun) ->
false;
match_any([Arg | Rem], M, Fun) ->
case Fun(Arg, M) of
true ->
true;
_ ->
match_any(Rem, M, Fun)
end.
validate_binding(_X, #binding{args = Args}) ->
case rabbit_misc:table_lookup(Args, <<"x-match">>) of
{longstr, <<"all">>} -> ok;
{longstr, <<"any">>} -> ok;
{longstr, <<"all-with-x">>} -> ok;
{longstr, <<"any-with-x">>} -> ok;
{longstr, Other} ->
{error, {binding_invalid,
"Invalid x-match field value ~tp; "
"expected all, any, all-with-x, or any-with-x", [Other]}};
{Type, Other} ->
{error, {binding_invalid,
"Invalid x-match field type ~tp (value ~tp); "
"expected longstr", [Type, Other]}};
undefined -> ok %% [0]
end.
validate(_X) -> ok.
create(_Serial, _X) -> ok.
delete(_Serial, _X) -> ok.
policy_changed(_X1, _X2) -> ok.
add_binding(_Serial, _X, _B) -> ok.
remove_bindings(_Serial, _X, _Bs) -> ok.
assert_args_equivalence(X, Args) ->
rabbit_exchange:assert_args_equivalence(X, Args).