Skip to content

Commit

Permalink
resolve comments
Browse files Browse the repository at this point in the history
Signed-off-by: dongmen <414110582@qq.com>
  • Loading branch information
asddongmen committed Jun 25, 2024
1 parent 54cec26 commit 6240c06
Showing 1 changed file with 10 additions and 11 deletions.
21 changes: 10 additions & 11 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"fmt"
"math"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -94,7 +93,7 @@ type SinkManager struct {
// When every time we want to create a new factory, version will be increased and
// errors will be replaced by a new channel. version is used to distinct different
// sink factories in table sinks.
version atomic.Uint64
version uint64
errors chan error
}

Expand Down Expand Up @@ -338,11 +337,11 @@ func (m *SinkManager) initSinkFactory() (chan error, uint64) {
cfg := m.config

if m.sinkFactory.f != nil {
return m.sinkFactory.errors, m.sinkFactory.version.Load()
return m.sinkFactory.errors, m.sinkFactory.version
}
if m.sinkFactory.errors == nil {
m.sinkFactory.errors = make(chan error, 16)
m.sinkFactory.version.Add(1)
m.sinkFactory.version += 1
}

emitError := func(err error) {
Expand All @@ -359,20 +358,20 @@ func (m *SinkManager) initSinkFactory() (chan error, uint64) {
})
if err != nil {
emitError(err)
return m.sinkFactory.errors, m.sinkFactory.version.Load()
return m.sinkFactory.errors, m.sinkFactory.version
}

m.sinkFactory.f, err = factory.New(m.managerCtx, m.changefeedID, uri, cfg, m.sinkFactory.errors, m.up.PDClock)
if err != nil {
emitError(err)
return m.sinkFactory.errors, m.sinkFactory.version.Load()
return m.sinkFactory.errors, m.sinkFactory.version
}

log.Info("Sink manager inits sink factory success",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Uint64("factoryVersion", m.sinkFactory.version.Load()))
return m.sinkFactory.errors, m.sinkFactory.version.Load()
zap.Uint64("factoryVersion", m.sinkFactory.version))
return m.sinkFactory.errors, m.sinkFactory.version
}

func (m *SinkManager) clearSinkFactory() {
Expand All @@ -382,13 +381,13 @@ func (m *SinkManager) clearSinkFactory() {
log.Info("Sink manager closing sink factory",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Uint64("factoryVersion", m.sinkFactory.version.Load()))
zap.Uint64("factoryVersion", m.sinkFactory.version))
m.sinkFactory.f.Close()
m.sinkFactory.f = nil
log.Info("Sink manager has closed sink factory",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Uint64("factoryVersion", m.sinkFactory.version.Load()))
zap.Uint64("factoryVersion", m.sinkFactory.version))
}
if m.sinkFactory.errors != nil {
close(m.sinkFactory.errors)
Expand Down Expand Up @@ -832,7 +831,7 @@ func (m *SinkManager) AddTable(span tablepb.Span, startTs model.Ts, targetTs mod
defer m.sinkFactory.Unlock()
if m.sinkFactory.f != nil {
s = m.sinkFactory.f.CreateTableSink(m.changefeedID, span, startTs, m.up.PDClock, m.metricsTableSinkTotalRows, m.metricsTableSinkFlushLagDuration)
version = m.sinkFactory.version.Load()
version = m.sinkFactory.version
}
}
return
Expand Down

0 comments on commit 6240c06

Please sign in to comment.