Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
datoug committed May 2, 2017
1 parent b9c5086 commit cbeb2f1
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 51 deletions.
2 changes: 1 addition & 1 deletion clients/metadata/metadata_cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1836,7 +1836,7 @@ func (s *CassandraSuite) TestConsumerGroupCRUD() {
OwnerEmail: common.StringPtr("consumer_test@uber.com"),
}

if pass % 2 == 0 {
if pass%2 == 0 {
updateReq.ActiveZone = common.StringPtr(`zone2`)
} else {
updateReq.ActiveZone = common.StringPtr(``)
Expand Down
3 changes: 3 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1060,6 +1060,8 @@ const (
ReplicatorReconcileCgFail
// ReplicatorReconcileCgFoundMissing indicates the reconcile for cg found a missing cg
ReplicatorReconcileCgFoundMissing
// ReplicatorReconcileCgFoundUpdated indicates the reconcile for cg found a updated cg
ReplicatorReconcileCgFoundUpdated
// ReplicatorReconcileDestExtentRun indicates the reconcile for dest extent runs
ReplicatorReconcileDestExtentRun
// ReplicatorReconcileDestExtentFail indicates the reconcile for dest extent fails
Expand Down Expand Up @@ -1251,6 +1253,7 @@ var metricDefs = map[ServiceIdx]map[int]metricDefinition{
ReplicatorReconcileCgRun: {Gauge, "replicator.reconcile.cg.run"},
ReplicatorReconcileCgFail: {Gauge, "replicator.reconcile.cg.fail"},
ReplicatorReconcileCgFoundMissing: {Gauge, "replicator.reconcile.cg.foundmissing"},
ReplicatorReconcileCgFoundUpdated: {Gauge, "replicator.reconcile.cg.foundupdated"},
ReplicatorReconcileDestExtentRun: {Gauge, "replicator.reconcile.destextent.run"},
ReplicatorReconcileDestExtentFail: {Gauge, "replicator.reconcile.destextent.fail"},
ReplicatorReconcileDestExtentFoundMissing: {Gauge, "replicator.reconcile.destextent.foundmissing"},
Expand Down
106 changes: 56 additions & 50 deletions services/replicator/metadataReconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ func (r *metadataReconciler) reconcileDest(localDests []*shared.DestinationDescr

func (r *metadataReconciler) reconcileCg(localCgs []*shared.ConsumerGroupDescription, remoteCgs []*shared.ConsumerGroupDescription) {
var replicatorReconcileCgFoundMissingCount int64
var replicatorReconcileCgFoundUpdatedCount int64
localCgsSet := make(map[string]*shared.ConsumerGroupDescription, len(localCgs))
for _, cg := range localCgs {
localCgsSet[cg.GetConsumerGroupUUID()] = cg
Expand Down Expand Up @@ -335,56 +336,8 @@ func (r *metadataReconciler) reconcileCg(localCgs []*shared.ConsumerGroupDescrip
continue
}

// case #2: cg exists in both remote and local, try to compare the property to see if anything gets updated
updateRequest := &shared.UpdateConsumerGroupRequest{}
cgUpdated := false

if localCg.GetLockTimeoutSeconds() != remoteCg.GetLockTimeoutSeconds() {
updateRequest.LockTimeoutSeconds = common.Int32Ptr(remoteCg.GetLockTimeoutSeconds())
cgUpdated = true
}
if localCg.GetMaxDeliveryCount() != remoteCg.GetMaxDeliveryCount() {
updateRequest.MaxDeliveryCount = common.Int32Ptr(remoteCg.GetMaxDeliveryCount())
cgUpdated = true
}
if localCg.GetSkipOlderMessagesSeconds() != remoteCg.GetSkipOlderMessagesSeconds() {
updateRequest.SkipOlderMessagesSeconds = common.Int32Ptr(remoteCg.GetSkipOlderMessagesSeconds())
cgUpdated = true
}
if localCg.GetStatus() != remoteCg.GetStatus() {
updateRequest.Status = common.InternalConsumerGroupStatusPtr(remoteCg.GetStatus())
cgUpdated = true
}
if localCg.GetOwnerEmail() != remoteCg.GetOwnerEmail() {
updateRequest.OwnerEmail = common.StringPtr(remoteCg.GetOwnerEmail())
cgUpdated = true
}
if localCg.GetActiveZone() != remoteCg.GetActiveZone() {
updateRequest.ActiveZone = common.StringPtr(remoteCg.GetActiveZone())
cgUpdated = true
}

if cgUpdated {
lclLg.Info(`Found cg gets updated in remote but not in local`)

destDesc, err := r.readDestinationInAuthoritativeZone(remoteCg.GetDestinationUUID())
if err != nil {
lclLg.WithFields(bark.Fields{
common.TagErr: err,
}).Error(`Failed to update ConsumerGroup in local zone because read destination failed in remote zone`)
continue
}
updateRequest.DestinationPath = common.StringPtr(destDesc.GetPath())
updateRequest.ConsumerGroupName = common.StringPtr(remoteCg.GetConsumerGroupName())

ctx, cancel := thrift.NewContext(localReplicatorCallTimeOut)
defer cancel()
_, err = r.replicator.UpdateConsumerGroup(ctx, updateRequest)
if err != nil {
lclLg.WithField(common.TagErr, err).Error(`Failed to update cg in local zone for reconciliation`)
continue
}
}
// case #2: cg exists in both remote and local, try to compare and update cg
r.compareAndUpdateCg(remoteCg, localCg, lclLg, &replicatorReconcileCgFoundUpdatedCount)
} else {
// case #3: cg exists in remote, but not in local. Create the cg locally
lclLg.Warn(`Found missing ConsumerGroup from remote!`)
Expand Down Expand Up @@ -437,6 +390,59 @@ func (r *metadataReconciler) reconcileCg(localCgs []*shared.ConsumerGroupDescrip
// We don't need to handle this because deleted cg will still be in the uuid table for 30 days, so it should be covered by case #1

r.m3Client.UpdateGauge(metrics.ReplicatorReconcileScope, metrics.ReplicatorReconcileCgFoundMissing, replicatorReconcileCgFoundMissingCount)
r.m3Client.UpdateGauge(metrics.ReplicatorReconcileScope, metrics.ReplicatorReconcileCgFoundUp
}

func (r *metadataReconciler) compareAndUpdateCg(remoteCg *shared.ConsumerGroupDescription, localCg *shared.ConsumerGroupDescription, logger bark.Logger, replicatorReconcileCgFoundUpdatedCount *int64) {
updateRequest := &shared.UpdateConsumerGroupRequest{}
cgUpdated := false

if localCg.GetLockTimeoutSeconds() != remoteCg.GetLockTimeoutSeconds() {
updateRequest.LockTimeoutSeconds = common.Int32Ptr(remoteCg.GetLockTimeoutSeconds())
cgUpdated = true
}
if localCg.GetMaxDeliveryCount() != remoteCg.GetMaxDeliveryCount() {
updateRequest.MaxDeliveryCount = common.Int32Ptr(remoteCg.GetMaxDeliveryCount())
cgUpdated = true
}
if localCg.GetSkipOlderMessagesSeconds() != remoteCg.GetSkipOlderMessagesSeconds() {
updateRequest.SkipOlderMessagesSeconds = common.Int32Ptr(remoteCg.GetSkipOlderMessagesSeconds())
cgUpdated = true
}
if localCg.GetStatus() != remoteCg.GetStatus() {
updateRequest.Status = common.InternalConsumerGroupStatusPtr(remoteCg.GetStatus())
cgUpdated = true
}
if localCg.GetOwnerEmail() != remoteCg.GetOwnerEmail() {
updateRequest.OwnerEmail = common.StringPtr(remoteCg.GetOwnerEmail())
cgUpdated = true
}
if localCg.GetActiveZone() != remoteCg.GetActiveZone() {
updateRequest.ActiveZone = common.StringPtr(remoteCg.GetActiveZone())
cgUpdated = true
}

if cgUpdated {
logger.Info(`Found cg gets updated in remote but not in local`)
*replicatorReconcileCgFoundUpdatedCount = *replicatorReconcileCgFoundUpdatedCount + 1

destDesc, err := r.readDestinationInAuthoritativeZone(remoteCg.GetDestinationUUID())
if err != nil {
logger.WithFields(bark.Fields{
common.TagErr: err,
}).Error(`Failed to update ConsumerGroup in local zone because read destination failed in remote zone`)
return
}
updateRequest.DestinationPath = common.StringPtr(destDesc.GetPath())
updateRequest.ConsumerGroupName = common.StringPtr(remoteCg.GetConsumerGroupName())

ctx, cancel := thrift.NewContext(localReplicatorCallTimeOut)
defer cancel()
_, err = r.replicator.UpdateConsumerGroup(ctx, updateRequest)
if err != nil {
logger.WithField(common.TagErr, err).Error(`Failed to update cg in local zone for reconciliation`)
}
}
}

func (r *metadataReconciler) readDestinationInAuthoritativeZone(destUUID string) (*shared.DestinationDescription, error) {
Expand Down

0 comments on commit cbeb2f1

Please sign in to comment.