forked from go-kit/kit
/
publisher.go
188 lines (164 loc) · 4.53 KB
/
publisher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
package amqp
import (
"context"
"time"
"github.com/go-kit/kit/endpoint"
"github.com/streadway/amqp"
)
// The golang AMQP implementation requires the []byte representation of
// correlation id strings to have a maximum length of 255 bytes.
const maxCorrelationIdLength = 255
// Publisher wraps an AMQP channel and queue, and provides a method that
// implements endpoint.Endpoint.
type Publisher struct {
ch Channel
q *amqp.Queue
enc EncodeRequestFunc
dec DecodeResponseFunc
before []RequestFunc
after []PublisherResponseFunc
deliverer Deliverer
timeout time.Duration
}
// NewPublisher constructs a usable Publisher for a single remote method.
func NewPublisher(
ch Channel,
q *amqp.Queue,
enc EncodeRequestFunc,
dec DecodeResponseFunc,
options ...PublisherOption,
) *Publisher {
p := &Publisher{
ch: ch,
q: q,
enc: enc,
dec: dec,
deliverer: DefaultDeliverer,
timeout: 10 * time.Second,
}
for _, option := range options {
option(p)
}
return p
}
// PublisherOption sets an optional parameter for clients.
type PublisherOption func(*Publisher)
// PublisherBefore sets the RequestFuncs that are applied to the outgoing AMQP
// request before it's invoked.
func PublisherBefore(before ...RequestFunc) PublisherOption {
return func(p *Publisher) { p.before = append(p.before, before...) }
}
// PublisherAfter sets the ClientResponseFuncs applied to the incoming AMQP
// request prior to it being decoded. This is useful for obtaining anything off
// of the response and adding onto the context prior to decoding.
func PublisherAfter(after ...PublisherResponseFunc) PublisherOption {
return func(p *Publisher) { p.after = append(p.after, after...) }
}
// PublisherDeliverer sets the deliverer function that the Publisher invokes.
func PublisherDeliverer(deliverer Deliverer) PublisherOption {
return func(p *Publisher) { p.deliverer = deliverer }
}
// PublisherTimeout sets the available timeout for an AMQP request.
func PublisherTimeout(timeout time.Duration) PublisherOption {
return func(p *Publisher) { p.timeout = timeout }
}
// Endpoint returns a usable endpoint that invokes the remote endpoint.
func (p Publisher) Endpoint() endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
ctx, cancel := context.WithTimeout(ctx, p.timeout)
defer cancel()
pub := amqp.Publishing{
ReplyTo: p.q.Name,
CorrelationId: randomString(randInt(5, maxCorrelationIdLength)),
}
if err := p.enc(ctx, &pub, request); err != nil {
return nil, err
}
for _, f := range p.before {
// Affect only amqp.Publishing
ctx = f(ctx, &pub, nil)
}
deliv, err := p.deliverer(ctx, p, &pub)
if err != nil {
return nil, err
}
for _, f := range p.after {
ctx = f(ctx, deliv)
}
response, err := p.dec(ctx, deliv)
if err != nil {
return nil, err
}
return response, nil
}
}
// Deliverer is invoked by the Publisher to publish the specified Publishing, and to
// retrieve the appropriate response Delivery object.
type Deliverer func(
context.Context,
Publisher,
*amqp.Publishing,
) (*amqp.Delivery, error)
// DefaultDeliverer is a deliverer that publishes the specified Publishing
// and returns the first Delivery object with the matching correlationId.
// If the context times out while waiting for a reply, an error will be returned.
func DefaultDeliverer(
ctx context.Context,
p Publisher,
pub *amqp.Publishing,
) (*amqp.Delivery, error) {
err := p.ch.Publish(
getPublishExchange(ctx),
getPublishKey(ctx),
false, //mandatory
false, //immediate
*pub,
)
if err != nil {
return nil, err
}
autoAck := getConsumeAutoAck(ctx)
msg, err := p.ch.Consume(
p.q.Name,
"", //consumer
autoAck,
false, //exclusive
false, //noLocal
false, //noWait
getConsumeArgs(ctx),
)
if err != nil {
return nil, err
}
for {
select {
case d := <-msg:
if d.CorrelationId == pub.CorrelationId {
if !autoAck {
d.Ack(false) //multiple
}
return &d, nil
}
case <-ctx.Done():
return nil, ctx.Err()
}
}
}
// SendAndForgetDeliverer delivers the supplied publishing and
// returns a nil response.
// When using this deliverer please ensure that the supplied DecodeResponseFunc and
// PublisherResponseFunc are able to handle nil-type responses.
func SendAndForgetDeliverer(
ctx context.Context,
p Publisher,
pub *amqp.Publishing,
) (*amqp.Delivery, error) {
err := p.ch.Publish(
getPublishExchange(ctx),
getPublishKey(ctx),
false, //mandatory
false, //immediate
*pub,
)
return nil, err
}