This repository has been archived by the owner on Nov 17, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 21
/
rabbit_exchange_type_recent_history.erl
110 lines (94 loc) · 3.3 KB
/
rabbit_exchange_type_recent_history.erl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
-module(rabbit_exchange_type_recent_history).
-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("rabbit_common/include/rabbit_exchange_type_spec.hrl").
-behaviour(rabbit_exchange_type).
-export([description/0, serialise_events/0, route/2]).
-export([validate/1, create/2, delete/3, add_binding/3,
remove_bindings/3, assert_args_equivalence/2]).
-export([setup_schema/0]).
-rabbit_boot_step({rabbit_exchange_type_rh_registry,
[{description, "recent history exchange type: registry"},
{mfa, {rabbit_registry, register,
[exchange, <<"x-recent-history">>,
?MODULE]}},
{requires, rabbit_registry},
{enables, kernel_ready}]}).
-rabbit_boot_step({rabbit_exchange_type_rh_mnesia,
[{description, "recent history exchange type: mnesia"},
{mfa, {?MODULE, setup_schema, []}},
{requires, database},
{enables, external_infrastructure}]}).
-define(KEEP_NB, 20).
-define(RH_TABLE, rh_exchange_table).
-record(cached, {key, content}).
description() ->
[{name, <<"recent-history">>},
{description, <<"List of Last-value caches exchange.">>}].
serialise_events() -> false.
route(#exchange{name = Name},
#delivery{message = #basic_message{
content = Content
}}) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
InCache = mnesia:read(?RH_TABLE, Name, write),
Msgs = case InCache of
[#cached{key=Name, content=Cached}] ->
Cached;
_ ->
[]
end,
mnesia:write(?RH_TABLE,
#cached{key = Name, content = [Content|lists:sublist(Msgs, ?KEEP_NB)]},
write)
end),
rabbit_router:match_routing_key(Name, ['_']).
validate(_X) -> ok.
create(_Tx, _X) -> ok.
delete(_Tx, #exchange{ name = Name }, _Bs) ->
rabbit_misc:execute_mnesia_transaction(
fun() ->
mnesia:delete(?RH_TABLE, Name, write)
end),
ok.
add_binding(_Tx, #exchange{ name = XName },
#binding{ destination = QueueName }) ->
case rabbit_amqqueue:lookup(QueueName) of
{error, not_found} ->
rabbit_misc:protocol_error(
internal_error,
"could not find queue '~s'",
[QueueName]);
{ok, #amqqueue{ pid = Queue }} ->
Msgs = case mnesia:dirty_read(?RH_TABLE, XName) of
[] ->
[];
[#cached{content=Cached}] ->
load_from_content(XName, Cached)
end,
deliver_messages(Queue, Msgs)
end,
ok.
remove_bindings(_Tx, _X, _Bs) -> ok.
assert_args_equivalence(X, Args) ->
rabbit_exchange_type_direct:assert_args_equivalence(X, Args).
setup_schema() ->
case mnesia:create_table(?RH_TABLE,
[{attributes, record_info(fields, cached)},
{record_name, cached},
{type, set}]) of
{atomic, ok} -> ok;
{aborted, {already_exists, ?RH_TABLE}} -> ok
end.
load_from_content(Cached, XName) ->
lists:map(
fun(Content) ->
{Props, Payload} = rabbit_basic:from_content(Content),
rabbit_basic:message(XName, <<"">>, Props, Payload)
end, Cached).
deliver_messages(Queue, Msgs) ->
lists:map(
fun (Msg) ->
Delivery = rabbit_basic:delivery(false, false, Msg, undefined),
rabbit_amqqueue:deliver(Queue, Delivery)
end, Msgs).