forked from vitessio/vitess
-
Notifications
You must be signed in to change notification settings - Fork 2
/
keyrange_filter.go
186 lines (171 loc) · 6.44 KB
/
keyrange_filter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
/*
Copyright 2017 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package binlog
import (
"github.com/xsec-lab/go/vt/key"
"github.com/xsec-lab/go/vt/log"
"github.com/xsec-lab/go/vt/sqlannotation"
"errors"
"fmt"
"github.com/xsec-lab/go/vt/sqlparser"
binlogdatapb "github.com/xsec-lab/go/vt/proto/binlogdata"
querypb "github.com/xsec-lab/go/vt/proto/query"
topodatapb "github.com/xsec-lab/go/vt/proto/topodata"
)
// KeyRangeFilterFunc returns a function that calls callback only if statements
// in the transaction match the specified keyrange. The resulting function can be
// passed into the Streamer: bls.Stream(file, pos, sendTransaction) ->
// bls.Stream(file, pos, KeyRangeFilterFunc(keyrange, sendTransaction))
func KeyRangeFilterFunc(keyrange *topodatapb.KeyRange, callback func(*binlogdatapb.BinlogTransaction) error) sendTransactionFunc {
return func(eventToken *querypb.EventToken, statements []FullBinlogStatement) error {
matched := false
filtered := make([]*binlogdatapb.BinlogTransaction_Statement, 0, len(statements))
for _, statement := range statements {
switch statement.Statement.Category {
case binlogdatapb.BinlogTransaction_Statement_BL_SET:
filtered = append(filtered, statement.Statement)
case binlogdatapb.BinlogTransaction_Statement_BL_DDL:
log.Warningf("Not forwarding DDL: %s", statement.Statement.Sql)
continue
case binlogdatapb.BinlogTransaction_Statement_BL_INSERT,
binlogdatapb.BinlogTransaction_Statement_BL_UPDATE,
binlogdatapb.BinlogTransaction_Statement_BL_DELETE:
// Handle RBR case first.
if statement.KeyspaceID != nil {
if !key.KeyRangeContains(keyrange, statement.KeyspaceID) {
// Skip keyspace ids that don't belong to the destination shard.
continue
}
filtered = append(filtered, statement.Statement)
matched = true
continue
}
// SBR case.
keyspaceIDS, err := sqlannotation.ExtractKeyspaceIDS(string(statement.Statement.Sql))
if err != nil {
if statement.Statement.Category == binlogdatapb.BinlogTransaction_Statement_BL_INSERT {
// TODO(erez): Stop filtered-replication here, and alert.
logExtractKeySpaceIDError(err)
continue
}
// If no keyspace IDs are found, we replicate to all targets.
// This is safe for UPDATE and DELETE because vttablet rewrites queries to
// include the primary key and the query will only affect the shards that
// have the rows.
filtered = append(filtered, statement.Statement)
matched = true
continue
}
if len(keyspaceIDS) == 1 {
if !key.KeyRangeContains(keyrange, keyspaceIDS[0]) {
// Skip keyspace ids that don't belong to the destination shard.
continue
}
filtered = append(filtered, statement.Statement)
matched = true
continue
}
query, err := getValidRangeQuery(string(statement.Statement.Sql), keyspaceIDS, keyrange)
if err != nil {
log.Errorf("Error parsing statement (%s). Got %v", string(statement.Statement.Sql), err)
continue
}
if query == "" {
continue
}
splitStatement := &binlogdatapb.BinlogTransaction_Statement{
Category: statement.Statement.Category,
Charset: statement.Statement.Charset,
Sql: []byte(query),
}
filtered = append(filtered, splitStatement)
matched = true
case binlogdatapb.BinlogTransaction_Statement_BL_UNRECOGNIZED:
updateStreamErrors.Add("KeyRangeStream", 1)
log.Errorf("Error parsing keyspace id: %s", statement.Statement.Sql)
continue
}
}
trans := &binlogdatapb.BinlogTransaction{
EventToken: eventToken,
}
if matched {
trans.Statements = filtered
}
return callback(trans)
}
}
func getValidRangeQuery(sql string, keyspaceIDs [][]byte, keyrange *topodatapb.KeyRange) (query string, err error) {
statement, err := sqlparser.Parse(sql)
_, marginComments := sqlparser.SplitMarginComments(sql)
if err != nil {
return "", err
}
switch statement := statement.(type) {
case *sqlparser.Insert:
query, err := generateSingleInsertQuery(statement, keyspaceIDs, marginComments, keyrange)
if err != nil {
return "", err
}
return query, nil
default:
return "", errors.New("unsupported construct ")
}
}
func generateSingleInsertQuery(ins *sqlparser.Insert, keyspaceIDs [][]byte, marginComments sqlparser.MarginComments, keyrange *topodatapb.KeyRange) (query string, err error) {
switch rows := ins.Rows.(type) {
case *sqlparser.Select, *sqlparser.Union:
return "", errors.New("unsupported: insert into select")
case sqlparser.Values:
var values sqlparser.Values
if len(rows) != len(keyspaceIDs) {
return "", fmt.Errorf("length of values tuples %v doesn't match with length of keyspaceids %v", len(values), len(keyspaceIDs))
}
queryBuf := sqlparser.NewTrackedBuffer(nil)
queryBuf.WriteString(marginComments.Leading)
for rowNum, val := range rows {
if key.KeyRangeContains(keyrange, keyspaceIDs[rowNum]) {
values = append(values, val)
}
}
if len(values) == 0 {
return "", nil
}
ins.Rows = values
ins.Format(queryBuf)
queryBuf.WriteString(marginComments.Trailing)
return queryBuf.String(), nil
default:
return "", errors.New("unexpected construct in insert")
}
}
func logExtractKeySpaceIDError(err error) {
extractErr, ok := err.(*sqlannotation.ExtractKeySpaceIDError)
if !ok {
log.Fatalf("Expected sqlannotation.ExtractKeySpaceIDError. Got: %v", err)
}
switch extractErr.Kind {
case sqlannotation.ExtractKeySpaceIDParseError:
log.Errorf(
"Error parsing keyspace id annotation. Skipping statement. (%s)", extractErr.Message)
updateStreamErrors.Add("ExtractKeySpaceIDParseError", 1)
case sqlannotation.ExtractKeySpaceIDReplicationUnfriendlyError:
log.Errorf(
"Found replication unfriendly statement. (%s). "+
"Filtered replication should abort, but we're currenty just skipping the statement.",
extractErr.Message)
updateStreamErrors.Add("ExtractKeySpaceIDReplicationUnfriendlyError", 1)
default:
log.Fatalf("Unexpected extractErr.Kind. (%v)", extractErr)
}
}