Skip to content

Commit

Permalink
Fix race condition
Browse files Browse the repository at this point in the history
  • Loading branch information
xichen2020 committed Mar 13, 2019
1 parent a09fb8d commit 6e195f3
Showing 1 changed file with 23 additions and 11 deletions.
34 changes: 23 additions & 11 deletions storage/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ const (
availableOnDisk
)

type segmentFlushState int
type segmentFlushState int32

// nolint: deadcode,varcheck,megacheck
const (
Expand All @@ -82,10 +82,8 @@ type dbSegment struct {
lastReadAtNanos int64
loadedStatus int32
dataLocation int32
// The flush related fields are always accessed within a single-thread context
// and as such do not require lock protection.
flushState segmentFlushState
flushAttempts int
flushState int32
flushAttempts int32
}

func newDatabaseSegment(
Expand Down Expand Up @@ -172,6 +170,11 @@ func (s *dbSegment) LoadedStatus() segmentLoadedStatus {
return segmentLoadedStatus(status)
}

func (s *dbSegment) FlushState() segmentFlushState {
state := atomic.LoadInt32(&s.flushState)
return segmentFlushState(state)
}

func (s *dbSegment) ShouldUnload() bool {
s.RLock()
ok := s.shouldUnloadWithLock()
Expand Down Expand Up @@ -225,22 +228,26 @@ func (s *dbSegment) Flush(persistFns persist.Fns) error {
}
}()

s.flushState = flushing
s.setFlushState(flushing)
err := persistFns.WriteFields(segmentFields)
if err == nil {
s.setDataLocation(availableOnDisk)
s.flushState = flushSuccess
s.setFlushState(flushSuccess)
} else {
s.flushState = flushFailed
s.setFlushState(flushFailed)
}
s.flushAttempts++
atomic.AddInt32(&s.flushAttempts, 1)

return err
}

func (s *dbSegment) FlushIsDone() bool {
return s.flushState == flushSuccess ||
(s.flushState == flushFailed && s.flushAttempts >= defaultMaxFlushRetries)
var (
flushState = s.FlushState()
flushAttempts = atomic.LoadInt32(&s.flushAttempts)
)
return flushState == flushSuccess ||
(flushState == flushFailed && flushAttempts >= defaultMaxFlushRetries)
}

func (s *dbSegment) Close() {
Expand All @@ -267,6 +274,11 @@ func (s *dbSegment) setDataLocation(v segmentDataLocation) {
atomic.StoreInt32(&s.dataLocation, status)
}

func (s *dbSegment) setFlushState(v segmentFlushState) {
state := int32(v)
atomic.StoreInt32(&s.flushState, state)
}

func (s *dbSegment) updateLastReadNanos() {
atomic.StoreInt64(&s.lastReadAtNanos, s.nowFn().UnixNano())
}
Expand Down

0 comments on commit 6e195f3

Please sign in to comment.