Skip to content

Commit

Permalink
Support USE statements, which may optionally have quoted identifiers
Browse files Browse the repository at this point in the history
temporary support for assuming `keyspace/0`
  • Loading branch information
bbeaudreault committed Apr 14, 2017
1 parent 00c7c2d commit 23a1cbd
Showing 1 changed file with 90 additions and 17 deletions.
107 changes: 90 additions & 17 deletions go/vt/vtgate/plugin_mysql_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,22 @@ import (
"github.com/youtube/vitess/go/vt/callerid"
"github.com/youtube/vitess/go/vt/servenv"

"strings"

"sync"

querypb "github.com/youtube/vitess/go/vt/proto/query"
topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
vtgatepb "github.com/youtube/vitess/go/vt/proto/vtgate"
"github.com/youtube/vitess/go/vt/servenv/grpcutils"
"github.com/youtube/vitess/go/vt/sqlparser"
)

var (
mysqlServerPort = flag.Int("mysql_server_port", 0, "If set, also listen for MySQL binary protocol connections on this port.")
mysqlAuthServerImpl = flag.String("mysql_auth_server_impl", "static", "Which auth server implementation to use.")
mysqlAllowClearTextWithoutTLS = flag.Bool("mysql_allow_clear_text_without_tls", false, "If set, the server will allow the use of a clear text password over non-SSL connections.")
assumeSingleDbConnections = flag.Bool("mysql_server_assume_single_db", false, "If set, when someone connects to a keyspace we will force routing to keyspace/0. User can hit all shards by doing keyspace/*")

mysqlSslCert = flag.String("mysql_server_ssl_cert", "", "Path to the ssl cert for mysql server plugin SSL")
mysqlSslKey = flag.String("mysql_server_ssl_key", "", "Path to ssl key for mysql server plugin SSL")
Expand All @@ -35,11 +41,15 @@ var (
// is in progress.
type vtgateHandler struct {
vtg *VTGate

mu sync.Mutex
keyspaceSingleDbCache map[string]string
}

func newVtgateHandler(vtg *VTGate) *vtgateHandler {
return &vtgateHandler{
vtg: vtg,
keyspaceSingleDbCache: make(map[string]string),
}
}

Expand All @@ -60,24 +70,87 @@ func (vh *vtgateHandler) ComQuery(c *mysqlconn.Conn, query string) (*sqltypes.Re
// FIXME(alainjobart): Add some kind of timeout to the context.
ctx := context.Background()

// Fill in the ImmediateCallerID with the UserData returned by
// the AuthServer plugin for that user. If nothing was
// returned, use the User. This lets the plugin map a MySQL
// user used for authentication to a Vitess User used for
// Table ACLs and Vitess authentication in general.
im := c.UserData.Get()
ef := callerid.NewEffectiveCallerID(
c.User, /* principal: who */
c.RemoteAddr().String(), /* component: running client process */
"VTGate MySQL Connector" /* subcomponent: part of the client */)
ctx = callerid.NewContext(ctx, ef, im)
switch {
case sqlparser.HasPrefix(query, "use"):
schema := strings.TrimSpace(query[3:])
// the schema name can be quoted, remove them if necessary
if schema[0] == schema[len(schema)-1] && schema[0] == '`' || schema[0] == '"' || schema[0] == '\'' {
schema = schema[1 : len(schema)-1]
}
if schema == "" {
return nil, fmt.Errorf("Unable to parse schema name from USE statement: %v", schema)
}
c.SchemaName = schema
return &sqltypes.Result{}, nil
default:
// Fill in the ImmediateCallerID with the UserData returned by
// the AuthServer plugin for that user. If nothing was
// returned, use the User. This lets the plugin map a MySQL
// user used for authentication to a Vitess User used for
// Table ACLs and Vitess authentication in general.
im := c.UserData.Get()
ef := callerid.NewEffectiveCallerID(
c.User, /* principal: who */
c.RemoteAddr().String(), /* component: running client process */
"VTGate MySQL Connector" /* subcomponent: part of the client */)
ctx = callerid.NewContext(ctx, ef, im)

session, _ := c.ClientData.(*vtgatepb.Session)
session, result, err := vh.vtg.Execute(ctx, query, make(map[string]interface{}), vh.ensureKeyspaceTarget(ctx, c.SchemaName), topodatapb.TabletType_MASTER, session, false /* notInTransaction */, &querypb.ExecuteOptions{
IncludedFields: querypb.ExecuteOptions_ALL,
})
c.ClientData = session
return result, sqldb.NewSQLErrorFromError(err)
}
}

session, _ := c.ClientData.(*vtgatepb.Session)
session, result, err := vh.vtg.Execute(ctx, query, make(map[string]interface{}), c.SchemaName, topodatapb.TabletType_MASTER, session, false /* notInTransaction */, &querypb.ExecuteOptions{
IncludedFields: querypb.ExecuteOptions_ALL,
})
c.ClientData = session
return result, sqldb.NewSQLErrorFromError(err)
// ensureKeyspaceTarget is a temporary measure to improve the usability of the mysql protocol
// for situations when most keyspaces are unsharded. It will be removed once V3 is improved to handle
// more adhoc, DBA, DDL, etc queries for unsharded keyspaces.
// converts `keyspace` to `keyspace/0` if the keyspace has only one shard.
func (vh *vtgateHandler) ensureKeyspaceTarget(ctx context.Context, schema string) string {
if schema == "" {
return schema
}
keyspace, shard := parseKeyspaceOptionalShard(schema)
// allow `keyspace/*` to connect to multiple dbs
if shard == "*" {
return keyspace
}
if shard != "" || !*assumeSingleDbConnections {
return schema
}

vh.mu.Lock()
if val, ok := vh.keyspaceSingleDbCache[schema]; ok {
vh.mu.Unlock()
return val
}
vh.mu.Unlock()

// Count the shards for the keyspace according to SrvKeyspace
srvKeyspace, err := vh.vtg.router.serv.GetSrvKeyspace(ctx, vh.vtg.resolver.cell, keyspace)
if err != nil {
log.Warningf("error getting SrvKeyspace for %v: %v", keyspace, err)
return schema
}
vh.mu.Lock()
defer vh.mu.Unlock()
shardCount := 0
for _, partition := range srvKeyspace.Partitions {
if sc := len(partition.ShardReferences); sc > shardCount {
shardCount = sc
}
}
if shardCount != 1 {
vh.keyspaceSingleDbCache[schema] = schema
return schema
}

res := fmt.Sprintf("%s/0", keyspace)
vh.keyspaceSingleDbCache[schema] = res

return res
}

func init() {
Expand Down

0 comments on commit 23a1cbd

Please sign in to comment.