-
Notifications
You must be signed in to change notification settings - Fork 10
/
rpc_client.go
64 lines (57 loc) · 1.9 KB
/
rpc_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package operator
import (
"errors"
"net/rpc"
"time"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/yetanotherco/aligned_layer/core/types"
)
// AggregatorRpcClient is the client to communicate with the aggregator via RPC
type AggregatorRpcClient struct {
rpcClient *rpc.Client
aggregatorIpPortAddr string
logger logging.Logger
}
const (
MaxRetries = 10
RetryInterval = 10 * time.Second
)
func NewAggregatorRpcClient(aggregatorIpPortAddr string, logger logging.Logger) (*AggregatorRpcClient, error) {
client, err := rpc.DialHTTP("tcp", aggregatorIpPortAddr)
if err != nil {
return nil, err
}
return &AggregatorRpcClient{
rpcClient: client,
aggregatorIpPortAddr: aggregatorIpPortAddr,
logger: logger,
}, nil
}
// SendSignedTaskResponseToAggregator is the method called by operators via RPC to send
// their signed task response.
func (c *AggregatorRpcClient) SendSignedTaskResponseToAggregator(signedTaskResponse *types.SignedTaskResponse) {
var reply uint8
for retries := 0; retries < MaxRetries; retries++ {
err := c.rpcClient.Call("Aggregator.ProcessOperatorSignedTaskResponse", signedTaskResponse, &reply)
if err != nil {
c.logger.Error("Received error from aggregator", "err", err)
if errors.Is(err, rpc.ErrShutdown) {
c.logger.Error("Aggregator is shutdown. Reconnecting...")
client, err := rpc.DialHTTP("tcp", c.aggregatorIpPortAddr)
if err != nil {
c.logger.Error("Could not reconnect to aggregator", "err", err)
time.Sleep(RetryInterval)
} else {
c.rpcClient = client
c.logger.Info("Reconnected to aggregator")
}
} else {
c.logger.Infof("Received error from aggregator: %s. Retrying ProcessOperatorSignedTaskResponse RPC call...", err)
time.Sleep(RetryInterval)
}
} else {
c.logger.Info("Signed task response header accepted by aggregator.", "reply", reply)
return
}
}
}