-
Notifications
You must be signed in to change notification settings - Fork 123
/
confirm.ex
97 lines (79 loc) · 2.87 KB
/
confirm.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
defmodule AMQP.Confirm do
@moduledoc """
Functions that work with publisher confirms (RabbitMQ extension to AMQP
0.9.1).
"""
import AMQP.Core
alias AMQP.{Basic, Channel}
@doc """
Activates publishing confirmations on the channel.
"""
@spec select(Channel.t()) :: :ok | Basic.error()
def select(%Channel{pid: pid}) do
case :amqp_channel.call(pid, confirm_select()) do
confirm_select_ok() -> :ok
error -> {:error, error}
end
end
@doc """
Wait until all messages published since the last call have been either ack'd
or nack'd by the broker.
"""
@spec wait_for_confirms(Channel.t()) :: boolean | :timeout
def wait_for_confirms(%Channel{pid: pid}) do
:amqp_channel.wait_for_confirms(pid)
end
@doc """
Wait until all messages published since the last call have been either ack'd
or nack'd by the broker, or until timeout elapses.
"""
@spec wait_for_confirms(Channel.t(), non_neg_integer) :: boolean | :timeout
def wait_for_confirms(%Channel{pid: pid}, timeout) do
:amqp_channel.wait_for_confirms(pid, timeout)
end
@doc """
Wait until all messages published since the last call have been either ack'd
or nack'd by the broker, or until timeout elapses.
If any of the messages were nack'd, the calling process dies.
"""
@spec wait_for_confirms_or_die(Channel.t()) :: true
def wait_for_confirms_or_die(%Channel{pid: pid}) do
:amqp_channel.wait_for_confirms_or_die(pid)
end
@spec wait_for_confirms_or_die(Channel.t(), non_neg_integer) :: true
def wait_for_confirms_or_die(%Channel{pid: pid}, timeout) do
:amqp_channel.wait_for_confirms_or_die(pid, timeout)
end
@doc """
On channel with confirm activated, return the next message sequence number.
To use in combination with `register_handler/2`
"""
@spec next_publish_seqno(Channel.t()) :: non_neg_integer
def next_publish_seqno(%Channel{pid: pid}) do
:amqp_channel.next_publish_seqno(pid)
end
@doc """
Register a handler for confirms on channel.
The handler will receive either:
* `{:basic_ack, seqno, multiple}`
* `{:basic_nack, seqno, multiple}`
The `seqno` (delivery_tag) is an integer, the sequence number of the message.
`multiple` is a boolean, when `true` means multiple messages confirm, up to
`seqno`.
See https://www.rabbitmq.com/confirms.html
"""
@spec register_handler(Channel.t(), pid) :: :ok
def register_handler(%Channel{} = chan, handler_pid) do
:amqp_channel.call_consumer(chan.pid, {:register_confirm_handler, chan, handler_pid})
end
@doc """
Remove the return handler.
It does nothing if there is no such handler.
"""
@spec unregister_handler(Channel.t()) :: :ok
def unregister_handler(%Channel{pid: pid}) do
# Currently we don't remove the receiver.
# The receiver will be deleted automatically when channel is closed.
:amqp_channel.unregister_confirm_handler(pid)
end
end