Skip to content
This repository has been archived by the owner on Oct 17, 2018. It is now read-only.

Commit

Permalink
Address feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
xichen2020 committed Sep 12, 2017
1 parent 39393ba commit 38143ef
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 33 deletions.
49 changes: 32 additions & 17 deletions aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,13 +365,32 @@ func (agg *aggregator) shouldProcessPlacementWithLock(newPlacement placement.Pla
return agg.placementCutoverNanos < newPlacement.CutoverNanos()
}

// updateShardSetWithLock updates the instance's shard set id given the instance from
// the latest placement, or nil if the current instance is not found in the placement.
// updateShardSetWithLock resets the instance's shard set id given the instance from
// the latest placement, or clears it if the instance is nil (i.e., instance not found).
func (agg *aggregator) updateShardSetIDWithLock(instance placement.Instance) error {
if instance == nil {
return agg.clearShardSetIDWithLock()
}
return agg.resetShardSetIDWithLock(instance)
}

// clearShardSetIDWithLock clears the instance's shard set id.
func (agg *aggregator) clearShardSetIDWithLock() error {
if !agg.shardSetOpen {
return nil
}
if err := agg.closeShardSetWithLock(); err != nil {
return err
}
agg.shardSetID = 0
agg.shardSetOpen = false
return nil
}

// resetShardSetIDWithLock resets the instance's shard set id given the instance from
// the latest placement.
func (agg *aggregator) resetShardSetIDWithLock(instance placement.Instance) error {
if !agg.shardSetOpen {
if instance == nil {
return nil
}
shardSetID := instance.ShardSetID()
if err := agg.openShardSetWithLock(shardSetID); err != nil {
return err
Expand All @@ -380,17 +399,12 @@ func (agg *aggregator) updateShardSetIDWithLock(instance placement.Instance) err
agg.shardSetOpen = true
return nil
}
if instance != nil && instance.ShardSetID() == agg.shardSetID {
if instance.ShardSetID() == agg.shardSetID {
return nil
}
if err := agg.closeShardSetWithLock(); err != nil {
return err
}
if instance == nil {
agg.shardSetID = 0
agg.shardSetOpen = false
return nil
}
newShardSetID := instance.ShardSetID()
if err := agg.openShardSetWithLock(newShardSetID); err != nil {
return err
Expand Down Expand Up @@ -508,17 +522,18 @@ func (agg *aggregator) ownedShards() (owned, toClose []*aggregatorShard) {
for i := 0; i < len(agg.shardIDs); i++ {
shardID := agg.shardIDs[i]
shard := agg.shards[shardID]
hasFlushedTillCutoff := agg.flushTimesChecker.HasFlushed(
shard.ID(),
shard.CutoffNanos(),
flushTimes,
)
// NB(xichen): a shard can be closed when all of the following conditions are met:
// * The shard is not writeable.
// * The shard has been cut off (we do not want to close a shard that has not been
// cut over in that it may be warming up).
// * All of the shard's data has been flushed up until the shard's cutoff time.
canCloseShard := !shard.IsWritable() && shard.IsCutoff() && hasFlushedTillCutoff
canCloseShard := !shard.IsWritable() &&
shard.IsCutoff() &&
agg.flushTimesChecker.HasFlushed(
shard.ID(),
shard.CutoffNanos(),
flushTimes,
)
if !canCloseShard {
owned = append(owned, shard)
} else {
Expand Down
1 change: 1 addition & 0 deletions aggregator/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,7 @@ func testAggregatorWithCustomPlacements(
return NewAggregator(opts).(*aggregator), store
}

// nolint: unparam
func testPlacementWatcherWithPlacementProto(
t *testing.T,
placementKey string,
Expand Down
4 changes: 4 additions & 0 deletions aggregator/election_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ func (mgr *electionManager) Resign(ctx context.Context) error {
return true
}
}
// Log the context error because the error returned from the retrier is not helpful.
if err := mgr.resignWhile(ctxNotDone); err != nil {
mgr.metrics.resignTimeout.Inc(1)
mgr.logError("resign error", ctx.Err())
Expand Down Expand Up @@ -413,6 +414,9 @@ func (mgr *electionManager) Close() error {
mgr.Unlock()

mgr.Wait()
mgr.campaignStateWatchable.Close()
mgr.electionStateWatchable.Close()
mgr.goalStateWatchable.Close()
return nil
}

Expand Down
26 changes: 19 additions & 7 deletions aggregator/flush_times_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ func (mgr *flushTimesManager) Close() error {
mgr.Unlock()

mgr.Wait()
mgr.flushTimesWatchable.Close()
mgr.persistWatchable.Close()
return nil
}

Expand Down Expand Up @@ -262,22 +264,32 @@ func (mgr *flushTimesManager) persistFlushTimes(persistWatch xwatch.Watch) {
}
}

type flushTimesChecker struct {
type flushTimesCheckerMetrics struct {
noFlushTimes tally.Counter
shardNotFound tally.Counter
notFullyFlushed tally.Counter
allFlushed tally.Counter
}

func newFlushTimesChecker(scope tally.Scope) flushTimesChecker {
return flushTimesChecker{
func newFlushTimesCheckerMetrics(scope tally.Scope) flushTimesCheckerMetrics {
return flushTimesCheckerMetrics{
noFlushTimes: scope.Counter("no-flush-times"),
shardNotFound: scope.Counter("shard-not-found"),
notFullyFlushed: scope.Counter("not-fully-flushed"),
allFlushed: scope.Counter("all-flushed"),
}
}

type flushTimesChecker struct {
metrics flushTimesCheckerMetrics
}

func newFlushTimesChecker(scope tally.Scope) flushTimesChecker {
return flushTimesChecker{
metrics: newFlushTimesCheckerMetrics(scope),
}
}

// HasFlushed returns true if data for a given shard have been flushed until
// at least the given target nanoseconds based on the flush times persisted in kv,
// and false otherwise.
Expand All @@ -287,20 +299,20 @@ func (sc flushTimesChecker) HasFlushed(
flushTimes *schema.ShardSetFlushTimes,
) bool {
if flushTimes == nil {
sc.noFlushTimes.Inc(1)
sc.metrics.noFlushTimes.Inc(1)
return false
}
shardFlushTimes, exists := flushTimes.ByShard[shardID]
if !exists {
sc.shardNotFound.Inc(1)
sc.metrics.shardNotFound.Inc(1)
return false
}
for _, lastFlushedNanos := range shardFlushTimes.ByResolution {
if lastFlushedNanos < targetNanos {
sc.notFullyFlushed.Inc(1)
sc.metrics.notFullyFlushed.Inc(1)
return false
}
}
sc.allFlushed.Inc(1)
sc.metrics.allFlushed.Inc(1)
return true
}
2 changes: 1 addition & 1 deletion aggregator/flush_times_mgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestFlushTimesManagerWatchSuccess(t *testing.T) {

select {
case <-watch.C():
require.Fail(t, "unexpected watch notificaiton")
require.Fail(t, "unexpected watch notification")
default:
}

Expand Down
4 changes: 2 additions & 2 deletions aggregator/follower_flush_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ func (mgr *followerFlushManager) Prepare(buckets []*flushBucket) (flushTask, tim
mgr.metrics.kvUpdateFlush.Inc(1)
} else {
durationSinceLastFlush := now.Sub(mgr.lastFlushed)
if durationSinceLastFlush > mgr.maxBufferSize {
flushBeforeNanos := now.Add(-mgr.maxBufferSize).Add(mgr.forcedFlushWindowSize).UnixNano()
if durationSinceLastFlush >= mgr.forcedFlushWindowSize {
flushBeforeNanos := now.Add(-mgr.maxBufferSize).UnixNano()
flushersByInterval = mgr.flushersFromForcedFlush(buckets, flushBeforeNanos)
mgr.metrics.forcedFlush.Inc(1)
}
Expand Down
19 changes: 13 additions & 6 deletions aggregator/follower_flush_mgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,15 +185,16 @@ func TestFollowerFlushManagerPrepareMaxBufferSizeExceeded(t *testing.T) {
nowFn := func() time.Time { return now }
doneCh := make(chan struct{})
opts := NewFlushManagerOptions().
SetMaxBufferSize(time.Second).
SetMaxBufferSize(time.Minute).
SetForcedFlushWindowSize(10 * time.Second).
SetCheckEvery(time.Second)
mgr := newFollowerFlushManager(doneCh, opts).(*followerFlushManager)
mgr.nowFn = nowFn
mgr.flushTimesState = flushTimesProcessed
mgr.lastFlushed = now

now = now.Add(2 * time.Second)
// Advance time by forced flush window size and expect a flush.
now = now.Add(10 * time.Second)
flushTask, dur := mgr.Prepare(testFlushBuckets)

expected := []flushersGroup{
Expand All @@ -202,11 +203,11 @@ func TestFollowerFlushManagerPrepareMaxBufferSizeExceeded(t *testing.T) {
flushers: []flusherWithTime{
{
flusher: testFlushBuckets[0].flushers[0],
flushBeforeNanos: 1245000000000,
flushBeforeNanos: 1184000000000,
},
{
flusher: testFlushBuckets[0].flushers[1],
flushBeforeNanos: 1245000000000,
flushBeforeNanos: 1184000000000,
},
},
},
Expand All @@ -215,7 +216,7 @@ func TestFollowerFlushManagerPrepareMaxBufferSizeExceeded(t *testing.T) {
flushers: []flusherWithTime{
{
flusher: testFlushBuckets[1].flushers[0],
flushBeforeNanos: 1245000000000,
flushBeforeNanos: 1184000000000,
},
},
},
Expand All @@ -224,7 +225,7 @@ func TestFollowerFlushManagerPrepareMaxBufferSizeExceeded(t *testing.T) {
flushers: []flusherWithTime{
{
flusher: testFlushBuckets[2].flushers[0],
flushBeforeNanos: 1245000000000,
flushBeforeNanos: 1184000000000,
},
},
},
Expand All @@ -235,6 +236,12 @@ func TestFollowerFlushManagerPrepareMaxBufferSizeExceeded(t *testing.T) {
actual := task.flushersByInterval
require.Equal(t, expected, actual)
require.Equal(t, now, mgr.lastFlushed)

// Advance time by less than the forced flush window size and expect no flush.
now = now.Add(time.Second)
flushTask, dur = mgr.Prepare(testFlushBuckets)
require.Nil(t, flushTask)
require.Equal(t, mgr.checkEvery, dur)
}

func TestFollowerFlushManagerWatchFlushTimes(t *testing.T) {
Expand Down

0 comments on commit 38143ef

Please sign in to comment.