/
resource_guard.ex
178 lines (146 loc) · 4.58 KB
/
resource_guard.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
defmodule Membrane.ResourceGuard do
@moduledoc """
Utility for handling resources that must be cleaned up after use.
This utility uses a separate process that allows registering functions
that are called when the owner process (passed to `start_link/1`) dies for
any reason. Each Membrane component spawns its resource guard on startup
and provides it via callback context.
### Example
def handle_setup(ctx, state) do
resource = MyResource.create()
Membrane.ResourceGuard.register(ctx.resource_guard, fn ->
MyResource.cleanup(resource)
end)
{:ok, %{state | my_resource: resource}}
end
"""
use GenServer
require Membrane.Core.Message, as: Message
require Membrane.Logger
@type t :: pid()
@spec start_link(owner_pid :: pid) :: {:ok, t}
def start_link(owner_pid \\ self()) do
GenServer.start(__MODULE__, owner_pid, spawn_opt: [:link])
end
@doc """
Registers a resource cleanup function in the resource guard.
Registered functions are called in the order reverse to the registration order.
Function returns a tag of the registered cleanup function. Tag can be passed
under a `:tag` key in `opts`. Many functions can be registered with the same tag.
If there is no `:tag` key in `opts`, tag will be result of `make_ref()`.
"""
@spec register(
t,
(() -> any),
opts :: [tag: any, timeout: milliseconds :: non_neg_integer]
) :: tag
when tag: any()
def register(resource_guard, cleanup_function, opts \\ []) do
opts = Keyword.put_new_lazy(opts, :tag, &make_ref/0)
Message.send(resource_guard, :register, [cleanup_function, opts])
Keyword.get(opts, :tag)
end
@doc """
Unregisters a resource cleanup function from the resource guard.
All cleanup functions with tag `tag` are deleted.
"""
@spec unregister(t, tag :: any) :: :ok
def unregister(resource_guard, tag) do
Message.send(resource_guard, :unregister, tag)
:ok
end
@doc """
Executes all cleanup functions registered in the resource gurard.
"""
@spec cleanup(t) :: :ok
def cleanup(resource_guard) do
Message.send(resource_guard, :cleanup_all)
:ok
end
@doc """
Executes cleanup functions registered with the specifc tag.
If many cleanup functions are registered with the same tag, all of them are executed.
"""
@spec cleanup(t, tag :: any) :: :ok
def cleanup(resource_guard, tag) do
Message.send(resource_guard, :cleanup, tag)
:ok
end
@impl true
def init(owner_pid) do
Process.flag(:trap_exit, true)
monitor = Process.monitor(owner_pid)
{:ok, %{guards: [], monitor: monitor}}
end
@impl true
def handle_info(Message.new(:register, [function, opts]), state) do
tag = Keyword.fetch!(opts, :tag)
timeout = Keyword.get(opts, :timeout, 5000)
{:noreply, %{state | guards: [{function, tag, timeout} | state.guards]}}
end
@impl true
def handle_info(Message.new(:unregister, tag), state) do
guards =
Enum.reject(state.guards, fn
{_function, ^tag, _timeout} -> true
_other -> false
end)
{:noreply, %{state | guards: guards}}
end
@impl true
def handle_info(Message.new(:cleanup_all), state) do
do_cleanup_all(state.guards)
{:noreply, %{state | guards: []}}
end
@impl true
def handle_info(Message.new(:cleanup, tag), state) do
guards =
Enum.reject(state.guards, fn
{function, ^tag, timeout} ->
do_cleanup(function, tag, timeout)
true
_other ->
false
end)
{:noreply, %{state | guards: guards}}
end
@impl true
def handle_info({:DOWN, monitor, :process, _pid, _reason}, %{monitor: monitor} = state) do
do_cleanup_all(state.guards)
{:stop, :normal, state}
end
@impl true
def handle_info(_message, state) do
{:noreply, state}
end
defp do_cleanup_all(guards) do
for {function, tag, timeout} <- guards do
do_cleanup(function, tag, timeout)
end
end
defp do_cleanup(function, tag, timeout) do
{:ok, task} = Task.start_link(function)
receive do
{:EXIT, ^task, reason} -> reason
after
timeout ->
Membrane.Logger.error("Cleanup of resource with tag: #{inspect(tag)} timed out, killing")
Process.unlink(task)
Process.exit(task, :kill)
:normal
end
|> case do
:normal ->
:ok
:shutdown ->
:ok
{:shutdown, _reason} ->
:ok
reason ->
Membrane.Logger.error(
"Error cleaning up resource with tag: #{inspect(tag)}, due to: #{inspect(reason)}"
)
end
:ok
end
end