forked from goinstant/delayd
/
entry.go
71 lines (58 loc) · 1.45 KB
/
entry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package main
import (
"time"
"github.com/streadway/amqp"
"github.com/ugorji/go/codec"
)
var mh codec.MsgpackHandle
// EntryWrapper wraps the Entry object and also attaches the amqp.Delivery
// object so that we can call Done whenever we want to ACK or NACK the message
type EntryWrapper struct {
Entry Entry
Msg amqp.Delivery
}
// Done is called whenever the user wants to ACK/NACK a message via amqp.
// a boolean is passed indicated whether we should ACK or NACK
func (e *EntryWrapper) Done(succ bool) {
// If we cannot ACK a message then we are pretty much stuck, we won't
// be able to receive any additional messages
pan := func() {
Panic("Unable to ACK/NACK amqp message")
}
if succ {
err := e.Msg.Ack(false)
if err != nil {
pan()
}
return
}
err := e.Msg.Nack(false, false)
if err != nil {
pan()
}
}
// Entry represents a delayed message.
type Entry struct {
// Required
SendAt time.Time
Target string
Body []byte
// Optional
Key string
// XXX amqp specific
ContentEncoding string
ContentType string
CorrelationID string
}
// entryFromBytes creates a new Entry based on the MessagePack encoded byte slice b.
func entryFromBytes(b []byte) (e Entry, err error) {
dec := codec.NewDecoderBytes(b, &mh)
err = dec.Decode(&e)
return
}
// ToBytes encodes an Entry to a byte slice, encoding with MessagePack
func (e Entry) ToBytes() (b []byte, err error) {
enc := codec.NewEncoderBytes(&b, &mh)
err = enc.Encode(e)
return
}