forked from cockroachdb/cockroach
/
routers.go
586 lines (520 loc) · 16.9 KB
/
routers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
// Copyright 2016 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
//
// Routers are used by processors to direct outgoing rows to (potentially)
// multiple streams; see docs/RFCS/distributed_sql.md
package distsqlrun
import (
"bytes"
"hash/crc32"
"sort"
"sync"
"sync/atomic"
"golang.org/x/net/context"
"github.com/pkg/errors"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)
type router interface {
RowReceiver
startable
init(flowCtx *FlowCtx, types []sqlbase.ColumnType)
}
// makeRouter creates a router. The router's init must be called before the
// router can be started.
//
// Pass-through routers are not supported; the higher layer is expected to elide
// them.
func makeRouter(spec *OutputRouterSpec, streams []RowReceiver) (router, error) {
if len(streams) == 0 {
return nil, errors.Errorf("no streams in router")
}
switch spec.Type {
case OutputRouterSpec_BY_HASH:
return makeHashRouter(spec.HashColumns, streams)
case OutputRouterSpec_MIRROR:
return makeMirrorRouter(streams)
case OutputRouterSpec_BY_RANGE:
return makeRangeRouter(streams, spec.RangeRouterSpec)
default:
return nil, errors.Errorf("router type %s not supported", spec.Type)
}
}
const routerRowBufSize = rowChannelBufSize
// routerOutput is the data associated with one router consumer.
type routerOutput struct {
stream RowReceiver
mu struct {
syncutil.Mutex
// cond is signaled whenever the main router routine adds a metadata item, a
// row, or sets producerDone.
cond *sync.Cond
streamStatus ConsumerStatus
metadataBuf []ProducerMetadata
// The "level 1" row buffer is used first, to avoid going through the row
// container if we don't need to buffer many rows. The buffer is a circular
// FIFO queue, with rowBufLen elements and the left-most (oldest) element at
// rowBufLeft.
rowBuf [routerRowBufSize]sqlbase.EncDatumRow
rowBufLeft, rowBufLen uint32
// The "level 2" rowContainer is used when we need to buffer more rows than
// rowBuf allows. The row container always contains rows "older" than those
// in rowBuf. The oldest rows are at the beginning of the row container.
// TODO(radu,arjun): fall back to a disk container when it gets too big.
rowContainer memRowContainer
producerDone bool
}
// TODO(radu): add padding of size sys.CacheLineSize to ensure there is no
// false-sharing?
}
func (ro *routerOutput) addMetadataLocked(meta ProducerMetadata) {
// We don't need any fancy buffering because normally there is not a lot of
// metadata being passed around.
ro.mu.metadataBuf = append(ro.mu.metadataBuf, meta)
}
// addRowLocked adds a row to rowBuf (potentially evicting the oldest row into
// rowContainer).
func (ro *routerOutput) addRowLocked(ctx context.Context, row sqlbase.EncDatumRow) error {
if ro.mu.streamStatus != NeedMoreRows {
// The consumer doesn't want more rows; drop the row.
return nil
}
if ro.mu.rowBufLen == routerRowBufSize {
// Take out the oldest row in rowBuf and put it in rowContainer.
evictedRow := ro.mu.rowBuf[ro.mu.rowBufLeft]
if err := ro.mu.rowContainer.AddRow(ctx, evictedRow); err != nil {
return err
}
ro.mu.rowBufLeft = (ro.mu.rowBufLeft + 1) % routerRowBufSize
ro.mu.rowBufLen--
}
ro.mu.rowBuf[(ro.mu.rowBufLeft+ro.mu.rowBufLen)%routerRowBufSize] = row
ro.mu.rowBufLen++
return nil
}
func (ro *routerOutput) popRowsLocked(rowBuf []sqlbase.EncDatumRow) []sqlbase.EncDatumRow {
n := 0
// First try to get rows from the row container.
for ; n < len(rowBuf) && ro.mu.rowContainer.Len() > 0; n++ {
// TODO(radu): use an EncDatumRowAlloc?
row := ro.mu.rowContainer.EncRow(0)
rowBuf[n] = make(sqlbase.EncDatumRow, len(row))
copy(rowBuf[n], row)
ro.mu.rowContainer.PopFirst()
}
// If the row container is empty, get more rows from the row buffer.
for ; n < len(rowBuf) && ro.mu.rowBufLen > 0; n++ {
rowBuf[n] = ro.mu.rowBuf[ro.mu.rowBufLeft]
ro.mu.rowBufLeft = (ro.mu.rowBufLeft + 1) % routerRowBufSize
ro.mu.rowBufLen--
}
return rowBuf[:n]
}
// See the comment for routerBase.semaphoreCount.
const semaphorePeriod = 8
type routerBase struct {
outputs []routerOutput
// How many of streams are not in the DrainRequested or ConsumerClosed state.
numNonDrainingStreams int32
// aggregatedStatus is an atomic that maintains a unified view across all
// streamStatus'es. Namely, if at least one of them is NeedMoreRows, this
// will be NeedMoreRows. If all of them are ConsumerClosed, this will
// (eventually) be ConsumerClosed. Otherwise, this will be DrainRequested.
aggregatedStatus uint32
// We use a semaphore of size len(outputs) and acquire it whenever we Push
// to each stream as well as in the router's main Push routine. This ensures
// that if all outputs are blocked, the main router routine blocks as well
// (preventing runaway buffering if the source is faster than the consumers).
semaphore chan struct{}
// To reduce synchronization overhead, we only acquire the semaphore once for
// every semaphorePeriod rows. This count keeps track of how many rows we
// saw since the last time we took the semaphore.
semaphoreCount int32
}
func (rb *routerBase) aggStatus() ConsumerStatus {
return ConsumerStatus(atomic.LoadUint32(&rb.aggregatedStatus))
}
func (rb *routerBase) setupStreams(streams []RowReceiver) {
rb.numNonDrainingStreams = int32(len(streams))
rb.semaphore = make(chan struct{}, len(streams))
rb.outputs = make([]routerOutput, len(streams))
for i := range rb.outputs {
ro := &rb.outputs[i]
ro.stream = streams[i]
ro.mu.cond = sync.NewCond(&ro.mu.Mutex)
ro.mu.streamStatus = NeedMoreRows
}
}
// init must be called after setupStreams but before start.
func (rb *routerBase) init(flowCtx *FlowCtx, types []sqlbase.ColumnType) {
for i := range rb.outputs {
// This method must be called before we start() so we don't need
// to take the mutex.
rb.outputs[i].mu.rowContainer.init(nil /* ordering */, types, &flowCtx.EvalCtx)
}
}
// start must be called after init.
func (rb *routerBase) start(ctx context.Context, wg *sync.WaitGroup, ctxCancel context.CancelFunc) {
wg.Add(len(rb.outputs))
for i := range rb.outputs {
go func(rb *routerBase, ro *routerOutput, wg *sync.WaitGroup) {
rowBuf := make([]sqlbase.EncDatumRow, routerRowBufSize)
streamStatus := NeedMoreRows
ro.mu.Lock()
for {
// Send any metadata that has been buffered. Note that we are not
// maintaining the relative ordering between metadata items and rows
// (but it doesn't matter).
if len(ro.mu.metadataBuf) > 0 {
m := ro.mu.metadataBuf[0]
// Reset the value so any objects it refers to can be garbage
// collected.
ro.mu.metadataBuf[0] = ProducerMetadata{}
ro.mu.metadataBuf = ro.mu.metadataBuf[1:]
ro.mu.Unlock()
rb.semaphore <- struct{}{}
status := ro.stream.Push(nil /*row*/, m)
<-rb.semaphore
rb.updateStreamState(&streamStatus, status)
ro.mu.Lock()
ro.mu.streamStatus = streamStatus
continue
}
// Send any rows that have been buffered. We grab multiple rows at a
// time to reduce contention.
if rows := ro.popRowsLocked(rowBuf); len(rows) > 0 {
ro.mu.Unlock()
rb.semaphore <- struct{}{}
for _, row := range rows {
status := ro.stream.Push(row, ProducerMetadata{})
rb.updateStreamState(&streamStatus, status)
}
<-rb.semaphore
ro.mu.Lock()
ro.mu.streamStatus = streamStatus
continue
}
// No rows or metadata buffered; see if the producer is done.
if ro.mu.producerDone {
ro.stream.ProducerDone()
break
}
// Nothing to do; wait.
ro.mu.cond.Wait()
}
ro.mu.rowContainer.Close(ctx)
ro.mu.Unlock()
wg.Done()
}(rb, &rb.outputs[i], wg)
}
}
// ProducerDone is part of the RowReceiver interface.
func (rb *routerBase) ProducerDone() {
for i := range rb.outputs {
o := &rb.outputs[i]
o.mu.Lock()
o.mu.producerDone = true
o.mu.Unlock()
o.mu.cond.Signal()
}
}
// updateStreamState updates the status of one stream and, if this was the last
// open stream, it also updates rb.aggregatedStatus.
func (rb *routerBase) updateStreamState(streamStatus *ConsumerStatus, newState ConsumerStatus) {
if newState != *streamStatus {
if *streamStatus == NeedMoreRows {
// A stream state never goes from draining to non-draining, so we can assume
// that this stream is now draining or closed.
if atomic.AddInt32(&rb.numNonDrainingStreams, -1) == 0 {
// Update aggregatedStatus, if the current value is NeedMoreRows.
atomic.CompareAndSwapUint32(
&rb.aggregatedStatus,
uint32(NeedMoreRows),
uint32(DrainRequested),
)
}
}
*streamStatus = newState
}
}
// fwdMetadata forwards a metadata record to the first stream that's still
// accepting data.
func (rb *routerBase) fwdMetadata(meta ProducerMetadata) {
if meta.Empty() {
log.Fatalf(context.TODO(), "asked to fwd empty metadata")
}
rb.semaphore <- struct{}{}
for i := range rb.outputs {
ro := &rb.outputs[i]
ro.mu.Lock()
if ro.mu.streamStatus != ConsumerClosed {
ro.addMetadataLocked(meta)
ro.mu.Unlock()
ro.mu.cond.Signal()
<-rb.semaphore
return
}
ro.mu.Unlock()
}
<-rb.semaphore
// If we got here it means that we couldn't even forward metadata anywhere;
// all streams are closed.
atomic.StoreUint32(&rb.aggregatedStatus, uint32(ConsumerClosed))
}
func (rb *routerBase) shouldUseSemaphore() bool {
rb.semaphoreCount++
if rb.semaphoreCount >= semaphorePeriod {
rb.semaphoreCount = 0
return true
}
return false
}
type mirrorRouter struct {
routerBase
}
type hashRouter struct {
routerBase
hashCols []uint32
buffer []byte
alloc sqlbase.DatumAlloc
}
// rangeRouter is a router that assumes the keyColumn'th column of incoming
// rows is a roachpb.Key, and maps it to a stream based on a matching
// span. That is, keys in the nth span will be mapped to the nth stream. The
// keyColumn must be of type DBytes (or optionally DNull if defaultDest
// is set).
type rangeRouter struct {
routerBase
alloc sqlbase.DatumAlloc
// b is a temp storage location used during encoding
b []byte
encodings []OutputRouterSpec_RangeRouterSpec_ColumnEncoding
spans []OutputRouterSpec_RangeRouterSpec_Span
// defaultDest, if set, sends any row not matching a span to this stream. If
// not set and a non-matching row is encountered, an error is returned and
// the router is shut down.
defaultDest *int
}
var _ RowReceiver = &mirrorRouter{}
var _ RowReceiver = &hashRouter{}
var _ RowReceiver = &rangeRouter{}
func makeMirrorRouter(streams []RowReceiver) (router, error) {
if len(streams) < 2 {
return nil, errors.Errorf("need at least two streams for mirror router")
}
mr := &mirrorRouter{}
mr.routerBase.setupStreams(streams)
return mr, nil
}
// Push is part of the RowReceiver interface.
func (mr *mirrorRouter) Push(row sqlbase.EncDatumRow, meta ProducerMetadata) ConsumerStatus {
aggStatus := mr.aggStatus()
if !meta.Empty() {
mr.fwdMetadata(meta)
return aggStatus
}
if aggStatus != NeedMoreRows {
return aggStatus
}
useSema := mr.shouldUseSemaphore()
if useSema {
mr.semaphore <- struct{}{}
}
for i := range mr.outputs {
ro := &mr.outputs[i]
ro.mu.Lock()
err := ro.addRowLocked(context.TODO(), row)
ro.mu.Unlock()
if err != nil {
if useSema {
<-mr.semaphore
}
mr.fwdMetadata(ProducerMetadata{Err: err})
atomic.StoreUint32(&mr.aggregatedStatus, uint32(ConsumerClosed))
return ConsumerClosed
}
ro.mu.cond.Signal()
}
if useSema {
<-mr.semaphore
}
return aggStatus
}
var crc32Table = crc32.MakeTable(crc32.Castagnoli)
func makeHashRouter(hashCols []uint32, streams []RowReceiver) (router, error) {
if len(streams) < 2 {
return nil, errors.Errorf("need at least two streams for hash router")
}
if len(hashCols) == 0 {
return nil, errors.Errorf("no hash columns for BY_HASH router")
}
hr := &hashRouter{hashCols: hashCols}
hr.routerBase.setupStreams(streams)
return hr, nil
}
// Push is part of the RowReceiver interface.
//
// If, according to the hash, the row needs to go to a consumer that's draining
// or closed, the row is silently dropped.
func (hr *hashRouter) Push(row sqlbase.EncDatumRow, meta ProducerMetadata) ConsumerStatus {
aggStatus := hr.aggStatus()
if !meta.Empty() {
hr.fwdMetadata(meta)
// fwdMetadata can change the status, re-read it.
return hr.aggStatus()
}
if aggStatus != NeedMoreRows {
return aggStatus
}
useSema := hr.shouldUseSemaphore()
if useSema {
hr.semaphore <- struct{}{}
}
streamIdx, err := hr.computeDestination(row)
if err == nil {
ro := &hr.outputs[streamIdx]
ro.mu.Lock()
err = ro.addRowLocked(context.TODO(), row)
ro.mu.Unlock()
ro.mu.cond.Signal()
}
if useSema {
<-hr.semaphore
}
if err != nil {
hr.fwdMetadata(ProducerMetadata{Err: err})
atomic.StoreUint32(&hr.aggregatedStatus, uint32(ConsumerClosed))
return ConsumerClosed
}
return aggStatus
}
// computeDestination hashes a row and returns the index of the output stream on
// which it must be sent.
func (hr *hashRouter) computeDestination(row sqlbase.EncDatumRow) (int, error) {
hr.buffer = hr.buffer[:0]
for _, col := range hr.hashCols {
if int(col) >= len(row) {
err := errors.Errorf("hash column %d, row with only %d columns", col, len(row))
return -1, err
}
// TODO(radu): we should choose an encoding that is already available as
// much as possible. However, we cannot decide this locally as multiple
// nodes may be doing the same hashing and the encodings need to match. The
// encoding needs to be determined at planning time. #13829
var err error
hr.buffer, err = row[col].Encode(&hr.alloc, preferredEncoding, hr.buffer)
if err != nil {
return -1, err
}
}
// We use CRC32-C because it makes for a decent hash function and is faster
// than most hashing algorithms (on recent x86 platforms where it is hardware
// accelerated).
return int(crc32.Update(0, crc32Table, hr.buffer) % uint32(len(hr.outputs))), nil
}
func makeRangeRouter(
streams []RowReceiver, spec OutputRouterSpec_RangeRouterSpec,
) (*rangeRouter, error) {
if len(streams) != len(spec.Spans) {
return nil, errors.Errorf("number of streams (%d) must match spans (%d)", len(streams), len(spec.Spans))
}
if len(spec.Encodings) == 0 {
return nil, errors.New("missing encodings")
}
var defaultDest *int
if spec.DefaultDest != nil {
i := int(*spec.DefaultDest)
defaultDest = &i
}
var prevKey []byte
// Verify spans are sorted and non-overlapping.
for i, span := range spec.Spans {
if bytes.Compare(prevKey, span.Start) > 0 {
return nil, errors.Errorf("span %d not after previous span", i)
}
prevKey = span.End
}
rr := rangeRouter{
spans: spec.Spans,
defaultDest: defaultDest,
encodings: spec.Encodings,
}
rr.routerBase.setupStreams(streams)
return &rr, nil
}
func (rr *rangeRouter) Push(row sqlbase.EncDatumRow, meta ProducerMetadata) ConsumerStatus {
aggStatus := rr.aggStatus()
if !meta.Empty() {
rr.fwdMetadata(meta)
// fwdMetadata can change the status, re-read it.
return rr.aggStatus()
}
useSema := rr.shouldUseSemaphore()
if useSema {
rr.semaphore <- struct{}{}
}
streamIdx, err := rr.computeDestination(row)
if err == nil {
ro := &rr.outputs[streamIdx]
ro.mu.Lock()
err = ro.addRowLocked(context.TODO(), row)
ro.mu.Unlock()
ro.mu.cond.Signal()
}
if useSema {
<-rr.semaphore
}
if err != nil {
rr.fwdMetadata(ProducerMetadata{Err: err})
atomic.StoreUint32(&rr.aggregatedStatus, uint32(ConsumerClosed))
return ConsumerClosed
}
return aggStatus
}
func (rr *rangeRouter) computeDestination(row sqlbase.EncDatumRow) (int, error) {
var err error
rr.b = rr.b[:0]
for _, enc := range rr.encodings {
col := row[enc.Column]
rr.b, err = col.Encode(&rr.alloc, enc.Encoding, rr.b)
if err != nil {
return 0, err
}
}
i := rr.spanForData(rr.b)
if i == -1 {
if rr.defaultDest == nil {
return 0, errors.New("no span found for key")
}
return *rr.defaultDest, nil
}
return i, nil
}
// spanForData returns the index of the first span that data is within
// [start, end). A -1 is returned if no such span is found.
func (rr *rangeRouter) spanForData(data []byte) int {
i := sort.Search(len(rr.spans), func(i int) bool {
return bytes.Compare(rr.spans[i].End, data) > 0
})
// If we didn't find an i where data < end, return an error.
if i == len(rr.spans) {
return -1
}
// Make sure the start is <= data.
if bytes.Compare(rr.spans[i].Start, data) > 0 {
return -1
}
return i
}