Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
delete kafka phantom extent, if sealed
Browse files Browse the repository at this point in the history
  • Loading branch information
kiranrg committed Apr 20, 2017
1 parent 87421fa commit 306cde1
Showing 1 changed file with 40 additions and 20 deletions.
60 changes: 40 additions & 20 deletions services/retentionmgr/retention.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,19 @@ func (t *RetentionManager) computeRetention(job *retentionJob, log bark.Logger)
dest := job.dest
ext := job.ext

if ext.status == shared.ExtentStatus_CONSUMED {

// keep extent in "consumed" state until 'ExtentDeleteDeferPeriod' has
// elapsed, only then "delete" the extent. this extra time helps ensure
// that any consumer-group extents that were just placed, will also
// move into "consumed" state, when they read from store.
if time.Since(ext.statusUpdatedTime) >= t.ExtentDeleteDeferPeriod {
job.retentionAddr = store.ADDR_SEAL // delete extent from the stores
job.deleteExtent = true // delete extent from metadata
}
return
}

if dest.status == shared.DestinationStatus_DELETING &&
ext.status == shared.ExtentStatus_SEALED {

Expand All @@ -471,19 +484,6 @@ func (t *RetentionManager) computeRetention(job *retentionJob, log bark.Logger)
}
}

if ext.status == shared.ExtentStatus_CONSUMED {

// keep extent in "consumed" state until 'ExtentDeleteDeferPeriod' has
// elapsed, only then "delete" the extent. this extra time helps ensure
// that any consumer-group extents that were just placed, will also
// move into "consumed" state, when they read from store.
if time.Since(ext.statusUpdatedTime) >= t.ExtentDeleteDeferPeriod {
job.retentionAddr = store.ADDR_SEAL // delete extent from the stores
job.deleteExtent = true // delete extent from metadata
}
return
}

// -- step 1: take a snapshot of the current time and compute retention timestamps -- //

tNow := time.Now().UnixNano()
Expand Down Expand Up @@ -700,22 +700,42 @@ func (t *RetentionManager) computeRetention(job *retentionJob, log bark.Logger)

// -- step 5: compute retention address -- //

//** retentionAddr = max( hardRetentionAddr, min( softRetentionAddr, minAckAddr ) ) **//
job.retentionAddr = store.ADDR_BEGIN
job.deleteExtent = false

if softRetentionAddr == store.ADDR_SEAL || (minAckAddr != store.ADDR_SEAL && minAckAddr < softRetentionAddr) {
softRetentionAddr = minAckAddr
}
if !ext.kafkaPhantomExtent {

//** retentionAddr = max( hardRetentionAddr, min( softRetentionAddr, minAckAddr ) ) **//

if softRetentionAddr == store.ADDR_SEAL || (minAckAddr != store.ADDR_SEAL && minAckAddr < softRetentionAddr) {
softRetentionAddr = minAckAddr
}

if softRetentionAddr == store.ADDR_SEAL || (hardRetentionAddr != store.ADDR_SEAL && softRetentionAddr > hardRetentionAddr) {
job.retentionAddr = softRetentionAddr
} else {
job.retentionAddr = hardRetentionAddr
}

if softRetentionAddr == store.ADDR_SEAL || (hardRetentionAddr != store.ADDR_SEAL && softRetentionAddr > hardRetentionAddr) {
job.retentionAddr = softRetentionAddr
} else {
job.retentionAddr = hardRetentionAddr

// if this is a Kafka phantom extent and it is 'sealed' (which is done only
// when the destination is being deleted), then delete this extent.

if ext.status == shared.ExtentStatus_SEALED ||
ext.status == shared.ExtentStatus_CONSUMED {

job.retentionAddr = store.ADDR_SEAL
job.deleteExtent = true
}
}

log.WithFields(bark.Fields{
`hardRetentionAddr`: hardRetentionAddr,
`softRetentionAddr`: softRetentionAddr,
`minAckAddr`: minAckAddr,
`retentionAddr`: job.retentionAddr,
`deleteExtent`: job.deleteExtent,
}).Debug("computed retentionAddr")

// -- step 6: check to see if the extent status can be updated to 'consumed' -- //
Expand Down

0 comments on commit 306cde1

Please sign in to comment.