-
Notifications
You must be signed in to change notification settings - Fork 0
/
publisher.go
114 lines (89 loc) · 1.9 KB
/
publisher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package rabbitmq
import (
"encoding/json"
"github.com/streadway/amqp"
)
type Publisher struct {
connector *Connector
exchangeName string
routingKey string
deliveryMode uint8
contentType string
body []byte
obj interface{}
expiration string
}
func newPublisher(connector *Connector) *Publisher {
return &Publisher{
connector: connector,
deliveryMode: amqp.Persistent,
contentType: "text/plain",
}
}
func (p *Publisher) ToExchange(name string) *Publisher {
p.exchangeName = name
return p
}
func (p *Publisher) ToQueue(name string) *Publisher {
p.routingKey = name
return p
}
func (p *Publisher) AsPersistent() *Publisher {
p.deliveryMode = amqp.Persistent
return p
}
func (p *Publisher) AsTransient() *Publisher {
p.deliveryMode = amqp.Transient
return p
}
func (p *Publisher) WithExpiration(expiration string) *Publisher {
p.expiration = expiration
return p
}
func (p *Publisher) WithContentType(contentType string) *Publisher {
p.contentType = contentType
return p
}
func (p *Publisher) WithData(data []byte) *Publisher {
p.body = data
return p
}
func (p *Publisher) WithStruct(obj interface{}) *Publisher {
p.obj = obj
return p
}
func (p *Publisher) Do() error {
channel, err := p.connector.GetChannel()
if nil != err {
return err
}
defer channel.Close()
body, err := p.getBody()
if nil != err {
return err
}
return channel.Publish(
p.exchangeName,
p.routingKey,
false,
false,
amqp.Publishing{
ContentType: p.contentType,
Body: body,
DeliveryMode: p.deliveryMode,
Expiration: p.expiration,
})
}
func (p *Publisher) getBody() ([]byte, error) {
if nil != p.obj {
return p.unmarshalStruct()
}
return p.body, nil
}
func (p *Publisher) unmarshalStruct() ([]byte, error) {
objWithMarshalJSON, ok := p.obj.(json.Marshaler)
if ok {
return objWithMarshalJSON.MarshalJSON()
}
return json.Marshal(p.obj)
}