-
Notifications
You must be signed in to change notification settings - Fork 102
RetMgr: Handle cleanup of consumer-groups and consumer-group extents #310
Conversation
@@ -1189,9 +1189,26 @@ const ( | |||
columnZoneConfigs + `: ?, ` + | |||
columnOptions + `: ? }` | |||
|
|||
cqlConsumerGroupType = columnConsumerGroup + `.` + columnUUID + "," + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: sqlConsumerGroupType (to be consistent)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
true .. :-)
services/retentionmgr/metadataDep.go
Outdated
@@ -60,7 +60,7 @@ func (t *metadataDepImpl) GetDestinations() (destinations []*destinationInfo) { | |||
i := 0 | |||
for { | |||
|
|||
log.Debug("GetDestinations: ListDestinationsByUUID on metadata") | |||
log.Info("GetDestinations: ListDestinationsByUUID on metadata") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think these will cause too many logs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure now .. why i decided to enable these. reverting it. :-)
services/retentionmgr/metadataDep.go
Outdated
if e != nil { | ||
err = e | ||
log.WithField(common.TagErr, err).Error("GetExtentsForConsumerGroup: ReadConsumerGroupExtentsLite failed") | ||
break |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i do a 'break' so it will print the "done" log below ..
services/retentionmgr/metadataDep.go
Outdated
func (t *metadataDepImpl) DeleteConsumerGroup(destID destinationID, cgID consumerGroupID) error { | ||
|
||
req := metadata.NewDeleteConsumerGroupUUIDRequest() | ||
req.UUID = common.StringPtr(string(destID)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
uuid should be cg uuid right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes; fixed.
sqlInsertCGByUUID = `INSERT INTO ` + tableConsumerGroups + | ||
`(` + | ||
columnUUID + `, ` + | ||
columnDestinationUUID + `, ` + | ||
columnIsMultiZone + `, ` + | ||
columnConsumerGroup + | ||
`) VALUES (?, ?,` + sqlCGValue + `)` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You need one more '?' here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some test should be able to catch this.
services/retentionmgr/retention.go
Outdated
@@ -342,10 +435,9 @@ func (t *RetentionManager) runRetention(jobsC chan<- *retentionJob) bool { | |||
totalJobs++ | |||
} | |||
|
|||
if allExtentsDeleted && dest.status == shared.DestinationStatus_DELETING { | |||
if dest.status == shared.DestinationStatus_DELETING && numCGs == 0 && numExtents == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think you can rely on 'numCGs == 0' since the error from the below call was swallowed.
cgs := t.metadata.GetConsumerGroups(dest.id)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still don't see this get fixed.
@@ -2003,6 +1987,73 @@ func (s *CassandraMetadataService) ListConsumerGroups(ctx thrift.Context, reques | |||
return result, nil | |||
} | |||
|
|||
// ListConsumerGroupsUUID returns all ConsumerGroups matching the given destination-uuid. | |||
func (s *CassandraMetadataService) ListConsumerGroupsUUID(ctx thrift.Context, request *shared.ListConsumerGroupsUUIDRequest) (*shared.ListConsumerGroupsUUIDResult_, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add some UT for the new API?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i have now.
@@ -312,26 +314,117 @@ func (t *RetentionManager) runRetention(jobsC chan<- *retentionJob) bool { | |||
continue | |||
} | |||
|
|||
log := t.logger.WithField(common.TagDst, dest.id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to UT the new behavior
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it can be .. though it will be quite some work; will give it a shot.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as discussed offline, i am going to take this up separately.
inputs = append(inputs, &shared.ListConsumerGroupRequest{ | ||
}).Equals(cgSet), "ListConsumerGroups did not return all CGs") | ||
|
||
assert.True(listCGs(&shared.ListConsumerGroupRequest{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this section duplicated from previous one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes. removed.
* Revert "Slowdown redeliveries, add timeout metrics (#290)" This reverts commit 464433e. * cmq - Cherami Metadata Query-r * ttt * show remote extents * remove binary * fix rate computation * flag DLQ extents * monitor -> watch * Revert "Slowdown redeliveries, add timeout metrics (#290)" (#316) This reverts commit 464433e. * sort by cgx-status updated time * show milliseconds since update * mark cgx for consumed extent as consumed * ~ * ~ * RetMgr: Handle cleanup of consumer-groups and consumer-group extents (#310) * handle consumer-group deletion * fix mocks, tests * debug log * more log * log * metadata logs * add destination_uuid column to consumer_groups table * ListConsumerGroupsUUID * Make RetMgr use ListConsumerGroupsUUID * update meta-metrics * thrift * thrift * update the new dest-uuid column in create-, update- and delete- cg paths * fix create-cg cql * fix mocks * ListConsumerGroupsUUID test * CR feedback * return and catch errors from metadata in ret-mgr * incorporate cr feedback * ~ * ensure consumer-groups are deleted before deleting destination * fix build due to gocql API change * cmq - Cherami Metadata Query-r * ttt * show remote extents * remove binary * fix rate computation * flag DLQ extents * monitor -> watch * sort by cgx-status updated time * show milliseconds since update * mark cgx for consumed extent as consumed * ~ * ~ * ... * cg deleting * fix build * yaml config support * cmq zone * output type support * json map output * fix lint * fix lint * fix build * fix lint
This diff also adds a new API to list consumer-groups for a given destination-uuid -- the difference between this and the existing ListConsumerGroups API is that this one would look at the 'consumer_groups' table (instead of 'consumer_groups_by_name', that ListConsumerGroups look at).
Retention-manager uses this new API to query consumer-groups in 'deleting' state and takes appropriate action to 'delete' them.