/
cabalrpc.go
55 lines (47 loc) · 1.32 KB
/
cabalrpc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package internal
import (
"context"
"github.com/manifoldfinance/cabalrpc/internal/broker"
"github.com/manifoldfinance/cabalrpc/internal/config"
"github.com/manifoldfinance/cabalrpc/internal/log"
"github.com/manifoldfinance/cabalrpc/internal/net"
"go.uber.org/zap"
)
var logger *zap.Logger
type Cabalrpc interface {
Start()
}
func NewCabalrpc(config config.Config, broker broker.Broker) (Cabalrpc, error) {
logger = log.GetLogger(config)
return cabalrpc{
config: config,
broker: broker,
rpcClient: net.NewRpcClient(config),
}, nil
}
type cabalrpc struct {
config config.Config
broker broker.Broker
rpcClient net.RpcClient
}
func (s cabalrpc) Start() {
defer logger.Sync()
err := s.broker.Subscribe(s.config.TopicIncomingRpcRequests, s.onIncomingRequest)
if err != nil {
logger.Error("failed to subscribe to incoming request topic", zap.Error(err))
}
}
func (s cabalrpc) onIncomingRequest(request []byte) {
response, err := s.rpcClient.Call(request)
if err != nil {
s.handleError(err)
return
}
s.handleError(s.broker.Publish(context.Background(), s.config.TopicOutgoingRpcResponses, response))
}
func (s cabalrpc) handleError(err error) {
if err != nil {
logger.Error("error occurred", zap.Error(err))
_ = s.broker.Publish(context.Background(), s.config.TopicErrors, []byte(err.Error()))
}
}