forked from 0xProject/0x-mesh
-
Notifications
You must be signed in to change notification settings - Fork 0
/
service.go
169 lines (148 loc) · 5.63 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
package rpc
import (
"context"
"encoding/json"
"net"
"strings"
"time"
"github.com/0xProject/0x-mesh/constants"
"github.com/0xProject/0x-mesh/zeroex/ordervalidator"
"github.com/ethereum/go-ethereum/rpc"
ethrpc "github.com/ethereum/go-ethereum/rpc"
peer "github.com/libp2p/go-libp2p-peer"
peerstore "github.com/libp2p/go-libp2p-peerstore"
ma "github.com/multiformats/go-multiaddr"
log "github.com/sirupsen/logrus"
)
// minHeartbeatInterval specifies the interval at which to emit heartbeat events to a subscriber
var minHeartbeatInterval = 5 * time.Second
// rpcService is an /ethereum/go-ethereum/rpc compatible service.
type rpcService struct {
rpcHandler RPCHandler
}
// RPCHandler is used to respond to incoming requests from the client.
type RPCHandler interface {
// AddOrders is called when the client sends an AddOrders request.
AddOrders(signedOrdersRaw []*json.RawMessage, opts AddOrdersOpts) (*ordervalidator.ValidationResults, error)
// GetOrders is called when the clients sends a GetOrders request
GetOrders(page, perPage int, snapshotID string) (*GetOrdersResponse, error)
// AddPeer is called when the client sends an AddPeer request.
AddPeer(peerInfo peerstore.PeerInfo) error
// GetStats is called when the client sends an GetStats request.
GetStats() (*GetStatsResponse, error)
// SubscribeToOrders is called when a client sends a Subscribe to `orders` request
SubscribeToOrders(ctx context.Context) (*rpc.Subscription, error)
}
// Orders calls rpcHandler.SubscribeToOrders and returns the rpc subscription.
func (s *rpcService) Orders(ctx context.Context) (*rpc.Subscription, error) {
return s.rpcHandler.SubscribeToOrders(ctx)
}
// Heartbeat calls rpcHandler.SubscribeToHeartbeat and returns the rpc subscription.
func (s *rpcService) Heartbeat(ctx context.Context) (*rpc.Subscription, error) {
log.Debug("received heartbeat subscription request via RPC")
subscription, err := SetupHeartbeat(ctx)
if err != nil {
log.WithField("error", err.Error()).Error("internal error in `mesh_subscribe` to `heartbeat` RPC call")
return nil, constants.ErrInternal
}
return subscription, nil
}
// SetupHeartbeat sets up the heartbeat for a subscription
func SetupHeartbeat(ctx context.Context) (*ethrpc.Subscription, error) {
notifier, supported := ethrpc.NotifierFromContext(ctx)
if !supported {
return ðrpc.Subscription{}, ethrpc.ErrNotificationsUnsupported
}
rpcSub := notifier.CreateSubscription()
go func() {
for {
select {
case err := <-rpcSub.Err():
if err != nil {
log.WithField("err", err).Error("rpcSub returned an error")
} else {
log.Debug("rpcSub was closed without error")
}
return
case <-notifier.Closed():
return
default:
// Continue
}
start := time.Now()
err := notifier.Notify(rpcSub.ID, "tick")
if err != nil {
// TODO(fabio): The current implementation of `notifier.Notify` returns a
// `write: broken pipe` error when it is called _after_ the client has
// disconnected but before the corresponding error is received on the
// `rpcSub.Err()` channel. This race-condition is not problematic beyond
// the unnecessary computation and log spam resulting from it. Once this is
// fixed upstream, give all logs an `Error` severity.
logEntry := log.WithFields(map[string]interface{}{
"error": err.Error(),
"subscriptionType": "heartbeat",
})
message := "error while calling notifier.Notify"
// If the network connection disconnects for longer then ~2mins and then comes
// back up, we've noticed the call to `notifier.Notify` return `i/o timeout`
// `net.OpError` errors everytime it's called and no values are sent over
// `rpcSub.Err()` nor `notifier.Closed()`. In order to stop the error from
// endlessly re-occuring, we unsubscribe and return for encountering this type of
// error.
if _, ok := err.(*net.OpError); ok {
logEntry.Trace(message)
return
}
if strings.Contains(err.Error(), "write: broken pipe") {
logEntry.Trace(message)
} else {
logEntry.Error(message)
}
}
// Wait MinCleanupInterval before emitting the next heartbeat.
time.Sleep(minHeartbeatInterval - time.Since(start))
}
}()
return rpcSub, nil
}
var defaultAddOrdersOpts = AddOrdersOpts{
Pinned: true,
}
// AddOrders calls rpcHandler.AddOrders and returns the validation results.
func (s *rpcService) AddOrders(signedOrdersRaw []*json.RawMessage, opts *AddOrdersOpts) (*ordervalidator.ValidationResults, error) {
if opts == nil {
opts = &defaultAddOrdersOpts
}
return s.rpcHandler.AddOrders(signedOrdersRaw, *opts)
}
// GetOrders calls rpcHandler.GetOrders and returns the validation results.
func (s *rpcService) GetOrders(page, perPage int, snapshotID string) (*GetOrdersResponse, error) {
return s.rpcHandler.GetOrders(page, perPage, snapshotID)
}
// AddPeer builds PeerInfo out of the given peer ID and multiaddresses and
// calls rpcHandler.AddPeer. If there is an error, it returns it.
func (s *rpcService) AddPeer(peerID string, multiaddrs []string) error {
// Parse peer ID.
parsedPeerID, err := peer.IDB58Decode(peerID)
if err != nil {
return err
}
peerInfo := peerstore.PeerInfo{
ID: parsedPeerID,
}
// Parse each given multiaddress.
parsedMultiaddrs := make([]ma.Multiaddr, len(multiaddrs))
for i, addr := range multiaddrs {
parsed, err := ma.NewMultiaddr(addr)
if err != nil {
return err
}
parsedMultiaddrs[i] = parsed
}
peerInfo.Addrs = parsedMultiaddrs
return s.rpcHandler.AddPeer(peerInfo)
}
// GetStats calls rpcHandler.GetStats. If there is an error, it returns it.
func (s *rpcService) GetStats() (*GetStatsResponse, error) {
return s.rpcHandler.GetStats()
}