Skip to content

Commit

Permalink
owner, sink(ticdc): cherry-pick memory quota, gc and failover related…
Browse files Browse the repository at this point in the history
… fix (#9546)

ref #9535
  • Loading branch information
CharlesCheung96 committed Aug 14, 2023
1 parent 1e2f277 commit 7ca498f
Show file tree
Hide file tree
Showing 24 changed files with 235 additions and 152 deletions.
53 changes: 30 additions & 23 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,8 @@ func (c *captureImpl) run(stdCtx context.Context) error {
}()

g, stdCtx := errgroup.WithContext(stdCtx)
stdCtx, cancel := context.WithCancel(stdCtx)

ctx := cdcContext.NewContext(stdCtx, &cdcContext.GlobalVars{
CaptureInfo: c.info,
EtcdClient: c.EtcdClient,
Expand All @@ -335,7 +337,6 @@ func (c *captureImpl) run(stdCtx context.Context) error {
SorterSystem: c.sorterSystem,
SortEngineFactory: c.sortEngineFactory,
})

g.Go(func() error {
// when the campaignOwner returns an error, it means that the owner throws
// an unrecoverable serious errors (recoverable errors are intercepted in the owner tick)
Expand All @@ -351,9 +352,20 @@ func (c *captureImpl) run(stdCtx context.Context) error {
})

g.Go(func() error {
// Processor manager should be closed as soon as possible to prevent double write issue.
defer func() {
if cancel != nil {
// Propagate the cancel signal to the owner and other goroutines.
cancel()
}
if c.processorManager != nil {
c.processorManager.AsyncClose()
}
log.Info("processor manager closed", zap.String("captureID", c.info.ID))
}()
processorFlushInterval := time.Duration(c.config.ProcessorFlushInterval)

globalState := orchestrator.NewGlobalState(c.EtcdClient.GetClusterID())
globalState := orchestrator.NewGlobalState(c.EtcdClient.GetClusterID(), c.config.CaptureSessionTTL)

globalState.SetOnCaptureAdded(func(captureID model.CaptureID, addr string) {
c.MessageRouter.AddPeer(captureID, addr)
Expand Down Expand Up @@ -419,7 +431,6 @@ func (c *captureImpl) campaignOwner(ctx cdcContext.Context) error {
}
// Campaign to be the owner, it blocks until it been elected.
if err := c.campaign(ctx); err != nil {

rootErr := errors.Cause(err)
if rootErr == context.Canceled {
return nil
Expand Down Expand Up @@ -467,7 +478,7 @@ func (c *captureImpl) campaignOwner(ctx cdcContext.Context) error {
owner := c.newOwner(c.upstreamManager)
c.setOwner(owner)

globalState := orchestrator.NewGlobalState(c.EtcdClient.GetClusterID())
globalState := orchestrator.NewGlobalState(c.EtcdClient.GetClusterID(), c.config.CaptureSessionTTL)

globalState.SetOnCaptureAdded(func(captureID model.CaptureID, addr string) {
c.MessageRouter.AddPeer(captureID, addr)
Expand All @@ -485,27 +496,27 @@ func (c *captureImpl) campaignOwner(ctx cdcContext.Context) error {
}
})

err = c.runEtcdWorker(ownerCtx, owner,
orchestrator.NewGlobalState(c.EtcdClient.GetClusterID()),
ownerFlushInterval, util.RoleOwner.String())
err = c.runEtcdWorker(ownerCtx, owner, globalState, ownerFlushInterval, util.RoleOwner.String())
c.owner.AsyncStop()
c.setOwner(nil)

// if owner exits, resign the owner key,
// use a new context to prevent the context from being cancelled.
resignCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
if resignErr := c.resign(resignCtx); resignErr != nil {
if errors.Cause(resignErr) != context.DeadlineExceeded {
log.Info("owner resign failed", zap.String("captureID", c.info.ID),
if !cerror.ErrNotOwner.Equal(err) {
// if owner exits, resign the owner key,
// use a new context to prevent the context from being cancelled.
resignCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
if resignErr := c.resign(resignCtx); resignErr != nil {
if errors.Cause(resignErr) != context.DeadlineExceeded {
log.Info("owner resign failed", zap.String("captureID", c.info.ID),
zap.Error(resignErr), zap.Int64("ownerRev", ownerRev))
cancel()
return errors.Trace(resignErr)
}

log.Warn("owner resign timeout", zap.String("captureID", c.info.ID),
zap.Error(resignErr), zap.Int64("ownerRev", ownerRev))
cancel()
return errors.Trace(resignErr)
}

log.Warn("owner resign timeout", zap.String("captureID", c.info.ID),
zap.Error(resignErr), zap.Int64("ownerRev", ownerRev))
cancel()
}
cancel()

log.Info("owner resigned successfully",
zap.String("captureID", c.info.ID), zap.Int64("ownerRev", ownerRev))
Expand Down Expand Up @@ -622,10 +633,6 @@ func (c *captureImpl) AsyncClose() {

c.captureMu.Lock()
defer c.captureMu.Unlock()
if c.processorManager != nil {
c.processorManager.AsyncClose()
}
log.Info("processor manager closed", zap.String("captureID", c.info.ID))

c.grpcService.Reset(nil)
if c.MessageRouter != nil {
Expand Down
4 changes: 2 additions & 2 deletions cdc/capture/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ func newElection(sess *concurrency.Session, key string) election {
}
}

func (e *electionImpl) campaign(ctx context.Context, key string) error {
func (e *electionImpl) campaign(ctx context.Context, val string) error {
failpoint.Inject("capture-campaign-compacted-error", func() {
failpoint.Return(errors.Trace(mvcc.ErrCompacted))
})
return e.election.Campaign(ctx, key)
return e.election.Campaign(ctx, val)
}

func (e *electionImpl) resign(ctx context.Context) error {
Expand Down
8 changes: 0 additions & 8 deletions cdc/owner/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,14 +411,6 @@ func (o *ownerImpl) updateMetrics() {
changefeedStatusGauge.WithLabelValues(cfID.Namespace, cfID.ID).
Set(float64(cf.state.Info.State.ToInt()))
}

// The InfoProvider is a proxy object returning information
// from the scheduler.
infoProvider := cf.GetInfoProvider()
if infoProvider == nil {
// The scheduler has not been initialized yet.
continue
}
}
}

Expand Down
6 changes: 3 additions & 3 deletions cdc/owner/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func createOwner4Test(ctx cdcContext.Context, t *testing.T) (*ownerImpl, *orches
o := owner.(*ownerImpl)
o.upstreamManager = upstream.NewManager4Test(pdClient)

state := orchestrator.NewGlobalState(etcd.DefaultCDCClusterID)
state := orchestrator.NewGlobalStateForTest(etcd.DefaultCDCClusterID)
tester := orchestrator.NewReactorStateTester(t, state, nil)

// set captures
Expand Down Expand Up @@ -430,7 +430,7 @@ func TestUpdateGCSafePoint(t *testing.T) {
ctx := cdcContext.NewBackendContext4Test(true)
ctx, cancel := cdcContext.WithCancel(ctx)
defer cancel()
state := orchestrator.NewGlobalState(etcd.DefaultCDCClusterID)
state := orchestrator.NewGlobalState(etcd.DefaultCDCClusterID, 0)
tester := orchestrator.NewReactorStateTester(t, state, nil)

// no changefeed, the gc safe point should be max uint64
Expand Down Expand Up @@ -667,7 +667,7 @@ WorkLoop:
}

func TestCalculateGCSafepointTs(t *testing.T) {
state := orchestrator.NewGlobalState(etcd.DefaultCDCClusterID)
state := orchestrator.NewGlobalState(etcd.DefaultCDCClusterID, 0)
expectMinTsMap := make(map[uint64]uint64)
expectForceUpdateMap := make(map[uint64]interface{})
o := ownerImpl{changefeeds: make(map[model.ChangeFeedID]*changefeed)}
Expand Down
1 change: 1 addition & 0 deletions cdc/processor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ func (m *managerImpl) handleCommand(ctx cdcContext.Context) error {
for changefeedID := range m.processors {
m.closeProcessor(changefeedID, ctx)
}
log.Info("All processors are closed in processor manager")
// FIXME: we should drain command queue and signal callers an error.
return cerrors.ErrReactorFinished
case commandTpWriteDebugInfo:
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (s *managerTester) resetSuit(ctx cdcContext.Context, t *testing.T) {
checkpointTs: replicaInfo.StartTs,
}, nil
}, &s.liveness)
s.state = orchestrator.NewGlobalState(etcd.DefaultCDCClusterID)
s.state = orchestrator.NewGlobalState(etcd.DefaultCDCClusterID, 0)
captureInfoBytes, err := ctx.GlobalVars().CaptureInfo.Marshal()
require.Nil(t, err)
s.tester = orchestrator.NewReactorStateTester(t, s.state, map[string]string{
Expand Down
69 changes: 29 additions & 40 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,20 @@ func (m *SinkManager) run(ctx context.Context, warnings ...chan<- error) (err er
zap.Error(err))
m.clearSinkFactory()
sinkFactoryErrors = make(chan error, 16)

start := time.Now()
log.Info("Sink manager is closing all table sinks",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID))
m.tableSinks.Range(func(key, value interface{}) bool {
value.(*tableSinkWrapper).closeTableSink()
m.sinkMemQuota.ClearTable(key.(model.TableID))
return true
})
log.Info("Sink manager has closed all table sinks",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Duration("cost", time.Since(start)))
}

if !cerror.IsChangefeedUnRetryableError(err) && errors.Cause(err) != context.Canceled {
Expand Down Expand Up @@ -413,22 +427,17 @@ func (m *SinkManager) backgroundGC(errors chan<- error) {
}()
}

// generateSinkTasks generates tasks to fetch data from the source manager.
func (m *SinkManager) generateSinkTasks(ctx context.Context) error {
// Task upperbound is limited by barrierTs and schemaResolvedTs.
// But receivedSorterResolvedTs can be less than barrierTs, in which case
// the table is just scheduled to this node.
getUpperBound := func(
tableSinkUpperBoundTs model.Ts,
) engine.Position {
schemaTs := m.schemaStorage.ResolvedTs()
if schemaTs != math.MaxUint64 && tableSinkUpperBoundTs > schemaTs+1 {
// schemaTs == math.MaxUint64 means it's in tests.
tableSinkUpperBoundTs = schemaTs + 1
}
return engine.Position{StartTs: tableSinkUpperBoundTs - 1, CommitTs: tableSinkUpperBoundTs}
func (m *SinkManager) getUpperBound(tableSinkUpperBoundTs model.Ts) engine.Position {
schemaTs := m.schemaStorage.ResolvedTs()
if schemaTs != math.MaxUint64 && tableSinkUpperBoundTs > schemaTs+1 {
// schemaTs == math.MaxUint64 means it's in tests.
tableSinkUpperBoundTs = schemaTs + 1
}
return engine.Position{StartTs: tableSinkUpperBoundTs - 1, CommitTs: tableSinkUpperBoundTs}
}

// generateSinkTasks generates tasks to fetch data from the source manager.
func (m *SinkManager) generateSinkTasks(ctx context.Context) error {
dispatchTasks := func() error {
tables := make([]*tableSinkWrapper, 0, sinkWorkerNum)
progs := make([]*progress, 0, sinkWorkerNum)
Expand Down Expand Up @@ -476,7 +485,7 @@ func (m *SinkManager) generateSinkTasks(ctx context.Context) error {
tableSink := tables[i]
slowestTableProgress := progs[i]
lowerBound := slowestTableProgress.nextLowerBoundPos
upperBound := getUpperBound(tableSink.getUpperBoundTs())
upperBound := m.getUpperBound(tableSink.getUpperBoundTs())
// The table has no available progress.
if lowerBound.Compare(upperBound) >= 0 {
m.sinkProgressHeap.push(slowestTableProgress)
Expand All @@ -502,7 +511,7 @@ func (m *SinkManager) generateSinkTasks(ctx context.Context) error {
t := &sinkTask{
tableID: tableSink.tableID,
lowerBound: lowerBound,
getUpperBound: getUpperBound,
getUpperBound: m.getUpperBound,
tableSink: tableSink,
callback: func(lastWrittenPos engine.Position) {
p := &progress{
Expand Down Expand Up @@ -566,18 +575,6 @@ func (m *SinkManager) generateSinkTasks(ctx context.Context) error {
}

func (m *SinkManager) generateRedoTasks(ctx context.Context) error {
// We use the table's resolved ts as the upper bound to fetch events.
getUpperBound := func(tableSinkUpperBoundTs model.Ts) engine.Position {
// If a task carries events after schemaResolvedTs, mounter group threads
// can be blocked on waiting schemaResolvedTs get advanced.
schemaTs := m.schemaStorage.ResolvedTs()
if tableSinkUpperBoundTs > schemaTs+1 {
tableSinkUpperBoundTs = schemaTs + 1
}

return engine.Position{StartTs: tableSinkUpperBoundTs - 1, CommitTs: tableSinkUpperBoundTs}
}

dispatchTasks := func() error {
tables := make([]*tableSinkWrapper, 0, redoWorkerNum)
progs := make([]*progress, 0, redoWorkerNum)
Expand Down Expand Up @@ -624,7 +621,7 @@ func (m *SinkManager) generateRedoTasks(ctx context.Context) error {
tableSink := tables[i]
slowestTableProgress := progs[i]
lowerBound := slowestTableProgress.nextLowerBoundPos
upperBound := getUpperBound(tableSink.getReceivedSorterResolvedTs())
upperBound := m.getUpperBound(tableSink.getReceivedSorterResolvedTs())

// The table has no available progress.
if lowerBound.Compare(upperBound) >= 0 {
Expand All @@ -646,7 +643,7 @@ func (m *SinkManager) generateRedoTasks(ctx context.Context) error {
t := &redoTask{
tableID: tableSink.tableID,
lowerBound: lowerBound,
getUpperBound: getUpperBound,
getUpperBound: m.getUpperBound,
tableSink: tableSink,
callback: func(lastWrittenPos engine.Position) {
p := &progress{
Expand Down Expand Up @@ -840,7 +837,7 @@ func (m *SinkManager) AsyncStopTable(tableID model.TableID) bool {
zap.String("changefeed", m.changefeedID.ID),
zap.Int64("tableID", tableID))
}
if tableSink.(*tableSinkWrapper).asyncClose() {
if tableSink.(*tableSinkWrapper).asyncStop() {
cleanedBytes := m.sinkMemQuota.RemoveTable(tableID)
cleanedBytes += m.redoMemQuota.RemoveTable(tableID)
log.Debug("MemoryQuotaTracing: Clean up memory quota for table sink task when removing table",
Expand Down Expand Up @@ -910,7 +907,7 @@ func (m *SinkManager) GetTableState(tableID model.TableID) (tablepb.TableState,
// again or not if it returns false. So we must retry `tableSink.asyncClose` here
// if necessary. It's better to remove the dirty logic in the future.
tableSink := wrapper.(*tableSinkWrapper)
if tableSink.getState() == tablepb.TableStateStopping && tableSink.asyncClose() {
if tableSink.getState() == tablepb.TableStateStopping && tableSink.asyncStop() {
cleanedBytes := m.sinkMemQuota.RemoveTable(tableID)
cleanedBytes += m.redoMemQuota.RemoveTable(tableID)
log.Debug("MemoryQuotaTracing: Clean up memory quota for table sink task when removing table",
Expand Down Expand Up @@ -982,14 +979,6 @@ func (m *SinkManager) Close() {
zap.String("changefeed", m.changefeedID.ID))
start := time.Now()
m.waitSubroutines()
m.tableSinks.Range(func(_, value interface{}) bool {
sink := value.(*tableSinkWrapper)
sink.close()
if m.eventCache != nil {
m.eventCache.removeTable(sink.tableID)
}
return true
})
m.clearSinkFactory()

log.Info("Closed sink manager",
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/sinkmanager/table_sink_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e
// events have been reported. Then we can continue the table
// at the checkpoint position.
case tablesink.SinkInternalError:
task.tableSink.clearTableSink()
task.tableSink.closeAndClearTableSink()
// After the table sink is cleared all pending events are sent out or dropped.
// So we can re-add the table into sinkMemQuota.
w.sinkMemQuota.ClearTable(task.tableSink.tableID)
Expand Down

0 comments on commit 7ca498f

Please sign in to comment.