-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
vtgateconn.go
237 lines (196 loc) · 8.26 KB
/
vtgateconn.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
/*
Copyright 2019 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package vtgateconn
import (
"context"
"fmt"
"sync"
"github.com/spf13/pflag"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/servenv"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
)
// vtgateProtocol defines the RPC implementation used for connecting to vtgate.
var vtgateProtocol = "grpc"
func registerFlags(fs *pflag.FlagSet) {
fs.StringVar(&vtgateProtocol, "vtgate_protocol", vtgateProtocol, "how to talk to vtgate")
}
func init() {
servenv.OnParseFor("vttablet", registerFlags)
servenv.OnParseFor("vtclient", registerFlags)
}
// GetVTGateProtocol returns the protocol used to connect to vtgate as provided in the flag.
func GetVTGateProtocol() string {
return vtgateProtocol
}
// SetVTGateProtocol set the protocol to be used to connect to vtgate.
func SetVTGateProtocol(protocol string) {
vtgateProtocol = protocol
}
// VTGateConn is the client API object to talk to vtgate.
// It can support concurrent sessions.
// It is constructed using the Dial method.
type VTGateConn struct {
impl Impl
}
// Session returns a VTGateSession that can be used to access execution functions.
func (conn *VTGateConn) Session(targetString string, options *querypb.ExecuteOptions) *VTGateSession {
return &VTGateSession{
session: &vtgatepb.Session{
TargetString: targetString,
Options: options,
Autocommit: true,
},
impl: conn.impl,
}
}
// SessionPb returns the underlying proto session.
func (sn *VTGateSession) SessionPb() *vtgatepb.Session {
return sn.session
}
// SessionFromPb returns a VTGateSession based on the provided proto session.
func (conn *VTGateConn) SessionFromPb(sn *vtgatepb.Session) *VTGateSession {
return &VTGateSession{
session: sn,
impl: conn.impl,
}
}
// Close must be called for releasing resources.
func (conn *VTGateConn) Close() {
conn.impl.Close()
conn.impl = nil
}
// VStreamReader is returned by VStream.
type VStreamReader interface {
// Recv returns the next result on the stream.
// It will return io.EOF if the stream ended.
Recv() ([]*binlogdatapb.VEvent, error)
}
// VStream streams binlog events.
func (conn *VTGateConn) VStream(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid,
filter *binlogdatapb.Filter, flags *vtgatepb.VStreamFlags) (VStreamReader, error) {
return conn.impl.VStream(ctx, tabletType, vgtid, filter, flags)
}
// VTGateSession exposes the Vitess Execution API to the clients.
// The object maintains client-side state and is comparable to a native MySQL connection.
// For example, if you enable autocommit on a Session object, all subsequent calls will respect this.
// Functions within an object must not be called concurrently.
// You can create as many objects as you want.
// All of them will share the underlying connection to vtgate ("VTGateConn" object).
type VTGateSession struct {
session *vtgatepb.Session
impl Impl
}
// Execute performs a VTGate Execute.
func (sn *VTGateSession) Execute(ctx context.Context, query string, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) {
session, res, err := sn.impl.Execute(ctx, sn.session, query, bindVars)
sn.session = session
return res, err
}
// ExecuteBatch executes a list of queries on vtgate within the current transaction.
func (sn *VTGateSession) ExecuteBatch(ctx context.Context, query []string, bindVars []map[string]*querypb.BindVariable) ([]sqltypes.QueryResponse, error) {
session, res, errs := sn.impl.ExecuteBatch(ctx, sn.session, query, bindVars)
sn.session = session
return res, errs
}
// StreamExecute executes a streaming query on vtgate.
// It returns a ResultStream and an error. First check the
// error. Then you can pull values from the ResultStream until io.EOF,
// or another error.
func (sn *VTGateSession) StreamExecute(ctx context.Context, query string, bindVars map[string]*querypb.BindVariable) (sqltypes.ResultStream, error) {
// passing in the function that will update the session when received on the stream.
return sn.impl.StreamExecute(ctx, sn.session, query, bindVars, func(response *vtgatepb.StreamExecuteResponse) {
if response.Session != nil {
sn.session = response.Session
}
})
}
// Prepare performs a VTGate Prepare.
func (sn *VTGateSession) Prepare(ctx context.Context, query string, bindVars map[string]*querypb.BindVariable) ([]*querypb.Field, error) {
session, fields, err := sn.impl.Prepare(ctx, sn.session, query, bindVars)
sn.session = session
return fields, err
}
//
// The rest of this file is for the protocol implementations.
//
// Impl defines the interface for a vtgate client protocol
// implementation. It can be used concurrently across goroutines.
type Impl interface {
// Execute executes a non-streaming query on vtgate.
Execute(ctx context.Context, session *vtgatepb.Session, query string, bindVars map[string]*querypb.BindVariable) (*vtgatepb.Session, *sqltypes.Result, error)
// ExecuteBatch executes a non-streaming queries on vtgate.
ExecuteBatch(ctx context.Context, session *vtgatepb.Session, queryList []string, bindVarsList []map[string]*querypb.BindVariable) (*vtgatepb.Session, []sqltypes.QueryResponse, error)
// StreamExecute executes a streaming query on vtgate.
StreamExecute(ctx context.Context, session *vtgatepb.Session, query string, bindVars map[string]*querypb.BindVariable, processResponse func(*vtgatepb.StreamExecuteResponse)) (sqltypes.ResultStream, error)
// Prepare returns the fields information for the query as part of supporting prepare statements.
Prepare(ctx context.Context, session *vtgatepb.Session, sql string, bindVariables map[string]*querypb.BindVariable) (*vtgatepb.Session, []*querypb.Field, error)
// CloseSession closes the session provided by rolling back any active transaction.
CloseSession(ctx context.Context, session *vtgatepb.Session) error
// VStream streams binlogevents
VStream(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter, flags *vtgatepb.VStreamFlags) (VStreamReader, error)
// Close must be called for releasing resources.
Close()
}
// DialerFunc represents a function that will return an Impl
// object that can communicate with a VTGate.
type DialerFunc func(ctx context.Context, address string) (Impl, error)
var (
dialers = make(map[string]DialerFunc)
dialersM sync.Mutex
)
// RegisterDialer is meant to be used by Dialer implementations
// to self register.
func RegisterDialer(name string, dialer DialerFunc) {
dialersM.Lock()
defer dialersM.Unlock()
if _, ok := dialers[name]; ok {
log.Warningf("Dialer %s already exists, overwriting it", name)
}
dialers[name] = dialer
}
// DeregisterDialer removes the named DialerFunc from the registered list of
// dialers. If the named DialerFunc does not exist, it is a noop.
//
// This is useful to avoid unbounded memory use if many different dialer
// implementations are used throughout the lifetime of a program.
func DeregisterDialer(name string) {
dialersM.Lock()
defer dialersM.Unlock()
delete(dialers, name)
}
// DialProtocol dials a specific protocol, and returns the *VTGateConn
func DialProtocol(ctx context.Context, protocol string, address string) (*VTGateConn, error) {
dialersM.Lock()
dialer, ok := dialers[protocol]
dialersM.Unlock()
if !ok {
return nil, fmt.Errorf("no dialer registered for VTGate protocol %s", protocol)
}
impl, err := dialer(ctx, address)
if err != nil {
return nil, err
}
return &VTGateConn{
impl: impl,
}, nil
}
// Dial dials using the command-line specified protocol, and returns
// the *VTGateConn.
func Dial(ctx context.Context, address string) (*VTGateConn, error) {
return DialProtocol(ctx, vtgateProtocol, address)
}