/
rabbithub_subscription.erl
128 lines (115 loc) · 5.18 KB
/
rabbithub_subscription.erl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
-module(rabbithub_subscription).
-export([start_subscriptions/0]).
-export([create/2, delete/1]).
-export([start_link/1]).
-export([register_subscription_pid/3, erase_subscription_pid/1]).
%% Internal export
-export([expire/1]).
-include("rabbithub.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
%% Should be exported by timer module, but isn't
system_time() ->
{MegaSec, Sec, MicroSec} = now(),
1000000 * (MegaSec * 1000000 + Sec) + MicroSec.
start_subscriptions() ->
{atomic, Leases} =
mnesia:transaction(fun () ->
mnesia:foldl(fun (Lease, Acc) -> [Lease | Acc] end,
[],
rabbithub_lease)
end),
lists:foreach(fun start/1, Leases).
create(Subscription, LeaseSeconds) ->
RequestedExpiryTime = system_time() + LeaseSeconds * 1000000,
Lease = #rabbithub_lease{subscription = Subscription,
lease_expiry_time_microsec = RequestedExpiryTime},
{atomic, ok} = mnesia:transaction(fun () -> ok = mnesia:write(Lease) end),
start(Lease).
delete(Subscription) ->
{atomic, ok} =
mnesia:transaction(fun () -> mnesia:delete({rabbithub_lease, Subscription}) end),
{atomic, SubPids} =
mnesia:transaction(
fun () ->
SubPids = mnesia:read({rabbithub_subscription_pid, Subscription}),
ok = mnesia:delete({rabbithub_subscription_pid, Subscription}),
SubPids
end),
lists:foreach(fun (#rabbithub_subscription_pid{pid = Pid,
expiry_timer = TRef}) ->
{ok, cancel} = timer:cancel(TRef),
gen_server:cast(Pid, shutdown)
end, SubPids),
ok.
expire(Subscription) ->
error_logger:info_report({expiring, Subscription}),
delete(Subscription).
start_link(Lease =
#rabbithub_lease{subscription =
#rabbithub_subscription{resource =
#resource{kind = ResourceTypeAtom}}}) ->
case ResourceTypeAtom of
exchange ->
gen_server:start_link(rabbithub_pseudo_queue, [Lease], []);
queue ->
gen_server:start_link(rabbithub_consumer, [Lease], [])
end.
start(Lease) ->
case supervisor:start_child(rabbithub_subscription_sup, [Lease]) of
{ok, _Pid} ->
ok;
{error, normal} ->
%% duplicate processes return normal, so as to not provoke the error logger.
ok;
{error, Reason} ->
{error, Reason}
end.
register_subscription_pid(Lease, Pid, ProcessModule) ->
Result = register_subscription_pid1(Lease, Pid),
error_logger:info_report({startup, Result, ProcessModule, Lease}),
Result.
register_subscription_pid1(#rabbithub_lease{subscription = Subscription,
lease_expiry_time_microsec = ExpiryTimeMicro},
Pid) ->
NowMicro = system_time(),
case NowMicro > ExpiryTimeMicro of
true ->
%% Expired.
ok = delete(Subscription),
expired;
false ->
%% Not *yet* expired. Always start a timer, since even if
%% it's a duplicate we want to cancel the existing timer
%% and create a new timer to fire at the new time.
{ok, TRef} = timer:apply_after((ExpiryTimeMicro - NowMicro) div 1000,
?MODULE, expire, [Subscription]),
NewPidRecord = #rabbithub_subscription_pid{subscription = Subscription,
pid = Pid,
expiry_timer = TRef},
{atomic, Result} =
mnesia:transaction(
fun () ->
case mnesia:read(rabbithub_subscription_pid, Subscription) of
[] ->
ok = mnesia:write(NewPidRecord);
[ExistingRecord =
#rabbithub_subscription_pid{pid = ExistingPid,
expiry_timer = OldTRef}] ->
case is_process_alive(ExistingPid) of
true ->
{ok, cancel} = timer:cancel(OldTRef),
R1 = ExistingRecord#rabbithub_subscription_pid{
expiry_timer = TRef},
ok = mnesia:write(R1),
duplicate;
false ->
ok = mnesia:write(NewPidRecord)
end
end
end),
Result
end.
erase_subscription_pid(Subscription) ->
{atomic, ok} =
mnesia:transaction(fun () -> mnesia:delete({rabbithub_subscription_pid, Subscription}) end),
ok.