Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
Merge branch 'master' into md-mgr
Browse files Browse the repository at this point in the history
  • Loading branch information
venkat1109 committed Feb 1, 2017
2 parents 3a432e3 + d357b8d commit abdd1bc
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 24 deletions.
22 changes: 17 additions & 5 deletions services/retentionmgr/metadataDep.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,12 +394,24 @@ func (t *metadataDepImpl) GetAckLevel(destID destinationID, extID extentID, cgID
// assert(resp.GetExtent().GetExtentUUID() == extID)
// assert(resp.GetExtent().GetConsumerGroupUUID() == cgID)

// check if the consumer-group has read to "sealed" point
if resp.GetExtent().GetStatus() != metadata.ConsumerGroupExtentStatus_OPEN {
ackLevel = store.ADDR_SEAL
} else {
switch resp.GetExtent().GetStatus() {

case metadata.ConsumerGroupExtentStatus_OPEN:
// return the ack-level from metadata
ackLevel = resp.GetExtent().GetAckLevelOffset()
// assert(ackLevel != cherami.ADDR_END

case metadata.ConsumerGroupExtentStatus_CONSUMED:
// 'ADDR_SEAL' indicates to the caller that this CG has fully consumed the extent
ackLevel = store.ADDR_SEAL

case metadata.ConsumerGroupExtentStatus_DELETED:
// set to 'ADDR_BEGIN' if cg-extent is deleted
ackLevel = store.ADDR_BEGIN

default:
ackLevel = store.ADDR_BEGIN
log.WithField(`ConsumerGroupExtentStatus`, resp.GetExtent().GetStatus()).
Error("GetAckLevel: Unknown ConsumerGroupExtentStatus")
}

log.WithField(`ackLevel`, ackLevel).Debug("GetAckLevel done")
Expand Down
50 changes: 32 additions & 18 deletions services/retentionmgr/retention.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,12 +599,15 @@ func (t *RetentionManager) computeRetention(job *retentionJob, log bark.Logger)
}
}

// If this is a multi_zone destination and local extent, disable soft retention
// The reason is if soft retention is very short, we may delete messages before remote zone has a chance to replicate the messages
// Long term solution should create a fake consumer for the remote zone
// If this is a multi_zone destination and local extent, disable soft retention -- and fall
// back to using only hard-retention. We also move the extent to the "consumed" state only
// based on hard-retention. The reason is if soft retention is very short, we may
// delete messages before remote zone has a chance to replicate the messages.
// Long term solution should create a fake consumer for the remote zone.
if dest.isMultiZone && !common.IsRemoteZoneExtent(ext.originZone, t.Options.LocalZone) {
log.Info(`overridden: soft retention overridden for multi_zone extent`)
softRetentionAddr = int64(store.ADDR_BEGIN)
softRetentionConsumed = false
}

log.WithFields(bark.Fields{
Expand All @@ -618,6 +621,7 @@ func (t *RetentionManager) computeRetention(job *retentionJob, log bark.Logger)
log.Debug("computing minAckAddr")

var minAckAddr = int64(store.ADDR_END)
var allHaveConsumed = true // start by assuming this is all-consumed

for _, cgInfo := range job.consumers {

Expand Down Expand Up @@ -647,22 +651,32 @@ func (t *RetentionManager) computeRetention(job *retentionJob, log bark.Logger)
}).Error(`computeRetention: minAckAddr GetAckLevel failed`)

minAckAddr = store.ADDR_BEGIN
allHaveConsumed = false
break
}

// check if all CGs have consumed this extent
if ackAddr != store.ADDR_SEAL {
allHaveConsumed = false
}

// update minAckAddr, if ackAddr is less than the current value
if (minAckAddr == store.ADDR_END) ||
(minAckAddr == store.ADDR_SEAL) || // -> all existing consumers have completely consumed this extent
(minAckAddr == store.ADDR_SEAL) || // -> consumers we have seen so far have completely consumed this extent
(ackAddr != store.ADDR_SEAL && ackAddr < minAckAddr) {

minAckAddr = ackAddr
}
}

// if we were unable to find any consumer groups, set minAckAddr to ADDR_BEGIN
// if we were unable to find any consumer groups, set minAckAddr to
// ADDR_BEGIN, effectively disabling soft-retention. that said, hard
// retention could still be enforced.
if minAckAddr == store.ADDR_END {

log.Debug("could not compute ackLevel, using 'ADDR_BEGIN'")
minAckAddr = store.ADDR_BEGIN
allHaveConsumed = false
}

job.minAckAddr = minAckAddr // remember the minAckAddr for doing checks later
Expand Down Expand Up @@ -694,21 +708,21 @@ func (t *RetentionManager) computeRetention(job *retentionJob, log bark.Logger)

// -- step 6: check to see if the extent status can be updated to 'consumed' -- //

// move the extent to 'consumed' if either:
// A. all of the following are true:
// 1. the extent was sealed
// 2. the extent as fully consumed by all of the consumer groups
// 3. a period of 'soft retention period' has passed (in other words,
// a consumer that is consuming along the soft retention time has
// "consumed" the extent)
// move the extent to 'consumed' if the extent is "sealed" _and_ either:
// A.
// 1. the extent was fully consumed by all of the consumer groups
// 2. and, a period of 'soft retention period' has passed (in other
// words, a consumer that is consuming along the soft retention
// time has "consumed" the extent)
// B. or, the hard-retention has reached the end of the sealed extent,
// in which case we will force the extent to be "consumed"
// NB: retentionAddr == ADDR_BEGIN indicates there was an error, so we no-op
if job.retentionAddr != store.ADDR_BEGIN &&
((ext.status == shared.ExtentStatus_SEALED &&
minAckAddr == store.ADDR_SEAL &&
softRetentionConsumed) ||
hardRetentionConsumed) {
// NB: if there was an error querying either the hard/soft retention addresses,
// {soft,hard}RetentionConsumed would be set to 'false'; if there was an error
// querying ack-addr, then allHaveConsumed will be false. therefore errors in
// either of the conditions would cause the extent to *not* be moved to the
// CONSUMED state, and would cause it to be retried on the next iteration.
if (ext.status == shared.ExtentStatus_SEALED) &&
((allHaveConsumed && softRetentionConsumed) || hardRetentionConsumed) {

log.WithFields(bark.Fields{
`retentionAddr`: job.retentionAddr,
Expand Down
32 changes: 31 additions & 1 deletion services/retentionmgr/retention_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ func (s *RetentionMgrSuite) TestRetentionManager() {
// DEST2,EXT8:
// DEST2,EXT9:
// DEST2,EXTA: test DeleteConsumerGroupExtent returns EntityNotExistsError
// DEST2,EXTE0: test extent that is 'active', but hardRetentionConsumed = true (should not move to 'consumed')
// DEST2,EXTE1: test extent that is 'sealed', but hardRetentionConsumed = true (should move to 'consumed')
// DEST2,EXTE2: test extent that is 'sealed', but softRetentionConsumed = true (should move to 'consumed')

softRetSecs, hardRetSecs := int32(10), int32(20)

Expand Down Expand Up @@ -126,6 +129,11 @@ func (s *RetentionMgrSuite) TestRetentionManager() {
"EXTB": {"STOR2", "STOR3", "STOR4"},
"EXTC": {"STOR2", "STOR4", "STOR6"},
"EXTD1": {"STOR2", "STOR4", "STOR6"},
"EXTE0": {"STOR2", "STOR3", "STOR4"},
"EXTE1": {"STOR2", "STOR3", "STOR4"},
"EXTE2": {"STOR2", "STOR3", "STOR4"},
"EXTE3": {"STOR2", "STOR3", "STOR4"},
"EXTE4": {"STOR2", "STOR3", "STOR4"},
"EXTm": {"STOR3"}, // Single CG Visible
"EXTn": {"STOR7"}, // Single CG Visible
}
Expand All @@ -148,6 +156,11 @@ func (s *RetentionMgrSuite) TestRetentionManager() {
"EXTC": shared.ExtentStatus_OPEN,
"EXTD": shared.ExtentStatus_SEALED,
"EXTD1": shared.ExtentStatus_SEALED,
"EXTE0": shared.ExtentStatus_OPEN,
"EXTE1": shared.ExtentStatus_SEALED,
"EXTE2": shared.ExtentStatus_SEALED,
"EXTE3": shared.ExtentStatus_SEALED,
"EXTE4": shared.ExtentStatus_SEALED,
"EXTm": shared.ExtentStatus_SEALED, // Merged DLQ extents should always be sealed
"EXTn": shared.ExtentStatus_SEALED, //
}
Expand Down Expand Up @@ -230,6 +243,11 @@ func (s *RetentionMgrSuite) TestRetentionManager() {
"EXTC": {"STOR2": {addrHard, false}, "STOR4": {addrHard, false}, "STOR6": {addrHard, false}},
"EXTD": {"STOR2": {addrHard, false}, "STOR4": {addrHard, false}, "STOR6": {addrHard, false}},
"EXTD1": {"STOR2": {addrHard, true}, "STOR4": {addrHard, false}, "STOR6": {addrHard, false}},
"EXTE0": {"STOR2": {addrBegin, false}, "STOR3": {addrBegin, true}, "STOR4": {addrBegin, false}},
"EXTE1": {"STOR2": {addrBegin, false}, "STOR3": {addrBegin, true}, "STOR4": {addrBegin, false}},
"EXTE2": {"STOR2": {addrBegin, false}, "STOR3": {addrBegin, false}, "STOR4": {addrBegin, false}},
"EXTE3": {"STOR2": {addrBegin, false}, "STOR3": {addrBegin, false}, "STOR4": {addrBegin, false}},
"EXTE4": {"STOR2": {addrBegin, false}, "STOR3": {addrBegin, false}, "STOR4": {addrBegin, false}},
"EXTm": {"STOR3": {addrHard - 100, true}},
"EXTn": {"STOR7": {addrHard - 100, true}},
}
Expand All @@ -253,6 +271,11 @@ func (s *RetentionMgrSuite) TestRetentionManager() {
"EXTC": {"STOR2": {addrSoft, true}, "STOR4": {addrSoft, true}, "STOR6": {addrSoft, true}},
"EXTD": {"STOR2": {addrSoft, true}, "STOR4": {addrSoft, true}, "STOR6": {addrSoft, true}},
"EXTD1": {"STOR2": {addrBegin, false}, "STOR4": {addrBegin, false}, "STOR6": {addrBegin, false}},
"EXTE0": {"STOR2": {addrBegin, false}, "STOR3": {addrBegin, false}, "STOR4": {addrBegin, false}},
"EXTE1": {"STOR2": {addrBegin, false}, "STOR3": {addrBegin, false}, "STOR4": {addrBegin, false}},
"EXTE2": {"STOR2": {addrBegin, false}, "STOR3": {addrBegin, false}, "STOR4": {addrBegin, true}},
"EXTE3": {"STOR2": {addrBegin, false}, "STOR3": {addrBegin, false}, "STOR4": {addrBegin, false}},
"EXTE4": {"STOR2": {addrBegin, false}, "STOR3": {addrBegin, false}, "STOR4": {addrBegin, true}},
"EXTm": {"STOR3": {addrSoft, true}},
"EXTn": {"STOR7": {addrSoft, true}},
}
Expand All @@ -275,6 +298,11 @@ func (s *RetentionMgrSuite) TestRetentionManager() {
"EXTC": {"CGm": addrSeal, "CG1": addrSeal, "CG2": addrSeal, "CG3": addrSeal},
"EXTD": {"CGm": addrSeal, "CG1": addrSeal, "CG2": addrSeal, "CG3": addrSeal},
"EXTD1": {"CGm": addrBegin, "CG1": addrBegin, "CG2": addrBegin, "CG3": addrBegin},
"EXTE0": {"CGm": addrBegin, "CG1": addrBegin, "CG2": addrBegin, "CG3": addrBegin},
"EXTE1": {"CGm": addrBegin, "CG1": addrBegin, "CG2": addrBegin, "CG3": addrBegin},
"EXTE2": {"CGm": addrBegin, "CG1": addrBegin, "CG2": addrBegin, "CG3": addrBegin},
"EXTE3": {"CGm": addrBegin, "CG1": addrBegin, "CG2": addrSeal, "CG3": addrBegin},
"EXTE4": {"CGm": addrSeal, "CG1": addrBegin, "CG2": addrBegin, "CG3": addrBegin},
"EXTm": {"CGm": addrPostSoft},
"EXTn": {"CGm": addrPreSoft},
}
Expand Down Expand Up @@ -303,6 +331,8 @@ func (s *RetentionMgrSuite) TestRetentionManager() {
s.metadata.On("MarkExtentConsumed", destinationID("DEST2"), extentID("EXTB")).Return(nil).Once()
s.metadata.On("MarkExtentConsumed", destinationID("DEST2"), extentID("EXTD")).Return(nil).Once()
s.metadata.On("MarkExtentConsumed", destinationID("DEST2"), extentID("EXTD1")).Return(nil).Once()
s.metadata.On("MarkExtentConsumed", destinationID("DEST2"), extentID("EXTE1")).Return(nil).Once()
s.metadata.On("MarkExtentConsumed", destinationID("DEST2"), extentID("EXTE4")).Return(nil).Once()
s.metadata.On("MarkExtentConsumed", destinationID("DEST2"), extentID("EXTm")).Return(nil).Once()
s.metadata.On("MarkExtentConsumed", destinationID("DEST2"), extentID("EXTn")).Return(nil).Once()

Expand Down Expand Up @@ -359,7 +389,7 @@ func (s *RetentionMgrSuite) TestRetentionManager() {
// retMgr.Start()
// retMgr.wait()

//s.metadata.AssertExpectations(s.T())
// s.metadata.AssertExpectations(s.T())
}

func (s *RetentionMgrSuite) TestRetentionManagerOnDeletedDestinations() {
Expand Down

0 comments on commit abdd1bc

Please sign in to comment.