Skip to content

Commit

Permalink
redo(ticdc): fix a bug that flush log executed before writing logs (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
amyangfei committed May 29, 2022
1 parent 3faaeb6 commit 9b29eef
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 0 deletions.
27 changes: 27 additions & 0 deletions cdc/redo/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tiflow/cdc/contextutil"
Expand Down Expand Up @@ -121,6 +122,11 @@ type ManagerOptions struct {
type cacheRows struct {
tableID model.TableID
rows []*model.RowChangedEvent
// When calling FlushLog for a table, we must ensure that all data of this
// table has been written to underlying writer. Since the EmitRowChangedEvents
// and FlushLog of the same table can't be executed concurrently, we can
// insert a simple barrier data into data stream to achieve this goal.
flushCallback chan struct{}
}

// ManagerImpl manages redo log writer, buffers un-persistent redo logs, calculates
Expand Down Expand Up @@ -252,6 +258,9 @@ func (m *ManagerImpl) TryEmitRowChangedEvents(
// error ErrBufferLogTimeout will be returned.
// TODO: if the API is truly non-blocking, we should return an error immediately
// when the log buffer channel is full.
// TODO: After buffer sink in sink node is removed, there is no batch mechanism
// before sending row changed events to redo manager, the original log buffer
// design may have performance issue.
func (m *ManagerImpl) EmitRowChangedEvents(
ctx context.Context,
tableID model.TableID,
Expand Down Expand Up @@ -286,6 +295,20 @@ func (m *ManagerImpl) FlushLog(
return nil
}
defer atomic.StoreInt64(&m.flushing, 0)

// Adding a barrier to data stream, to ensure all logs of this table has been
// written to underlying writer.
flushCallbackCh := make(chan struct{})
m.logBuffer <- cacheRows{
tableID: tableID,
flushCallback: flushCallbackCh,
}
select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
case <-flushCallbackCh:
}

return m.writer.FlushLog(ctx, tableID, resolvedTs)
}

Expand Down Expand Up @@ -400,6 +423,10 @@ func (m *ManagerImpl) bgWriteLog(ctx context.Context, errCh chan<- error) {
case <-ctx.Done():
return
case cache := <-m.logBuffer:
if cache.flushCallback != nil {
close(cache.flushCallback)
continue
}
logs := make([]*model.RedoRowChangedEvent, 0, len(cache.rows))
for _, row := range cache.rows {
logs = append(logs, RowToRedo(row))
Expand Down
71 changes: 71 additions & 0 deletions cdc/redo/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package redo
import (
"context"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -272,3 +273,73 @@ func TestLogManagerInOwner(t *testing.T) {
err = logMgr.writer.DeleteAllLogs(ctx)
require.Nil(t, err)
}

// TestWriteLogFlushLogSequence tests flush log must be executed after table's
// log has been written to writer.
func TestWriteLogFlushLogSequence(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
cfg := &config.ConsistentConfig{
Level: string(ConsistentLevelEventual),
Storage: "blackhole://",
}
errCh := make(chan error, 1)
opts := &ManagerOptions{
EnableBgRunner: false,
ErrCh: errCh,
}
logMgr, err := NewManager(ctx, cfg, opts)
require.Nil(t, err)

var (
wg sync.WaitGroup

tableID = int64(53)
startTs = uint64(100)
resolvedTs = uint64(150)
)
logMgr.AddTable(tableID, startTs)

wg.Add(1)
go func() {
defer wg.Done()
select {
case <-ctx.Done():
return
case err := <-errCh:
require.Nil(t, err)
}
}()

wg.Add(1)
go func() {
defer wg.Done()
// FlushLog blocks until bgWriteLog consumes data and close callback chan.
err := logMgr.FlushLog(ctx, tableID, resolvedTs)
require.Nil(t, err)
}()

// Sleep a short time to ensure `logMgr.FlushLog` is called
time.Sleep(time.Millisecond * 100)
// FlushLog is still ongoing
require.Equal(t, int64(1), atomic.LoadInt64(&logMgr.flushing))
err = logMgr.updateTableResolvedTs(ctx)
require.Nil(t, err)
require.Equal(t, startTs, logMgr.GetMinResolvedTs())

wg.Add(1)
go func() {
defer wg.Done()
logMgr.bgWriteLog(ctx, errCh)
}()

require.Eventually(t, func() bool {
err = logMgr.updateTableResolvedTs(ctx)
require.Nil(t, err)
return logMgr.GetMinResolvedTs() == resolvedTs
}, time.Second, time.Millisecond*20)

cancel()
wg.Wait()
}

0 comments on commit 9b29eef

Please sign in to comment.