diff --git a/go/db/db.go b/go/db/db.go index 4080875af..03d94b839 100644 --- a/go/db/db.go +++ b/go/db/db.go @@ -17,6 +17,7 @@ package db import ( + "context" "database/sql" "errors" "fmt" @@ -256,7 +257,7 @@ func versionIsDeployed(db *sql.DB) (result bool, err error) { } // registerOrchestratorDeployment updates the orchestrator_metadata table upon successful deployment -func registerOrchestratorDeployment(db *sql.DB) error { +func registerOrchestratorDeployment(ctx context.Context, db *sql.DB) error { query := ` replace into orchestrator_db_deployments ( deployed_version, deployed_timestamp @@ -264,7 +265,7 @@ func registerOrchestratorDeployment(db *sql.DB) error { ?, NOW() ) ` - if _, err := execInternal(db, query, config.RuntimeCLIFlags.ConfiguredVersion); err != nil { + if _, err := execInternal(ctx, db, query, config.RuntimeCLIFlags.ConfiguredVersion); err != nil { log.Fatalf("Unable to write to orchestrator_metadata: %+v", err) } log.Debugf("Migrated database schema to version [%+v]", config.RuntimeCLIFlags.ConfiguredVersion) @@ -273,7 +274,7 @@ func registerOrchestratorDeployment(db *sql.DB) error { // deployStatements will issue given sql queries that are not already known to be deployed. // This iterates both lists (to-run and already-deployed) and also verifies no contraditions. -func deployStatements(db *sql.DB, queries []string) error { +func deployStatements(ctx context.Context, db *sql.DB, queries []string) error { tx, err := db.Begin() if err != nil { log.Fatale(err) @@ -335,6 +336,7 @@ func deployStatements(db *sql.DB, queries []string) error { // initOrchestratorDB attempts to create/upgrade the orchestrator backend database. It is created once in the // application's lifetime. func initOrchestratorDB(db *sql.DB) error { + ctx := context.Background() log.Debug("Initializing orchestrator") versionAlreadyDeployed, err := versionIsDeployed(db) @@ -346,9 +348,9 @@ func initOrchestratorDB(db *sql.DB) error { log.Fatalf("PanicIfDifferentDatabaseDeploy is set. Configured version %s is not the version found in the database", config.RuntimeCLIFlags.ConfiguredVersion) } log.Debugf("Migrating database schema") - deployStatements(db, generateSQLBase) - deployStatements(db, generateSQLPatches) - registerOrchestratorDeployment(db) + deployStatements(ctx, db, generateSQLBase) + deployStatements(ctx, db, generateSQLPatches) + registerOrchestratorDeployment(ctx, db) if IsSQLite() { ExecOrchestrator(`PRAGMA journal_mode = WAL`) @@ -359,13 +361,13 @@ func initOrchestratorDB(db *sql.DB) error { } // execInternal -func execInternal(db *sql.DB, query string, args ...interface{}) (sql.Result, error) { +func execInternal(ctx context.Context, db *sql.DB, query string, args ...interface{}) (sql.Result, error) { var err error query, err = translateStatement(query) if err != nil { return nil, err } - res, err := sqlutils.ExecNoPrepare(db, query, args...) + res, err := sqlutils.ExecNoPrepare(ctx, db, query, args...) return res, err } @@ -380,7 +382,8 @@ func ExecOrchestrator(query string, args ...interface{}) (sql.Result, error) { if err != nil { return nil, err } - res, err := sqlutils.ExecNoPrepare(db, query, args...) + ctx := context.Background() + res, err := sqlutils.ExecNoPrepare(ctx, db, query, args...) return res, err } diff --git a/go/http/api.go b/go/http/api.go index 7e1625436..94a6fc2c5 100644 --- a/go/http/api.go +++ b/go/http/api.go @@ -208,6 +208,7 @@ func (this *HttpAPI) Instance(params martini.Params, r render.Render, req *http. // AsyncDiscover issues an asynchronous read on an instance. This is // useful for bulk loads of a new set of instances and will not block // if the instance is slow to respond or not reachable. +// It will also not block the raft queue in the event communication to discover instance hangs. func (this *HttpAPI) AsyncDiscover(params martini.Params, r render.Render, req *http.Request, user auth.User) { if !isAuthorizedForAction(req, user) { Respond(r, &APIResponse{Code: ERROR, Message: "Unauthorized"}) @@ -218,7 +219,12 @@ func (this *HttpAPI) AsyncDiscover(params martini.Params, r render.Render, req * Respond(r, &APIResponse{Code: ERROR, Message: err.Error()}) return } - go this.Discover(params, r, req, user) + + if orcraft.IsRaftEnabled() { + orcraft.PublishCommand("async-discover", instanceKey) + } else { + go logic.DiscoverInstance(instanceKey) + } Respond(r, &APIResponse{Code: OK, Message: fmt.Sprintf("Asynchronous discovery initiated for Instance: %+v", instanceKey)}) } diff --git a/go/inst/instance_dao.go b/go/inst/instance_dao.go index 04e4168d7..7fbd997d5 100644 --- a/go/inst/instance_dao.go +++ b/go/inst/instance_dao.go @@ -18,10 +18,10 @@ package inst import ( "bytes" + "context" "database/sql" "errors" "fmt" - "github.com/go-sql-driver/mysql" "regexp" "runtime" "sort" @@ -30,6 +30,8 @@ import ( "sync" "time" + "github.com/go-sql-driver/mysql" + "github.com/openark/golib/log" "github.com/openark/golib/math" "github.com/openark/golib/sqlutils" @@ -231,13 +233,13 @@ func unrecoverableError(err error) bool { // Check if the instance is a MaxScale binlog server (a proxy not a real // MySQL server) and also update the resolved hostname -func (instance *Instance) checkMaxScale(db *sql.DB, latency *stopwatch.NamedStopwatch) (isMaxScale bool, resolvedHostname string, err error) { +func (instance *Instance) checkMaxScale(ctx context.Context, db *sql.DB, latency *stopwatch.NamedStopwatch) (isMaxScale bool, resolvedHostname string, err error) { if config.Config.SkipMaxScaleCheck { return isMaxScale, resolvedHostname, err } latency.Start("instance") - err = sqlutils.QueryRowsMap(db, "show variables like 'maxscale%'", func(m sqlutils.RowMap) error { + err = sqlutils.QueryRowsMapContext(ctx, db, "show variables like 'maxscale%'", func(m sqlutils.RowMap) error { if m.GetString("Variable_name") == "MAXSCALE_VERSION" { originalVersion := m.GetString("Value") if originalVersion == "" { @@ -311,6 +313,9 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, } }() + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(config.Config.MySQLDiscoveryReadTimeoutSeconds)*time.Second) + defer cancel() + var waitGroup sync.WaitGroup var serverUuidWaitGroup sync.WaitGroup readingStartTime := time.Now() @@ -345,10 +350,14 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, if err != nil { goto Cleanup } + if db.Stats().InUse >= config.MySQLTopologyMaxPoolConnections { + err = fmt.Errorf("Database connections exhausted for %v", *instanceKey) + goto Cleanup + } instance.Key = *instanceKey - if isMaxScale, resolvedHostname, err = instance.checkMaxScale(db, latency); err != nil { + if isMaxScale, resolvedHostname, err = instance.checkMaxScale(ctx, db, latency); err != nil { // We do not "goto Cleanup" here, although it should be the correct flow. // Reason is 5.7's new security feature that requires GRANTs on performance_schema.session_variables. // There is a wrong decision making in this design and the migration path to 5.7 will be difficult. @@ -369,17 +378,17 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, // Buggy buggy maxscale 1.1.0. Reported Master_Host can be corrupted. // Therefore we (currently) take @@hostname (which is masquerading as master host anyhow) - err = db.QueryRow("select @@hostname").Scan(&maxScaleMasterHostname) + err = db.QueryRowContext(ctx, "select @@hostname").Scan(&maxScaleMasterHostname) if err != nil { goto Cleanup } } if isMaxScale110 { // Only this is supported: - db.QueryRow("select @@server_id").Scan(&instance.ServerID) + db.QueryRowContext(ctx, "select @@server_id").Scan(&instance.ServerID) } else { - db.QueryRow("select @@global.server_id").Scan(&instance.ServerID) - db.QueryRow("select @@global.server_uuid").Scan(&instance.ServerUUID) + db.QueryRowContext(ctx, "select @@global.server_id").Scan(&instance.ServerID) + db.QueryRowContext(ctx, "select @@global.server_uuid").Scan(&instance.ServerUUID) } } else { // NOT MaxScale @@ -391,7 +400,7 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, defer waitGroup.Done() var dummy string // show global status works just as well with 5.6 & 5.7 (5.7 moves variables to performance_schema) - err := db.QueryRow("show global status like 'Uptime'").Scan(&dummy, &instance.Uptime) + err := db.QueryRowContext(ctx, "show global status like 'Uptime'").Scan(&dummy, &instance.Uptime) if err != nil { logReadTopologyInstanceError(instanceKey, "show global status like 'Uptime'", err) @@ -408,7 +417,7 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, } var mysqlHostname, mysqlReportHost string - err = db.QueryRow("select @@global.hostname, ifnull(@@global.report_host, ''), @@global.server_id, @@global.version, @@global.version_comment, @@global.read_only, @@global.binlog_format, @@global.log_bin, @@global.log_slave_updates").Scan( + err = db.QueryRowContext(ctx, "select @@global.hostname, ifnull(@@global.report_host, ''), @@global.server_id, @@global.version, @@global.version_comment, @@global.read_only, @@global.binlog_format, @@global.log_bin, @@global.log_slave_updates").Scan( &mysqlHostname, &mysqlReportHost, &instance.ServerID, &instance.Version, &instance.VersionComment, &instance.ReadOnly, &instance.Binlog_format, &instance.LogBinEnabled, &instance.LogReplicationUpdatesEnabled) if err != nil { goto Cleanup @@ -433,7 +442,7 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, waitGroup.Add(1) go func() { defer waitGroup.Done() - err := sqlutils.QueryRowsMap(db, "show master status", func(m sqlutils.RowMap) error { + err := sqlutils.QueryRowsMapContext(ctx, db, "show master status", func(m sqlutils.RowMap) error { var err error instance.SelfBinlogCoordinates.LogFile = m.GetString("File") instance.SelfBinlogCoordinates.LogPos = m.GetInt64("Position") @@ -449,7 +458,7 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, defer waitGroup.Done() semiSyncMasterPluginLoaded := false semiSyncReplicaPluginLoaded := false - err := sqlutils.QueryRowsMap(db, "show global variables like 'rpl_semi_sync_%'", func(m sqlutils.RowMap) error { + err := sqlutils.QueryRowsMapContext(ctx, db, "show global variables like 'rpl_semi_sync_%'", func(m sqlutils.RowMap) error { if m.GetString("Variable_name") == "rpl_semi_sync_master_enabled" { instance.SemiSyncMasterEnabled = (m.GetString("Value") == "ON") semiSyncMasterPluginLoaded = true @@ -471,7 +480,7 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, waitGroup.Add(1) go func() { defer waitGroup.Done() - err := sqlutils.QueryRowsMap(db, "show global status like 'rpl_semi_sync_%'", func(m sqlutils.RowMap) error { + err := sqlutils.QueryRowsMapContext(ctx, db, "show global status like 'rpl_semi_sync_%'", func(m sqlutils.RowMap) error { if m.GetString("Variable_name") == "Rpl_semi_sync_master_status" { instance.SemiSyncMasterStatus = (m.GetString("Value") == "ON") } else if m.GetString("Variable_name") == "Rpl_semi_sync_master_clients" { @@ -496,14 +505,14 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, // ... // @@gtid_mode only available in Orcale MySQL >= 5.6 // Previous version just issued this query brute-force, but I don't like errors being issued where they shouldn't. - _ = db.QueryRow("select @@global.gtid_mode, @@global.server_uuid, @@global.gtid_executed, @@global.gtid_purged, @@global.master_info_repository = 'TABLE', @@global.binlog_row_image").Scan(&instance.GTIDMode, &instance.ServerUUID, &instance.ExecutedGtidSet, &instance.GtidPurged, &masterInfoRepositoryOnTable, &instance.BinlogRowImage) + _ = db.QueryRowContext(ctx, "select @@global.gtid_mode, @@global.server_uuid, @@global.gtid_executed, @@global.gtid_purged, @@global.master_info_repository = 'TABLE', @@global.binlog_row_image").Scan(&instance.GTIDMode, &instance.ServerUUID, &instance.ExecutedGtidSet, &instance.GtidPurged, &masterInfoRepositoryOnTable, &instance.BinlogRowImage) if instance.GTIDMode != "" && instance.GTIDMode != "OFF" { instance.SupportsOracleGTID = true } if config.Config.ReplicationCredentialsQuery != "" { instance.ReplicationCredentialsAvailable = true } else if masterInfoRepositoryOnTable { - _ = db.QueryRow("select count(*) > 0 and MAX(User_name) != '' from mysql.slave_master_info").Scan(&instance.ReplicationCredentialsAvailable) + _ = db.QueryRowContext(ctx, "select count(*) > 0 and MAX(User_name) != '' from mysql.slave_master_info").Scan(&instance.ReplicationCredentialsAvailable) } }() } @@ -549,7 +558,7 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, instance.ReplicationIOThreadState = ReplicationThreadStateNoThread instance.ReplicationSQLThreadState = ReplicationThreadStateNoThread - err = sqlutils.QueryRowsMap(db, "show slave status", func(m sqlutils.RowMap) error { + err = sqlutils.QueryRowsMapContext(ctx, db, "show slave status", func(m sqlutils.RowMap) error { instance.HasReplicationCredentials = (m.GetString("Master_User") != "") instance.ReplicationIOThreadState = ReplicationThreadStateFromStatus(m.GetString("Slave_IO_Running")) instance.ReplicationSQLThreadState = ReplicationThreadStateFromStatus(m.GetString("Slave_SQL_Running")) @@ -627,7 +636,7 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, waitGroup.Add(1) go func() { defer waitGroup.Done() - if err := db.QueryRow(config.Config.ReplicationLagQuery).Scan(&instance.ReplicationLagSeconds); err == nil { + if err := db.QueryRowContext(ctx, config.Config.ReplicationLagQuery).Scan(&instance.ReplicationLagSeconds); err == nil { if instance.ReplicationLagSeconds.Valid && instance.ReplicationLagSeconds.Int64 < 0 { log.Warningf("Host: %+v, instance.SlaveLagSeconds < 0 [%+v], correcting to 0", instanceKey, instance.ReplicationLagSeconds.Int64) instance.ReplicationLagSeconds.Int64 = 0 @@ -649,7 +658,7 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, // Get replicas, either by SHOW SLAVE HOSTS or via PROCESSLIST // MaxScale does not support PROCESSLIST, so SHOW SLAVE HOSTS is the only option if config.Config.DiscoverByShowSlaveHosts || isMaxScale { - err := sqlutils.QueryRowsMap(db, `show slave hosts`, + err := sqlutils.QueryRowsMapContext(ctx, db, `show slave hosts`, func(m sqlutils.RowMap) error { // MaxScale 1.1 may trigger an error with this command, but // also we may see issues if anything on the MySQL server locks up. @@ -685,7 +694,7 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, waitGroup.Add(1) go func() { defer waitGroup.Done() - err := sqlutils.QueryRowsMap(db, ` + err := sqlutils.QueryRowsMapContext(ctx, db, ` select substring_index(host, ':', 1) as slave_hostname from @@ -714,7 +723,7 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, waitGroup.Add(1) go func() { defer waitGroup.Done() - err := sqlutils.QueryRowsMap(db, ` + err := sqlutils.QueryRowsMapContext(ctx, db, ` select substring(service_URI,9) mysql_host from @@ -740,7 +749,7 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, waitGroup.Add(1) go func() { defer waitGroup.Done() - err := db.QueryRow(config.Config.DetectDataCenterQuery).Scan(&instance.DataCenter) + err := db.QueryRowContext(ctx, config.Config.DetectDataCenterQuery).Scan(&instance.DataCenter) logReadTopologyInstanceError(instanceKey, "DetectDataCenterQuery", err) }() } @@ -749,7 +758,7 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, waitGroup.Add(1) go func() { defer waitGroup.Done() - err := db.QueryRow(config.Config.DetectRegionQuery).Scan(&instance.Region) + err := db.QueryRowContext(ctx, config.Config.DetectRegionQuery).Scan(&instance.Region) logReadTopologyInstanceError(instanceKey, "DetectRegionQuery", err) }() } @@ -758,7 +767,7 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, waitGroup.Add(1) go func() { defer waitGroup.Done() - err := db.QueryRow(config.Config.DetectPhysicalEnvironmentQuery).Scan(&instance.PhysicalEnvironment) + err := db.QueryRowContext(ctx, config.Config.DetectPhysicalEnvironmentQuery).Scan(&instance.PhysicalEnvironment) logReadTopologyInstanceError(instanceKey, "DetectPhysicalEnvironmentQuery", err) }() } @@ -767,7 +776,7 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, waitGroup.Add(1) go func() { defer waitGroup.Done() - err := db.QueryRow(config.Config.DetectInstanceAliasQuery).Scan(&instance.InstanceAlias) + err := db.QueryRowContext(ctx, config.Config.DetectInstanceAliasQuery).Scan(&instance.InstanceAlias) logReadTopologyInstanceError(instanceKey, "DetectInstanceAliasQuery", err) }() } @@ -776,7 +785,7 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, waitGroup.Add(1) go func() { defer waitGroup.Done() - err := db.QueryRow(config.Config.DetectSemiSyncEnforcedQuery).Scan(&instance.SemiSyncEnforced) + err := db.QueryRowContext(ctx, config.Config.DetectSemiSyncEnforcedQuery).Scan(&instance.SemiSyncEnforced) logReadTopologyInstanceError(instanceKey, "DetectSemiSyncEnforcedQuery", err) }() } @@ -830,7 +839,7 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, go func() { defer waitGroup.Done() var value string - err := db.QueryRow(config.Config.DetectPromotionRuleQuery).Scan(&value) + err := db.QueryRowContext(ctx, config.Config.DetectPromotionRuleQuery).Scan(&value) logReadTopologyInstanceError(instanceKey, "DetectPromotionRuleQuery", err) promotionRule, err := ParseCandidatePromotionRule(value) logReadTopologyInstanceError(instanceKey, "ParseCandidatePromotionRule", err) @@ -851,7 +860,7 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, // Only need to do on masters if config.Config.DetectClusterAliasQuery != "" { clusterAlias := "" - if err := db.QueryRow(config.Config.DetectClusterAliasQuery).Scan(&clusterAlias); err != nil { + if err := db.QueryRowContext(ctx, config.Config.DetectClusterAliasQuery).Scan(&clusterAlias); err != nil { logReadTopologyInstanceError(instanceKey, "DetectClusterAliasQuery", err) } else { instance.SuggestedClusterAlias = clusterAlias @@ -869,7 +878,7 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, if instance.ReplicationDepth == 0 && config.Config.DetectClusterDomainQuery != "" && !isMaxScale { // Only need to do on masters domainName := "" - if err := db.QueryRow(config.Config.DetectClusterDomainQuery).Scan(&domainName); err != nil { + if err := db.QueryRowContext(ctx, config.Config.DetectClusterDomainQuery).Scan(&domainName); err != nil { domainName = "" logReadTopologyInstanceError(instanceKey, "DetectClusterDomainQuery", err) } @@ -929,7 +938,7 @@ Cleanup: redactedMasterExecutedGtidSet, _ := NewOracleGtidSet(instance.masterExecutedGtidSet) redactedMasterExecutedGtidSet.RemoveUUID(instance.MasterUUID) - db.QueryRow("select gtid_subtract(?, ?)", redactedExecutedGtidSet.String(), redactedMasterExecutedGtidSet.String()).Scan(&instance.GtidErrant) + db.QueryRowContext(ctx, "select gtid_subtract(?, ?)", redactedExecutedGtidSet.String(), redactedMasterExecutedGtidSet.String()).Scan(&instance.GtidErrant) } } } diff --git a/go/inst/instance_topology_dao.go b/go/inst/instance_topology_dao.go index 1631fe8eb..7c9af38b3 100644 --- a/go/inst/instance_topology_dao.go +++ b/go/inst/instance_topology_dao.go @@ -63,7 +63,9 @@ func ExecInstance(instanceKey *InstanceKey, query string, args ...interface{}) ( if err != nil { return nil, err } - return sqlutils.ExecNoPrepare(db, query, args...) + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(config.Config.MySQLTopologyReadTimeoutSeconds)*time.Second) + defer cancel() + return sqlutils.ExecNoPrepare(ctx, db, query, args...) } // ExecuteOnTopology will execute given function while maintaining concurrency limit diff --git a/go/logic/command_applier.go b/go/logic/command_applier.go index 73f67fbc9..c6e2348a1 100644 --- a/go/logic/command_applier.go +++ b/go/logic/command_applier.go @@ -21,7 +21,7 @@ import ( "github.com/openark/orchestrator/go/inst" "github.com/openark/orchestrator/go/kv" - "github.com/openark/orchestrator/go/raft" + orcraft "github.com/openark/orchestrator/go/raft" "github.com/openark/golib/log" ) @@ -45,6 +45,8 @@ func (applier *CommandApplier) ApplyCommand(op string, value []byte) interface{} return applier.registerNode(value) case "discover": return applier.discover(value) + case "async-discover": + return applier.asyncDiscover(value) case "injected-pseudo-gtid": return applier.injectedPseudoGTID(value) case "forget": @@ -109,6 +111,14 @@ func (applier *CommandApplier) discover(value []byte) interface{} { return nil } +// asyncDiscover discover-s in a goroutine so that the discovery cannot block the raft path +// (e.g. a situation where communication with discovered instannce hangs could block the entire +// raft queue; this function is therefore safer) +func (applier *CommandApplier) asyncDiscover(value []byte) interface{} { + go applier.discover(value) + return nil +} + func (applier *CommandApplier) injectedPseudoGTID(value []byte) interface{} { var clusterName string if err := json.Unmarshal(value, &clusterName); err != nil { diff --git a/go/logic/topology_recovery.go b/go/logic/topology_recovery.go index 49cc36039..1e44aa12c 100644 --- a/go/logic/topology_recovery.go +++ b/go/logic/topology_recovery.go @@ -227,7 +227,7 @@ func getCountPendingRecoveries() int64 { func initializeTopologyRecoveryPostConfiguration() { config.WaitForConfigurationToBeLoaded() - emergencyReadTopologyInstanceMap = cache.New(time.Second, time.Millisecond*250) + emergencyReadTopologyInstanceMap = cache.New(time.Duration(config.Config.MySQLDiscoveryReadTimeoutSeconds)*time.Second, time.Millisecond*500) emergencyRestartReplicaTopologyInstanceMap = cache.New(time.Second*30, time.Second) emergencyOperationGracefulPeriodMap = cache.New(time.Second*5, time.Millisecond*500) } diff --git a/script/test-integration b/script/test-integration index 5fb96bf9f..c2a2ff78b 100755 --- a/script/test-integration +++ b/script/test-integration @@ -7,7 +7,7 @@ setup_mysql() { if [ ! -f "dbdeployer" ] ; then return fi - ./dbdeployer deploy single "5.7.21" --sandbox-binary $PWD/sandbox/binary --sandbox-home $PWD/sandboxes --sandbox-directory orc-sandbox --port=3306 + ./dbdeployer deploy single "5.7.26" --sandbox-binary $PWD/sandbox/binary --sandbox-home $PWD/sandboxes --sandbox-directory orc-sandbox --port=3306 mkdir -p bin ln -s /go/src/github.com/openark/orchestrator/sandboxes/orc-sandbox/use bin/mysql chmod +x bin/mysql diff --git a/tests/integration/orchestrator.conf.json b/tests/integration/orchestrator.conf.json index d9ac21612..b609f883c 100644 --- a/tests/integration/orchestrator.conf.json +++ b/tests/integration/orchestrator.conf.json @@ -15,7 +15,7 @@ "BackendDB": "backend-db-placeholder", "SQLite3DataFile": "sqlite-data-file-placeholder", "MySQLOrchestratorHost": "127.0.0.1", - "MySQLOrchestratorPort": 3306, + "MySQLOrchestratorPort": mysql-orchestrator-port-placeholder, "MySQLOrchestratorDatabase": "test", "MySQLOrchestratorCredentialsConfigFile": "/tmp/orchestrator-test-my.cnf", "MySQLOrchestratorSSLPrivateKeyFile": "", diff --git a/tests/integration/test.sh b/tests/integration/test.sh index d630817dc..923e72587 100755 --- a/tests/integration/test.sh +++ b/tests/integration/test.sh @@ -18,6 +18,7 @@ exec_command_file=/tmp/orchestrator-test.bash test_mysql_defaults_file=/tmp/orchestrator-test-my.cnf db_type="" sqlite_file="/tmp/orchestrator.db" +mysql_port="" mysql_args="--defaults-extra-file=${test_mysql_defaults_file} --default-character-set=utf8mb4 -s -s" function run_queries() { @@ -58,6 +59,8 @@ setup_mysql() { echo "mysql config (${test_mysql_defaults_file})" cat $test_mysql_defaults_file mysql $mysql_args -e "create database if not exists test" + mysql_port="$(mysql $mysql_args -e "select @@port")" + echo "mysql_port: $mysql_port" } check_db() { @@ -181,7 +184,7 @@ deploy_internal_db() { if [ $? -ne 0 ] ; then echo "ERROR deploy internal db failed" cat $test_logfile - return 1 + exit 1 fi echo "- deploy_internal_db result: $?" } @@ -190,6 +193,8 @@ generate_config_file() { cp ${tests_path}/orchestrator.conf.json ${test_config_file} sed -i -e "s/backend-db-placeholder/${db_type}/g" ${test_config_file} sed -i -e "s^sqlite-data-file-placeholder^${sqlite_file}^g" ${test_config_file} + sed -i -e "s^mysql-orchestrator-port-placeholder^${mysql_port:-3306}^g" ${test_config_file} + touch "$test_mysql_defaults_file" # required even for sqlite because config file references the my.cnf cgf file echo "- generate_config_file OK" } diff --git a/vendor/github.com/openark/golib/sqlutils/sqlutils.go b/vendor/github.com/openark/golib/sqlutils/sqlutils.go index f92b5e10f..325e47a4d 100644 --- a/vendor/github.com/openark/golib/sqlutils/sqlutils.go +++ b/vendor/github.com/openark/golib/sqlutils/sqlutils.go @@ -17,6 +17,7 @@ package sqlutils import ( + "context" "database/sql" "encoding/json" "errors" @@ -210,7 +211,7 @@ func GetSQLiteDB(dbFile string) (*sql.DB, bool, error) { func RowToArray(rows *sql.Rows, columns []string) []CellData { buff := make([]interface{}, len(columns)) data := make([]CellData, len(columns)) - for i, _ := range buff { + for i := range buff { buff[i] = data[i].NullString() } rows.Scan(buff...) @@ -257,6 +258,12 @@ func ScanRowsToMaps(rows *sql.Rows, on_row func(RowMap) error) error { // QueryRowsMap is a convenience function allowing querying a result set while poviding a callback // function activated per read row. func QueryRowsMap(db *sql.DB, query string, on_row func(RowMap) error, args ...interface{}) (err error) { + return QueryRowsMapContext(context.Background(), db, query, on_row, args...) +} + +// QueryRowsMapContext is a convenience function allowing querying a result set while poviding a callback +// function activated per read row. +func QueryRowsMapContext(ctx context.Context, db *sql.DB, query string, on_row func(RowMap) error, args ...interface{}) (err error) { defer func() { if derr := recover(); derr != nil { err = fmt.Errorf("QueryRowsMap unexpected error: %+v", derr) @@ -264,7 +271,7 @@ func QueryRowsMap(db *sql.DB, query string, on_row func(RowMap) error, args ...i }() var rows *sql.Rows - rows, err = db.Query(query, args...) + rows, err = db.QueryContext(ctx, query, args...) if rows != nil { defer rows.Close() } @@ -332,14 +339,14 @@ func QueryRowsMapBuffered(db *sql.DB, query string, on_row func(RowMap) error, a } // ExecNoPrepare executes given query using given args on given DB, without using prepared statements. -func ExecNoPrepare(db *sql.DB, query string, args ...interface{}) (res sql.Result, err error) { +func ExecNoPrepare(ctx context.Context, db *sql.DB, query string, args ...interface{}) (res sql.Result, err error) { defer func() { if derr := recover(); derr != nil { err = errors.New(fmt.Sprintf("ExecNoPrepare unexpected error: %+v", derr)) } }() - res, err = db.Exec(query, args...) + res, err = db.ExecContext(ctx, query, args...) if err != nil { log.Errore(err) }