Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
datoug committed Mar 24, 2017
1 parent 521fc2a commit 4986802
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 45 deletions.
70 changes: 38 additions & 32 deletions clients/metadata/metadata_cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ const (
defaultSessionTimeout = 10 * time.Second
defaultDLQConsumedRetention = 7 * 24 * 3600 // One Week
defaultDLQUnconsumedRetention = 7 * 24 * 3600 // One Week
defaultDLQOwnerEmail = "default@uber"
)

const (
Expand Down Expand Up @@ -1207,37 +1206,9 @@ func (s *CassandraMetadataService) CreateConsumerGroupUUID(ctx thrift.Context, r
// We may create a consumer group consume a DLQ destination and no DLQ destination creation needed in this case
var dlqUUID *string
if common.PathRegex.MatchString(createRequest.GetDestinationPath()) {
dlqCreateRequest := shared.NewCreateDestinationRequest()
dlqCreateRequest.ConsumedMessagesRetention = common.Int32Ptr(defaultDLQConsumedRetention)
dlqCreateRequest.UnconsumedMessagesRetention = common.Int32Ptr(defaultDLQUnconsumedRetention)
dlqCreateRequest.OwnerEmail = common.StringPtr(defaultDLQOwnerEmail)
dlqCreateRequest.Type = common.InternalDestinationTypePtr(shared.DestinationType_PLAIN)
dlqCreateRequest.DLQConsumerGroupUUID = common.StringPtr(cgUUID)
dlqPath, _ := common.GetDLQPathNameFromCGName(createRequest.GetConsumerGroupName())
dlqCreateRequest.Path = common.StringPtr(dlqPath)

var dlqDestDesc *shared.DestinationDescription
dlqDestDesc, err := s.CreateDestination(nil, dlqCreateRequest)

if err != nil || dlqDestDesc == nil {
switch err.(type) {
case *shared.EntityAlreadyExistsError:
log.WithFields(log.Fields{common.TagCnsm: common.FmtCnsm(cgUUID)}).Info("DeadLetterQueue destination already existed")
mDLQReadRequest := shared.ReadDestinationRequest{
Path: dlqCreateRequest.Path,
}

dlqDestDesc, err = s.ReadDestination(nil, &mDLQReadRequest)
if err != nil || dlqDestDesc == nil {
log.WithFields(log.Fields{common.TagCnsm: common.FmtCnsm(cgUUID), common.TagErr: err}).Error(`Can't read existing DeadLetterQueue destination`)
return nil, err
}

// We continue to consumer group creation if err == nil and dlqDestDesc != nil after the read
default:
log.WithFields(log.Fields{common.TagCnsm: common.FmtCnsm(cgUUID), common.TagErr: err}).Error(`Can't create DeadLetterQueue destination`)
return nil, err
}
dlqDestDesc, err := s.createDlqDestination(cgUUID, createRequest.GetConsumerGroupName(), createRequest.GetOwnerEmail())
if err != nil {
return nil, err
}

dlqUUID = common.StringPtr(dlqDestDesc.GetDestinationUUID())
Expand Down Expand Up @@ -1354,6 +1325,41 @@ func (s *CassandraMetadataService) CreateConsumerGroupUUID(ctx thrift.Context, r
}, nil
}

func (s *CassandraMetadataService) createDlqDestination(cgUUID string, cgName string, ownerEmail string) (*shared.DestinationDescription, error) {
dlqCreateRequest := shared.NewCreateDestinationRequest()
dlqCreateRequest.ConsumedMessagesRetention = common.Int32Ptr(defaultDLQConsumedRetention)
dlqCreateRequest.UnconsumedMessagesRetention = common.Int32Ptr(defaultDLQUnconsumedRetention)
dlqCreateRequest.OwnerEmail = common.StringPtr(ownerEmail)
dlqCreateRequest.Type = common.InternalDestinationTypePtr(shared.DestinationType_PLAIN)
dlqCreateRequest.DLQConsumerGroupUUID = common.StringPtr(cgUUID)
dlqPath, _ := common.GetDLQPathNameFromCGName(cgName)
dlqCreateRequest.Path = common.StringPtr(dlqPath)

var dlqDestDesc *shared.DestinationDescription
dlqDestDesc, err := s.CreateDestination(nil, dlqCreateRequest)

if err != nil || dlqDestDesc == nil {
switch err.(type) {
case *shared.EntityAlreadyExistsError:
log.WithFields(log.Fields{common.TagCnsm: common.FmtCnsm(cgUUID)}).Info("DeadLetterQueue destination already existed")
mDLQReadRequest := shared.ReadDestinationRequest{
Path: dlqCreateRequest.Path,
}

dlqDestDesc, err = s.ReadDestination(nil, &mDLQReadRequest)
if err != nil || dlqDestDesc == nil {
log.WithFields(log.Fields{common.TagCnsm: common.FmtCnsm(cgUUID), common.TagErr: err}).Error(`Can't read existing DeadLetterQueue destination`)
return nil, err
}
return dlqDestDesc, nil
default:
log.WithFields(log.Fields{common.TagCnsm: common.FmtCnsm(cgUUID), common.TagErr: err}).Error(`Can't create DeadLetterQueue destination`)
return nil, err
}
}
return dlqDestDesc, err
}

func (s *CassandraMetadataService) readConsumerGroupByDstUUID(dstUUID string, cgName string) (*shared.ConsumerGroupDescription, error) {
result := getUtilConsumerGroupDescription()
var zoneConfigsData []map[string]interface{}
Expand Down
7 changes: 7 additions & 0 deletions clients/metadata/metadata_cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1736,6 +1736,13 @@ func (s *CassandraSuite) TestConsumerGroupCRUD() {
dlqDest, err := s.client.ReadDestination(nil, dlqDestReq)
assert.Nil(err, "Read Dlq destination failed")
assert.Equal(dlqDest.GetDLQConsumerGroupUUID(), gotCG.GetConsumerGroupUUID())
assert.Equal(dlqDest.GetConsumedMessagesRetention(), int32(defaultDLQConsumedRetention))
assert.Equal(dlqDest.GetUnconsumedMessagesRetention(), int32(defaultDLQUnconsumedRetention))
assert.Equal(dlqDest.GetIsMultiZone(), false)
assert.Equal(dlqDest.GetOwnerEmail(), createReq.GetOwnerEmail())
dlqName, err := common.GetDLQPathNameFromCGName(createReq.GetConsumerGroupName())
assert.Nil(err, "GetDLQPathNameFromCGName failed")
assert.Equal(dlqDest.GetPath(), dlqName)

for pass := 0; pass < 3; pass++ {
readReq := &m.ReadConsumerGroupRequest{
Expand Down
2 changes: 1 addition & 1 deletion services/replicator/metadataReconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ func (r *metadataReconciler) reconcileCg(localCgs []*shared.ConsumerGroupDescrip
if err != nil {
lclLg.WithFields(bark.Fields{
common.TagErr: err,
}).Error(`Failed to create ConsumerGroup in local zone because read destination failed`)
}).Error(`Failed to create ConsumerGroup in local zone because read destination failed in remote zone`)
continue
}

Expand Down
24 changes: 12 additions & 12 deletions services/replicator/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func (r *Replicator) OpenReplicationRemoteReadStreamHandler(w http.ResponseWrite
return
}

// CreateDestinationUUID creates destination at local zone, expect to be called by remote replicator
// CreateDestinationUUID creates destination at local zone, expect to be called by replicator from authoritative zone
func (r *Replicator) CreateDestinationUUID(ctx thrift.Context, createRequest *shared.CreateDestinationUUIDRequest) (*shared.DestinationDescription, error) {
r.m3Client.IncCounter(metrics.ReplicatorCreateDestUUIDScope, metrics.ReplicatorRequests)

Expand Down Expand Up @@ -319,7 +319,7 @@ func (r *Replicator) CreateDestinationUUID(ctx thrift.Context, createRequest *sh
return destDesc, nil
}

// CreateRemoteDestinationUUID propagates creation to multiple remote zones, expect to be called by local zone services
// CreateRemoteDestinationUUID propagates creation to multiple remote zones, expect to be called only in authoritative zone
func (r *Replicator) CreateRemoteDestinationUUID(ctx thrift.Context, createRequest *shared.CreateDestinationUUIDRequest) error {
r.m3Client.IncCounter(metrics.ReplicatorCreateRmtDestUUIDScope, metrics.ReplicatorRequests)

Expand Down Expand Up @@ -392,7 +392,7 @@ func (r *Replicator) createDestinationRemoteCall(zone string, logger bark.Logger
return nil
}

// UpdateDestination updates destination at local zone, expect to be called by remote replicator
// UpdateDestination updates destination at local zone, expect to be called by replicator from authoritative zone
func (r *Replicator) UpdateDestination(ctx thrift.Context, updateRequest *shared.UpdateDestinationRequest) (*shared.DestinationDescription, error) {
r.m3Client.IncCounter(metrics.ReplicatorUpdateDestScope, metrics.ReplicatorRequests)

Expand All @@ -419,7 +419,7 @@ func (r *Replicator) UpdateDestination(ctx thrift.Context, updateRequest *shared
return destDesc, nil
}

// UpdateRemoteDestination propagates update to multiple remote zones, expect to be called by local zone services
// UpdateRemoteDestination propagates update to multiple remote zones, expect to be called only in authoritative zone
func (r *Replicator) UpdateRemoteDestination(ctx thrift.Context, updateRequest *shared.UpdateDestinationRequest) error {
r.m3Client.IncCounter(metrics.ReplicatorUpdateRmtDestScope, metrics.ReplicatorRequests)

Expand Down Expand Up @@ -484,7 +484,7 @@ func (r *Replicator) updateDestinationRemoteCall(zone string, logger bark.Logger
return nil
}

// DeleteDestination deletes destination at local zone, expect to be called by remote replicator
// DeleteDestination deletes destination at local zone, expect to be called by replicator from authoritative zone
func (r *Replicator) DeleteDestination(ctx thrift.Context, deleteRequest *shared.DeleteDestinationRequest) error {
r.m3Client.IncCounter(metrics.ReplicatorDeleteDestScope, metrics.ReplicatorRequests)

Expand All @@ -501,7 +501,7 @@ func (r *Replicator) DeleteDestination(ctx thrift.Context, deleteRequest *shared
return nil
}

// DeleteRemoteDestination propagate deletion to multiple remote zones, expect to be called by local zone services
// DeleteRemoteDestination propagate deletion to multiple remote zones, expect to be called only in authoritative zone
func (r *Replicator) DeleteRemoteDestination(ctx thrift.Context, deleteRequest *shared.DeleteDestinationRequest) error {
r.m3Client.IncCounter(metrics.ReplicatorDeleteRmtDestScope, metrics.ReplicatorRequests)

Expand Down Expand Up @@ -566,7 +566,7 @@ func (r *Replicator) deleteDestinationRemoteCall(zone string, logger bark.Logger
return nil
}

// CreateConsumerGroupUUID creates consumer group at local zone, expect to be called by remote replicator
// CreateConsumerGroupUUID creates consumer group at local zone, expect to be called by replicator from authoritative zone
func (r *Replicator) CreateConsumerGroupUUID(ctx thrift.Context, createRequest *shared.CreateConsumerGroupUUIDRequest) (*shared.ConsumerGroupDescription, error) {
r.m3Client.IncCounter(metrics.ReplicatorCreateCgUUIDScope, metrics.ReplicatorRequests)

Expand Down Expand Up @@ -594,7 +594,7 @@ func (r *Replicator) CreateConsumerGroupUUID(ctx thrift.Context, createRequest *
return cgDesc, nil
}

// CreateRemoteConsumerGroupUUID propagate creation to multiple remote zones, expect to be called by local zone services
// CreateRemoteConsumerGroupUUID propagate creation to multiple remote zones, expect to be called only in authoritative zone
func (r *Replicator) CreateRemoteConsumerGroupUUID(ctx thrift.Context, createRequest *shared.CreateConsumerGroupUUIDRequest) error {
r.m3Client.IncCounter(metrics.ReplicatorCreateRmtCgUUIDScope, metrics.ReplicatorRequests)

Expand Down Expand Up @@ -668,7 +668,7 @@ func (r *Replicator) createConsumerGroupRemoteCall(zone string, logger bark.Logg
return nil
}

// UpdateConsumerGroup updates consumer group at local zone, expect to be called by remote replicator
// UpdateConsumerGroup updates consumer group at local zone, expect to be called by replicator from authoritative zone
func (r *Replicator) UpdateConsumerGroup(ctx thrift.Context, updateRequest *shared.UpdateConsumerGroupRequest) (*shared.ConsumerGroupDescription, error) {
r.m3Client.IncCounter(metrics.ReplicatorUpdateCgScope, metrics.ReplicatorRequests)

Expand All @@ -695,7 +695,7 @@ func (r *Replicator) UpdateConsumerGroup(ctx thrift.Context, updateRequest *shar
return cgDesc, nil
}

// UpdateRemoteConsumerGroup propagate update to multiple remote zones, expect to be called by local zone services
// UpdateRemoteConsumerGroup propagate update to multiple remote zones, expect to be called only in authoritative zone
func (r *Replicator) UpdateRemoteConsumerGroup(ctx thrift.Context, updateRequest *shared.UpdateConsumerGroupRequest) error {
r.m3Client.IncCounter(metrics.ReplicatorUpdateRmtCgScope, metrics.ReplicatorRequests)

Expand Down Expand Up @@ -761,7 +761,7 @@ func (r *Replicator) updateCgRemoteCall(zone string, logger bark.Logger, updateR
return nil
}

// DeleteConsumerGroup deletes consumer group at local zone, expect to be called by remote replicator
// DeleteConsumerGroup deletes consumer group at local zone, expect to be called by replicator from authoritative zone
func (r *Replicator) DeleteConsumerGroup(ctx thrift.Context, deleteRequest *shared.DeleteConsumerGroupRequest) error {
r.m3Client.IncCounter(metrics.ReplicatorDeleteCgScope, metrics.ReplicatorRequests)

Expand All @@ -779,7 +779,7 @@ func (r *Replicator) DeleteConsumerGroup(ctx thrift.Context, deleteRequest *shar
return nil
}

// DeleteRemoteConsumerGroup propagate deletion to multiple remote zones, expect to be called by local zone services
// DeleteRemoteConsumerGroup propagate deletion to multiple remote zones, expect to be called only in authoritative zone
func (r *Replicator) DeleteRemoteConsumerGroup(ctx thrift.Context, deleteRequest *shared.DeleteConsumerGroupRequest) error {
r.m3Client.IncCounter(metrics.ReplicatorDeleteRmtCgScope, metrics.ReplicatorRequests)

Expand Down

0 comments on commit 4986802

Please sign in to comment.