-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
row_splitter.go
89 lines (81 loc) · 2.47 KB
/
row_splitter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
// Copyright 2014, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package worker
import (
"fmt"
mproto "github.com/youtube/vitess/go/mysql/proto"
"github.com/youtube/vitess/go/sqltypes"
"github.com/youtube/vitess/go/vt/key"
"github.com/youtube/vitess/go/vt/topo"
)
// RowSplitter is a helper class to split rows into multiple
// subsets targeted to different shards.
type RowSplitter struct {
Type key.KeyspaceIdType
ValueIndex int
KeyRanges []key.KeyRange
}
// NewRowSplitter returns a new row splitter for the given shard distribution.
func NewRowSplitter(shardInfos []*topo.ShardInfo, typ key.KeyspaceIdType, valueIndex int) *RowSplitter {
result := &RowSplitter{
Type: typ,
ValueIndex: valueIndex,
KeyRanges: make([]key.KeyRange, len(shardInfos)),
}
for i, si := range shardInfos {
result.KeyRanges[i] = key.ProtoToKeyRange(si.KeyRange)
}
return result
}
// StartSplit starts a new split. Split can then be called multiple times.
func (rs *RowSplitter) StartSplit() [][][]sqltypes.Value {
return make([][][]sqltypes.Value, len(rs.KeyRanges))
}
// Split will split the rows into subset for each distribution
func (rs *RowSplitter) Split(result [][][]sqltypes.Value, rows [][]sqltypes.Value) error {
if rs.Type == key.KIT_UINT64 {
for _, row := range rows {
v := sqltypes.MakeNumeric(row[rs.ValueIndex].Raw())
i, err := v.ParseUint64()
if err != nil {
return fmt.Errorf("Non numerical value: %v", err)
}
k := key.Uint64Key(i).KeyspaceId()
for i, kr := range rs.KeyRanges {
if kr.Contains(k) {
result[i] = append(result[i], row)
break
}
}
}
} else {
for _, row := range rows {
k := key.KeyspaceId(row[rs.ValueIndex].Raw())
for i, kr := range rs.KeyRanges {
if kr.Contains(k) {
result[i] = append(result[i], row)
break
}
}
}
}
return nil
}
// Send will send the rows to the list of channels. Returns true if aborted.
func (rs *RowSplitter) Send(fields []mproto.Field, result [][][]sqltypes.Value, baseCmd string, insertChannels []chan string, abort <-chan struct{}) bool {
for i, c := range insertChannels {
// one of the chunks might be empty, so no need
// to send data in that case
if len(result[i]) > 0 {
cmd := baseCmd + makeValueString(fields, result[i])
// also check on abort, so we don't wait forever
select {
case c <- cmd:
case <-abort:
return true
}
}
}
return false
}