/
msg_server.go
129 lines (107 loc) · 4.13 KB
/
msg_server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package keeper
import (
"context"
"errors"
"fmt"
"sort"
"strings"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/nephirim/blackfury/utils"
"github.com/nephirim/blackfury/x/interchainquery/types"
)
type msgServer struct {
*Keeper
}
// NewMsgServerImpl returns an implementation of the bank MsgServer interface
// for the provided Keeper.
func NewMsgServerImpl(keeper Keeper) types.MsgServer {
return &msgServer{Keeper: &keeper}
}
var _ types.MsgServer = msgServer{}
func (k msgServer) SubmitQueryResponse(goCtx context.Context, msg *types.MsgSubmitQueryResponse) (*types.MsgSubmitQueryResponseResponse, error) {
ctx := sdk.UnwrapSDKContext(goCtx)
q, found := k.GetQuery(ctx, msg.QueryId)
if !found {
k.Logger(ctx).Debug("query not found", "QueryID", msg.QueryId)
return &types.MsgSubmitQueryResponseResponse{}, nil
}
latest := k.GetLatestHeight(ctx, msg.ChainId)
if latest > uint64(msg.Height) && q.QueryType != "tendermint.Tx" && q.QueryType != "ibc.ClientUpdate" {
k.Logger(ctx).Error("ignoring stale query result", "id", q.Id, "type", q.QueryType, "latestHeight", latest, "msgHeight", msg.Height)
// technically this is an error, but will cause the entire tx to fail
// if we have one 'bad' message, so we can just no-op here.
return &types.MsgSubmitQueryResponseResponse{}, nil
}
// check if query was previously processed
// - indicated by query.LastHeight matching current Block Height;
if q.LastHeight.Int64() == ctx.BlockHeader().Height {
k.Logger(ctx).Debug("ignoring duplicate query", "id", q.Id, "type", q.QueryType)
// technically this is an error, but will cause the entire tx to fail
// if we have one 'bad' message, so we can just no-op here.
return &types.MsgSubmitQueryResponseResponse{}, nil
}
pathParts := strings.Split(q.QueryType, "/")
if pathParts[len(pathParts)-1] == "key" {
if err := utils.ValidateProofOps(ctx, k.IBCKeeper, q.ConnectionId, q.ChainId, msg.Height, pathParts[1], q.Request, msg.Result, msg.ProofOps); err != nil {
k.Logger(ctx).Error("failed to validate proofops", "id", q.Id, "type", q.QueryType)
return nil, err
}
}
noDelete := false
// execute registered callbacks.
keys := make([]string, 0)
for k := range k.callbacks {
keys = append(keys, k)
}
sort.Strings(keys)
callbackExecuted := false
for _, key := range keys {
module := k.callbacks[key]
if module.Has(q.CallbackId) {
err := module.Call(ctx, q.CallbackId, msg.Result, q)
callbackExecuted = true
if err != nil {
// not edge case: proceed with regular error handling!
if !errors.Is(err, types.ErrSucceededNoDelete) {
k.Logger(ctx).Error("error in callback", "error", err, "msg", msg.QueryId, "result", msg.Result, "type", q.QueryType, "params", q.Request)
return nil, err
}
// edge case: the callback has resent the same query (re-query)!
// action: set noDelete to true and continue (short circuit error handling)!
noDelete = true
}
// we have executed a callback; only a single callback is expected per request, so break here.
break
}
}
k.SetLatestHeight(ctx, msg.ChainId, uint64(msg.Height))
if !callbackExecuted && q.CallbackId != "" {
k.Logger(ctx).Error("callback expected but not found", "callbackId", q.CallbackId, "msg", msg.QueryId, "type", q.QueryType)
return nil, fmt.Errorf("expected callback %s, but did not find it", q.CallbackId)
}
if q.Ttl > 0 {
// don't store if ttl is 0
if err := k.SetDatapointForID(ctx, msg.QueryId, msg.Result, sdk.NewInt(msg.Height)); err != nil {
k.Logger(ctx).Error("failed to set datapoint", "id", q.Id, "type", q.QueryType)
return nil, err
}
}
// check for and delete non-repeating queries, update any other
// - Period.IsNegative() indicates a single query;
// - noDelete indicates a response that triggered a re-query;
if q.Period.IsNegative() {
if !noDelete {
k.DeleteQuery(ctx, msg.QueryId)
}
} else {
q.LastHeight = sdk.NewInt(ctx.BlockHeight())
k.SetQuery(ctx, q)
}
ctx.EventManager().EmitEvents(sdk.Events{
sdk.NewEvent(
sdk.EventTypeMessage,
sdk.NewAttribute(sdk.AttributeKeyModule, types.AttributeValueCategory),
),
})
return &types.MsgSubmitQueryResponseResponse{}, nil
}