Skip to content

Commit

Permalink
Workers now write data as App instead of Dba
Browse files Browse the repository at this point in the history
  • Loading branch information
aaijazi committed Feb 5, 2015
1 parent f32f4a3 commit f694728
Show file tree
Hide file tree
Showing 22 changed files with 114 additions and 67 deletions.
2 changes: 1 addition & 1 deletion go/cmd/mysqlctl/mysqlctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func main() {
log.Errorf("%v", err)
exit.Return(1)
}
mysqld := mysqlctl.NewMysqld("Dba", mycnf, &dbcfgs.Dba, &dbcfgs.Repl)
mysqld := mysqlctl.NewMysqld("Dba", "App", mycnf, &dbcfgs.Dba, &dbcfgs.App.ConnectionParams, &dbcfgs.Repl)
defer mysqld.Close()

action := flag.Arg(0)
Expand Down
6 changes: 3 additions & 3 deletions go/cmd/mysqlctld/mysqlctld.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ var (
mysqld *mysqlctl.Mysqld

mysqlPort = flag.Int("mysql_port", 3306, "mysql port")
tabletUid = flag.Uint("tablet_uid", 41983, "tablet uid")
tabletUID = flag.Uint("tablet_uid", 41983, "tablet uid")
mysqlSocket = flag.String("mysql_socket", "", "path to the mysql socket")

// mysqlctl init flags
Expand All @@ -50,7 +50,7 @@ func main() {
dbconfigs.RegisterFlags(flags)
flag.Parse()

mycnf := mysqlctl.NewMycnf(uint32(*tabletUid), *mysqlPort)
mycnf := mysqlctl.NewMycnf(uint32(*tabletUID), *mysqlPort)
if *mysqlSocket != "" {
mycnf.SocketFile = *mysqlSocket
}
Expand All @@ -60,7 +60,7 @@ func main() {
log.Errorf("%v", err)
exit.Return(255)
}
mysqld = mysqlctl.NewMysqld("Dba", mycnf, &dbcfgs.Dba, &dbcfgs.Repl)
mysqld = mysqlctl.NewMysqld("Dba", "App", mycnf, &dbcfgs.Dba, &dbcfgs.App.ConnectionParams, &dbcfgs.Repl)

// Register OnTerm handler before mysqld starts, so we get notified if mysqld
// dies on its own without us (or our RPC client) telling it to.
Expand Down
2 changes: 1 addition & 1 deletion go/cmd/vtocc/vtocc.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func main() {
}
}
mycnf := &mysqlctl.Mycnf{BinLogPath: *binlogPath}
mysqld := mysqlctl.NewMysqld("Dba", mycnf, &dbConfigs.Dba, &dbConfigs.Repl)
mysqld := mysqlctl.NewMysqld("Dba", "App", mycnf, &dbConfigs.Dba, &dbConfigs.App.ConnectionParams, &dbConfigs.Repl)

if err := unmarshalFile(*overridesFile, &schemaOverrides); err != nil {
log.Error(err)
Expand Down
11 changes: 6 additions & 5 deletions go/vt/mysqlctl/mycnf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@ import (
"github.com/youtube/vitess/go/vt/env"
)

var MYCNF_PATH = "/tmp/my.cnf"
var MycnfPath = "/tmp/my.cnf"

func TestMycnf(t *testing.T) {
os.Setenv("MYSQL_FLAVOR", "GoogleMysql")
dbaConfig := dbconfigs.DefaultDBConfigs.Dba
appConfig := dbconfigs.DefaultDBConfigs.App.ConnectionParams
replConfig := dbconfigs.DefaultDBConfigs.Repl
tablet0 := NewMysqld("Dba", NewMycnf(0, 6802), &dbaConfig, &replConfig)
tablet0 := NewMysqld("Dba", "App", NewMycnf(0, 6802), &dbaConfig, &appConfig, &replConfig)
defer tablet0.Close()
root, err := env.VtRoot()
if err != nil {
Expand All @@ -37,16 +38,16 @@ func TestMycnf(t *testing.T) {
} else {
t.Logf("data: %v", data)
}
err = ioutil.WriteFile(MYCNF_PATH, []byte(data), 0666)
err = ioutil.WriteFile(MycnfPath, []byte(data), 0666)
if err != nil {
t.Errorf("failed creating my.cnf %v", err)
}
_, err = ioutil.ReadFile(MYCNF_PATH)
_, err = ioutil.ReadFile(MycnfPath)
if err != nil {
t.Errorf("failed reading, err %v", err)
return
}
mycnf, err := ReadMycnf(MYCNF_PATH)
mycnf, err := ReadMycnf(MycnfPath)
if err != nil {
t.Errorf("failed reading, err %v", err)
} else {
Expand Down
35 changes: 27 additions & 8 deletions go/vt/mysqlctl/mysql_daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ type MysqlDaemon interface {
// Schema related methods
GetSchema(dbName string, tables, excludeTables []string, includeViews bool) (*proto.SchemaDefinition, error)

// GetDbaConnection returns a connection to be able to talk
// to the database as the admin user.
GetDbaConnection() (dbconnpool.PoolConnection, error)
// GetDbConnection returns a connection to be able to talk to the database.
// It accepts a dbconfig name to determine which db user it the connection should have.
GetDbConnection(dbconfigName string) (dbconnpool.PoolConnection, error)
}

// FakeMysqlDaemon implements MysqlDaemon and allows the user to fake
Expand All @@ -54,10 +54,14 @@ type FakeMysqlDaemon struct {
// return an error.
Schema *proto.SchemaDefinition

// DbaConnectionFactory is the factory for making fake dba connection
// DbaConnectionFactory is the factory for making fake dba connections
DbaConnectionFactory func() (dbconnpool.PoolConnection, error)

// DbAppConnectionFactory is the factory for making fake db app connections
DbAppConnectionFactory func() (dbconnpool.PoolConnection, error)
}

// GetMasterAddr is part of the MysqlDaemon interface
func (fmd *FakeMysqlDaemon) GetMasterAddr() (string, error) {
if fmd.MasterAddr == "" {
return "", ErrNotSlave
Expand All @@ -68,40 +72,55 @@ func (fmd *FakeMysqlDaemon) GetMasterAddr() (string, error) {
return fmd.MasterAddr, nil
}

// GetMysqlPort is part of the MysqlDaemon interface
func (fmd *FakeMysqlDaemon) GetMysqlPort() (int, error) {
if fmd.MysqlPort == -1 {
return 0, fmt.Errorf("FakeMysqlDaemon.GetMysqlPort returns an error")
}
return fmd.MysqlPort, nil
}

// StartSlave is part of the MysqlDaemon interface
func (fmd *FakeMysqlDaemon) StartSlave(hookExtraEnv map[string]string) error {
fmd.Replicating = true
return nil
}

// StopSlave is part of the MysqlDaemon interface
func (fmd *FakeMysqlDaemon) StopSlave(hookExtraEnv map[string]string) error {
fmd.Replicating = false
return nil
}

// SlaveStatus is part of the MysqlDaemon interface
func (fmd *FakeMysqlDaemon) SlaveStatus() (*proto.ReplicationStatus, error) {
if fmd.CurrentSlaveStatus == nil {
return nil, fmt.Errorf("no slave status defined")
}
return fmd.CurrentSlaveStatus, nil
}

// GetSchema is part of the MysqlDaemon interface
func (fmd *FakeMysqlDaemon) GetSchema(dbName string, tables, excludeTables []string, includeViews bool) (*proto.SchemaDefinition, error) {
if fmd.Schema == nil {
return nil, fmt.Errorf("no schema defined")
}
return fmd.Schema, nil
}

func (fmd *FakeMysqlDaemon) GetDbaConnection() (dbconnpool.PoolConnection, error) {
if fmd.DbaConnectionFactory == nil {
return nil, fmt.Errorf("no DbaConnectionFactory set in this FakeMysqlDaemon")
// GetDbConnection is part of the MysqlDaemon interface
func (fmd *FakeMysqlDaemon) GetDbConnection(dbconfigName string) (dbconnpool.PoolConnection, error) {
switch dbconfigName {
case "dba":
if fmd.DbaConnectionFactory == nil {
return nil, fmt.Errorf("no DbaConnectionFactory set in this FakeMysqlDaemon")
}
return fmd.DbaConnectionFactory()
case "app":
if fmd.DbAppConnectionFactory == nil {
return nil, fmt.Errorf("no DbAppConnectionFactory set in this FakeMysqlDaemon")
}
return fmd.DbAppConnectionFactory()
}
return fmd.DbaConnectionFactory()
return nil, fmt.Errorf("unknown dbconfigName: %v", dbconfigName)
}
24 changes: 13 additions & 11 deletions go/vt/mysqlctl/mysqld.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ const (
)

var (
dbaPoolSize = flag.Int("dba_pool_size", 10, "Size of the connection pool for dba connections")
// TODO(aaijazi): for reasons I don't understand, the dba pool size needs to be fairly large (15+)
// for test/clone.py to pass.
dbaPoolSize = flag.Int("dba_pool_size", 20, "Size of the connection pool for dba connections")
dbaIdleTimeout = flag.Duration("dba_idle_timeout", time.Minute, "Idle timeout for dba connections")
appPoolSize = flag.Int("app_pool_size", 40, "Size of the connection pool for app connections")
appIdleTimeout = flag.Duration("app_idle_timeout", time.Minute, "Idle timeout for app connections")
Expand Down Expand Up @@ -83,7 +85,7 @@ func NewMysqld(dbaName, appName string, config *Mycnf, dba, app, repl *mysql.Con

// create and open the connection pool for app access
appMysqlStats := stats.NewTimings("Mysql" + appName)
appPool := dbconnpool.NewConnectionPool(appName+"ConnPool", *dbaPoolSize, *dbaIdleTimeout)
appPool := dbconnpool.NewConnectionPool(appName+"ConnPool", *appPoolSize, *appIdleTimeout)
appPool.Open(dbconnpool.DBConnectionCreator(app, appMysqlStats))

return &Mysqld{
Expand Down Expand Up @@ -530,16 +532,16 @@ func (mysqld *Mysqld) ExecuteMysqlCommand(sql string) error {
return nil
}

// GetDbaConnection returns a connection from the dba pool.
// GetDbConnection returns a connection from the pool chosen by dbconfigName.
// Recycle needs to be called on the result.
func (mysqld *Mysqld) GetDbaConnection() (dbconnpool.PoolConnection, error) {
return mysqld.dbaPool.Get(0)
}

// GetAppConnection returns a connection from the app pool.
// Recycle needs to be called on the result.
func (mysqld *Mysqld) GetAppConnection() (dbconnpool.PoolConnection, error) {
return mysqld.appPool.Get(0)
func (mysqld *Mysqld) GetDbConnection(dbconfigName string) (dbconnpool.PoolConnection, error) {
switch dbconfigName {
case "dba":
return mysqld.dbaPool.Get(0)
case "app":
return mysqld.appPool.Get(0)
}
return nil, fmt.Errorf("unknown dbconfigName: %v", dbconfigName)
}

// Close will close this instance of Mysqld. It will wait for all dba
Expand Down
2 changes: 1 addition & 1 deletion go/vt/tabletmanager/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func NewActionAgent(
schemaOverrides := loadSchemaOverrides(overridesFile)

topoServer := topo.GetServer()
mysqld := mysqlctl.NewMysqld("Dba", mycnf, &dbcfgs.Dba, &dbcfgs.Repl)
mysqld := mysqlctl.NewMysqld("Dba", "App", mycnf, &dbcfgs.Dba, &dbcfgs.App.ConnectionParams, &dbcfgs.Repl)

agent = &ActionAgent{
batchCtx: batchCtx,
Expand Down
6 changes: 3 additions & 3 deletions go/vt/tabletmanager/agent_rpc_actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type RPCAgent interface {

ApplySchema(ctx context.Context, change *myproto.SchemaChange) (*myproto.SchemaChangeResult, error)

ExecuteFetch(ctx context.Context, query string, maxrows int, wantFields, disableBinlogs bool) (*proto.QueryResult, error)
ExecuteFetch(ctx context.Context, query string, maxrows int, wantFields, disableBinlogs bool, dbconfigName string) (*proto.QueryResult, error)

// Replication related methods

Expand Down Expand Up @@ -273,9 +273,9 @@ func (agent *ActionAgent) ApplySchema(ctx context.Context, change *myproto.Schem

// ExecuteFetch will execute the given query, possibly disabling binlogs.
// Should be called under RPCWrap.
func (agent *ActionAgent) ExecuteFetch(ctx context.Context, query string, maxrows int, wantFields, disableBinlogs bool) (*proto.QueryResult, error) {
func (agent *ActionAgent) ExecuteFetch(ctx context.Context, query string, maxrows int, wantFields, disableBinlogs bool, dbconfigName string) (*proto.QueryResult, error) {
// get a connection
conn, err := agent.MysqlDaemon.GetDbaConnection()
conn, err := agent.MysqlDaemon.GetDbConnection(dbconfigName)
if err != nil {
return nil, err
}
Expand Down
6 changes: 4 additions & 2 deletions go/vt/tabletmanager/agentrpctest/test_agent_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ var testExecuteFetchResult = &mproto.QueryResult{
},
}

func (fra *fakeRPCAgent) ExecuteFetch(ctx context.Context, query string, maxrows int, wantFields, disableBinlogs bool) (*mproto.QueryResult, error) {
func (fra *fakeRPCAgent) ExecuteFetch(ctx context.Context, query string, maxrows int, wantFields, disableBinlogs bool, dbconfigName string) (*mproto.QueryResult, error) {
compare(fra.t, "ExecuteFetch query", query, testExecuteFetchQuery)
compare(fra.t, "ExecuteFetch maxrows", maxrows, testExecuteFetchMaxRows)
compareBool(fra.t, "ExecuteFetch wantFields", wantFields)
Expand All @@ -454,7 +454,9 @@ func (fra *fakeRPCAgent) ExecuteFetch(ctx context.Context, query string, maxrows
}

func agentRPCTestExecuteFetch(ctx context.Context, t *testing.T, client tmclient.TabletManagerClient, ti *topo.TabletInfo) {
qr, err := client.ExecuteFetch(ctx, ti, testExecuteFetchQuery, testExecuteFetchMaxRows, true, true)
qr, err := client.ExecuteFetchAsDba(ctx, ti, testExecuteFetchQuery, testExecuteFetchMaxRows, true, true)
compareError(t, "ExecuteFetch", err, qr, testExecuteFetchResult)
qr, err = client.ExecuteFetchAsApp(ctx, ti, testExecuteFetchQuery, testExecuteFetchMaxRows, true, true)
compareError(t, "ExecuteFetch", err, qr, testExecuteFetchResult)
}

Expand Down
10 changes: 8 additions & 2 deletions go/vt/tabletmanager/faketmclient/fake_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,14 @@ func (client *FakeTabletManagerClient) ApplySchema(ctx context.Context, tablet *
return &scr, nil
}

// ExecuteFetch is part of the tmclient.TabletManagerClient interface
func (client *FakeTabletManagerClient) ExecuteFetch(ctx context.Context, tablet *topo.TabletInfo, query string, maxRows int, wantFields, disableBinlogs bool) (*mproto.QueryResult, error) {
// ExecuteFetchAsDba is part of the tmclient.TabletManagerClient interface
func (client *FakeTabletManagerClient) ExecuteFetchAsDba(ctx context.Context, tablet *topo.TabletInfo, query string, maxRows int, wantFields, disableBinlogs bool) (*mproto.QueryResult, error) {
var qr mproto.QueryResult
return &qr, nil
}

// ExecuteFetchAsApp is part of the tmclient.TabletManagerClient interface
func (client *FakeTabletManagerClient) ExecuteFetchAsApp(ctx context.Context, tablet *topo.TabletInfo, query string, maxRows int, wantFields, disableBinlogs bool) (*mproto.QueryResult, error) {
var qr mproto.QueryResult
return &qr, nil
}
Expand Down
1 change: 1 addition & 0 deletions go/vt/tabletmanager/gorpcproto/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type ExecuteFetchArgs struct {
MaxRows int
WantFields bool
DisableBinlogs bool
DBConfigName string
}

// gorpc doesn't support returning a streaming type during streaming
Expand Down
27 changes: 24 additions & 3 deletions go/vt/tabletmanager/gorpctmclient/gorpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,10 +219,31 @@ func (client *GoRPCTabletManagerClient) ApplySchema(ctx context.Context, tablet
return &scr, nil
}

// ExecuteFetch is part of the tmclient.TabletManagerClient interface
func (client *GoRPCTabletManagerClient) ExecuteFetch(ctx context.Context, tablet *topo.TabletInfo, query string, maxRows int, wantFields, disableBinlogs bool) (*mproto.QueryResult, error) {
// ExecuteFetchAsDba is part of the tmclient.TabletManagerClient interface
func (client *GoRPCTabletManagerClient) ExecuteFetchAsDba(ctx context.Context, tablet *topo.TabletInfo, query string, maxRows int, wantFields, disableBinlogs bool) (*mproto.QueryResult, error) {
var qr mproto.QueryResult
if err := client.rpcCallTablet(ctx, tablet, actionnode.TABLET_ACTION_EXECUTE_FETCH, &gorpcproto.ExecuteFetchArgs{Query: query, MaxRows: maxRows, WantFields: wantFields, DisableBinlogs: disableBinlogs}, &qr); err != nil {
if err := client.rpcCallTablet(ctx, tablet, actionnode.TABLET_ACTION_EXECUTE_FETCH, &gorpcproto.ExecuteFetchArgs{
Query: query,
MaxRows: maxRows,
WantFields: wantFields,
DisableBinlogs: disableBinlogs,
DBConfigName: "dba",
}, &qr); err != nil {
return nil, err
}
return &qr, nil
}

// ExecuteFetchAsApp is part of the tmclient.TabletManagerClient interface
func (client *GoRPCTabletManagerClient) ExecuteFetchAsApp(ctx context.Context, tablet *topo.TabletInfo, query string, maxRows int, wantFields, disableBinlogs bool) (*mproto.QueryResult, error) {
var qr mproto.QueryResult
if err := client.rpcCallTablet(ctx, tablet, actionnode.TABLET_ACTION_EXECUTE_FETCH, &gorpcproto.ExecuteFetchArgs{
Query: query,
MaxRows: maxRows,
WantFields: wantFields,
DisableBinlogs: disableBinlogs,
DBConfigName: "app",
}, &qr); err != nil {
return nil, err
}
return &qr, nil
Expand Down
2 changes: 1 addition & 1 deletion go/vt/tabletmanager/gorpctmserver/gorpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func (tm *TabletManager) ApplySchema(ctx context.Context, args *myproto.SchemaCh
// ExecuteFetch wraps RPCAgent.
func (tm *TabletManager) ExecuteFetch(ctx context.Context, args *gorpcproto.ExecuteFetchArgs, reply *mproto.QueryResult) error {
return tm.agent.RPCWrap(ctx, actionnode.TABLET_ACTION_EXECUTE_FETCH, args, reply, func() error {
qr, err := tm.agent.ExecuteFetch(ctx, args.Query, args.MaxRows, args.WantFields, args.DisableBinlogs)
qr, err := tm.agent.ExecuteFetch(ctx, args.Query, args.MaxRows, args.WantFields, args.DisableBinlogs, args.DBConfigName)
if err == nil {
*reply = *qr
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/tabletmanager/tmclient/rpc_client_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ type TabletManagerClient interface {
// ApplySchema will apply a schema change
ApplySchema(ctx context.Context, tablet *topo.TabletInfo, change *myproto.SchemaChange) (*myproto.SchemaChangeResult, error)

// ExecuteFetch executes a query remotely using the DBA pool
// ExecuteFetchAsDba executes a query remotely using the DBA pool
ExecuteFetchAsDba(ctx context.Context, tablet *topo.TabletInfo, query string, maxRows int, wantFields, disableBinlogs bool) (*mproto.QueryResult, error)

// ExecuteFetchAsApp executes a query remotely using the App pool
Expand Down
8 changes: 4 additions & 4 deletions go/vt/vtctl/vtctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ var commands = []commandGroup{
command{"ExecuteHook", commandExecuteHook,
"<tablet alias> <hook name> [<param1=value1> <param2=value2> ...]",
"This runs the specified hook on the given tablet."},
command{"ExecuteFetch", commandExecuteFetch,
command{"ExecuteFetchAsDba", commandExecuteFetchAsDba,
"[--max_rows=10000] [--want_fields] [--disable_binlogs] <tablet alias> <sql command>",
"Runs the given sql command as a DBA on the remote tablet"},
},
Expand Down Expand Up @@ -957,23 +957,23 @@ func commandClone(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.Fla
return wr.Clone(ctx, srcTabletAlias, dstTabletAliases, *force, *concurrency, *fetchConcurrency, *fetchRetryCount, *serverMode)
}

func commandExecuteFetch(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
func commandExecuteFetchAsDba(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
maxRows := subFlags.Int("max_rows", 10000, "maximum number of rows to allow in reset")
wantFields := subFlags.Bool("want_fields", false, "also get the field names")
disableBinlogs := subFlags.Bool("disable_binlogs", false, "disable writing to binlogs during the query")
if err := subFlags.Parse(args); err != nil {
return err
}
if subFlags.NArg() != 2 {
return fmt.Errorf("action ExecuteFetch requires <tablet alias> <sql command>")
return fmt.Errorf("action ExecuteFetchAsDba requires <tablet alias> <sql command>")
}

alias, err := topo.ParseTabletAliasString(subFlags.Arg(0))
if err != nil {
return err
}
query := subFlags.Arg(1)
qr, err := wr.ExecuteFetch(ctx, alias, query, *maxRows, *wantFields, *disableBinlogs)
qr, err := wr.ExecuteFetchAsDba(ctx, alias, query, *maxRows, *wantFields, *disableBinlogs)
if err == nil {
wr.Logger().Printf("%v\n", jscfg.ToJson(qr))
}
Expand Down
4 changes: 2 additions & 2 deletions go/vt/worker/clone_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func executeFetchWithRetries(ctx context.Context, wr *wrangler.Wrangler, ti *top
defer retryCancel()
for {
tryCtx, cancel := context.WithTimeout(retryCtx, 2*time.Minute)
_, err := wr.TabletManagerClient().ExecuteFetch(tryCtx, ti, command, 0, false, disableBinLogs)
_, err := wr.TabletManagerClient().ExecuteFetchAsApp(tryCtx, ti, command, 0, false, disableBinLogs)
cancel()
switch {
case err == nil:
Expand Down Expand Up @@ -190,7 +190,7 @@ func findChunks(ctx context.Context, wr *wrangler.Wrangler, ti *topo.TabletInfo,
// get the min and max of the leading column of the primary key
query := fmt.Sprintf("SELECT MIN(%v), MAX(%v) FROM %v.%v", td.PrimaryKeyColumns[0], td.PrimaryKeyColumns[0], ti.DbName(), td.Name)
ctx, cancel := context.WithTimeout(ctx, 60*time.Second)
qr, err := wr.TabletManagerClient().ExecuteFetch(ctx, ti, query, 1, true, false)
qr, err := wr.TabletManagerClient().ExecuteFetchAsApp(ctx, ti, query, 1, true, false)
cancel()
if err != nil {
wr.Logger().Infof("Not splitting table %v into multiple chunks: %v", td.Name, err)
Expand Down

0 comments on commit f694728

Please sign in to comment.