Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
cache the cg extent locally to reduce load on cassandra
Browse files Browse the repository at this point in the history
  • Loading branch information
datoug committed Jul 17, 2017
1 parent 10390f4 commit b94ec3a
Showing 1 changed file with 33 additions and 18 deletions.
51 changes: 33 additions & 18 deletions services/replicator/replicator.go
Expand Up @@ -67,6 +67,9 @@ type (
storehostConn map[string]*outConnection
storehostConnMutex sync.RWMutex

knownCgExtents map[string]struct{}
knownCgExtentsMutex sync.RWMutex

metadataReconciler MetadataReconciler
}
)
Expand Down Expand Up @@ -127,6 +130,7 @@ func NewReplicator(serviceName string, sVice common.SCommon, metadataClient meta
replicatorclientFactory: replicatorClientFactory,
remoteReplicatorConn: make(map[string]*outConnection),
storehostConn: make(map[string]*outConnection),
knownCgExtents: make(map[string]struct{}),
}

r.metaClient = mm.NewMetadataMetricsMgr(metadataClient, r.m3Client, r.logger)
Expand Down Expand Up @@ -1102,26 +1106,37 @@ func (r *Replicator) SetAckOffset(ctx thrift.Context, request *shared.SetAckOffs
})
r.m3Client.IncCounter(metrics.ReplicatorSetAckOffsetScope, metrics.ReplicatorRequests)

// make sure the cg extent is created locally before accepting the SetAckOffset call.
// otherwise SetAckOffset will create the cg extent entry with no store uuid or output host uuid
// and we may not be able to clean up the entry eventually.
extent, err := r.metaClient.ReadConsumerGroupExtent(nil, &metadata.ReadConsumerGroupExtentRequest{
ConsumerGroupUUID: common.StringPtr(request.GetConsumerGroupUUID()),
ExtentUUID: common.StringPtr(request.GetExtentUUID()),
})
if err != nil {
lcllg.WithField(common.TagErr, err).Error(`SetAckOffset: Failed to read cg extent locally`)
r.m3Client.IncCounter(metrics.ReplicatorSetAckOffsetScope, metrics.ReplicatorFailures)
return err
}
if len(extent.GetExtent().GetStoreUUIDs()) < 1 {
err = fmt.Errorf(`empty store uuid from cg extent`)
lcllg.Error(`SetAckOffset: empty store uuid from cg extent`)
r.m3Client.IncCounter(metrics.ReplicatorSetAckOffsetScope, metrics.ReplicatorFailures)
return err
var cgExtentCreated bool
r.knownCgExtentsMutex.RLock()
_, cgExtentCreated = r.knownCgExtents[request.GetExtentUUID()]
r.knownCgExtentsMutex.RUnlock()

if !cgExtentCreated {
// make sure the cg extent is created locally before accepting the SetAckOffset call.
// otherwise SetAckOffset will create the cg extent entry with no store uuid or output host uuid
// and we may not be able to clean up the entry eventually.
extent, err := r.metaClient.ReadConsumerGroupExtent(nil, &metadata.ReadConsumerGroupExtentRequest{
ConsumerGroupUUID: common.StringPtr(request.GetConsumerGroupUUID()),
ExtentUUID: common.StringPtr(request.GetExtentUUID()),
})
if err != nil {
lcllg.WithField(common.TagErr, err).Error(`SetAckOffset: Failed to read cg extent locally`)
r.m3Client.IncCounter(metrics.ReplicatorSetAckOffsetScope, metrics.ReplicatorFailures)
return err
}
if len(extent.GetExtent().GetStoreUUIDs()) < 1 {
err = fmt.Errorf(`empty store uuid from cg extent`)
lcllg.Error(`SetAckOffset: empty store uuid from cg extent`)
r.m3Client.IncCounter(metrics.ReplicatorSetAckOffsetScope, metrics.ReplicatorFailures)
return err
}

r.knownCgExtentsMutex.Lock()
r.knownCgExtents[request.GetExtentUUID()] = struct{}{}
r.knownCgExtentsMutex.Unlock()
}

err = r.metaClient.SetAckOffset(nil, request)
err := r.metaClient.SetAckOffset(nil, request)
if err != nil {
lcllg.WithField(common.TagErr, err).Error(`Error calling metadata to set ack offset`)
r.m3Client.IncCounter(metrics.ReplicatorSetAckOffsetScope, metrics.ReplicatorFailures)
Expand Down

0 comments on commit b94ec3a

Please sign in to comment.