Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mysql: Refactor dependencies #13688

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 7 additions & 6 deletions go/cmd/mysqlctl/mysqlctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ import (

"github.com/spf13/pflag"

"vitess.io/vitess/go/mysql/replication"

"vitess.io/vitess/go/acl"
"vitess.io/vitess/go/cmd"
"vitess.io/vitess/go/exit"
"vitess.io/vitess/go/flagutil"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
Expand Down Expand Up @@ -175,30 +176,30 @@ func positionCmd(subFlags *pflag.FlagSet, args []string) error {
return fmt.Errorf("not enough arguments for position operation")
}

pos1, err := mysql.DecodePosition(args[1])
pos1, err := replication.DecodePosition(args[1])
if err != nil {
return err
}

switch args[0] {
case "equal":
pos2, err := mysql.DecodePosition(args[2])
pos2, err := replication.DecodePosition(args[2])
if err != nil {
return err
}
fmt.Println(pos1.Equal(pos2))
case "at_least":
pos2, err := mysql.DecodePosition(args[2])
pos2, err := replication.DecodePosition(args[2])
if err != nil {
return err
}
fmt.Println(pos1.AtLeast(pos2))
case "append":
gtid, err := mysql.DecodeGTID(args[2])
gtid, err := replication.DecodeGTID(args[2])
if err != nil {
return err
}
fmt.Println(mysql.AppendGTID(pos1, gtid))
fmt.Println(replication.AppendGTID(pos1, gtid))
}

return nil
Expand Down
25 changes: 13 additions & 12 deletions go/cmd/vtbackup/vtbackup.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,11 @@ import (

"github.com/spf13/pflag"

"vitess.io/vitess/go/mysql/replication"

"vitess.io/vitess/go/acl"
"vitess.io/vitess/go/cmd"
"vitess.io/vitess/go/exit"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/log"
Expand Down Expand Up @@ -359,7 +360,7 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back
Stats: backupstats.RestoreStats(),
}
backupManifest, err := mysqlctl.Restore(ctx, params)
var restorePos mysql.Position
var restorePos replication.Position
switch err {
case nil:
// if err is nil, we expect backupManifest to be non-nil
Expand All @@ -370,7 +371,7 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back
if !allowFirstBackup {
return fmt.Errorf("no backup found; not starting up empty since --initial_backup flag was not enabled")
}
restorePos = mysql.Position{}
restorePos = replication.Position{}
default:
return fmt.Errorf("can't restore from backup: %v", err)
}
Expand Down Expand Up @@ -408,7 +409,7 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back
tmc := tmclient.NewTabletManagerClient()
// Keep retrying if we can't contact the primary. The primary might be
// changing, moving, or down temporarily.
var primaryPos mysql.Position
var primaryPos replication.Position
err = retryOnError(ctx, func() error {
// Add a per-operation timeout so we re-read topo if the primary is unreachable.
opCtx, optCancel := context.WithTimeout(ctx, operationTimeout)
Expand Down Expand Up @@ -516,7 +517,7 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back
return nil
}

func resetReplication(ctx context.Context, pos mysql.Position, mysqld mysqlctl.MysqlDaemon) error {
func resetReplication(ctx context.Context, pos replication.Position, mysqld mysqlctl.MysqlDaemon) error {
cmds := []string{
"STOP SLAVE",
"RESET SLAVE ALL", // "ALL" makes it forget replication source host:port.
Expand Down Expand Up @@ -563,27 +564,27 @@ func startReplication(ctx context.Context, mysqld mysqlctl.MysqlDaemon, topoServ
return nil
}

func getPrimaryPosition(ctx context.Context, tmc tmclient.TabletManagerClient, ts *topo.Server) (mysql.Position, error) {
func getPrimaryPosition(ctx context.Context, tmc tmclient.TabletManagerClient, ts *topo.Server) (replication.Position, error) {
si, err := ts.GetShard(ctx, initKeyspace, initShard)
if err != nil {
return mysql.Position{}, vterrors.Wrap(err, "can't read shard")
return replication.Position{}, vterrors.Wrap(err, "can't read shard")
}
if topoproto.TabletAliasIsZero(si.PrimaryAlias) {
// Normal tablets will sit around waiting to be reparented in this case.
// Since vtbackup is a batch job, we just have to fail.
return mysql.Position{}, fmt.Errorf("shard %v/%v has no primary", initKeyspace, initShard)
return replication.Position{}, fmt.Errorf("shard %v/%v has no primary", initKeyspace, initShard)
}
ti, err := ts.GetTablet(ctx, si.PrimaryAlias)
if err != nil {
return mysql.Position{}, fmt.Errorf("can't get primary tablet record %v: %v", topoproto.TabletAliasString(si.PrimaryAlias), err)
return replication.Position{}, fmt.Errorf("can't get primary tablet record %v: %v", topoproto.TabletAliasString(si.PrimaryAlias), err)
}
posStr, err := tmc.PrimaryPosition(ctx, ti.Tablet)
if err != nil {
return mysql.Position{}, fmt.Errorf("can't get primary replication position: %v", err)
return replication.Position{}, fmt.Errorf("can't get primary replication position: %v", err)
}
pos, err := mysql.DecodePosition(posStr)
pos, err := replication.DecodePosition(posStr)
if err != nil {
return mysql.Position{}, fmt.Errorf("can't decode primary replication position %q: %v", posStr, err)
return replication.Position{}, fmt.Errorf("can't decode primary replication position %q: %v", posStr, err)
}
return pos, nil
}
Expand Down
5 changes: 3 additions & 2 deletions go/cmd/vtcombo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ import (
"github.com/spf13/pflag"
"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/mysql/replication"

"vitess.io/vitess/go/acl"
"vitess.io/vitess/go/exit"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
Expand Down Expand Up @@ -342,7 +343,7 @@ func (mysqld *vtcomboMysqld) RestartReplication(hookExtraEnv map[string]string)
}

// StartReplicationUntilAfter implements the MysqlDaemon interface
func (mysqld *vtcomboMysqld) StartReplicationUntilAfter(ctx context.Context, pos mysql.Position) error {
func (mysqld *vtcomboMysqld) StartReplicationUntilAfter(ctx context.Context, pos replication.Position) error {
return nil
}

Expand Down
6 changes: 3 additions & 3 deletions go/cmd/vtctldclient/cli/shards.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package cli
import (
"sort"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/vt/topo/topoproto"

replicationdatapb "vitess.io/vitess/go/vt/proto/replicationdata"
Expand Down Expand Up @@ -83,12 +83,12 @@ func (rts rTablets) Less(i, j int) bool {
}

// then compare replication positions
lpos, err := mysql.DecodePosition(l.Status.Position)
lpos, err := replication.DecodePosition(l.Status.Position)
if err != nil {
return true
}

rpos, err := mysql.DecodePosition(r.Status.Position)
rpos, err := replication.DecodePosition(r.Status.Position)
if err != nil {
return false
}
Expand Down
4 changes: 3 additions & 1 deletion go/cmd/vtctldclient/command/keyspaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (

"github.com/spf13/cobra"

"vitess.io/vitess/go/mysql/sqlerror"

"vitess.io/vitess/go/cmd/vtctldclient/cli"
"vitess.io/vitess/go/constants/sidecar"
"vitess.io/vitess/go/mysql"
Expand Down Expand Up @@ -184,7 +186,7 @@ func commandCreateKeyspace(cmd *cobra.Command, args []string) error {
return errors.New("--sidecar-db-name cannot be empty when creating a keyspace")
}
if len(createKeyspaceOptions.SidecarDBName) > mysql.MaxIdentifierLength {
return mysql.NewSQLError(mysql.ERTooLongIdent, mysql.SSDataTooLong, "--sidecar-db-name identifier value of %q is too long (%d chars), max length for database identifiers is %d characters",
return sqlerror.NewSQLError(sqlerror.ERTooLongIdent, sqlerror.SSDataTooLong, "--sidecar-db-name identifier value of %q is too long (%d chars), max length for database identifiers is %d characters",
createKeyspaceOptions.SidecarDBName, len(createKeyspaceOptions.SidecarDBName), mysql.MaxIdentifierLength)
}

Expand Down
55 changes: 55 additions & 0 deletions go/constants/sidecar/queries.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
Copyright 2023 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package sidecar

import "vitess.io/vitess/go/vt/sqlparser"

// region unit-test-only
// This section uses helpers used in tests, but also in
// go/vt/vtexplain/vtexplain_vttablet.go.
// Hence, it is here and not in the _test.go file.
const (
createDBQuery = "create database if not exists %s"
createTableRegexp = "(?i)CREATE TABLE .* `?\\_vt\\`?..*"
alterTableRegexp = "(?i)ALTER TABLE `?\\_vt\\`?..*"
)

var (
DBInitQueries = []string{
"use %s",
createDBQuery,
}
// Query patterns to handle in mocks.
DBInitQueryPatterns = []string{
createTableRegexp,
alterTableRegexp,
}
)

// GetCreateQuery returns the CREATE DATABASE SQL statement
// used to create the sidecar database.
func GetCreateQuery() string {
return sqlparser.BuildParsedQuery(createDBQuery, GetIdentifier()).Query
}

// GetIdentifier returns the sidecar database name as an SQL
// identifier string, most importantly this means that it will
// be properly escaped if/as needed.
func GetIdentifier() string {
ident := sqlparser.NewIdentifierCS(GetName())
return sqlparser.String(ident)
}
11 changes: 6 additions & 5 deletions go/mysql/auth_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"net"
"sync"

"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
Expand Down Expand Up @@ -427,7 +428,7 @@ func (n *mysqlNativePasswordAuthMethod) AllowClearTextWithoutTLS() bool {

func (n *mysqlNativePasswordAuthMethod) HandleAuthPluginData(conn *Conn, user string, serverAuthPluginData []byte, clientAuthPluginData []byte, remoteAddr net.Addr) (Getter, error) {
if serverAuthPluginData[len(serverAuthPluginData)-1] != 0x00 {
return nil, NewSQLError(ERAccessDeniedError, SSAccessDeniedError, "Access denied for user '%v'", user)
return nil, sqlerror.NewSQLError(sqlerror.ERAccessDeniedError, sqlerror.SSAccessDeniedError, "Access denied for user '%v'", user)
}

salt := serverAuthPluginData[:len(serverAuthPluginData)-1]
Expand Down Expand Up @@ -519,7 +520,7 @@ func (n *mysqlCachingSha2AuthMethod) AllowClearTextWithoutTLS() bool {

func (n *mysqlCachingSha2AuthMethod) HandleAuthPluginData(c *Conn, user string, serverAuthPluginData []byte, clientAuthPluginData []byte, remoteAddr net.Addr) (Getter, error) {
if serverAuthPluginData[len(serverAuthPluginData)-1] != 0x00 {
return nil, NewSQLError(ERAccessDeniedError, SSAccessDeniedError, "Access denied for user '%v'", user)
return nil, sqlerror.NewSQLError(sqlerror.ERAccessDeniedError, sqlerror.SSAccessDeniedError, "Access denied for user '%v'", user)
}

salt := serverAuthPluginData[:len(serverAuthPluginData)-1]
Expand All @@ -531,7 +532,7 @@ func (n *mysqlCachingSha2AuthMethod) HandleAuthPluginData(c *Conn, user string,

switch cacheState {
case AuthRejected:
return nil, NewSQLError(ERAccessDeniedError, SSAccessDeniedError, "Access denied for user '%v'", user)
return nil, sqlerror.NewSQLError(sqlerror.ERAccessDeniedError, sqlerror.SSAccessDeniedError, "Access denied for user '%v'", user)
case AuthAccepted:
// We need to write a more data packet to indicate the
// handshake completed properly. This will be followed
Expand All @@ -546,7 +547,7 @@ func (n *mysqlCachingSha2AuthMethod) HandleAuthPluginData(c *Conn, user string,
return result, nil
case AuthNeedMoreData:
if !c.TLSEnabled() && !c.IsUnixSocket() {
return nil, NewSQLError(ERAccessDeniedError, SSAccessDeniedError, "Access denied for user '%v'", user)
return nil, sqlerror.NewSQLError(sqlerror.ERAccessDeniedError, sqlerror.SSAccessDeniedError, "Access denied for user '%v'", user)
}

data, pos := c.startEphemeralPacketWithHeader(2)
Expand All @@ -562,7 +563,7 @@ func (n *mysqlCachingSha2AuthMethod) HandleAuthPluginData(c *Conn, user string,
return n.storage.UserEntryWithPassword(c, user, password, remoteAddr)
default:
// Somehow someone returned an unknown state, let's error with access denied.
return nil, NewSQLError(ERAccessDeniedError, SSAccessDeniedError, "Access denied for user '%v'", user)
return nil, sqlerror.NewSQLError(sqlerror.ERAccessDeniedError, sqlerror.SSAccessDeniedError, "Access denied for user '%v'", user)
}
}

Expand Down
16 changes: 9 additions & 7 deletions go/mysql/auth_server_static.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (

"github.com/spf13/pflag"

"vitess.io/vitess/go/mysql/sqlerror"

"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/vterrors"
Expand Down Expand Up @@ -179,7 +181,7 @@ func (a *AuthServerStatic) UserEntryWithPassword(conn *Conn, user string, passwo
a.mu.Unlock()

if !ok {
return &StaticUserData{}, NewSQLError(ERAccessDeniedError, SSAccessDeniedError, "Access denied for user '%v'", user)
return &StaticUserData{}, sqlerror.NewSQLError(sqlerror.ERAccessDeniedError, sqlerror.SSAccessDeniedError, "Access denied for user '%v'", user)
}

for _, entry := range entries {
Expand All @@ -188,7 +190,7 @@ func (a *AuthServerStatic) UserEntryWithPassword(conn *Conn, user string, passwo
return &StaticUserData{entry.UserData, entry.Groups}, nil
}
}
return &StaticUserData{}, NewSQLError(ERAccessDeniedError, SSAccessDeniedError, "Access denied for user '%v'", user)
return &StaticUserData{}, sqlerror.NewSQLError(sqlerror.ERAccessDeniedError, sqlerror.SSAccessDeniedError, "Access denied for user '%v'", user)
}

// UserEntryWithHash implements password lookup based on a
Expand All @@ -199,14 +201,14 @@ func (a *AuthServerStatic) UserEntryWithHash(conn *Conn, salt []byte, user strin
a.mu.Unlock()

if !ok {
return &StaticUserData{}, NewSQLError(ERAccessDeniedError, SSAccessDeniedError, "Access denied for user '%v'", user)
return &StaticUserData{}, sqlerror.NewSQLError(sqlerror.ERAccessDeniedError, sqlerror.SSAccessDeniedError, "Access denied for user '%v'", user)
}

for _, entry := range entries {
if entry.MysqlNativePassword != "" {
hash, err := DecodeMysqlNativePasswordHex(entry.MysqlNativePassword)
if err != nil {
return &StaticUserData{entry.UserData, entry.Groups}, NewSQLError(ERAccessDeniedError, SSAccessDeniedError, "Access denied for user '%v'", user)
return &StaticUserData{entry.UserData, entry.Groups}, sqlerror.NewSQLError(sqlerror.ERAccessDeniedError, sqlerror.SSAccessDeniedError, "Access denied for user '%v'", user)
}

isPass := VerifyHashedMysqlNativePassword(authResponse, salt, hash)
Expand All @@ -221,7 +223,7 @@ func (a *AuthServerStatic) UserEntryWithHash(conn *Conn, salt []byte, user strin
}
}
}
return &StaticUserData{}, NewSQLError(ERAccessDeniedError, SSAccessDeniedError, "Access denied for user '%v'", user)
return &StaticUserData{}, sqlerror.NewSQLError(sqlerror.ERAccessDeniedError, sqlerror.SSAccessDeniedError, "Access denied for user '%v'", user)
}

// UserEntryWithCacheHash implements password lookup based on a
Expand All @@ -232,7 +234,7 @@ func (a *AuthServerStatic) UserEntryWithCacheHash(conn *Conn, salt []byte, user
a.mu.Unlock()

if !ok {
return &StaticUserData{}, AuthRejected, NewSQLError(ERAccessDeniedError, SSAccessDeniedError, "Access denied for user '%v'", user)
return &StaticUserData{}, AuthRejected, sqlerror.NewSQLError(sqlerror.ERAccessDeniedError, sqlerror.SSAccessDeniedError, "Access denied for user '%v'", user)
}

for _, entry := range entries {
Expand All @@ -243,7 +245,7 @@ func (a *AuthServerStatic) UserEntryWithCacheHash(conn *Conn, salt []byte, user
return &StaticUserData{entry.UserData, entry.Groups}, AuthAccepted, nil
}
}
return &StaticUserData{}, AuthRejected, NewSQLError(ERAccessDeniedError, SSAccessDeniedError, "Access denied for user '%v'", user)
return &StaticUserData{}, AuthRejected, sqlerror.NewSQLError(sqlerror.ERAccessDeniedError, sqlerror.SSAccessDeniedError, "Access denied for user '%v'", user)
}

// AuthMethods returns the AuthMethod instances this auth server can handle.
Expand Down