-
Notifications
You must be signed in to change notification settings - Fork 0
/
publisher.go
110 lines (92 loc) · 2.75 KB
/
publisher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package nats
import (
"context"
"encoding/json"
"github.com/jjggzz/kit/endpoint"
"github.com/nats-io/nats.go"
"time"
)
// Publisher wraps a URL and provides a method that implements endpoint.Endpoint.
type Publisher struct {
publisher *nats.Conn
subject string
enc EncodeRequestFunc
dec DecodeResponseFunc
before []RequestFunc
after []PublisherResponseFunc
timeout time.Duration
}
// NewPublisher constructs a usable Publisher for a single remote method.
func NewPublisher(
publisher *nats.Conn,
subject string,
enc EncodeRequestFunc,
dec DecodeResponseFunc,
options ...PublisherOption,
) *Publisher {
p := &Publisher{
publisher: publisher,
subject: subject,
enc: enc,
dec: dec,
timeout: 10 * time.Second,
}
for _, option := range options {
option(p)
}
return p
}
// PublisherOption sets an optional parameter for clients.
type PublisherOption func(*Publisher)
// PublisherBefore sets the RequestFuncs that are applied to the outgoing NATS
// request before it's invoked.
func PublisherBefore(before ...RequestFunc) PublisherOption {
return func(p *Publisher) { p.before = append(p.before, before...) }
}
// PublisherAfter sets the ClientResponseFuncs applied to the incoming NATS
// request prior to it being decoded. This is useful for obtaining anything off
// of the response and adding onto the context prior to decoding.
func PublisherAfter(after ...PublisherResponseFunc) PublisherOption {
return func(p *Publisher) { p.after = append(p.after, after...) }
}
// PublisherTimeout sets the available timeout for NATS request.
func PublisherTimeout(timeout time.Duration) PublisherOption {
return func(p *Publisher) { p.timeout = timeout }
}
// Endpoint returns a usable endpoint that invokes the remote endpoint.
func (p Publisher) Endpoint() endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
ctx, cancel := context.WithTimeout(ctx, p.timeout)
defer cancel()
msg := nats.Msg{Subject: p.subject}
if err := p.enc(ctx, &msg, request); err != nil {
return nil, err
}
for _, f := range p.before {
ctx = f(ctx, &msg)
}
resp, err := p.publisher.RequestWithContext(ctx, msg.Subject, msg.Data)
if err != nil {
return nil, err
}
for _, f := range p.after {
ctx = f(ctx, resp)
}
response, err := p.dec(ctx, resp)
if err != nil {
return nil, err
}
return response, nil
}
}
// EncodeJSONRequest is an EncodeRequestFunc that serializes the request as a
// JSON object to the Data of the Msg. Many JSON-over-NATS services can use it as
// a sensible default.
func EncodeJSONRequest(_ context.Context, msg *nats.Msg, request interface{}) error {
b, err := json.Marshal(request)
if err != nil {
return err
}
msg.Data = b
return nil
}