-
Notifications
You must be signed in to change notification settings - Fork 179
/
backend_accounts.go
160 lines (134 loc) · 4.6 KB
/
backend_accounts.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
package backend
import (
"context"
"time"
"github.com/hashicorp/go-multierror"
execproto "github.com/onflow/flow/protobuf/go/flow/execution"
"github.com/rs/zerolog"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/onflow/flow-go/engine/common/rpc"
"github.com/onflow/flow-go/engine/common/rpc/convert"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/state/protocol"
"github.com/onflow/flow-go/storage"
)
type backendAccounts struct {
state protocol.State
headers storage.Headers
executionReceipts storage.ExecutionReceipts
connFactory ConnectionFactory
log zerolog.Logger
}
func (b *backendAccounts) GetAccount(ctx context.Context, address flow.Address) (*flow.Account, error) {
return b.GetAccountAtLatestBlock(ctx, address)
}
func (b *backendAccounts) GetAccountAtLatestBlock(ctx context.Context, address flow.Address) (*flow.Account, error) {
// get the latest sealed header
latestHeader, err := b.state.Sealed().Head()
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to get latest sealed header: %v", err)
}
// get the block id of the latest sealed header
latestBlockID := latestHeader.ID()
account, err := b.getAccountAtBlockID(ctx, address, latestBlockID)
if err != nil {
b.log.Error().Err(err).Msgf("failed to get account at blockID: %v", latestBlockID)
return nil, err
}
return account, nil
}
func (b *backendAccounts) GetAccountAtBlockHeight(
ctx context.Context,
address flow.Address,
height uint64,
) (*flow.Account, error) {
// get header at given height
header, err := b.headers.ByHeight(height)
if err != nil {
err = rpc.ConvertStorageError(err)
return nil, err
}
// get block ID of the header at the given height
blockID := header.ID()
account, err := b.getAccountAtBlockID(ctx, address, blockID)
if err != nil {
return nil, err
}
return account, nil
}
func (b *backendAccounts) getAccountAtBlockID(
ctx context.Context,
address flow.Address,
blockID flow.Identifier,
) (*flow.Account, error) {
exeReq := execproto.GetAccountAtBlockIDRequest{
Address: address.Bytes(),
BlockId: blockID[:],
}
execNodes, err := executionNodesForBlockID(ctx, blockID, b.executionReceipts, b.state, b.log)
if err != nil {
return nil, getAccountError(err)
}
var exeRes *execproto.GetAccountAtBlockIDResponse
exeRes, err = b.getAccountFromAnyExeNode(ctx, execNodes, exeReq)
if err != nil {
b.log.Error().Err(err).Msg("failed to get account from execution nodes")
return nil, err
}
account, err := convert.MessageToAccount(exeRes.GetAccount())
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to convert account message: %v", err)
}
return account, nil
}
func getAccountError(err error) error {
errStatus, _ := status.FromError(err)
if errStatus.Code() == codes.NotFound {
return err
}
return status.Errorf(codes.Internal, "failed to get account from the execution node: %v", err)
}
func (b *backendAccounts) getAccountFromAnyExeNode(ctx context.Context, execNodes flow.IdentityList, req execproto.GetAccountAtBlockIDRequest) (*execproto.GetAccountAtBlockIDResponse, error) {
var errors *multierror.Error // captures all error except
for _, execNode := range execNodes {
// TODO: use the GRPC Client interceptor
start := time.Now()
resp, err := b.tryGetAccount(ctx, execNode, req)
duration := time.Since(start)
if err == nil {
// return if any execution node replied successfully
b.log.Debug().
Str("execution_node", execNode.String()).
Hex("block_id", req.GetBlockId()).
Hex("address", req.GetAddress()).
Int64("rtt_ms", duration.Milliseconds()).
Msg("Successfully got account info")
return resp, nil
}
b.log.Error().
Str("execution_node", execNode.String()).
Hex("block_id", req.GetBlockId()).
Hex("address", req.GetAddress()).
Int64("rtt_ms", duration.Milliseconds()).
Err(err).
Msg("failed to execute GetAccount")
errors = multierror.Append(errors, err)
}
return nil, rpc.ConvertMultiError(errors, "failed to get account from the execution node", codes.Internal)
}
func (b *backendAccounts) tryGetAccount(ctx context.Context, execNode *flow.Identity, req execproto.GetAccountAtBlockIDRequest) (*execproto.GetAccountAtBlockIDResponse, error) {
execRPCClient, closer, err := b.connFactory.GetExecutionAPIClient(execNode.Address)
if err != nil {
return nil, err
}
defer closer.Close()
resp, err := execRPCClient.GetAccountAtBlockID(ctx, &req)
if err != nil {
if status.Code(err) == codes.Unavailable {
b.connFactory.InvalidateExecutionAPIClient(execNode.Address)
}
return nil, err
}
return resp, nil
}