Skip to content

Commit

Permalink
sink(cdc): close table sinks when sink factory fails (#9449) (#9464)
Browse files Browse the repository at this point in the history
close #9450
  • Loading branch information
ti-chi-bot committed Aug 2, 2023
1 parent e1c71b8 commit 8228ded
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 69 deletions.
61 changes: 29 additions & 32 deletions cdc/processor/sinkmanager/manager.go
Expand Up @@ -270,6 +270,20 @@ func (m *SinkManager) run(ctx context.Context, warnings ...chan<- error) (err er
zap.Error(err))
m.clearSinkFactory()
sinkFactoryErrors = make(chan error, 16)

start := time.Now()
log.Info("Sink manager is closing all table sinks",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID))
m.tableSinks.Range(func(key, value interface{}) bool {
value.(*tableSinkWrapper).closeTableSink()
m.sinkMemQuota.ClearTable(key.(model.TableID))
return true
})
log.Info("Sink manager has closed all table sinks",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Duration("cost", time.Since(start)))
}

if !cerror.IsChangefeedUnRetryableError(err) && errors.Cause(err) != context.Canceled {
Expand Down Expand Up @@ -410,22 +424,17 @@ func (m *SinkManager) backgroundGC(errors chan<- error) {
}()
}

// generateSinkTasks generates tasks to fetch data from the source manager.
func (m *SinkManager) generateSinkTasks(ctx context.Context) error {
// Task upperbound is limited by barrierTs and schemaResolvedTs.
// But receivedSorterResolvedTs can be less than barrierTs, in which case
// the table is just scheduled to this node.
getUpperBound := func(
tableSinkUpperBoundTs model.Ts,
) engine.Position {
schemaTs := m.schemaStorage.ResolvedTs()
if schemaTs != math.MaxUint64 && tableSinkUpperBoundTs > schemaTs+1 {
// schemaTs == math.MaxUint64 means it's in tests.
tableSinkUpperBoundTs = schemaTs + 1
}
return engine.Position{StartTs: tableSinkUpperBoundTs - 1, CommitTs: tableSinkUpperBoundTs}
func (m *SinkManager) getUpperBound(tableSinkUpperBoundTs model.Ts) engine.Position {
schemaTs := m.schemaStorage.ResolvedTs()
if schemaTs != math.MaxUint64 && tableSinkUpperBoundTs > schemaTs+1 {
// schemaTs == math.MaxUint64 means it's in tests.
tableSinkUpperBoundTs = schemaTs + 1
}
return engine.Position{StartTs: tableSinkUpperBoundTs - 1, CommitTs: tableSinkUpperBoundTs}
}

// generateSinkTasks generates tasks to fetch data from the source manager.
func (m *SinkManager) generateSinkTasks(ctx context.Context) error {
dispatchTasks := func() error {
tables := make([]*tableSinkWrapper, 0, sinkWorkerNum)
progs := make([]*progress, 0, sinkWorkerNum)
Expand Down Expand Up @@ -473,7 +482,7 @@ func (m *SinkManager) generateSinkTasks(ctx context.Context) error {
tableSink := tables[i]
slowestTableProgress := progs[i]
lowerBound := slowestTableProgress.nextLowerBoundPos
upperBound := getUpperBound(tableSink.getUpperBoundTs())
upperBound := m.getUpperBound(tableSink.getUpperBoundTs())
// The table has no available progress.
if lowerBound.Compare(upperBound) >= 0 {
m.sinkProgressHeap.push(slowestTableProgress)
Expand All @@ -499,7 +508,7 @@ func (m *SinkManager) generateSinkTasks(ctx context.Context) error {
t := &sinkTask{
tableID: tableSink.tableID,
lowerBound: lowerBound,
getUpperBound: getUpperBound,
getUpperBound: m.getUpperBound,
tableSink: tableSink,
callback: func(lastWrittenPos engine.Position) {
p := &progress{
Expand Down Expand Up @@ -563,18 +572,6 @@ func (m *SinkManager) generateSinkTasks(ctx context.Context) error {
}

func (m *SinkManager) generateRedoTasks(ctx context.Context) error {
// We use the table's resolved ts as the upper bound to fetch events.
getUpperBound := func(tableSinkUpperBoundTs model.Ts) engine.Position {
// If a task carries events after schemaResolvedTs, mounter group threads
// can be blocked on waiting schemaResolvedTs get advanced.
schemaTs := m.schemaStorage.ResolvedTs()
if tableSinkUpperBoundTs > schemaTs+1 {
tableSinkUpperBoundTs = schemaTs + 1
}

return engine.Position{StartTs: tableSinkUpperBoundTs - 1, CommitTs: tableSinkUpperBoundTs}
}

dispatchTasks := func() error {
tables := make([]*tableSinkWrapper, 0, redoWorkerNum)
progs := make([]*progress, 0, redoWorkerNum)
Expand Down Expand Up @@ -621,7 +618,7 @@ func (m *SinkManager) generateRedoTasks(ctx context.Context) error {
tableSink := tables[i]
slowestTableProgress := progs[i]
lowerBound := slowestTableProgress.nextLowerBoundPos
upperBound := getUpperBound(tableSink.getReceivedSorterResolvedTs())
upperBound := m.getUpperBound(tableSink.getReceivedSorterResolvedTs())

// The table has no available progress.
if lowerBound.Compare(upperBound) >= 0 {
Expand All @@ -643,7 +640,7 @@ func (m *SinkManager) generateRedoTasks(ctx context.Context) error {
t := &redoTask{
tableID: tableSink.tableID,
lowerBound: lowerBound,
getUpperBound: getUpperBound,
getUpperBound: m.getUpperBound,
tableSink: tableSink,
callback: func(lastWrittenPos engine.Position) {
p := &progress{
Expand Down Expand Up @@ -837,7 +834,7 @@ func (m *SinkManager) AsyncStopTable(tableID model.TableID) bool {
zap.String("changefeed", m.changefeedID.ID),
zap.Int64("tableID", tableID))
}
if tableSink.(*tableSinkWrapper).asyncClose() {
if tableSink.(*tableSinkWrapper).asyncStop() {
cleanedBytes := m.sinkMemQuota.RemoveTable(tableID)
cleanedBytes += m.redoMemQuota.RemoveTable(tableID)
log.Debug("MemoryQuotaTracing: Clean up memory quota for table sink task when removing table",
Expand Down Expand Up @@ -907,7 +904,7 @@ func (m *SinkManager) GetTableState(tableID model.TableID) (tablepb.TableState,
// again or not if it returns false. So we must retry `tableSink.asyncClose` here
// if necessary. It's better to remove the dirty logic in the future.
tableSink := wrapper.(*tableSinkWrapper)
if tableSink.getState() == tablepb.TableStateStopping && tableSink.asyncClose() {
if tableSink.getState() == tablepb.TableStateStopping && tableSink.asyncStop() {
cleanedBytes := m.sinkMemQuota.RemoveTable(tableID)
cleanedBytes += m.redoMemQuota.RemoveTable(tableID)
log.Debug("MemoryQuotaTracing: Clean up memory quota for table sink task when removing table",
Expand Down
31 changes: 17 additions & 14 deletions cdc/processor/sinkmanager/table_sink_wrapper.go
Expand Up @@ -263,7 +263,7 @@ func (t *tableSinkWrapper) markAsClosed() {
}
}

func (t *tableSinkWrapper) asyncClose() bool {
func (t *tableSinkWrapper) asyncStop() bool {
t.markAsClosing()
if t.asyncCloseAndClearTableSink() {
t.markAsClosed()
Expand All @@ -272,7 +272,7 @@ func (t *tableSinkWrapper) asyncClose() bool {
return false
}

func (t *tableSinkWrapper) close() {
func (t *tableSinkWrapper) stop() {
t.markAsClosing()
t.closeAndClearTableSink()
t.markAsClosed()
Expand All @@ -289,29 +289,32 @@ func (t *tableSinkWrapper) initTableSink() bool {
return true
}

func (t *tableSinkWrapper) asyncCloseAndClearTableSink() bool {
func (t *tableSinkWrapper) asyncCloseTableSink() bool {
t.tableSinkMu.RLock()
defer t.tableSinkMu.RUnlock()
if t.tableSink == nil {
t.tableSinkMu.RUnlock()
return true
}
if !t.tableSink.AsyncClose() {
t.tableSinkMu.RUnlock()
return false
}
t.tableSinkMu.RUnlock()
t.doTableSinkClear()
return true
return t.tableSink.AsyncClose()
}

func (t *tableSinkWrapper) closeAndClearTableSink() {
func (t *tableSinkWrapper) closeTableSink() {
t.tableSinkMu.RLock()
defer t.tableSinkMu.RUnlock()
if t.tableSink == nil {
t.tableSinkMu.RUnlock()
return
}
t.tableSink.Close()
t.tableSinkMu.RUnlock()
}

func (t *tableSinkWrapper) asyncCloseAndClearTableSink() bool {
t.asyncCloseTableSink()
t.doTableSinkClear()
return true
}

func (t *tableSinkWrapper) closeAndClearTableSink() {
t.closeTableSink()
t.doTableSinkClear()
}

Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/sinkmanager/table_sink_wrapper_test.go
Expand Up @@ -89,7 +89,7 @@ func TestTableSinkWrapperClose(t *testing.T) {

wrapper, _ := createTableSinkWrapper(model.DefaultChangeFeedID("1"), 1)
require.Equal(t, tablepb.TableStatePreparing, wrapper.getState())
wrapper.close()
wrapper.stop()
require.Equal(t, tablepb.TableStateStopped, wrapper.getState(), "table sink state should be stopped")
}

Expand Down
4 changes: 3 additions & 1 deletion cdc/sinkv2/tablesink/table_sink_impl.go
Expand Up @@ -130,7 +130,9 @@ func (e *EventTableSink[E]) UpdateResolvedTs(resolvedTs model.ResolvedTs) error
// GetCheckpointTs returns the checkpoint ts of the table sink.
func (e *EventTableSink[E]) GetCheckpointTs() model.ResolvedTs {
if e.state.Load() == state.TableSinkStopping {
e.progressTracker.checkClosed(e.backendSink.Dead())
if e.progressTracker.checkClosed(e.backendSink.Dead()) {
e.markAsClosed()
}
}
return e.progressTracker.advance()
}
Expand Down
30 changes: 9 additions & 21 deletions cdc/sinkv2/tablesink/table_sink_impl_test.go
Expand Up @@ -381,26 +381,14 @@ func TestCheckpointTsFrozenWhenStopping(t *testing.T) {
require.Nil(t, err)
require.Len(t, sink.events, 7, "all events should be flushed")

go func() {
time.Sleep(time.Millisecond * 10)
sink.Close()
}()
// Table sink close should return even if callbacks are not called,
// because the backend sink is closed.
sink.Close()
tb.Close()

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
tb.Close()
}()
require.Eventually(t, func() bool {
return state.TableSinkStopping == tb.state.Load()
}, time.Second, time.Microsecond, "table should be stopping")
wg.Add(1)
go func() {
defer wg.Done()
currentTs := tb.GetCheckpointTs()
sink.acknowledge(105)
require.Equal(t, currentTs, tb.GetCheckpointTs(), "checkpointTs should not be updated")
}()
wg.Wait()
require.Equal(t, state.TableSinkStopped, tb.state.Load())

currentTs := tb.GetCheckpointTs()
sink.acknowledge(105)
require.Equal(t, currentTs, tb.GetCheckpointTs(), "checkpointTs should not be updated")
}

0 comments on commit 8228ded

Please sign in to comment.