Skip to content

Commit

Permalink
log(ticdc): Add more error query information to the returned error to…
Browse files Browse the repository at this point in the history
… facilitate users to know the cause of the failure (#10945) (#11257)

close #11254
  • Loading branch information
ti-chi-bot committed Jun 11, 2024
1 parent 053cdaf commit 2425d54
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 14 deletions.
20 changes: 10 additions & 10 deletions cdc/sink/mysql/mysql_syncpoint_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,23 +156,23 @@ func (s *mysqlSyncPointStore) CreateSyncTable(ctx context.Context) error {
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
log.Error("create sync table: begin Tx fail", zap.Error(err))
return cerror.WrapError(cerror.ErrMySQLTxnError, err)
return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "create sync table: begin Tx fail;"))
}
_, err = tx.Exec("CREATE DATABASE IF NOT EXISTS " + database)
if err != nil {
err2 := tx.Rollback()
if err2 != nil {
log.Error("failed to create syncpoint table", zap.Error(err2))
}
return cerror.WrapError(cerror.ErrMySQLTxnError, err)
return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "failed to create syncpoint table;"))
}
_, err = tx.Exec("USE " + database)
if err != nil {
err2 := tx.Rollback()
if err2 != nil {
log.Error("failed to create syncpoint table", zap.Error(err2))
}
return cerror.WrapError(cerror.ErrMySQLTxnError, err)
return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "failed to create syncpoint table;"))
}
query := `CREATE TABLE IF NOT EXISTS %s
(
Expand All @@ -191,10 +191,10 @@ func (s *mysqlSyncPointStore) CreateSyncTable(ctx context.Context) error {
if err2 != nil {
log.Error("failed to create syncpoint table", zap.Error(err2))
}
return cerror.WrapError(cerror.ErrMySQLTxnError, err)
return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "failed to create syncpoint table;"))
}
err = tx.Commit()
return cerror.WrapError(cerror.ErrMySQLTxnError, err)
return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "failed to create syncpoint table;"))
}

func (s *mysqlSyncPointStore) SinkSyncPoint(ctx context.Context,
Expand All @@ -204,7 +204,7 @@ func (s *mysqlSyncPointStore) SinkSyncPoint(ctx context.Context,
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
log.Error("sync table: begin Tx fail", zap.Error(err))
return cerror.WrapError(cerror.ErrMySQLTxnError, err)
return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "sync table: begin Tx fail;"))
}
row := tx.QueryRow("select @@tidb_current_ts")
var secondaryTs string
Expand All @@ -215,7 +215,7 @@ func (s *mysqlSyncPointStore) SinkSyncPoint(ctx context.Context,
if err2 != nil {
log.Error("failed to write syncpoint table", zap.Error(err))
}
return cerror.WrapError(cerror.ErrMySQLTxnError, err)
return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "failed to write syncpoint table;"))
}
// insert ts map
query := "insert ignore into " + filter.TiCDCSystemSchema + "." + filter.SyncPointTable +
Expand All @@ -226,7 +226,7 @@ func (s *mysqlSyncPointStore) SinkSyncPoint(ctx context.Context,
if err2 != nil {
log.Error("failed to write syncpoint table", zap.Error(err2))
}
return cerror.WrapError(cerror.ErrMySQLTxnError, err)
return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "failed to write syncpoint table;"))
}

// set global tidb_external_ts to secondary ts
Expand All @@ -242,7 +242,7 @@ func (s *mysqlSyncPointStore) SinkSyncPoint(ctx context.Context,
if err2 != nil {
log.Error("failed to write syncpoint table", zap.Error(err2))
}
return cerror.WrapError(cerror.ErrMySQLTxnError, err)
return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "failed to write syncpoint table;"))
}
}

Expand All @@ -267,7 +267,7 @@ func (s *mysqlSyncPointStore) SinkSyncPoint(ctx context.Context,
}

err = tx.Commit()
return cerror.WrapError(cerror.ErrMySQLTxnError, err)
return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "failed to write syncpoint table;"))
}

func (s *mysqlSyncPointStore) Close() error {
Expand Down
7 changes: 4 additions & 3 deletions cdc/sinkv2/ddlsink/mysql/mysql_ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ package mysql
import (
"context"
"database/sql"
"fmt"
"net/url"
"time"

"github.com/pingcap/errors"
cerrors "github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
timodel "github.com/pingcap/tidb/parser/model"
Expand Down Expand Up @@ -189,7 +190,7 @@ func (m *mysqlDDLSink) execDDL(pctx context.Context, ddl *model.DDLEvent) error
zap.Duration("duration", time.Since(start)),
zap.String("namespace", m.id.Namespace),
zap.String("changefeed", m.id.ID), zap.Error(err))
return cerror.WrapError(cerror.ErrMySQLTxnError, err)
return cerror.WrapError(cerror.ErrMySQLTxnError, cerrors.WithMessage(err, fmt.Sprintf("Query info: %s; ", ddl.Query)))
}

log.Info("Exec DDL succeeded", zap.String("sql", ddl.Query),
Expand Down Expand Up @@ -217,7 +218,7 @@ func (m *mysqlDDLSink) WriteCheckpointTs(_ context.Context, _ uint64, _ []*model
// Close closes the database connection.
func (m *mysqlDDLSink) Close() error {
if err := m.db.Close(); err != nil {
return errors.Trace(err)
return cerrors.Trace(err)
}
if m.statistics != nil {
m.statistics.Close()
Expand Down
2 changes: 1 addition & 1 deletion cdc/sinkv2/eventsink/txn/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -772,7 +772,7 @@ func logDMLTxnErr(
zap.String("query", query), zap.Int("count", count),
zap.String("changefeed", changefeed))
}
return err
return errors.WithMessage(err, fmt.Sprintf("Failed query info: %s; ", query))
}

func isRetryableDMLError(err error) bool {
Expand Down

0 comments on commit 2425d54

Please sign in to comment.