Skip to content

Commit

Permalink
use startTs for failed changefeed that has failed before being initia…
Browse files Browse the repository at this point in the history
…lized
  • Loading branch information
charleszheng44 authored and ti-chi-bot committed May 6, 2023
1 parent 455681d commit edfaae5
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 1 deletion.
1 change: 1 addition & 0 deletions cdc/owner/feed_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ func (m *feedStateManager) Tick(state *orchestrator.ChangefeedReactorState) (adm
case model.StateError:
if m.state.Info.Error.IsChangefeedUnRetryableError() {
m.shouldBeRunning = false
m.patchState(model.StateFailed)
return
}
}
Expand Down
8 changes: 7 additions & 1 deletion cdc/owner/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,13 @@ func (o *ownerImpl) ignoreFailedChangeFeedWhenGC(
log.Warn("upstream not found", zap.Uint64("ID", upID))
return false
}
return us.GCManager.IgnoreFailedChangeFeed(state.Status.CheckpointTs)
// in case the changefeed failed right after it is created
// and the status is not initialized yet.
ts := state.Info.StartTs
if state.Status != nil {
ts = state.Status.CheckpointTs
}
return us.GCManager.IgnoreFailedChangeFeed(ts)
}

// calculateGCSafepoint calculates GCSafepoint for different upstream.
Expand Down

0 comments on commit edfaae5

Please sign in to comment.