Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
propagate cg visibility field for multi_zone destination extent
Browse files Browse the repository at this point in the history
  • Loading branch information
datoug committed Apr 7, 2017
1 parent 3fbd20b commit 3d9e8e0
Show file tree
Hide file tree
Showing 9 changed files with 136 additions and 44 deletions.
6 changes: 5 additions & 1 deletion clients/metadata/metadata_cassandra.go
Expand Up @@ -2141,7 +2141,11 @@ func (s *CassandraMetadataService) CreateExtent(ctx thrift.Context, request *sha
}
}

epochMillisNow := s.createExtentImpl(extent, shared.ExtentStatus_OPEN, replicaStatsList, nil, batch)
var consumerGroupVisibility *string
if len(request.GetConsumerGroupVisibility()) > 0 {
consumerGroupVisibility = common.StringPtr(request.GetConsumerGroupVisibility())
}
epochMillisNow := s.createExtentImpl(extent, shared.ExtentStatus_OPEN, replicaStatsList, consumerGroupVisibility, batch)
if err := s.session.ExecuteBatch(batch); err != nil {
return nil, &shared.InternalServiceError{
Message: "CreateExtent: " + err.Error(),
Expand Down
29 changes: 29 additions & 0 deletions clients/metadata/metadata_cassandra_test.go
Expand Up @@ -766,6 +766,35 @@ func (s *CassandraSuite) TestExtentCRU() {
}
}

func (s *CassandraSuite) TestCreateExtentWithCgVisibility() {
extentUUID := uuid.New()
destUUID := uuid.New()
cgUUID := uuid.New()
storeIds := []string{uuid.New(), uuid.New(), uuid.New()}
extent := &shared.Extent{
ExtentUUID: common.StringPtr(extentUUID),
DestinationUUID: common.StringPtr(destUUID),
StoreUUIDs: storeIds,
InputHostUUID: common.StringPtr(uuid.New()),
}
createRequest := &shared.CreateExtentRequest{
Extent: extent,
ConsumerGroupVisibility: common.StringPtr(cgUUID),
}
_, err := s.client.CreateExtent(nil, createRequest)
s.Nil(err)

readExtentStats := &m.ReadExtentStatsRequest{
DestinationUUID: common.StringPtr(destUUID),
ExtentUUID: common.StringPtr(extentUUID),
}
extentStats, err := s.client.ReadExtentStats(nil, readExtentStats)
s.Nil(err)
s.NotNil(extentStats)
s.Equal(shared.ExtentStatus_OPEN, extentStats.GetExtentStats().GetStatus())
s.Equal(cgUUID, extentStats.GetExtentStats().GetConsumerGroupVisibility())
}

func (s *CassandraSuite) TestUpdateStoreExtentReplicaStats() {
extentUUID := uuid.New()
destUUID := uuid.New()
Expand Down
2 changes: 1 addition & 1 deletion glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion services/controllerhost/consumer_test.go
Expand Up @@ -258,7 +258,7 @@ func (s *McpSuite) TestCGExtentSelectorHonorsRemoteExtent() {
if i%2 == 0 {
zone = `zone1`
}
context.mm.CreateRemoteZoneExtent(dstID, extID, inhosts[0].UUID, stores, zone, stores[0])
context.mm.CreateRemoteZoneExtent(dstID, extID, inhosts[0].UUID, stores, zone, stores[0], ``)
extents = append(extents, extID)
nExtents++
}
Expand Down
3 changes: 2 additions & 1 deletion services/controllerhost/controllerhost.go
Expand Up @@ -1161,7 +1161,8 @@ func (mcp *Mcp) CreateRemoteZoneExtent(ctx thrift.Context, createRequest *shared
inputHost := common.InputHostForRemoteExtent

res, err := context.mm.CreateRemoteZoneExtent(createRequest.GetExtent().GetDestinationUUID(),
createRequest.GetExtent().GetExtentUUID(), inputHost, storeids, createRequest.GetExtent().GetOriginZone(), remoteExtentPrimaryStore)
createRequest.GetExtent().GetExtentUUID(), inputHost, storeids, createRequest.GetExtent().GetOriginZone(),
remoteExtentPrimaryStore, createRequest.GetConsumerGroupVisibility())
if err != nil {
lclLg.WithField(common.TagErr, err).Error("CreateRemoteZoneExtent: metadata CreateRemoteZoneExtent failed")
context.m3Client.IncCounter(metrics.ControllerCreateRemoteZoneExtentScope, metrics.ControllerErrMetadataUpdateCounter)
Expand Down
2 changes: 1 addition & 1 deletion services/controllerhost/event_pipeline_test.go
Expand Up @@ -314,7 +314,7 @@ func (s *EventPipelineSuite) TestStoreHostFailedEventStage2() {
originZone := `zone1`

for i := 0; i < len(extentIDs); i++ {
_, err := s.mcp.context.mm.CreateRemoteZoneExtent(dstID, extentIDs[i], inHostIDs[i], storeIDs, originZone, storeIDs[primaryStoreIdx])
_, err := s.mcp.context.mm.CreateRemoteZoneExtent(dstID, extentIDs[i], inHostIDs[i], storeIDs, originZone, storeIDs[primaryStoreIdx], ``)
s.Nil(err, "Failed to create extent")
}

Expand Down
13 changes: 8 additions & 5 deletions services/controllerhost/metadataMgr.go
Expand Up @@ -80,7 +80,7 @@ type (
// CreateExtent creates a new extent for the given destination and marks the status as OPEN
CreateExtent(dstID string, extentID string, inhostID string, storeIDs []string) (*shared.CreateExtentResult_, error)
// CreateRemoteZoneExtent creates a new remote zone extent for the given destination and marks the status as OPEN
CreateRemoteZoneExtent(dstID string, extentID string, inhostID string, storeIDs []string, originZone string, remoteExtentPrimaryStore string) (*shared.CreateExtentResult_, error)
CreateRemoteZoneExtent(dstID string, extentID string, inhostID string, storeIDs []string, originZone string, remoteExtentPrimaryStore string, consumerGroupVisibility string) (*shared.CreateExtentResult_, error)
// AddExtentToConsumerGroup adds an open extent to consumer group for consumption
AddExtentToConsumerGroup(dstID string, cgID string, extentID string, outHostID string, storeIDs []string) error
// ListConsumerGroupsByDstID lists all consumer groups for a given destination uuid
Expand Down Expand Up @@ -471,15 +471,15 @@ func (mm *metadataMgrImpl) ListExtentsByConsumerGroup(dstID string, cgID string,

// CreateExtent creates a new extent for the given destination and marks the status as OPEN
func (mm *metadataMgrImpl) CreateExtent(dstID string, extentID string, inhostID string, storeIDs []string) (*shared.CreateExtentResult_, error) {
return mm.createExtentInternal(dstID, extentID, inhostID, storeIDs, ``, ``)
return mm.createExtentInternal(dstID, extentID, inhostID, storeIDs, ``, ``, ``)
}

// CreateRemoteZoneExtent creates a new remote zone extent for the given destination and marks the status as OPEN
func (mm *metadataMgrImpl) CreateRemoteZoneExtent(dstID string, extentID string, inhostID string, storeIDs []string, originZone string, remoteExtentPrimaryStore string) (*shared.CreateExtentResult_, error) {
return mm.createExtentInternal(dstID, extentID, inhostID, storeIDs, originZone, remoteExtentPrimaryStore)
func (mm *metadataMgrImpl) CreateRemoteZoneExtent(dstID string, extentID string, inhostID string, storeIDs []string, originZone string, remoteExtentPrimaryStore string, consumerGroupVisibility string) (*shared.CreateExtentResult_, error) {
return mm.createExtentInternal(dstID, extentID, inhostID, storeIDs, originZone, remoteExtentPrimaryStore, consumerGroupVisibility)
}

func (mm *metadataMgrImpl) createExtentInternal(dstID string, extentID string, inhostID string, storeIDs []string, originZone string, remoteExtentPrimaryStore string) (*shared.CreateExtentResult_, error) {
func (mm *metadataMgrImpl) createExtentInternal(dstID string, extentID string, inhostID string, storeIDs []string, originZone string, remoteExtentPrimaryStore string, consumerGroupVisibility string) (*shared.CreateExtentResult_, error) {

extent := &shared.Extent{
ExtentUUID: common.StringPtr(extentID),
Expand All @@ -490,6 +490,9 @@ func (mm *metadataMgrImpl) createExtentInternal(dstID string, extentID string, i
RemoteExtentPrimaryStore: common.StringPtr(remoteExtentPrimaryStore),
}
mReq := &shared.CreateExtentRequest{Extent: extent}
if len(consumerGroupVisibility) > 0 {
mReq.ConsumerGroupVisibility = common.StringPtr(consumerGroupVisibility)
}

res, err := mm.mClient.CreateExtent(nil, mReq)
if err != nil {
Expand Down
36 changes: 22 additions & 14 deletions services/replicator/metadataReconciler.go
Expand Up @@ -618,7 +618,7 @@ func (r *metadataReconciler) reconcileCgExtentMetadata(localCgs []*shared.Consum
return nil
}

func (r *metadataReconciler) getAllDestExtentInRemoteZone(zone string, destUUID string) (map[string]shared.ExtentStatus, error) {
func (r *metadataReconciler) getAllDestExtentInRemoteZone(zone string, destUUID string) (map[string]*shared.ExtentStats, error) {
var err error
remoteReplicator, err := r.replicator.replicatorclientFactory.GetReplicatorClient(zone)
if err != nil {
Expand All @@ -635,7 +635,7 @@ func (r *metadataReconciler) getAllDestExtentInRemoteZone(zone string, destUUID
Limit: common.Int64Ptr(metadataListRequestPageSize),
}

extents := make(map[string]shared.ExtentStatus)
extents := make(map[string]*shared.ExtentStats)
for {
ctx, cancel := thrift.NewContext(remoteReplicatorCallTimeOut)
defer cancel()
Expand All @@ -646,7 +646,7 @@ func (r *metadataReconciler) getAllDestExtentInRemoteZone(zone string, destUUID
}

for _, ext := range res.GetExtentStatsList() {
extents[ext.GetExtent().GetExtentUUID()] = ext.GetStatus()
extents[ext.GetExtent().GetExtentUUID()] = ext
}

if len(res.GetNextPageToken()) == 0 {
Expand All @@ -659,14 +659,14 @@ func (r *metadataReconciler) getAllDestExtentInRemoteZone(zone string, destUUID
return extents, nil
}

func (r *metadataReconciler) getAllDestExtentInCurrentZone(destUUID string) (map[string]map[string]shared.ExtentStatus, error) {
func (r *metadataReconciler) getAllDestExtentInCurrentZone(destUUID string) (map[string]map[string]*shared.ExtentStats, error) {
listReq := &shared.ListExtentsStatsRequest{
DestinationUUID: common.StringPtr(destUUID),
LocalExtentsOnly: common.BoolPtr(false),
Limit: common.Int64Ptr(metadataListRequestPageSize),
}

perZoneExtents := make(map[string]map[string]shared.ExtentStatus)
perZoneExtents := make(map[string]map[string]*shared.ExtentStats)
for {
res, err := r.mClient.ListExtentsStats(nil, listReq)
if err != nil {
Expand All @@ -677,9 +677,9 @@ func (r *metadataReconciler) getAllDestExtentInCurrentZone(destUUID string) (map
for _, ext := range res.GetExtentStatsList() {
zone := ext.GetExtent().GetOriginZone()
if _, ok := perZoneExtents[zone]; !ok {
perZoneExtents[zone] = make(map[string]shared.ExtentStatus)
perZoneExtents[zone] = make(map[string]*shared.ExtentStats)
}
perZoneExtents[zone][ext.GetExtent().GetExtentUUID()] = ext.GetStatus()
perZoneExtents[zone][ext.GetExtent().GetExtentUUID()] = ext
}

if len(res.GetNextPageToken()) == 0 {
Expand Down Expand Up @@ -767,13 +767,14 @@ func (r *metadataReconciler) getAllCgExtentInCurrentZone(destUUID string, cgUUID
return cgExtents, nil
}

func (r *metadataReconciler) reconcileDestExtent(destUUID string, localExtents map[string]shared.ExtentStatus, remoteExtents map[string]shared.ExtentStatus, remoteZone string) {
func (r *metadataReconciler) reconcileDestExtent(destUUID string, localExtents map[string]*shared.ExtentStats, remoteExtents map[string]*shared.ExtentStats, remoteZone string) {
var remoteDeletedLocalNotCount int64
var remoteConsumedLocalMissingCount int64
var remoteDeletedLocalMissingCount int64
var foundMissingCount int64
for remoteExtentUUID, remoteExtentStatus := range remoteExtents {
localExtentStatus, ok := localExtents[remoteExtentUUID]
for remoteExtentUUID, remoteExtentStats := range remoteExtents {
remoteExtentStatus := remoteExtentStats.GetStatus()
localExtentStats, ok := localExtents[remoteExtentUUID]
if !ok {
r.logger.WithFields(bark.Fields{
common.TagDst: common.FmtDst(destUUID),
Expand Down Expand Up @@ -803,6 +804,12 @@ func (r *metadataReconciler) reconcileDestExtent(destUUID string, localExtents m
OriginZone: common.StringPtr(remoteZone),
},
}

// if the remote extent has cg visibility set (a merged dlq extent), propagate that field too.
if remoteExtentStats.IsSetConsumerGroupVisibility() {
createRequest.ConsumerGroupVisibility = common.StringPtr(remoteExtentStats.GetConsumerGroupVisibility())
}

ctx, cancel := thrift.NewContext(localReplicatorCallTimeOut)
defer cancel()
_, err := r.replicator.CreateExtent(ctx, createRequest)
Expand All @@ -816,6 +823,7 @@ func (r *metadataReconciler) reconcileDestExtent(destUUID string, localExtents m
continue
}
} else {
localExtentStatus := localExtentStats.GetStatus()
if (remoteExtentStatus == shared.ExtentStatus_SEALED || remoteExtentStatus == shared.ExtentStatus_CONSUMED) && localExtentStatus == shared.ExtentStatus_OPEN {
r.sealExtentInMetadata(destUUID, remoteExtentUUID)
}
Expand All @@ -833,9 +841,9 @@ func (r *metadataReconciler) reconcileDestExtent(destUUID string, localExtents m
// we act on it only after it has been missing for a certain period of time

remoteMissingExtents := make(map[string]struct{})
for localExtentUUID, localExtentStatus := range localExtents {
for localExtentUUID, localExtentStats := range localExtents {
// we're going to delete this extent soon locally so no need to act on it
if localExtentStatus == shared.ExtentStatus_CONSUMED || localExtentStatus == shared.ExtentStatus_DELETED {
if localExtentStats.GetStatus() == shared.ExtentStatus_CONSUMED || localExtentStats.GetStatus() == shared.ExtentStatus_DELETED {
continue
}
if _, ok := remoteExtents[localExtentUUID]; !ok {
Expand All @@ -853,15 +861,15 @@ func (r *metadataReconciler) reconcileDestExtent(destUUID string, localExtents m
delete(r.suspectMissingExtents, suspectExtent)
} else {
if time.Since(suspectExtentInfo.missingSince) > extentMissingDurationThreshold {
localStatus, ok := localExtents[suspectExtent]
localStats, ok := localExtents[suspectExtent]
if !ok {
r.logger.WithFields(bark.Fields{
common.TagDst: common.FmtDst(suspectExtentInfo.destUUID),
common.TagExt: common.FmtExt(suspectExtent),
}).Error(`code bug!! suspect extent should in local extent map!!`)
continue
}
r.handleExtentDeletedOrMissingInRemote(suspectExtentInfo.destUUID, suspectExtent, localStatus, &remoteDeletedLocalNotCount)
r.handleExtentDeletedOrMissingInRemote(suspectExtentInfo.destUUID, suspectExtent, localStats.GetStatus(), &remoteDeletedLocalNotCount)
}
}
}
Expand Down
87 changes: 67 additions & 20 deletions services/replicator/replicator_test.go
Expand Up @@ -1322,9 +1322,42 @@ func (s *ReplicatorSuite) TestDestExtentMetadataReconcileLocalMissing() {
s.Equal(remoteZone, req.GetExtent().GetOriginZone())
})

localExtents := make(map[string]shared.ExtentStatus)
remoteExtents := make(map[string]shared.ExtentStatus)
remoteExtents[missingExtent] = shared.ExtentStatus_OPEN
localExtents := make(map[string]*shared.ExtentStats)
remoteExtents := make(map[string]*shared.ExtentStats)
remoteExtents[missingExtent] = &shared.ExtentStats{
Status: common.MetadataExtentStatusPtr(shared.ExtentStatus_OPEN),
}
reconciler.reconcileDestExtent(dest, localExtents, remoteExtents, remoteZone)
s.mockControllerClient.AssertExpectations(s.T())
}

// local zone is missing one dlq destination extent compared to remote. Expect to create the missing destination extent
func (s *ReplicatorSuite) TestDestExtentMetadataReconcileLocalMissingDLQExtent() {
localZone := `zone2`
remoteZone := `zone1`
missingExtent := uuid.New()
dest := uuid.New()
cg := uuid.New()

repliator, _ := NewReplicator("replicator-test", s.mockService, s.mockMeta, s.mockReplicatorClientFactory, s.cfg)
reconciler, _ := NewMetadataReconciler(repliator.metaClient, repliator, localZone, repliator.logger, repliator.m3Client).(*metadataReconciler)

// setup mock
s.mockControllerClient.On("CreateRemoteZoneExtent", mock.Anything, mock.Anything).Return(shared.NewCreateExtentResult_(), nil).Run(func(args mock.Arguments) {
req := args.Get(1).(*shared.CreateExtentRequest)
s.True(req.IsSetExtent())
s.Equal(missingExtent, req.GetExtent().GetExtentUUID())
s.Equal(dest, req.GetExtent().GetDestinationUUID())
s.Equal(remoteZone, req.GetExtent().GetOriginZone())
s.Equal(cg, req.GetConsumerGroupVisibility())
})

localExtents := make(map[string]*shared.ExtentStats)
remoteExtents := make(map[string]*shared.ExtentStats)
remoteExtents[missingExtent] = &shared.ExtentStats{
Status: common.MetadataExtentStatusPtr(shared.ExtentStatus_OPEN),
ConsumerGroupVisibility: common.StringPtr(cg),
}
reconciler.reconcileDestExtent(dest, localExtents, remoteExtents, remoteZone)
s.mockControllerClient.AssertExpectations(s.T())
}
Expand All @@ -1339,9 +1372,11 @@ func (s *ReplicatorSuite) TestDestExtentMetadataReconcileLocalMissingConsumedExt
repliator, _ := NewReplicator("replicator-test", s.mockService, s.mockMeta, s.mockReplicatorClientFactory, s.cfg)
reconciler, _ := NewMetadataReconciler(repliator.metaClient, repliator, localZone, repliator.logger, repliator.m3Client).(*metadataReconciler)

localExtents := make(map[string]shared.ExtentStatus)
remoteExtents := make(map[string]shared.ExtentStatus)
remoteExtents[missingExtent] = shared.ExtentStatus_CONSUMED
localExtents := make(map[string]*shared.ExtentStats)
remoteExtents := make(map[string]*shared.ExtentStats)
remoteExtents[missingExtent] = &shared.ExtentStats{
Status: common.MetadataExtentStatusPtr(shared.ExtentStatus_CONSUMED),
}
reconciler.reconcileDestExtent(dest, localExtents, remoteExtents, remoteZone)
s.mockControllerClient.AssertExpectations(s.T())
}
Expand All @@ -1355,8 +1390,8 @@ func (s *ReplicatorSuite) TestDestExtentMetadataReconcileLocalAndRemoteEmpty() {
repliator, _ := NewReplicator("replicator-test", s.mockService, s.mockMeta, s.mockReplicatorClientFactory, s.cfg)
reconciler, _ := NewMetadataReconciler(repliator.metaClient, repliator, localZone, repliator.logger, repliator.m3Client).(*metadataReconciler)

localExtents := make(map[string]shared.ExtentStatus)
remoteExtents := make(map[string]shared.ExtentStatus)
localExtents := make(map[string]*shared.ExtentStats)
remoteExtents := make(map[string]*shared.ExtentStats)
reconciler.reconcileDestExtent(dest, localExtents, remoteExtents, remoteZone)
s.mockControllerClient.AssertExpectations(s.T())
}
Expand All @@ -1379,10 +1414,14 @@ func (s *ReplicatorSuite) TestDestExtentMetadataReconcileInconsistentStatus() {
s.Equal(shared.ExtentStatus_SEALED, req.GetStatus())
})

localExtents := make(map[string]shared.ExtentStatus)
remoteExtents := make(map[string]shared.ExtentStatus)
remoteExtents[extent] = shared.ExtentStatus_SEALED
localExtents[extent] = shared.ExtentStatus_OPEN
localExtents := make(map[string]*shared.ExtentStats)
remoteExtents := make(map[string]*shared.ExtentStats)
remoteExtents[extent] = &shared.ExtentStats{
Status: common.MetadataExtentStatusPtr(shared.ExtentStatus_SEALED),
}
localExtents[extent] = &shared.ExtentStats{
Status: common.MetadataExtentStatusPtr(shared.ExtentStatus_OPEN),
}
reconciler.reconcileDestExtent(dest, localExtents, remoteExtents, remoteZone)
s.mockMeta.AssertExpectations(s.T())
}
Expand Down Expand Up @@ -1421,10 +1460,14 @@ func (s *ReplicatorSuite) TestDestExtentMetadataReconcileRemoteDeletedLocalNot()
s.Equal(extent, req.GetExtentUUID())
})

localExtents := make(map[string]shared.ExtentStatus)
remoteExtents := make(map[string]shared.ExtentStatus)
remoteExtents[extent] = shared.ExtentStatus_DELETED
localExtents[extent] = shared.ExtentStatus_OPEN
localExtents := make(map[string]*shared.ExtentStats)
remoteExtents := make(map[string]*shared.ExtentStats)
remoteExtents[extent] = &shared.ExtentStats{
Status: common.MetadataExtentStatusPtr(shared.ExtentStatus_DELETED),
}
localExtents[extent] = &shared.ExtentStats{
Status: common.MetadataExtentStatusPtr(shared.ExtentStatus_OPEN),
}
reconciler.reconcileDestExtent(dest, localExtents, remoteExtents, remoteZone)
s.mockMeta.AssertExpectations(s.T())
s.mockStoreClient.AssertExpectations(s.T())
Expand All @@ -1442,9 +1485,11 @@ func (s *ReplicatorSuite) TestDestExtentMetadataReconcileRemoteGoneLocalNot() {
reconciler, _ := NewMetadataReconciler(repliator.metaClient, repliator, localZone, repliator.logger, repliator.m3Client).(*metadataReconciler)

// step1: found extent missing from remote. No action expected, only added to suspect list
localExtents := make(map[string]shared.ExtentStatus)
remoteExtents := make(map[string]shared.ExtentStatus)
localExtents[extent] = shared.ExtentStatus_OPEN
localExtents := make(map[string]*shared.ExtentStats)
remoteExtents := make(map[string]*shared.ExtentStats)
localExtents[extent] = &shared.ExtentStats{
Status: common.MetadataExtentStatusPtr(shared.ExtentStatus_OPEN),
}
reconciler.reconcileDestExtent(dest, localExtents, remoteExtents, remoteZone)
s.mockMeta.AssertExpectations(s.T())
s.mockStoreClient.AssertExpectations(s.T())
Expand Down Expand Up @@ -1509,7 +1554,9 @@ func (s *ReplicatorSuite) TestDestExtentMetadataReconcileRemoteGoneLocalNot() {
s.Equal(1, len(reconciler.suspectMissingExtents))

// step4: add the extent in remote zone, expect no action, and extent removed from suspect list
remoteExtents[extent] = shared.ExtentStatus_OPEN
remoteExtents[extent] = &shared.ExtentStats{
Status: common.MetadataExtentStatusPtr(shared.ExtentStatus_OPEN),
}
reconciler.reconcileDestExtent(dest, localExtents, remoteExtents, remoteZone)
s.mockMeta.AssertExpectations(s.T())
s.mockStoreClient.AssertExpectations(s.T())
Expand Down

0 comments on commit 3d9e8e0

Please sign in to comment.