-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
plugin_mysql_server.go
137 lines (115 loc) · 4.58 KB
/
plugin_mysql_server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package vtgate
import (
"flag"
"fmt"
"net"
log "github.com/golang/glog"
"golang.org/x/net/context"
"github.com/youtube/vitess/go/mysqlconn"
"github.com/youtube/vitess/go/sqldb"
"github.com/youtube/vitess/go/sqltypes"
"github.com/youtube/vitess/go/vt/callerid"
"github.com/youtube/vitess/go/vt/servenv"
querypb "github.com/youtube/vitess/go/vt/proto/query"
topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
vtgatepb "github.com/youtube/vitess/go/vt/proto/vtgate"
"github.com/youtube/vitess/go/vt/servenv/grpcutils"
)
var (
mysqlServerPort = flag.Int("mysql_server_port", 0, "If set, also listen for MySQL binary protocol connections on this port.")
mysqlAuthServerImpl = flag.String("mysql_auth_server_impl", "static", "Which auth server implementation to use.")
mysqlAllowClearTextWithoutTLS = flag.Bool("mysql_allow_clear_text_without_tls", false, "If set, the server will allow the use of a clear text password over non-SSL connections.")
mysqlSslCert = flag.String("mysql_server_ssl_cert", "", "Path to the ssl cert for mysql server plugin SSL")
mysqlSslKey = flag.String("mysql_server_ssl_key", "", "Path to ssl key for mysql server plugin SSL")
mysqlSslCa = flag.String("mysql_server_ssl_ca", "", "Path to ssl CA for mysql server plugin SSL. If specified, server will require and validate client certs.")
)
// vtgateHandler implements the Listener interface.
// It stores the Session in the ClientData of a Connection, if a transaction
// is in progress.
type vtgateHandler struct {
vtg *VTGate
}
func newVtgateHandler(vtg *VTGate) *vtgateHandler {
return &vtgateHandler{
vtg: vtg,
}
}
func (vh *vtgateHandler) NewConnection(c *mysqlconn.Conn) {
}
func (vh *vtgateHandler) ConnectionClosed(c *mysqlconn.Conn) {
// Rollback if there is an ongoing transaction. Ignore error.
ctx := context.Background()
session, _ := c.ClientData.(*vtgatepb.Session)
if session == nil || !session.InTransaction {
return
}
_, _, _ = vh.vtg.Execute(ctx, "rollback", make(map[string]interface{}), "", topodatapb.TabletType_MASTER, session, false, &querypb.ExecuteOptions{})
}
func (vh *vtgateHandler) ComQuery(c *mysqlconn.Conn, query string) (*sqltypes.Result, error) {
// FIXME(alainjobart): Add some kind of timeout to the context.
ctx := context.Background()
// Fill in the ImmediateCallerID with the UserData returned by
// the AuthServer plugin for that user. If nothing was
// returned, use the User. This lets the plugin map a MySQL
// user used for authentication to a Vitess User used for
// Table ACLs and Vitess authentication in general.
im := c.UserData.Get()
ef := callerid.NewEffectiveCallerID(
c.User, /* principal: who */
c.RemoteAddr().String(), /* component: running client process */
"VTGate MySQL Connector" /* subcomponent: part of the client */)
ctx = callerid.NewContext(ctx, ef, im)
session, _ := c.ClientData.(*vtgatepb.Session)
session, result, err := vh.vtg.Execute(ctx, query, make(map[string]interface{}), c.SchemaName, topodatapb.TabletType_MASTER, session, false /* notInTransaction */, &querypb.ExecuteOptions{
IncludedFields: querypb.ExecuteOptions_ALL,
})
c.ClientData = session
return result, sqldb.NewSQLErrorFromError(err)
}
func init() {
var listener *mysqlconn.Listener
servenv.OnRun(func() {
// Flag is not set, just return.
if *mysqlServerPort == 0 {
return
}
// If no VTGate was created, just return.
if rpcVTGate == nil {
return
}
// Initialize registered AuthServer implementations (or other plugins)
for _, initFn := range pluginInitializers {
initFn()
}
authServer := mysqlconn.GetAuthServer(*mysqlAuthServerImpl)
// Create a Listener.
var err error
vh := newVtgateHandler(rpcVTGate)
listener, err = mysqlconn.NewListener("tcp", net.JoinHostPort("", fmt.Sprintf("%v", *mysqlServerPort)), authServer, vh)
if err != nil {
log.Fatalf("mysqlconn.NewListener failed: %v", err)
}
if *mysqlSslCert != "" && *mysqlSslKey != "" {
listener.TLSConfig, err = grpcutils.TLSServerConfig(*mysqlSslCert, *mysqlSslKey, *mysqlSslCa)
if err != nil {
log.Fatalf("grpcutils.TLSServerConfig failed: %v", err)
return
}
}
listener.AllowClearTextWithoutTLS = *mysqlAllowClearTextWithoutTLS
// And starts listening.
go func() {
listener.Accept()
}()
})
servenv.OnTerm(func() {
if listener != nil {
listener.Close()
}
})
}
var pluginInitializers []func()
// RegisterPluginInitializer lets plugins register themselves to be init'ed at servenv.OnRun-time
func RegisterPluginInitializer(initializer func()) {
pluginInitializers = append(pluginInitializers, initializer)
}