diff --git a/go/cmd/mysqlctl/mysqlctl.go b/go/cmd/mysqlctl/mysqlctl.go index dc6a933b264..841666db97d 100644 --- a/go/cmd/mysqlctl/mysqlctl.go +++ b/go/cmd/mysqlctl/mysqlctl.go @@ -202,7 +202,7 @@ func main() { log.Errorf("%v", err) exit.Return(1) } - mysqld := mysqlctl.NewMysqld("Dba", mycnf, &dbcfgs.Dba, &dbcfgs.Repl) + mysqld := mysqlctl.NewMysqld("Dba", "App", mycnf, &dbcfgs.Dba, &dbcfgs.App.ConnectionParams, &dbcfgs.Repl) defer mysqld.Close() action := flag.Arg(0) diff --git a/go/cmd/mysqlctld/mysqlctld.go b/go/cmd/mysqlctld/mysqlctld.go index ebaa7c7096b..e3da314d480 100644 --- a/go/cmd/mysqlctld/mysqlctld.go +++ b/go/cmd/mysqlctld/mysqlctld.go @@ -24,7 +24,7 @@ var ( mysqld *mysqlctl.Mysqld mysqlPort = flag.Int("mysql_port", 3306, "mysql port") - tabletUid = flag.Uint("tablet_uid", 41983, "tablet uid") + tabletUID = flag.Uint("tablet_uid", 41983, "tablet uid") mysqlSocket = flag.String("mysql_socket", "", "path to the mysql socket") // mysqlctl init flags @@ -50,7 +50,7 @@ func main() { dbconfigs.RegisterFlags(flags) flag.Parse() - mycnf := mysqlctl.NewMycnf(uint32(*tabletUid), *mysqlPort) + mycnf := mysqlctl.NewMycnf(uint32(*tabletUID), *mysqlPort) if *mysqlSocket != "" { mycnf.SocketFile = *mysqlSocket } @@ -60,7 +60,7 @@ func main() { log.Errorf("%v", err) exit.Return(255) } - mysqld = mysqlctl.NewMysqld("Dba", mycnf, &dbcfgs.Dba, &dbcfgs.Repl) + mysqld = mysqlctl.NewMysqld("Dba", "App", mycnf, &dbcfgs.Dba, &dbcfgs.App.ConnectionParams, &dbcfgs.Repl) // Register OnTerm handler before mysqld starts, so we get notified if mysqld // dies on its own without us (or our RPC client) telling it to. diff --git a/go/cmd/vtocc/vtocc.go b/go/cmd/vtocc/vtocc.go index c93ee2cf182..a20ffc3be13 100644 --- a/go/cmd/vtocc/vtocc.go +++ b/go/cmd/vtocc/vtocc.go @@ -61,7 +61,7 @@ func main() { } } mycnf := &mysqlctl.Mycnf{BinLogPath: *binlogPath} - mysqld := mysqlctl.NewMysqld("Dba", mycnf, &dbConfigs.Dba, &dbConfigs.Repl) + mysqld := mysqlctl.NewMysqld("Dba", "App", mycnf, &dbConfigs.Dba, &dbConfigs.App.ConnectionParams, &dbConfigs.Repl) if err := unmarshalFile(*overridesFile, &schemaOverrides); err != nil { log.Error(err) diff --git a/go/vt/mysqlctl/mycnf_test.go b/go/vt/mysqlctl/mycnf_test.go index 66528a8c7d4..df312aae969 100644 --- a/go/vt/mysqlctl/mycnf_test.go +++ b/go/vt/mysqlctl/mycnf_test.go @@ -14,13 +14,14 @@ import ( "github.com/youtube/vitess/go/vt/env" ) -var MYCNF_PATH = "/tmp/my.cnf" +var MycnfPath = "/tmp/my.cnf" func TestMycnf(t *testing.T) { os.Setenv("MYSQL_FLAVOR", "GoogleMysql") dbaConfig := dbconfigs.DefaultDBConfigs.Dba + appConfig := dbconfigs.DefaultDBConfigs.App.ConnectionParams replConfig := dbconfigs.DefaultDBConfigs.Repl - tablet0 := NewMysqld("Dba", NewMycnf(0, 6802), &dbaConfig, &replConfig) + tablet0 := NewMysqld("Dba", "App", NewMycnf(0, 6802), &dbaConfig, &appConfig, &replConfig) defer tablet0.Close() root, err := env.VtRoot() if err != nil { @@ -37,16 +38,16 @@ func TestMycnf(t *testing.T) { } else { t.Logf("data: %v", data) } - err = ioutil.WriteFile(MYCNF_PATH, []byte(data), 0666) + err = ioutil.WriteFile(MycnfPath, []byte(data), 0666) if err != nil { t.Errorf("failed creating my.cnf %v", err) } - _, err = ioutil.ReadFile(MYCNF_PATH) + _, err = ioutil.ReadFile(MycnfPath) if err != nil { t.Errorf("failed reading, err %v", err) return } - mycnf, err := ReadMycnf(MYCNF_PATH) + mycnf, err := ReadMycnf(MycnfPath) if err != nil { t.Errorf("failed reading, err %v", err) } else { diff --git a/go/vt/mysqlctl/mysql_daemon.go b/go/vt/mysqlctl/mysql_daemon.go index 1d974f0136c..19b760a209d 100644 --- a/go/vt/mysqlctl/mysql_daemon.go +++ b/go/vt/mysqlctl/mysql_daemon.go @@ -28,9 +28,9 @@ type MysqlDaemon interface { // Schema related methods GetSchema(dbName string, tables, excludeTables []string, includeViews bool) (*proto.SchemaDefinition, error) - // GetDbaConnection returns a connection to be able to talk - // to the database as the admin user. - GetDbaConnection() (dbconnpool.PoolConnection, error) + // GetDbConnection returns a connection to be able to talk to the database. + // It accepts a dbconfig name to determine which db user it the connection should have. + GetDbConnection(dbconfigName string) (dbconnpool.PoolConnection, error) } // FakeMysqlDaemon implements MysqlDaemon and allows the user to fake @@ -54,10 +54,14 @@ type FakeMysqlDaemon struct { // return an error. Schema *proto.SchemaDefinition - // DbaConnectionFactory is the factory for making fake dba connection + // DbaConnectionFactory is the factory for making fake dba connections DbaConnectionFactory func() (dbconnpool.PoolConnection, error) + + // DbAppConnectionFactory is the factory for making fake db app connections + DbAppConnectionFactory func() (dbconnpool.PoolConnection, error) } +// GetMasterAddr is part of the MysqlDaemon interface func (fmd *FakeMysqlDaemon) GetMasterAddr() (string, error) { if fmd.MasterAddr == "" { return "", ErrNotSlave @@ -68,6 +72,7 @@ func (fmd *FakeMysqlDaemon) GetMasterAddr() (string, error) { return fmd.MasterAddr, nil } +// GetMysqlPort is part of the MysqlDaemon interface func (fmd *FakeMysqlDaemon) GetMysqlPort() (int, error) { if fmd.MysqlPort == -1 { return 0, fmt.Errorf("FakeMysqlDaemon.GetMysqlPort returns an error") @@ -75,16 +80,19 @@ func (fmd *FakeMysqlDaemon) GetMysqlPort() (int, error) { return fmd.MysqlPort, nil } +// StartSlave is part of the MysqlDaemon interface func (fmd *FakeMysqlDaemon) StartSlave(hookExtraEnv map[string]string) error { fmd.Replicating = true return nil } +// StopSlave is part of the MysqlDaemon interface func (fmd *FakeMysqlDaemon) StopSlave(hookExtraEnv map[string]string) error { fmd.Replicating = false return nil } +// SlaveStatus is part of the MysqlDaemon interface func (fmd *FakeMysqlDaemon) SlaveStatus() (*proto.ReplicationStatus, error) { if fmd.CurrentSlaveStatus == nil { return nil, fmt.Errorf("no slave status defined") @@ -92,6 +100,7 @@ func (fmd *FakeMysqlDaemon) SlaveStatus() (*proto.ReplicationStatus, error) { return fmd.CurrentSlaveStatus, nil } +// GetSchema is part of the MysqlDaemon interface func (fmd *FakeMysqlDaemon) GetSchema(dbName string, tables, excludeTables []string, includeViews bool) (*proto.SchemaDefinition, error) { if fmd.Schema == nil { return nil, fmt.Errorf("no schema defined") @@ -99,9 +108,19 @@ func (fmd *FakeMysqlDaemon) GetSchema(dbName string, tables, excludeTables []str return fmd.Schema, nil } -func (fmd *FakeMysqlDaemon) GetDbaConnection() (dbconnpool.PoolConnection, error) { - if fmd.DbaConnectionFactory == nil { - return nil, fmt.Errorf("no DbaConnectionFactory set in this FakeMysqlDaemon") +// GetDbConnection is part of the MysqlDaemon interface +func (fmd *FakeMysqlDaemon) GetDbConnection(dbconfigName string) (dbconnpool.PoolConnection, error) { + switch dbconfigName { + case "dba": + if fmd.DbaConnectionFactory == nil { + return nil, fmt.Errorf("no DbaConnectionFactory set in this FakeMysqlDaemon") + } + return fmd.DbaConnectionFactory() + case "app": + if fmd.DbAppConnectionFactory == nil { + return nil, fmt.Errorf("no DbAppConnectionFactory set in this FakeMysqlDaemon") + } + return fmd.DbAppConnectionFactory() } - return fmd.DbaConnectionFactory() + return nil, fmt.Errorf("unknown dbconfigName: %v", dbconfigName) } diff --git a/go/vt/mysqlctl/mysqld.go b/go/vt/mysqlctl/mysqld.go index fb2172c7539..0a92c1af674 100644 --- a/go/vt/mysqlctl/mysqld.go +++ b/go/vt/mysqlctl/mysqld.go @@ -42,7 +42,9 @@ const ( ) var ( - dbaPoolSize = flag.Int("dba_pool_size", 10, "Size of the connection pool for dba connections") + // TODO(aaijazi): for reasons I don't understand, the dba pool size needs to be fairly large (15+) + // for test/clone.py to pass. + dbaPoolSize = flag.Int("dba_pool_size", 20, "Size of the connection pool for dba connections") dbaIdleTimeout = flag.Duration("dba_idle_timeout", time.Minute, "Idle timeout for dba connections") appPoolSize = flag.Int("app_pool_size", 40, "Size of the connection pool for app connections") appIdleTimeout = flag.Duration("app_idle_timeout", time.Minute, "Idle timeout for app connections") @@ -83,7 +85,7 @@ func NewMysqld(dbaName, appName string, config *Mycnf, dba, app, repl *mysql.Con // create and open the connection pool for app access appMysqlStats := stats.NewTimings("Mysql" + appName) - appPool := dbconnpool.NewConnectionPool(appName+"ConnPool", *dbaPoolSize, *dbaIdleTimeout) + appPool := dbconnpool.NewConnectionPool(appName+"ConnPool", *appPoolSize, *appIdleTimeout) appPool.Open(dbconnpool.DBConnectionCreator(app, appMysqlStats)) return &Mysqld{ @@ -530,16 +532,16 @@ func (mysqld *Mysqld) ExecuteMysqlCommand(sql string) error { return nil } -// GetDbaConnection returns a connection from the dba pool. +// GetDbConnection returns a connection from the pool chosen by dbconfigName. // Recycle needs to be called on the result. -func (mysqld *Mysqld) GetDbaConnection() (dbconnpool.PoolConnection, error) { - return mysqld.dbaPool.Get(0) -} - -// GetAppConnection returns a connection from the app pool. -// Recycle needs to be called on the result. -func (mysqld *Mysqld) GetAppConnection() (dbconnpool.PoolConnection, error) { - return mysqld.appPool.Get(0) +func (mysqld *Mysqld) GetDbConnection(dbconfigName string) (dbconnpool.PoolConnection, error) { + switch dbconfigName { + case "dba": + return mysqld.dbaPool.Get(0) + case "app": + return mysqld.appPool.Get(0) + } + return nil, fmt.Errorf("unknown dbconfigName: %v", dbconfigName) } // Close will close this instance of Mysqld. It will wait for all dba diff --git a/go/vt/tabletmanager/agent.go b/go/vt/tabletmanager/agent.go index 01c1e67d07e..cf103f91cf3 100644 --- a/go/vt/tabletmanager/agent.go +++ b/go/vt/tabletmanager/agent.go @@ -128,7 +128,7 @@ func NewActionAgent( schemaOverrides := loadSchemaOverrides(overridesFile) topoServer := topo.GetServer() - mysqld := mysqlctl.NewMysqld("Dba", mycnf, &dbcfgs.Dba, &dbcfgs.Repl) + mysqld := mysqlctl.NewMysqld("Dba", "App", mycnf, &dbcfgs.Dba, &dbcfgs.App.ConnectionParams, &dbcfgs.Repl) agent = &ActionAgent{ batchCtx: batchCtx, diff --git a/go/vt/tabletmanager/agent_rpc_actions.go b/go/vt/tabletmanager/agent_rpc_actions.go index bd417b42796..8c2baa4c74f 100644 --- a/go/vt/tabletmanager/agent_rpc_actions.go +++ b/go/vt/tabletmanager/agent_rpc_actions.go @@ -69,7 +69,7 @@ type RPCAgent interface { ApplySchema(ctx context.Context, change *myproto.SchemaChange) (*myproto.SchemaChangeResult, error) - ExecuteFetch(ctx context.Context, query string, maxrows int, wantFields, disableBinlogs bool) (*proto.QueryResult, error) + ExecuteFetch(ctx context.Context, query string, maxrows int, wantFields, disableBinlogs bool, dbconfigName string) (*proto.QueryResult, error) // Replication related methods @@ -273,9 +273,9 @@ func (agent *ActionAgent) ApplySchema(ctx context.Context, change *myproto.Schem // ExecuteFetch will execute the given query, possibly disabling binlogs. // Should be called under RPCWrap. -func (agent *ActionAgent) ExecuteFetch(ctx context.Context, query string, maxrows int, wantFields, disableBinlogs bool) (*proto.QueryResult, error) { +func (agent *ActionAgent) ExecuteFetch(ctx context.Context, query string, maxrows int, wantFields, disableBinlogs bool, dbconfigName string) (*proto.QueryResult, error) { // get a connection - conn, err := agent.MysqlDaemon.GetDbaConnection() + conn, err := agent.MysqlDaemon.GetDbConnection(dbconfigName) if err != nil { return nil, err } diff --git a/go/vt/tabletmanager/agentrpctest/test_agent_rpc.go b/go/vt/tabletmanager/agentrpctest/test_agent_rpc.go index 79688bddf75..c10d4e7d506 100644 --- a/go/vt/tabletmanager/agentrpctest/test_agent_rpc.go +++ b/go/vt/tabletmanager/agentrpctest/test_agent_rpc.go @@ -445,7 +445,7 @@ var testExecuteFetchResult = &mproto.QueryResult{ }, } -func (fra *fakeRPCAgent) ExecuteFetch(ctx context.Context, query string, maxrows int, wantFields, disableBinlogs bool) (*mproto.QueryResult, error) { +func (fra *fakeRPCAgent) ExecuteFetch(ctx context.Context, query string, maxrows int, wantFields, disableBinlogs bool, dbconfigName string) (*mproto.QueryResult, error) { compare(fra.t, "ExecuteFetch query", query, testExecuteFetchQuery) compare(fra.t, "ExecuteFetch maxrows", maxrows, testExecuteFetchMaxRows) compareBool(fra.t, "ExecuteFetch wantFields", wantFields) @@ -454,7 +454,9 @@ func (fra *fakeRPCAgent) ExecuteFetch(ctx context.Context, query string, maxrows } func agentRPCTestExecuteFetch(ctx context.Context, t *testing.T, client tmclient.TabletManagerClient, ti *topo.TabletInfo) { - qr, err := client.ExecuteFetch(ctx, ti, testExecuteFetchQuery, testExecuteFetchMaxRows, true, true) + qr, err := client.ExecuteFetchAsDba(ctx, ti, testExecuteFetchQuery, testExecuteFetchMaxRows, true, true) + compareError(t, "ExecuteFetch", err, qr, testExecuteFetchResult) + qr, err = client.ExecuteFetchAsApp(ctx, ti, testExecuteFetchQuery, testExecuteFetchMaxRows, true, true) compareError(t, "ExecuteFetch", err, qr, testExecuteFetchResult) } diff --git a/go/vt/tabletmanager/faketmclient/fake_client.go b/go/vt/tabletmanager/faketmclient/fake_client.go index 15976b252b3..4dce91c58d7 100644 --- a/go/vt/tabletmanager/faketmclient/fake_client.go +++ b/go/vt/tabletmanager/faketmclient/fake_client.go @@ -141,8 +141,14 @@ func (client *FakeTabletManagerClient) ApplySchema(ctx context.Context, tablet * return &scr, nil } -// ExecuteFetch is part of the tmclient.TabletManagerClient interface -func (client *FakeTabletManagerClient) ExecuteFetch(ctx context.Context, tablet *topo.TabletInfo, query string, maxRows int, wantFields, disableBinlogs bool) (*mproto.QueryResult, error) { +// ExecuteFetchAsDba is part of the tmclient.TabletManagerClient interface +func (client *FakeTabletManagerClient) ExecuteFetchAsDba(ctx context.Context, tablet *topo.TabletInfo, query string, maxRows int, wantFields, disableBinlogs bool) (*mproto.QueryResult, error) { + var qr mproto.QueryResult + return &qr, nil +} + +// ExecuteFetchAsApp is part of the tmclient.TabletManagerClient interface +func (client *FakeTabletManagerClient) ExecuteFetchAsApp(ctx context.Context, tablet *topo.TabletInfo, query string, maxRows int, wantFields, disableBinlogs bool) (*mproto.QueryResult, error) { var qr mproto.QueryResult return &qr, nil } diff --git a/go/vt/tabletmanager/gorpcproto/structs.go b/go/vt/tabletmanager/gorpcproto/structs.go index 373400cd120..a1b0fecfc71 100644 --- a/go/vt/tabletmanager/gorpcproto/structs.go +++ b/go/vt/tabletmanager/gorpcproto/structs.go @@ -64,6 +64,7 @@ type ExecuteFetchArgs struct { MaxRows int WantFields bool DisableBinlogs bool + DBConfigName string } // gorpc doesn't support returning a streaming type during streaming diff --git a/go/vt/tabletmanager/gorpctmclient/gorpc_client.go b/go/vt/tabletmanager/gorpctmclient/gorpc_client.go index fc77de385e0..362433ee002 100644 --- a/go/vt/tabletmanager/gorpctmclient/gorpc_client.go +++ b/go/vt/tabletmanager/gorpctmclient/gorpc_client.go @@ -219,10 +219,31 @@ func (client *GoRPCTabletManagerClient) ApplySchema(ctx context.Context, tablet return &scr, nil } -// ExecuteFetch is part of the tmclient.TabletManagerClient interface -func (client *GoRPCTabletManagerClient) ExecuteFetch(ctx context.Context, tablet *topo.TabletInfo, query string, maxRows int, wantFields, disableBinlogs bool) (*mproto.QueryResult, error) { +// ExecuteFetchAsDba is part of the tmclient.TabletManagerClient interface +func (client *GoRPCTabletManagerClient) ExecuteFetchAsDba(ctx context.Context, tablet *topo.TabletInfo, query string, maxRows int, wantFields, disableBinlogs bool) (*mproto.QueryResult, error) { var qr mproto.QueryResult - if err := client.rpcCallTablet(ctx, tablet, actionnode.TABLET_ACTION_EXECUTE_FETCH, &gorpcproto.ExecuteFetchArgs{Query: query, MaxRows: maxRows, WantFields: wantFields, DisableBinlogs: disableBinlogs}, &qr); err != nil { + if err := client.rpcCallTablet(ctx, tablet, actionnode.TABLET_ACTION_EXECUTE_FETCH, &gorpcproto.ExecuteFetchArgs{ + Query: query, + MaxRows: maxRows, + WantFields: wantFields, + DisableBinlogs: disableBinlogs, + DBConfigName: "dba", + }, &qr); err != nil { + return nil, err + } + return &qr, nil +} + +// ExecuteFetchAsApp is part of the tmclient.TabletManagerClient interface +func (client *GoRPCTabletManagerClient) ExecuteFetchAsApp(ctx context.Context, tablet *topo.TabletInfo, query string, maxRows int, wantFields, disableBinlogs bool) (*mproto.QueryResult, error) { + var qr mproto.QueryResult + if err := client.rpcCallTablet(ctx, tablet, actionnode.TABLET_ACTION_EXECUTE_FETCH, &gorpcproto.ExecuteFetchArgs{ + Query: query, + MaxRows: maxRows, + WantFields: wantFields, + DisableBinlogs: disableBinlogs, + DBConfigName: "app", + }, &qr); err != nil { return nil, err } return &qr, nil diff --git a/go/vt/tabletmanager/gorpctmserver/gorpc_server.go b/go/vt/tabletmanager/gorpctmserver/gorpc_server.go index 9fee9d52727..364fd9a9368 100644 --- a/go/vt/tabletmanager/gorpctmserver/gorpc_server.go +++ b/go/vt/tabletmanager/gorpctmserver/gorpc_server.go @@ -187,7 +187,7 @@ func (tm *TabletManager) ApplySchema(ctx context.Context, args *myproto.SchemaCh // ExecuteFetch wraps RPCAgent. func (tm *TabletManager) ExecuteFetch(ctx context.Context, args *gorpcproto.ExecuteFetchArgs, reply *mproto.QueryResult) error { return tm.agent.RPCWrap(ctx, actionnode.TABLET_ACTION_EXECUTE_FETCH, args, reply, func() error { - qr, err := tm.agent.ExecuteFetch(ctx, args.Query, args.MaxRows, args.WantFields, args.DisableBinlogs) + qr, err := tm.agent.ExecuteFetch(ctx, args.Query, args.MaxRows, args.WantFields, args.DisableBinlogs, args.DBConfigName) if err == nil { *reply = *qr } diff --git a/go/vt/tabletmanager/tmclient/rpc_client_api.go b/go/vt/tabletmanager/tmclient/rpc_client_api.go index 4ce3b4e7cb9..ef896a4c37e 100644 --- a/go/vt/tabletmanager/tmclient/rpc_client_api.go +++ b/go/vt/tabletmanager/tmclient/rpc_client_api.go @@ -83,7 +83,7 @@ type TabletManagerClient interface { // ApplySchema will apply a schema change ApplySchema(ctx context.Context, tablet *topo.TabletInfo, change *myproto.SchemaChange) (*myproto.SchemaChangeResult, error) - // ExecuteFetch executes a query remotely using the DBA pool + // ExecuteFetchAsDba executes a query remotely using the DBA pool ExecuteFetchAsDba(ctx context.Context, tablet *topo.TabletInfo, query string, maxRows int, wantFields, disableBinlogs bool) (*mproto.QueryResult, error) // ExecuteFetchAsApp executes a query remotely using the App pool diff --git a/go/vt/vtctl/vtctl.go b/go/vt/vtctl/vtctl.go index c7adb040bc7..763cf79788e 100644 --- a/go/vt/vtctl/vtctl.go +++ b/go/vt/vtctl/vtctl.go @@ -116,7 +116,7 @@ var commands = []commandGroup{ command{"ExecuteHook", commandExecuteHook, " [ ...]", "This runs the specified hook on the given tablet."}, - command{"ExecuteFetch", commandExecuteFetch, + command{"ExecuteFetchAsDba", commandExecuteFetchAsDba, "[--max_rows=10000] [--want_fields] [--disable_binlogs] ", "Runs the given sql command as a DBA on the remote tablet"}, }, @@ -957,7 +957,7 @@ func commandClone(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.Fla return wr.Clone(ctx, srcTabletAlias, dstTabletAliases, *force, *concurrency, *fetchConcurrency, *fetchRetryCount, *serverMode) } -func commandExecuteFetch(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error { +func commandExecuteFetchAsDba(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error { maxRows := subFlags.Int("max_rows", 10000, "maximum number of rows to allow in reset") wantFields := subFlags.Bool("want_fields", false, "also get the field names") disableBinlogs := subFlags.Bool("disable_binlogs", false, "disable writing to binlogs during the query") @@ -965,7 +965,7 @@ func commandExecuteFetch(ctx context.Context, wr *wrangler.Wrangler, subFlags *f return err } if subFlags.NArg() != 2 { - return fmt.Errorf("action ExecuteFetch requires ") + return fmt.Errorf("action ExecuteFetchAsDba requires ") } alias, err := topo.ParseTabletAliasString(subFlags.Arg(0)) @@ -973,7 +973,7 @@ func commandExecuteFetch(ctx context.Context, wr *wrangler.Wrangler, subFlags *f return err } query := subFlags.Arg(1) - qr, err := wr.ExecuteFetch(ctx, alias, query, *maxRows, *wantFields, *disableBinlogs) + qr, err := wr.ExecuteFetchAsDba(ctx, alias, query, *maxRows, *wantFields, *disableBinlogs) if err == nil { wr.Logger().Printf("%v\n", jscfg.ToJson(qr)) } diff --git a/go/vt/worker/clone_utils.go b/go/vt/worker/clone_utils.go index 37819004e15..ef9512ba3ad 100644 --- a/go/vt/worker/clone_utils.go +++ b/go/vt/worker/clone_utils.go @@ -109,7 +109,7 @@ func executeFetchWithRetries(ctx context.Context, wr *wrangler.Wrangler, ti *top defer retryCancel() for { tryCtx, cancel := context.WithTimeout(retryCtx, 2*time.Minute) - _, err := wr.TabletManagerClient().ExecuteFetch(tryCtx, ti, command, 0, false, disableBinLogs) + _, err := wr.TabletManagerClient().ExecuteFetchAsApp(tryCtx, ti, command, 0, false, disableBinLogs) cancel() switch { case err == nil: @@ -190,7 +190,7 @@ func findChunks(ctx context.Context, wr *wrangler.Wrangler, ti *topo.TabletInfo, // get the min and max of the leading column of the primary key query := fmt.Sprintf("SELECT MIN(%v), MAX(%v) FROM %v.%v", td.PrimaryKeyColumns[0], td.PrimaryKeyColumns[0], ti.DbName(), td.Name) ctx, cancel := context.WithTimeout(ctx, 60*time.Second) - qr, err := wr.TabletManagerClient().ExecuteFetch(ctx, ti, query, 1, true, false) + qr, err := wr.TabletManagerClient().ExecuteFetchAsApp(ctx, ti, query, 1, true, false) cancel() if err != nil { wr.Logger().Infof("Not splitting table %v into multiple chunks: %v", td.Name, err) diff --git a/go/vt/worker/split_clone_test.go b/go/vt/worker/split_clone_test.go index 65c31041b0f..248e94f33ab 100644 --- a/go/vt/worker/split_clone_test.go +++ b/go/vt/worker/split_clone_test.go @@ -296,7 +296,7 @@ func testSplitClone(t *testing.T, strategy string) { }, }, } - sourceRdonly.FakeMysqlDaemon.DbaConnectionFactory = SourceRdonlyFactory(t) + sourceRdonly.FakeMysqlDaemon.DbAppConnectionFactory = SourceRdonlyFactory(t) sourceRdonly.FakeMysqlDaemon.CurrentSlaveStatus = &myproto.ReplicationStatus{ Position: myproto.ReplicationPosition{ GTIDSet: myproto.MariadbGTID{Domain: 12, Server: 34, Sequence: 5678}, @@ -312,10 +312,10 @@ func testSplitClone(t *testing.T, strategy string) { // That means 3 insert statements on each target (each // containing half of the rows, i.e. 2 + 2 + 1 rows). So 3 * 10 // = 30 insert statements on each destination. - leftMaster.FakeMysqlDaemon.DbaConnectionFactory = DestinationsFactory(t, 30) - leftRdonly.FakeMysqlDaemon.DbaConnectionFactory = DestinationsFactory(t, 30) - rightMaster.FakeMysqlDaemon.DbaConnectionFactory = DestinationsFactory(t, 30) - rightRdonly.FakeMysqlDaemon.DbaConnectionFactory = DestinationsFactory(t, 30) + leftMaster.FakeMysqlDaemon.DbAppConnectionFactory = DestinationsFactory(t, 30) + leftRdonly.FakeMysqlDaemon.DbAppConnectionFactory = DestinationsFactory(t, 30) + rightMaster.FakeMysqlDaemon.DbAppConnectionFactory = DestinationsFactory(t, 30) + rightRdonly.FakeMysqlDaemon.DbAppConnectionFactory = DestinationsFactory(t, 30) wrk.Run() status := wrk.StatusAsText() diff --git a/go/vt/worker/vertical_split_clone_test.go b/go/vt/worker/vertical_split_clone_test.go index 0a9e780de0d..946a90d5c5d 100644 --- a/go/vt/worker/vertical_split_clone_test.go +++ b/go/vt/worker/vertical_split_clone_test.go @@ -284,7 +284,7 @@ func testVerticalSplitClone(t *testing.T, strategy string) { }, }, } - sourceRdonly.FakeMysqlDaemon.DbaConnectionFactory = VerticalSourceRdonlyFactory(t) + sourceRdonly.FakeMysqlDaemon.DbAppConnectionFactory = VerticalSourceRdonlyFactory(t) sourceRdonly.FakeMysqlDaemon.CurrentSlaveStatus = &myproto.ReplicationStatus{ Position: myproto.ReplicationPosition{ GTIDSet: myproto.MariadbGTID{Domain: 12, Server: 34, Sequence: 5678}, @@ -299,8 +299,8 @@ func testVerticalSplitClone(t *testing.T, strategy string) { // at once. So we'll process 4 + 4 + 2 rows to get to 10. // That means 3 insert statements on the target. So 3 * 10 // = 30 insert statements on the destination. - destMaster.FakeMysqlDaemon.DbaConnectionFactory = VerticalDestinationsFactory(t, 30) - destRdonly.FakeMysqlDaemon.DbaConnectionFactory = VerticalDestinationsFactory(t, 30) + destMaster.FakeMysqlDaemon.DbAppConnectionFactory = VerticalDestinationsFactory(t, 30) + destRdonly.FakeMysqlDaemon.DbAppConnectionFactory = VerticalDestinationsFactory(t, 30) wrk.Run() status := wrk.StatusAsText() diff --git a/go/vt/wrangler/schema.go b/go/vt/wrangler/schema.go index e609e23f738..045981879cf 100644 --- a/go/vt/wrangler/schema.go +++ b/go/vt/wrangler/schema.go @@ -582,7 +582,7 @@ func (wr *Wrangler) applySqlShard(ctx context.Context, tabletInfo *topo.TabletIn ctx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() // Need to make sure that we enable binlog, since we're only applying the statement on masters. - _, err = wr.tmc.ExecuteFetch(ctx, tabletInfo, filledChange, 0, false, false) + _, err = wr.tmc.ExecuteFetchAsDba(ctx, tabletInfo, filledChange, 0, false, false) return err } diff --git a/go/vt/wrangler/tablet.go b/go/vt/wrangler/tablet.go index 4d1b159f203..416913a2439 100644 --- a/go/vt/wrangler/tablet.go +++ b/go/vt/wrangler/tablet.go @@ -268,11 +268,11 @@ func (wr *Wrangler) DeleteTablet(tabletAlias topo.TabletAlias) error { return wr.TopoServer().DeleteTablet(tabletAlias) } -// ExecuteFetch will get data from a remote tablet -func (wr *Wrangler) ExecuteFetch(ctx context.Context, tabletAlias topo.TabletAlias, query string, maxRows int, wantFields, disableBinlogs bool) (*mproto.QueryResult, error) { +// ExecuteFetchAsDba executes a query remotely using the DBA pool +func (wr *Wrangler) ExecuteFetchAsDba(ctx context.Context, tabletAlias topo.TabletAlias, query string, maxRows int, wantFields, disableBinlogs bool) (*mproto.QueryResult, error) { ti, err := wr.ts.GetTablet(tabletAlias) if err != nil { return nil, err } - return wr.tmc.ExecuteFetch(ctx, ti, query, maxRows, wantFields, disableBinlogs) + return wr.tmc.ExecuteFetchAsDba(ctx, ti, query, maxRows, wantFields, disableBinlogs) } diff --git a/go/vt/wrangler/testlib/copy_schema_shard_test.go b/go/vt/wrangler/testlib/copy_schema_shard_test.go index 237d8bd4a88..e21fe0f825d 100644 --- a/go/vt/wrangler/testlib/copy_schema_shard_test.go +++ b/go/vt/wrangler/testlib/copy_schema_shard_test.go @@ -135,12 +135,8 @@ func TestCopySchemaShard(t *testing.T) { destinationMaster := NewFakeTablet(t, wr, "cell1", 10, topo.TYPE_MASTER, TabletKeyspaceShard(t, "ks", "-40")) - // one destination RdOnly, so we know that schema copies propogate from masters - destinationRdonly := NewFakeTablet(t, wr, "cell1", 11, - topo.TYPE_RDONLY, TabletKeyspaceShard(t, "ks", "-40"), - TabletParent(destinationMaster.Tablet.Alias)) - for _, ft := range []*FakeTablet{sourceMaster, sourceRdonly, destinationMaster, destinationRdonly} { + for _, ft := range []*FakeTablet{sourceMaster, sourceRdonly, destinationMaster} { ft.StartActionLoop(t, wr) defer ft.StopActionLoop(t) } @@ -162,7 +158,6 @@ func TestCopySchemaShard(t *testing.T) { } destinationMaster.FakeMysqlDaemon.DbaConnectionFactory = DestinationsFactory(t) - destinationRdonly.FakeMysqlDaemon.DbaConnectionFactory = DestinationsFactory(t) if err := wr.CopySchemaShard(context.Background(), sourceRdonly.Tablet.Alias, nil, nil, true, "ks", "-40"); err != nil { t.Fatalf("CopySchemaShard failed: %v", err) diff --git a/test/tabletmanager.py b/test/tabletmanager.py index 44207d74b4c..743c479205b 100755 --- a/test/tabletmanager.py +++ b/test/tabletmanager.py @@ -103,7 +103,7 @@ def _test_sanity(self): (str(rows), result)) # make sure direct dba queries work - query_result = utils.run_vtctl_json(['ExecuteFetch', '-want_fields', tablet_62344.tablet_alias, 'select * from vt_test_keyspace.vt_select_test']) + query_result = utils.run_vtctl_json(['ExecuteFetchAsDba', '-want_fields', tablet_62344.tablet_alias, 'select * from vt_test_keyspace.vt_select_test']) self.assertEqual(len(query_result['Rows']), 4, "expected 4 rows in vt_select_test: %s" % str(query_result)) self.assertEqual(len(query_result['Fields']), 2, "expected 2 fields in vt_select_test: %s" % str(query_result))