Skip to content

Commit

Permalink
SidecarDB Init: don't fail on schema init errors (#12328)
Browse files Browse the repository at this point in the history
* Don't fail on schema init errors

Signed-off-by: Rohit Nayak <rohit@planetscale.com>

* Fix debug var attribute name

Signed-off-by: Rohit Nayak <rohit@planetscale.com>

* Fix typo in debug var

Signed-off-by: Rohit Nayak <rohit@planetscale.com>

* Address review comment

Signed-off-by: Rohit Nayak <rohit@planetscale.com>

* Address review comments

Signed-off-by: Rohit Nayak <rohit@planetscale.com>

---------

Signed-off-by: Rohit Nayak <rohit@planetscale.com>
  • Loading branch information
rohit-nayak-ps committed Feb 22, 2023
1 parent fd8c795 commit a5d0cdc
Show file tree
Hide file tree
Showing 5 changed files with 220 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"testing"
"time"

"vitess.io/vitess/go/vt/sidecardb"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/test/endtoend/cluster"
Expand Down Expand Up @@ -164,11 +166,12 @@ func waitForSourcePort(ctx context.Context, t *testing.T, tablet cluster.Vttable
return fmt.Errorf("time out before source port became %v for %v", expectedPort, tablet.Alias)
}

func getSidecarDbDDLQueryCount(tablet *cluster.VttabletProcess) (int64, error) {
func getSidecarDBDDLQueryCount(tablet *cluster.VttabletProcess) (int64, error) {
vars := tablet.GetVars()
val, ok := vars["SidecarDbDDLQueryCount"]
key := sidecardb.StatsKeyQueryCount
val, ok := vars[key]
if !ok {
return 0, fmt.Errorf("SidecarDbDDLQueryCount not found in debug/vars")
return 0, fmt.Errorf("%s not found in debug/vars", key)
}
return int64(val.(float64)), nil
}
Expand All @@ -178,7 +181,7 @@ func TestReplicationRepairAfterPrimaryTabletChange(t *testing.T) {
err := waitForSourcePort(ctx, t, replicaTablet, int32(primaryTablet.MySQLPort))
require.NoError(t, err)

sidecarDDLCount, err := getSidecarDbDDLQueryCount(primaryTablet.VttabletProcess)
sidecarDDLCount, err := getSidecarDBDDLQueryCount(primaryTablet.VttabletProcess)
require.NoError(t, err)
// sidecar db should create all _vt tables when vttablet started
require.Greater(t, sidecarDDLCount, int64(0))
Expand All @@ -197,7 +200,7 @@ func TestReplicationRepairAfterPrimaryTabletChange(t *testing.T) {
err = waitForSourcePort(ctx, t, replicaTablet, int32(newMysqlPort))
require.NoError(t, err)

sidecarDDLCount, err = getSidecarDbDDLQueryCount(primaryTablet.VttabletProcess)
sidecarDDLCount, err = getSidecarDBDDLQueryCount(primaryTablet.VttabletProcess)
require.NoError(t, err)
// sidecardb should find the desired _vt schema and not apply any new creates or upgrades when the tablet comes up again
require.Equal(t, sidecarDDLCount, int64(0))
Expand Down
1 change: 1 addition & 0 deletions go/test/endtoend/vreplication/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,7 @@ func getDebugVar(t *testing.T, port int, varPath []string) (string, error) {
var val []byte
var err error
url := fmt.Sprintf("http://localhost:%d/debug/vars", port)
log.Infof("url: %s, varPath: %s", url, strings.Join(varPath, ":"))
body := getHTTPBody(url)
val, _, _, err = jsonparser.Get([]byte(body), varPath...)
require.NoError(t, err)
Expand Down
4 changes: 3 additions & 1 deletion go/test/endtoend/vreplication/sidecardb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"strconv"
"testing"

"vitess.io/vitess/go/vt/sidecardb"

"github.com/stretchr/testify/require"
"github.com/tidwall/gjson"
)
Expand Down Expand Up @@ -130,7 +132,7 @@ func modifySidecarDBSchema(t *testing.T, vc *VitessCluster, tabletID string, ddl
}

func getNumExecutedDDLQueries(t *testing.T, port int) int {
val, err := getDebugVar(t, port, []string{"SidecarDbDDLQueryCount"})
val, err := getDebugVar(t, port, []string{sidecardb.StatsKeyQueryCount})
require.NoError(t, err)
i, err := strconv.Atoi(val)
require.NoError(t, err)
Expand Down
95 changes: 84 additions & 11 deletions go/vt/sidecardb/sidecardb.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import (
"regexp"
"runtime"
"strings"
"sync"

"vitess.io/vitess/go/history"
"vitess.io/vitess/go/mysql"

"vitess.io/vitess/go/mysql/fakesqldb"
Expand Down Expand Up @@ -71,10 +73,50 @@ func (t *sidecarTable) String() string {

var sidecarTables []*sidecarTable
var ddlCount *stats.Counter
var ddlErrorCount *stats.Counter
var ddlErrorHistory *history.History
var mu sync.Mutex

type ddlError struct {
tableName string
err error
}

const maxDDLErrorHistoryLength = 100

// failOnSchemaInitError decides whether we fail the schema init process when we encounter an error while
// applying a table schema upgrade DDL or continue with the next table.
// If true, tablets will not launch. The cluster will not come up until the issue is resolved.
// If false, the init process will continue trying to upgrade other tables. So some functionality might be broken
// due to an incorrect schema, but the cluster should come up and serve queries.
// This is an operational trade-off: if we always fail it could cause a major incident since the entire cluster will be down.
// If we are more permissive, it could cause hard-to-detect errors, because a module
// doesn't load or behaves incorrectly due to an incomplete upgrade. Errors however will be reported and if the
// related stats endpoints are monitored we should be able to diagnose/get alerted in a timely fashion.
const failOnSchemaInitError = false

const StatsKeyPrefix = "SidecarDBDDL"
const StatsKeyQueryCount = StatsKeyPrefix + "QueryCount"
const StatsKeyErrorCount = StatsKeyPrefix + "ErrorCount"
const StatsKeyErrors = StatsKeyPrefix + "Errors"

func init() {
initSchemaFiles()
ddlCount = stats.NewCounter("SidecarDbDDLQueryCount", "Number of create/upgrade queries executed")
ddlCount = stats.NewCounter(StatsKeyQueryCount, "Number of queries executed")
ddlErrorCount = stats.NewCounter(StatsKeyErrorCount, "Number of errors during sidecar schema upgrade")
ddlErrorHistory = history.New(maxDDLErrorHistoryLength)
stats.Publish(StatsKeyErrors, stats.StringMapFunc(func() map[string]string {
mu.Lock()
defer mu.Unlock()
result := make(map[string]string, len(ddlErrorHistory.Records()))
for _, e := range ddlErrorHistory.Records() {
d, ok := e.(*ddlError)
if ok {
result[d.tableName] = d.err.Error()
}
}
return result
}))
}

func validateSchemaDefinition(name, schema string) (string, error) {
Expand All @@ -90,14 +132,14 @@ func validateSchemaDefinition(name, schema string) (string, error) {
tableName := createTable.Table.Name.String()
qualifier := createTable.Table.Qualifier.String()
if qualifier != SidecarDBName {
return "", fmt.Errorf("database qualifier specified for the %s table is %s rather than the expected value of %s",
return "", vterrors.Errorf(vtrpcpb.Code_INTERNAL, "database qualifier specified for the %s table is %s rather than the expected value of %s",
name, qualifier, SidecarDBName)
}
if !strings.EqualFold(tableName, name) {
return "", fmt.Errorf("table name of %s does not match the table name specified within the file: %s", name, tableName)
return "", vterrors.Errorf(vtrpcpb.Code_INTERNAL, "table name of %s does not match the table name specified within the file: %s", name, tableName)
}
if !createTable.IfNotExists {
return "", fmt.Errorf("%s file did not include the required IF NOT EXISTS clause in the CREATE TABLE statement for the %s table", name, tableName)
return "", vterrors.Errorf(vtrpcpb.Code_NOT_FOUND, "%s file did not include the required IF NOT EXISTS clause in the CREATE TABLE statement for the %s table", name, tableName)
}
normalizedSchema := sqlparser.CanonicalString(createTable)
return normalizedSchema, nil
Expand All @@ -123,7 +165,7 @@ func initSchemaFiles() {
case 2:
module = fmt.Sprintf("%s/%s", dirparts[0], dirparts[1])
default:
return fmt.Errorf("unexpected path value of %s specified for sidecar schema table; expected structure is <module>[/<submodule>]/<tablename>.sql", dir)
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected path value of %s specified for sidecar schema table; expected structure is <module>[/<submodule>]/<tablename>.sql", dir)
}

name := strings.Split(fname, ".")[0]
Expand Down Expand Up @@ -163,11 +205,28 @@ type schemaInit struct {
// Exec is a callback that has to be passed to Init() to execute the specified query in the database.
type Exec func(ctx context.Context, query string, maxRows int, useDB bool) (*sqltypes.Result, error)

// GetDDLCount metric returns the count of sidecardb ddls that have been run as part of this vttablet's init process.
// GetDDLCount returns the count of sidecardb DDLs that have been run as part of this vttablet's init process.
func GetDDLCount() int64 {
return ddlCount.Get()
}

// GetDDLErrorCount returns the count of sidecardb DDLs that have been errored out as part of this vttablet's init process.
func GetDDLErrorCount() int64 {
return ddlErrorCount.Get()
}

// GetDDLErrorHistory returns the errors encountered as part of this vttablet's init process..
func GetDDLErrorHistory() []*ddlError {
var errors []*ddlError
for _, e := range ddlErrorHistory.Records() {
ddle, ok := e.(*ddlError)
if ok {
errors = append(errors, ddle)
}
}
return errors
}

// Init creates or upgrades the sidecar database based on declarative schema for all tables in the schema.
func Init(ctx context.Context, exec Exec) error {
printCallerDetails() // for debug purposes only, remove in v17
Expand Down Expand Up @@ -249,7 +308,7 @@ func (si *schemaInit) doesSidecarDBExist() (bool, error) {
return true, nil
default:
log.Errorf("found too many rows for sidecarDB %s: %d", SidecarDBName, len(rs.Rows))
return false, fmt.Errorf("found too many rows for sidecarDB %s: %d", SidecarDBName, len(rs.Rows))
return false, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "found too many rows for sidecarDB %s: %d", SidecarDBName, len(rs.Rows))
}
}

Expand Down Expand Up @@ -321,7 +380,7 @@ func (si *schemaInit) findTableSchemaDiff(tableName, current, desired string) (s
if ddl == "" {
log.Infof("No changes needed for table %s", tableName)
} else {
log.Infof("Applying ddl for table %s:\n%s", tableName, ddl)
log.Infof("Applying DDL for table %s:\n%s", tableName, ddl)
}
}

Expand Down Expand Up @@ -357,17 +416,31 @@ func (si *schemaInit) ensureSchema(table *sidecarTable) error {
}
_, err := si.exec(ctx, ddl, 1, true)
if err != nil {
log.Errorf("Error running ddl %s for table %s during sidecar database initialization %s: %+v", ddl, table, err)
return err
ddlErr := vterrors.Wrapf(err,
"Error running DDL %s for table %s during sidecar database initialization", ddl, table)
recordDDLError(table.name, ddlErr)
if failOnSchemaInitError {
return ddlErr
}
return nil
}
log.Infof("Applied ddl %s for table %s during sidecar database initialization %s", ddl, table)
log.Infof("Applied DDL %s for table %s during sidecar database initialization", ddl, table)
ddlCount.Add(1)
return nil
}
log.Infof("Table schema was already up to date for the %s table in the %s sidecar database", table.name, SidecarDBName)
return nil
}

func recordDDLError(tableName string, err error) {
log.Error(err)
ddlErrorCount.Add(1)
ddlErrorHistory.Add(&ddlError{
tableName: tableName,
err: err,
})
}

// region unit-test-only
// This section uses helpers used in tests, but also in the go/vt/vtexplain/vtexplain_vttablet.go.
// Hence, it is here and not in the _test.go file.
Expand Down

0 comments on commit a5d0cdc

Please sign in to comment.