-
Notifications
You must be signed in to change notification settings - Fork 0
/
responder.go
110 lines (91 loc) · 2.87 KB
/
responder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package qp
import (
"errors"
"github.com/stretchr/slog"
)
// TransactionHandler represents types capable of handling Requests.
type TransactionHandler interface {
Handle(req *Transaction) *Transaction
}
// TransactionHandlerFunc represents functions capable of handling
// Requests.
type TransactionHandlerFunc func(r *Transaction) *Transaction
// Handle calls the TransactionHandlerFunc in order to handle
// the specific Transaction.
func (f TransactionHandlerFunc) Handle(r *Transaction) *Transaction {
return f(r)
}
// Responder represents types capable of responding to requests.
type Responder interface {
// Handle binds a TransactionHandler to the specified channel.
Handle(channel string, handler TransactionHandler) error
// HandleFunc binds the specified function to the specified channel.
HandleFunc(channel string, f TransactionHandlerFunc) error
}
// responder responds to requests.
type responder struct {
name string
instanceID string
uniqueID string
codec Codec
transport DirectTransport
log slog.Logger
}
// NewResponder makes a new object capable of responding to requests.
func NewResponder(name, instanceID string, codec Codec, transport DirectTransport) Responder {
return NewResponderLogger(name, instanceID, codec, transport, slog.NilLogger)
}
// NewResponderLogger makes a new object capable of responding to requests, which
// will log errors to the specified Logger.
func NewResponderLogger(name, instanceID string, codec Codec, transport DirectTransport, logger slog.Logger) Responder {
return &responder{
codec: codec,
transport: transport,
uniqueID: name + "." + instanceID,
log: logger,
}
}
func (r *responder) Handle(channel string, handler TransactionHandler) error {
return r.transport.OnMessage(channel, HandlerFunc(func(msg *Message) {
var request Transaction
if err := r.codec.Unmarshal(msg.Data, &request); err != nil {
if r.log.Err() {
r.log.Err("unmarshal error:", err)
}
return
}
request = *handler.Handle(&request)
// at this point, the caller has mutated the data.
// forward this request object to the next endpoint
var to string
if len(request.To) != 0 {
// pop off the first to
to = request.To[0]
request.To = request.To[1:]
} else {
// send it from form whence it came
if len(request.From) == 0 {
err := errors.New("cannot respond when From field is empty")
if r.log.Err() {
r.log.Err("error handling request:", err)
}
return
}
to = request.From[0]
}
request.From = append(request.From, r.uniqueID)
// encode the data
data, err := r.codec.Marshal(request)
if err != nil {
if r.log.Err() {
r.log.Err("error encoding data for pipeline:", err)
}
return
}
// send the data
r.transport.Send(to, data)
}))
}
func (r *responder) HandleFunc(channel string, f TransactionHandlerFunc) error {
return r.Handle(channel, f)
}