From 6e195f310b6e9bac482875df28ed4431b6d5772a Mon Sep 17 00:00:00 2001 From: Xi Chen Date: Wed, 13 Mar 2019 17:51:40 -0400 Subject: [PATCH] Fix race condition --- storage/segment.go | 34 +++++++++++++++++++++++----------- 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/storage/segment.go b/storage/segment.go index a3616be..7f84aa6 100644 --- a/storage/segment.go +++ b/storage/segment.go @@ -57,7 +57,7 @@ const ( availableOnDisk ) -type segmentFlushState int +type segmentFlushState int32 // nolint: deadcode,varcheck,megacheck const ( @@ -82,10 +82,8 @@ type dbSegment struct { lastReadAtNanos int64 loadedStatus int32 dataLocation int32 - // The flush related fields are always accessed within a single-thread context - // and as such do not require lock protection. - flushState segmentFlushState - flushAttempts int + flushState int32 + flushAttempts int32 } func newDatabaseSegment( @@ -172,6 +170,11 @@ func (s *dbSegment) LoadedStatus() segmentLoadedStatus { return segmentLoadedStatus(status) } +func (s *dbSegment) FlushState() segmentFlushState { + state := atomic.LoadInt32(&s.flushState) + return segmentFlushState(state) +} + func (s *dbSegment) ShouldUnload() bool { s.RLock() ok := s.shouldUnloadWithLock() @@ -225,22 +228,26 @@ func (s *dbSegment) Flush(persistFns persist.Fns) error { } }() - s.flushState = flushing + s.setFlushState(flushing) err := persistFns.WriteFields(segmentFields) if err == nil { s.setDataLocation(availableOnDisk) - s.flushState = flushSuccess + s.setFlushState(flushSuccess) } else { - s.flushState = flushFailed + s.setFlushState(flushFailed) } - s.flushAttempts++ + atomic.AddInt32(&s.flushAttempts, 1) return err } func (s *dbSegment) FlushIsDone() bool { - return s.flushState == flushSuccess || - (s.flushState == flushFailed && s.flushAttempts >= defaultMaxFlushRetries) + var ( + flushState = s.FlushState() + flushAttempts = atomic.LoadInt32(&s.flushAttempts) + ) + return flushState == flushSuccess || + (flushState == flushFailed && flushAttempts >= defaultMaxFlushRetries) } func (s *dbSegment) Close() { @@ -267,6 +274,11 @@ func (s *dbSegment) setDataLocation(v segmentDataLocation) { atomic.StoreInt32(&s.dataLocation, status) } +func (s *dbSegment) setFlushState(v segmentFlushState) { + state := int32(v) + atomic.StoreInt32(&s.flushState, state) +} + func (s *dbSegment) updateLastReadNanos() { atomic.StoreInt64(&s.lastReadAtNanos, s.nowFn().UnixNano()) }