forked from istio/istio
-
Notifications
You must be signed in to change notification settings - Fork 1
/
server.go
204 lines (172 loc) · 6.92 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
// Copyright 2017 Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package mockapi supplies a fake Mixer server for use in testing. It should NOT
// be used outside of testing contexts.
package mockapi // import "istio.io/istio/mixer/pkg/mockapi"
import (
"errors"
"fmt"
"net"
"time"
rpc "github.com/gogo/googleapis/google/rpc"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
grpcstatus "google.golang.org/grpc/status"
mixerpb "istio.io/api/mixer/v1"
"istio.io/istio/mixer/pkg/attribute"
"istio.io/istio/mixer/pkg/status"
)
// DefaultAmount is the default quota amount to use in testing (1).
var DefaultAmount = int64(1)
// DefaultValidUseCount is the default number of valid uses to return for
// quota allocs for testing (1).
var DefaultValidUseCount = int32(10000)
// DefaultValidDuration is the default duration to return for
// quota allocs in testing (1s).
var DefaultValidDuration = 5 * time.Second
// AttributesServer implements the Mixer API to send mutable attributes bags to
// a channel upon API requests. This can be used for tests that want to exercise
// the Mixer API and validate server handling of supplied attributes.
type AttributesServer struct {
// GlobalDict controls the known global dictionary for attribute processing.
GlobalDict map[string]int32
// GenerateGRPCError instructs the server whether or not to fail-fast with
// an error that will manifest as a GRPC error.
GenerateGRPCError bool
// Handler is what the server will call to simulate passing attribute bags
// and method args within the Mixer server. It allows tests to gain access
// to the attribute handling pipeline within Mixer and to set the response
// details.
Handler AttributesHandler
// CheckGlobalDict indicates whether to check if proxy global dictionary
// is ahead of the one in mixer.
checkGlobalDict bool
}
// NewAttributesServer creates an AttributesServer. All channels are set to
// default length.
func NewAttributesServer(handler AttributesHandler, checkDict bool) *AttributesServer {
list := attribute.GlobalList()
globalDict := make(map[string]int32, len(list))
for i := 0; i < len(list); i++ {
globalDict[list[i]] = int32(i)
}
return &AttributesServer{
globalDict,
false,
handler,
checkDict,
}
}
// Check sends a copy of the protocol buffers attributes wrapper for the preconditions
// check as well as for each quotas check to the CheckAttributes channel. It also
// builds a CheckResponse based on server fields. All channel sends timeout to
// prevent problematic tests from blocking indefinitely.
func (a *AttributesServer) Check(ctx context.Context, req *mixerpb.CheckRequest) (*mixerpb.CheckResponse, error) {
if a.GenerateGRPCError {
return nil, errors.New("error handling check call")
}
if a.checkGlobalDict && req.GlobalWordCount > uint32(len(a.GlobalDict)) {
return nil, fmt.Errorf("global dictionary mismatch: proxy %d and mixer %d", req.GlobalWordCount, len(a.GlobalDict))
}
requestBag := attribute.GetProtoBag(&req.Attributes, a.GlobalDict, attribute.GlobalList())
defer requestBag.Done()
result := a.Handler.Check(requestBag)
if result.ReferencedAttributes == nil {
result.ReferencedAttributes = requestBag.GetReferencedAttributes(a.GlobalDict, int(req.GlobalWordCount))
}
requestBag.ClearReferencedAttributes()
resp := &mixerpb.CheckResponse{Precondition: result}
if len(req.Quotas) > 0 {
resp.Quotas = make(map[string]mixerpb.CheckResponse_QuotaResult, len(req.Quotas))
for name, param := range req.Quotas {
args := QuotaArgs{
Quota: name,
Amount: param.Amount,
DeduplicationID: req.DeduplicationId + name,
BestEffort: param.BestEffort,
}
result, out := a.Handler.Quota(requestBag, args)
if status.IsOK(resp.Precondition.Status) && !status.IsOK(out) {
resp.Precondition.Status = out
}
qr := mixerpb.CheckResponse_QuotaResult{
GrantedAmount: result.Amount,
ValidDuration: result.Expiration,
ReferencedAttributes: *requestBag.GetReferencedAttributes(a.GlobalDict, int(req.GlobalWordCount)),
}
if result.Referenced != nil {
qr.ReferencedAttributes = *result.Referenced
} else {
qr.ReferencedAttributes = *requestBag.GetReferencedAttributes(a.GlobalDict, int(req.GlobalWordCount))
}
resp.Quotas[name] = qr
requestBag.ClearReferencedAttributes()
}
}
return resp, nil
}
// Report iterates through the supplied attributes sets, applying the deltas
// appropriately, and sending the generated bags to the channel.
func (a *AttributesServer) Report(ctx context.Context, req *mixerpb.ReportRequest) (*mixerpb.ReportResponse, error) {
if a.GenerateGRPCError {
return nil, errors.New("error handling report call")
}
if len(req.Attributes) == 0 {
// early out
return &mixerpb.ReportResponse{}, nil
}
// apply the request-level word list to each attribute message if needed
for i := 0; i < len(req.Attributes); i++ {
if len(req.Attributes[i].Words) == 0 {
req.Attributes[i].Words = req.DefaultWords
}
}
protoBag := attribute.GetProtoBag(&req.Attributes[0], a.GlobalDict, attribute.GlobalList())
requestBag := attribute.GetMutableBag(protoBag)
defer requestBag.Done()
defer protoBag.Done()
out := a.Handler.Report(requestBag)
for i := 1; i < len(req.Attributes); i++ {
// the first attribute block is handled by the protoBag as a foundation,
// deltas are applied to the child bag (i.e. requestBag)
if err := requestBag.UpdateBagFromProto(&req.Attributes[i], attribute.GlobalList()); err != nil {
return &mixerpb.ReportResponse{}, fmt.Errorf("could not apply attribute delta: %v", err)
}
out = a.Handler.Report(requestBag)
}
if !status.IsOK(out) {
return nil, makeGRPCError(out)
}
return &mixerpb.ReportResponse{}, nil
}
// NewMixerServer creates a new grpc.Server with the supplied implementation
// of the Mixer API.
func NewMixerServer(impl mixerpb.MixerServer) *grpc.Server {
gs := grpc.NewServer()
mixerpb.RegisterMixerServer(gs, impl)
return gs
}
// ListenerAndPort starts a listener on an available port and returns both the
// listener and the port on which it is listening.
func ListenerAndPort() (net.Listener, int, error) {
lis, err := net.Listen("tcp", ":0") // nolint: gas
if err != nil {
return nil, 0, fmt.Errorf("could not find open port for server: %v", err)
}
return lis, lis.Addr().(*net.TCPAddr).Port, nil
}
func makeGRPCError(status rpc.Status) error {
return grpcstatus.Errorf(codes.Code(status.Code), status.Message)
}