Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

owner, sink(ticdc): cherry-pick memory quota, gc and failover related fix #9546

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 30 additions & 23 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,8 @@ func (c *captureImpl) run(stdCtx context.Context) error {
}()

g, stdCtx := errgroup.WithContext(stdCtx)
stdCtx, cancel := context.WithCancel(stdCtx)

ctx := cdcContext.NewContext(stdCtx, &cdcContext.GlobalVars{
CaptureInfo: c.info,
EtcdClient: c.EtcdClient,
Expand All @@ -335,7 +337,6 @@ func (c *captureImpl) run(stdCtx context.Context) error {
SorterSystem: c.sorterSystem,
SortEngineFactory: c.sortEngineFactory,
})

g.Go(func() error {
// when the campaignOwner returns an error, it means that the owner throws
// an unrecoverable serious errors (recoverable errors are intercepted in the owner tick)
Expand All @@ -351,9 +352,20 @@ func (c *captureImpl) run(stdCtx context.Context) error {
})

g.Go(func() error {
// Processor manager should be closed as soon as possible to prevent double write issue.
defer func() {
if cancel != nil {
// Propagate the cancel signal to the owner and other goroutines.
cancel()
}
if c.processorManager != nil {
c.processorManager.AsyncClose()
}
log.Info("processor manager closed", zap.String("captureID", c.info.ID))
}()
processorFlushInterval := time.Duration(c.config.ProcessorFlushInterval)

globalState := orchestrator.NewGlobalState(c.EtcdClient.GetClusterID())
globalState := orchestrator.NewGlobalState(c.EtcdClient.GetClusterID(), c.config.CaptureSessionTTL)

globalState.SetOnCaptureAdded(func(captureID model.CaptureID, addr string) {
c.MessageRouter.AddPeer(captureID, addr)
Expand Down Expand Up @@ -419,7 +431,6 @@ func (c *captureImpl) campaignOwner(ctx cdcContext.Context) error {
}
// Campaign to be the owner, it blocks until it been elected.
if err := c.campaign(ctx); err != nil {

rootErr := errors.Cause(err)
if rootErr == context.Canceled {
return nil
Expand Down Expand Up @@ -467,7 +478,7 @@ func (c *captureImpl) campaignOwner(ctx cdcContext.Context) error {
owner := c.newOwner(c.upstreamManager)
c.setOwner(owner)

globalState := orchestrator.NewGlobalState(c.EtcdClient.GetClusterID())
globalState := orchestrator.NewGlobalState(c.EtcdClient.GetClusterID(), c.config.CaptureSessionTTL)

globalState.SetOnCaptureAdded(func(captureID model.CaptureID, addr string) {
c.MessageRouter.AddPeer(captureID, addr)
Expand All @@ -485,27 +496,27 @@ func (c *captureImpl) campaignOwner(ctx cdcContext.Context) error {
}
})

err = c.runEtcdWorker(ownerCtx, owner,
orchestrator.NewGlobalState(c.EtcdClient.GetClusterID()),
ownerFlushInterval, util.RoleOwner.String())
err = c.runEtcdWorker(ownerCtx, owner, globalState, ownerFlushInterval, util.RoleOwner.String())
c.owner.AsyncStop()
c.setOwner(nil)

// if owner exits, resign the owner key,
// use a new context to prevent the context from being cancelled.
resignCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
if resignErr := c.resign(resignCtx); resignErr != nil {
if errors.Cause(resignErr) != context.DeadlineExceeded {
log.Info("owner resign failed", zap.String("captureID", c.info.ID),
if !cerror.ErrNotOwner.Equal(err) {
// if owner exits, resign the owner key,
// use a new context to prevent the context from being cancelled.
resignCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
if resignErr := c.resign(resignCtx); resignErr != nil {
if errors.Cause(resignErr) != context.DeadlineExceeded {
log.Info("owner resign failed", zap.String("captureID", c.info.ID),
zap.Error(resignErr), zap.Int64("ownerRev", ownerRev))
cancel()
return errors.Trace(resignErr)
}

log.Warn("owner resign timeout", zap.String("captureID", c.info.ID),
zap.Error(resignErr), zap.Int64("ownerRev", ownerRev))
cancel()
return errors.Trace(resignErr)
}

log.Warn("owner resign timeout", zap.String("captureID", c.info.ID),
zap.Error(resignErr), zap.Int64("ownerRev", ownerRev))
cancel()
}
cancel()

log.Info("owner resigned successfully",
zap.String("captureID", c.info.ID), zap.Int64("ownerRev", ownerRev))
Expand Down Expand Up @@ -622,10 +633,6 @@ func (c *captureImpl) AsyncClose() {

c.captureMu.Lock()
defer c.captureMu.Unlock()
if c.processorManager != nil {
c.processorManager.AsyncClose()
}
log.Info("processor manager closed", zap.String("captureID", c.info.ID))

c.grpcService.Reset(nil)
if c.MessageRouter != nil {
Expand Down
4 changes: 2 additions & 2 deletions cdc/capture/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ func newElection(sess *concurrency.Session, key string) election {
}
}

func (e *electionImpl) campaign(ctx context.Context, key string) error {
func (e *electionImpl) campaign(ctx context.Context, val string) error {
failpoint.Inject("capture-campaign-compacted-error", func() {
failpoint.Return(errors.Trace(mvcc.ErrCompacted))
})
return e.election.Campaign(ctx, key)
return e.election.Campaign(ctx, val)
}

func (e *electionImpl) resign(ctx context.Context) error {
Expand Down
8 changes: 0 additions & 8 deletions cdc/owner/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,14 +411,6 @@ func (o *ownerImpl) updateMetrics() {
changefeedStatusGauge.WithLabelValues(cfID.Namespace, cfID.ID).
Set(float64(cf.state.Info.State.ToInt()))
}

// The InfoProvider is a proxy object returning information
// from the scheduler.
infoProvider := cf.GetInfoProvider()
if infoProvider == nil {
// The scheduler has not been initialized yet.
continue
}
}
}

Expand Down
6 changes: 3 additions & 3 deletions cdc/owner/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func createOwner4Test(ctx cdcContext.Context, t *testing.T) (*ownerImpl, *orches
o := owner.(*ownerImpl)
o.upstreamManager = upstream.NewManager4Test(pdClient)

state := orchestrator.NewGlobalState(etcd.DefaultCDCClusterID)
state := orchestrator.NewGlobalStateForTest(etcd.DefaultCDCClusterID)
tester := orchestrator.NewReactorStateTester(t, state, nil)

// set captures
Expand Down Expand Up @@ -430,7 +430,7 @@ func TestUpdateGCSafePoint(t *testing.T) {
ctx := cdcContext.NewBackendContext4Test(true)
ctx, cancel := cdcContext.WithCancel(ctx)
defer cancel()
state := orchestrator.NewGlobalState(etcd.DefaultCDCClusterID)
state := orchestrator.NewGlobalState(etcd.DefaultCDCClusterID, 0)
tester := orchestrator.NewReactorStateTester(t, state, nil)

// no changefeed, the gc safe point should be max uint64
Expand Down Expand Up @@ -667,7 +667,7 @@ WorkLoop:
}

func TestCalculateGCSafepointTs(t *testing.T) {
state := orchestrator.NewGlobalState(etcd.DefaultCDCClusterID)
state := orchestrator.NewGlobalState(etcd.DefaultCDCClusterID, 0)
expectMinTsMap := make(map[uint64]uint64)
expectForceUpdateMap := make(map[uint64]interface{})
o := ownerImpl{changefeeds: make(map[model.ChangeFeedID]*changefeed)}
Expand Down
1 change: 1 addition & 0 deletions cdc/processor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ func (m *managerImpl) handleCommand(ctx cdcContext.Context) error {
for changefeedID := range m.processors {
m.closeProcessor(changefeedID, ctx)
}
log.Info("All processors are closed in processor manager")
// FIXME: we should drain command queue and signal callers an error.
return cerrors.ErrReactorFinished
case commandTpWriteDebugInfo:
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (s *managerTester) resetSuit(ctx cdcContext.Context, t *testing.T) {
checkpointTs: replicaInfo.StartTs,
}, nil
}, &s.liveness)
s.state = orchestrator.NewGlobalState(etcd.DefaultCDCClusterID)
s.state = orchestrator.NewGlobalState(etcd.DefaultCDCClusterID, 0)
captureInfoBytes, err := ctx.GlobalVars().CaptureInfo.Marshal()
require.Nil(t, err)
s.tester = orchestrator.NewReactorStateTester(t, s.state, map[string]string{
Expand Down
69 changes: 29 additions & 40 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,20 @@ func (m *SinkManager) run(ctx context.Context, warnings ...chan<- error) (err er
zap.Error(err))
m.clearSinkFactory()
sinkFactoryErrors = make(chan error, 16)

start := time.Now()
log.Info("Sink manager is closing all table sinks",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID))
m.tableSinks.Range(func(key, value interface{}) bool {
value.(*tableSinkWrapper).closeTableSink()
m.sinkMemQuota.ClearTable(key.(model.TableID))
return true
})
log.Info("Sink manager has closed all table sinks",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Duration("cost", time.Since(start)))
}

if !cerror.IsChangefeedUnRetryableError(err) && errors.Cause(err) != context.Canceled {
Expand Down Expand Up @@ -413,22 +427,17 @@ func (m *SinkManager) backgroundGC(errors chan<- error) {
}()
}

// generateSinkTasks generates tasks to fetch data from the source manager.
func (m *SinkManager) generateSinkTasks(ctx context.Context) error {
// Task upperbound is limited by barrierTs and schemaResolvedTs.
// But receivedSorterResolvedTs can be less than barrierTs, in which case
// the table is just scheduled to this node.
getUpperBound := func(
tableSinkUpperBoundTs model.Ts,
) engine.Position {
schemaTs := m.schemaStorage.ResolvedTs()
if schemaTs != math.MaxUint64 && tableSinkUpperBoundTs > schemaTs+1 {
// schemaTs == math.MaxUint64 means it's in tests.
tableSinkUpperBoundTs = schemaTs + 1
}
return engine.Position{StartTs: tableSinkUpperBoundTs - 1, CommitTs: tableSinkUpperBoundTs}
func (m *SinkManager) getUpperBound(tableSinkUpperBoundTs model.Ts) engine.Position {
schemaTs := m.schemaStorage.ResolvedTs()
if schemaTs != math.MaxUint64 && tableSinkUpperBoundTs > schemaTs+1 {
// schemaTs == math.MaxUint64 means it's in tests.
tableSinkUpperBoundTs = schemaTs + 1
}
return engine.Position{StartTs: tableSinkUpperBoundTs - 1, CommitTs: tableSinkUpperBoundTs}
}

// generateSinkTasks generates tasks to fetch data from the source manager.
func (m *SinkManager) generateSinkTasks(ctx context.Context) error {
dispatchTasks := func() error {
tables := make([]*tableSinkWrapper, 0, sinkWorkerNum)
progs := make([]*progress, 0, sinkWorkerNum)
Expand Down Expand Up @@ -476,7 +485,7 @@ func (m *SinkManager) generateSinkTasks(ctx context.Context) error {
tableSink := tables[i]
slowestTableProgress := progs[i]
lowerBound := slowestTableProgress.nextLowerBoundPos
upperBound := getUpperBound(tableSink.getUpperBoundTs())
upperBound := m.getUpperBound(tableSink.getUpperBoundTs())
// The table has no available progress.
if lowerBound.Compare(upperBound) >= 0 {
m.sinkProgressHeap.push(slowestTableProgress)
Expand All @@ -502,7 +511,7 @@ func (m *SinkManager) generateSinkTasks(ctx context.Context) error {
t := &sinkTask{
tableID: tableSink.tableID,
lowerBound: lowerBound,
getUpperBound: getUpperBound,
getUpperBound: m.getUpperBound,
tableSink: tableSink,
callback: func(lastWrittenPos engine.Position) {
p := &progress{
Expand Down Expand Up @@ -566,18 +575,6 @@ func (m *SinkManager) generateSinkTasks(ctx context.Context) error {
}

func (m *SinkManager) generateRedoTasks(ctx context.Context) error {
// We use the table's resolved ts as the upper bound to fetch events.
getUpperBound := func(tableSinkUpperBoundTs model.Ts) engine.Position {
// If a task carries events after schemaResolvedTs, mounter group threads
// can be blocked on waiting schemaResolvedTs get advanced.
schemaTs := m.schemaStorage.ResolvedTs()
if tableSinkUpperBoundTs > schemaTs+1 {
tableSinkUpperBoundTs = schemaTs + 1
}

return engine.Position{StartTs: tableSinkUpperBoundTs - 1, CommitTs: tableSinkUpperBoundTs}
}

dispatchTasks := func() error {
tables := make([]*tableSinkWrapper, 0, redoWorkerNum)
progs := make([]*progress, 0, redoWorkerNum)
Expand Down Expand Up @@ -624,7 +621,7 @@ func (m *SinkManager) generateRedoTasks(ctx context.Context) error {
tableSink := tables[i]
slowestTableProgress := progs[i]
lowerBound := slowestTableProgress.nextLowerBoundPos
upperBound := getUpperBound(tableSink.getReceivedSorterResolvedTs())
upperBound := m.getUpperBound(tableSink.getReceivedSorterResolvedTs())

// The table has no available progress.
if lowerBound.Compare(upperBound) >= 0 {
Expand All @@ -646,7 +643,7 @@ func (m *SinkManager) generateRedoTasks(ctx context.Context) error {
t := &redoTask{
tableID: tableSink.tableID,
lowerBound: lowerBound,
getUpperBound: getUpperBound,
getUpperBound: m.getUpperBound,
tableSink: tableSink,
callback: func(lastWrittenPos engine.Position) {
p := &progress{
Expand Down Expand Up @@ -840,7 +837,7 @@ func (m *SinkManager) AsyncStopTable(tableID model.TableID) bool {
zap.String("changefeed", m.changefeedID.ID),
zap.Int64("tableID", tableID))
}
if tableSink.(*tableSinkWrapper).asyncClose() {
if tableSink.(*tableSinkWrapper).asyncStop() {
cleanedBytes := m.sinkMemQuota.RemoveTable(tableID)
cleanedBytes += m.redoMemQuota.RemoveTable(tableID)
log.Debug("MemoryQuotaTracing: Clean up memory quota for table sink task when removing table",
Expand Down Expand Up @@ -910,7 +907,7 @@ func (m *SinkManager) GetTableState(tableID model.TableID) (tablepb.TableState,
// again or not if it returns false. So we must retry `tableSink.asyncClose` here
// if necessary. It's better to remove the dirty logic in the future.
tableSink := wrapper.(*tableSinkWrapper)
if tableSink.getState() == tablepb.TableStateStopping && tableSink.asyncClose() {
if tableSink.getState() == tablepb.TableStateStopping && tableSink.asyncStop() {
cleanedBytes := m.sinkMemQuota.RemoveTable(tableID)
cleanedBytes += m.redoMemQuota.RemoveTable(tableID)
log.Debug("MemoryQuotaTracing: Clean up memory quota for table sink task when removing table",
Expand Down Expand Up @@ -982,14 +979,6 @@ func (m *SinkManager) Close() {
zap.String("changefeed", m.changefeedID.ID))
start := time.Now()
m.waitSubroutines()
m.tableSinks.Range(func(_, value interface{}) bool {
sink := value.(*tableSinkWrapper)
sink.close()
if m.eventCache != nil {
m.eventCache.removeTable(sink.tableID)
}
return true
})
m.clearSinkFactory()

log.Info("Closed sink manager",
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/sinkmanager/table_sink_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e
// events have been reported. Then we can continue the table
// at the checkpoint position.
case tablesink.SinkInternalError:
task.tableSink.clearTableSink()
task.tableSink.closeAndClearTableSink()
// After the table sink is cleared all pending events are sent out or dropped.
// So we can re-add the table into sinkMemQuota.
w.sinkMemQuota.ClearTable(task.tableSink.tableID)
Expand Down
Loading
Loading