mirrored from https://chromium.googlesource.com/infra/luci/luci-go
/
client.go
272 lines (226 loc) · 7.7 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
// Copyright 2017 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package bundleServicesClient
import (
"context"
"sync"
"time"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/gcloud/gae"
"go.chromium.org/luci/common/logging"
s "go.chromium.org/luci/logdog/api/endpoints/coordinator/services/v1"
"github.com/golang/protobuf/proto"
"google.golang.org/api/support/bundler"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"
)
// The maximum, AppEngine request size, minus 1MB for overhead.
const maxBundleSize = gae.MaxRequestSize - (1024 * 1024) // 1MB
// Client is a LogDog Coordinator Services endpoint client that intercepts
// calls that can be batched and buffers them, sending them with the Batch
// RPC instead of their independent individual RPCs.
//
// The Context and CallOption set for the first intercepted call will be used
// when making the batch call; all other CallOption sets will be ignored.
//
// Bundling parameters can be controlled by modifying the Bundler prior to
// invoking it.
type Client struct {
// ServicesClient is the Coordinator Services endpoint Client that is being
// wrapped.
s.ServicesClient
// Starting from the time that the first message is added to a bundle, once
// this delay has passed, handle the bundle.
DelayThreshold time.Duration
// Once a bundle has this many items, handle the bundle. Since only one
// item at a time is added to a bundle, no bundle will exceed this
// threshold, so it also serves as a limit.
BundleCountThreshold int
initBundlerOnce sync.Once
bundler *bundler.Bundler
// outstanding is used to track outstanding RPCs. On Flush, the Client will
// block pending completion of all outstanding RPCs.
outstanding sync.WaitGroup
}
// RegisterStream implements ServicesClient.
func (c *Client) RegisterStream(ctx context.Context, in *s.RegisterStreamRequest, opts ...grpc.CallOption) (
*s.RegisterStreamResponse, error) {
resp, err := c.bundleRPC(ctx, opts, &s.BatchRequest_Entry{
Value: &s.BatchRequest_Entry_RegisterStream{RegisterStream: in},
})
if err != nil {
return nil, err
}
return resp.GetRegisterStream(), nil
}
// LoadStream implements ServicesClient.
func (c *Client) LoadStream(ctx context.Context, in *s.LoadStreamRequest, opts ...grpc.CallOption) (
*s.LoadStreamResponse, error) {
resp, err := c.bundleRPC(ctx, opts, &s.BatchRequest_Entry{
Value: &s.BatchRequest_Entry_LoadStream{LoadStream: in},
})
if err != nil {
return nil, err
}
return resp.GetLoadStream(), nil
}
// TerminateStream implements ServicesClient.
func (c *Client) TerminateStream(ctx context.Context, in *s.TerminateStreamRequest, opts ...grpc.CallOption) (
*emptypb.Empty, error) {
_, err := c.bundleRPC(ctx, opts, &s.BatchRequest_Entry{
Value: &s.BatchRequest_Entry_TerminateStream{TerminateStream: in},
})
if err != nil {
return nil, err
}
return &emptypb.Empty{}, nil
}
// ArchiveStream implements ServicesClient.
func (c *Client) ArchiveStream(ctx context.Context, in *s.ArchiveStreamRequest, opts ...grpc.CallOption) (
*emptypb.Empty, error) {
_, err := c.bundleRPC(ctx, opts, &s.BatchRequest_Entry{
Value: &s.BatchRequest_Entry_ArchiveStream{ArchiveStream: in},
})
if err != nil {
return nil, err
}
return &emptypb.Empty{}, nil
}
// Flush flushes the Bundler. It should be called when terminating to ensure
// that buffered client requests have been completed.
func (c *Client) Flush() {
c.initBundler()
c.bundler.Flush()
c.outstanding.Wait()
}
func (c *Client) initBundler() {
c.initBundlerOnce.Do(func() {
c.bundler = bundler.NewBundler(&batchEntry{}, c.bundlerHandler)
c.bundler.DelayThreshold = c.DelayThreshold
c.bundler.BundleCountThreshold = c.BundleCountThreshold
c.bundler.BundleByteThreshold = maxBundleSize // Hard-coded.
})
}
// bundleRPC adds req to the underlying Bundler, blocks until it completes, and
// returns its response.
func (c *Client) bundleRPC(ctx context.Context, opts []grpc.CallOption, req *s.BatchRequest_Entry) (*s.BatchResponse_Entry, error) {
c.initBundler()
be := &batchEntry{
req: req,
ctx: ctx,
opts: opts,
complete: make(chan *s.BatchResponse_Entry, 1),
}
if err := c.addEntry(be); err != nil {
return nil, err
}
resp := <-be.complete
if e := resp.GetErr(); e != nil {
return nil, e.ToError()
}
return resp, nil
}
func (c *Client) addEntry(be *batchEntry) error {
return c.bundler.Add(be, proto.Size(be.req))
}
// bundleHandler is called when a bundle threshold has been met.
//
// This is a bundler.Bundler handler function. "iface" is []*batchEntry{}, a
// slice of the prototype passed into NewBundler.
//
// Note that "iface" is owned by this handler; the Bundler allocates a new
// slice after each bundle dispatch. Therefore, retention and mutation are safe.
func (c *Client) bundlerHandler(iface any) {
entries := iface.([]*batchEntry)
if len(entries) == 0 {
return
}
ctx, opts := entries[0].ctx, entries[0].opts
c.outstanding.Add(1)
go func() {
defer c.outstanding.Done()
c.sendBundle(ctx, entries, opts...)
}()
}
func (c *Client) sendBundle(ctx context.Context, entries []*batchEntry, opts ...grpc.CallOption) {
req := s.BatchRequest{
Req: make([]*s.BatchRequest_Entry, len(entries)),
}
for i, ent := range entries {
req.Req[i] = ent.req
}
resp, err := c.ServicesClient.Batch(ctx, &req, opts...)
// Supply a response to each blocking request. Note that "complete" is a
// buffered channel, so this will not block.
if err != nil {
logging.WithError(err).Errorf(ctx, "Failed to send RPC bundle.")
// Error case: generate an error response from "err".
for _, ent := range entries {
e := s.MakeError(err)
ent.complete <- &s.BatchResponse_Entry{
Value: &s.BatchResponse_Entry_Err{Err: e},
}
}
return
}
// We don't have a solution for a case where the Coordinator couldn't provide
// a single response. We would infinitely continue retrying our initial
// request set.
//
// This shouldn't happen, but if it does, make it visible.
if len(resp.Resp) == 0 {
panic(errors.New("batch response had zero entries"))
}
// Pair each response with its request.
count := 0
for _, r := range resp.Resp {
// Handle error conditions.
switch {
case r.Index < 0, int(r.Index) >= len(entries):
logging.Warningf(ctx, "Response included invalid index %d (%d entries).", r.Index, len(entries))
continue
case entries[r.Index] == nil:
logging.Warningf(ctx, "Response included duplicate entry for index %d.", r.Index)
continue
}
entries[r.Index].complete <- r
entries[r.Index] = nil
count++
}
// Fast path: if our count equals the number of entries, then we've processed
// them all.
if count == len(entries) {
return
}
// Figure out which entries we didn't process and resubmit.
count = 0
for _, be := range entries {
if be == nil {
// Already processed.
continue
}
if err := c.addEntry(be); err != nil {
// This was already added successfully, so it can't fail here.
panic(errors.Annotate(err, "failed to re-add entry").Err())
}
count++
}
logging.Debugf(ctx, "Resubmitting %d unprocessed entr[y|ies].", count)
}
type batchEntry struct {
req *s.BatchRequest_Entry
ctx context.Context
opts []grpc.CallOption
complete chan *s.BatchResponse_Entry
}