Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
create multi zone cg API
Browse files Browse the repository at this point in the history
  • Loading branch information
datoug committed Mar 15, 2017
1 parent 4d6a0d7 commit e6a6457
Show file tree
Hide file tree
Showing 6 changed files with 205 additions and 148 deletions.
164 changes: 101 additions & 63 deletions clients/metadata/metadata_cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,12 @@ const directoryUUID string = "CC3B477C-E6F2-4465-9A98-7FE71B68CD1F"

var uuidRegex, _ = regexp.Compile(`^[[:xdigit:]]{8}-[[:xdigit:]]{4}-[[:xdigit:]]{4}-[[:xdigit:]]{4}-[[:xdigit:]]{12}$`)

const defaultSessionTimeout = 10 * time.Second
const (
defaultSessionTimeout = 10 * time.Second
defaultDLQConsumedRetention = 7 * 24 * 3600 // One Week
defaultDLQUnconsumedRetention = 7 * 24 * 3600 // One Week
defaultDLQOwnerEmail = "default@uber"
)

const (
opsCreate = "create"
Expand Down Expand Up @@ -1181,19 +1186,73 @@ const (
` WHERE ` + columnDestinationUUID + `=? and ` + columnName + `=?`
)

// CreateConsumerGroup creates a ConsumerGroup for the given destination, if it doesn't already exist
func (s *CassandraMetadataService) CreateConsumerGroup(ctx thrift.Context, request *shared.CreateConsumerGroupRequest) (*shared.ConsumerGroupDescription, error) {
uuidRequest := shared.NewCreateConsumerGroupUUIDRequest()
uuidRequest.Request = request
uuidRequest.ConsumerGroupUUID = common.StringPtr(uuid.New())
return s.CreateConsumerGroupUUID(ctx, uuidRequest)
}

// CreateConsumerGroupUUID creates a ConsumerGroup for the given destination, if it doesn't already exist
// ConsumerGroups are tied to a destination path, so the same ConsumerGroupName can be used across
// multiple destination paths. If the requested [destinationPath, consumerGroupName] already exists,
// this method will return an EntityAlreadyExistsError.
func (s *CassandraMetadataService) CreateConsumerGroup(ctx thrift.Context, request *shared.CreateConsumerGroupRequest) (*shared.ConsumerGroupDescription, error) {
func (s *CassandraMetadataService) CreateConsumerGroupUUID(ctx thrift.Context, request *shared.CreateConsumerGroupUUIDRequest) (*shared.ConsumerGroupDescription, error) {
createRequest := request.GetRequest()
cgUUID := request.GetConsumerGroupUUID()

// Dead Letter Queue destination creation

// Only non-UUID (non-DLQ) destinations get a DLQ for the corresponding consumer groups
// We may create a consumer group consume a DLQ destination and no DLQ destination creation needed in this case
var dlqUUID *string
if common.PathRegex.MatchString(createRequest.GetDestinationPath()) {
dlqCreateRequest := shared.NewCreateDestinationRequest()
dlqCreateRequest.ConsumedMessagesRetention = common.Int32Ptr(defaultDLQConsumedRetention)
dlqCreateRequest.UnconsumedMessagesRetention = common.Int32Ptr(defaultDLQUnconsumedRetention)
dlqCreateRequest.OwnerEmail = common.StringPtr(defaultDLQOwnerEmail)
dlqCreateRequest.Type = common.InternalDestinationTypePtr(shared.DestinationType_PLAIN)
dlqCreateRequest.DLQConsumerGroupUUID = common.StringPtr(cgUUID)
dlqPath, _ := common.GetDLQPathNameFromCGName(createRequest.GetConsumerGroupName())
dlqCreateRequest.Path = common.StringPtr(dlqPath)

var dlqDestDesc *shared.DestinationDescription
dlqDestDesc, err := s.CreateDestination(nil, dlqCreateRequest)

if err != nil || dlqDestDesc == nil {
switch err.(type) {
case *shared.EntityAlreadyExistsError:
log.WithFields(log.Fields{common.TagCnsm: common.FmtCnsm(cgUUID)}).Info("DeadLetterQueue destination already existed")
mDLQReadRequest := m.ReadDestinationRequest{
Path: dlqCreateRequest.Path,
}

dlqDestDesc, err = s.ReadDestination(nil, &mDLQReadRequest)
if err != nil || dlqDestDesc == nil {
log.WithFields(log.Fields{common.TagCnsm: common.FmtCnsm(cgUUID), common.TagErr: err}).Error(`Can't read existing DeadLetterQueue destination`)
return nil, err
}

// We continue to consumer group creation if err == nil and dlqDestDesc != nil after the read
default:
log.WithFields(log.Fields{common.TagCnsm: common.FmtCnsm(cgUUID), common.TagErr: err}).Error(`Can't create DeadLetterQueue destination`)
return nil, err
}
}

dstInfo, err := s.ReadDestination(nil, &m.ReadDestinationRequest{Path: common.StringPtr(request.GetDestinationPath())})
dlqUUID = common.StringPtr(dlqDestDesc.GetDestinationUUID())
} else {
log.WithFields(log.Fields{common.TagCnsm: common.FmtCnsm(cgUUID)}).Info("DeadLetterQueue destination not being created")
}

dstInfo, err := s.ReadDestination(nil, &m.ReadDestinationRequest{Path: common.StringPtr(createRequest.GetDestinationPath())})
if err != nil {
return nil, err
}
dstUUID := dstInfo.GetDestinationUUID()

/*
Every ConsumerGroup is assigned a UUID, which identifes the group uniquely. We need to
Every ConsumerGroup is assigned a UUID, which identifies the group uniquely. We need to
be able to retrieve a ConsumerGroup given either a UUID or a (destinationPath, groupName)
tuple. To enable this, we maintain the ConsumerGroup information across two tables, one
indexed by the UUID and the other indexed by the (destinationUUID, groupName) tuple. Creation
Expand All @@ -1207,70 +1266,48 @@ func (s *CassandraMetadataService) CreateConsumerGroup(ctx thrift.Context, reque
A batch query cannot be used here, because CassandraDB requires all the queries in a batch
to be from the same partition.
*/
cgUUID := uuid.New()
dstUUID := dstInfo.GetDestinationUUID()
var dlqUUID *string
if request.DeadLetterQueueDestinationUUID != nil {
var destDesc *shared.DestinationDescription
dlqUUID = common.StringPtr(request.GetDeadLetterQueueDestinationUUID())

// We need to create the consumer group with UUID matching the random UUID already populated in the DLQ destination

readDestReq := m.NewReadDestinationRequest()
readDestReq.DestinationUUID = common.StringPtr(request.GetDeadLetterQueueDestinationUUID())

destDesc, err = s.ReadDestination(ctx, readDestReq)
if err != nil || destDesc == nil || len(destDesc.GetDLQConsumerGroupUUID()) == 0 {
return nil, &shared.InternalServiceError{
Message: fmt.Sprintf("CreateConsumerGroup - lookup of DLQ destination failed dst=%v, cg=%v, dlqDst=%v err=%v",
request.GetDestinationPath(), request.GetConsumerGroupName(), request.GetDeadLetterQueueDestinationUUID(), err),
}
}

cgUUID = destDesc.GetDLQConsumerGroupUUID()
}

err = s.session.Query(sqlInsertCGByUUID,
cgUUID,
request.GetIsMultiZone(),
createRequest.GetIsMultiZone(),
cgUUID,
dstUUID,
request.GetConsumerGroupName(),
request.GetStartFrom(),
createRequest.GetConsumerGroupName(),
createRequest.GetStartFrom(),
shared.ConsumerGroupStatus_ENABLED,
request.GetLockTimeoutSeconds(),
request.GetMaxDeliveryCount(),
request.GetSkipOlderMessagesSeconds(),
createRequest.GetLockTimeoutSeconds(),
createRequest.GetMaxDeliveryCount(),
createRequest.GetSkipOlderMessagesSeconds(),
dlqUUID,
request.GetOwnerEmail(),
request.GetIsMultiZone(),
request.GetActiveZone(),
marshalCgZoneConfigs(request.GetZoneConfigs())).Exec()
createRequest.GetOwnerEmail(),
createRequest.GetIsMultiZone(),
createRequest.GetActiveZone(),
marshalCgZoneConfigs(createRequest.GetZoneConfigs())).Exec()

if err != nil {
return nil, &shared.InternalServiceError{
Message: fmt.Sprintf("CreateConsumerGroup - insert into consumer_groups table failed, dst=%v, cg=%v, err=%v",
request.GetDestinationPath(), request.GetConsumerGroupName(), err),
createRequest.GetDestinationPath(), createRequest.GetConsumerGroupName(), err),
}
}

query := s.session.Query(sqlInsertCGByName,
dstUUID,
request.GetConsumerGroupName(),
request.GetIsMultiZone(),
createRequest.GetConsumerGroupName(),
createRequest.GetIsMultiZone(),
cgUUID,
dstUUID,
request.GetConsumerGroupName(),
request.GetStartFrom(),
createRequest.GetConsumerGroupName(),
createRequest.GetStartFrom(),
shared.ConsumerGroupStatus_ENABLED,
request.GetLockTimeoutSeconds(),
request.GetMaxDeliveryCount(),
request.GetSkipOlderMessagesSeconds(),
createRequest.GetLockTimeoutSeconds(),
createRequest.GetMaxDeliveryCount(),
createRequest.GetSkipOlderMessagesSeconds(),
dlqUUID,
request.GetOwnerEmail(),
request.GetIsMultiZone(),
request.GetActiveZone(),
marshalCgZoneConfigs(request.GetZoneConfigs()))
createRequest.GetOwnerEmail(),
createRequest.GetIsMultiZone(),
createRequest.GetActiveZone(),
marshalCgZoneConfigs(createRequest.GetZoneConfigs()))

previous := make(map[string]interface{}) // We actually throw away the old values below, but passing nil causes a panic

Expand All @@ -1280,7 +1317,7 @@ func (s *CassandraMetadataService) CreateConsumerGroup(ctx thrift.Context, reque
log.WithFields(log.Fields{common.TagCnsm: common.FmtCnsm(cgUUID), common.TagErr: err}).Warn(`CreateConsumerGroup - failed to delete orphan record after a failed CAS attempt, ,`)
}
return nil, &shared.EntityAlreadyExistsError{
Message: fmt.Sprintf("CreateConsumerGroup - Group exists, dst=%v cg=%v err=%v", request.GetDestinationPath(), request.GetConsumerGroupName(), err),
Message: fmt.Sprintf("CreateConsumerGroup - Group exists, dst=%v cg=%v err=%v", createRequest.GetDestinationPath(), createRequest.GetConsumerGroupName(), err),
}
}

Expand All @@ -1289,7 +1326,7 @@ func (s *CassandraMetadataService) CreateConsumerGroup(ctx thrift.Context, reque
callerServiceName := getThriftContextValue(ctx, common.CallerServiceName)

s.recordUserOperation(
request.GetConsumerGroupName(),
createRequest.GetConsumerGroupName(),
cgUUID,
entityTypeCG,
callerUserName,
Expand All @@ -1298,22 +1335,22 @@ func (s *CassandraMetadataService) CreateConsumerGroup(ctx thrift.Context, reque
callerHostName,
opsCreate,
time.Now(),
marshalRequest(request))
marshalRequest(createRequest))

return &shared.ConsumerGroupDescription{
ConsumerGroupUUID: common.StringPtr(cgUUID),
DestinationUUID: common.StringPtr(dstUUID),
ConsumerGroupName: common.StringPtr(request.GetConsumerGroupName()),
StartFrom: common.Int64Ptr(request.GetStartFrom()),
ConsumerGroupName: common.StringPtr(createRequest.GetConsumerGroupName()),
StartFrom: common.Int64Ptr(createRequest.GetStartFrom()),
Status: common.InternalConsumerGroupStatusPtr(shared.ConsumerGroupStatus_ENABLED),
LockTimeoutSeconds: common.Int32Ptr(request.GetLockTimeoutSeconds()),
MaxDeliveryCount: common.Int32Ptr(request.GetMaxDeliveryCount()),
SkipOlderMessagesSeconds: common.Int32Ptr(request.GetSkipOlderMessagesSeconds()),
DeadLetterQueueDestinationUUID: common.StringPtr(request.GetDeadLetterQueueDestinationUUID()),
OwnerEmail: common.StringPtr(request.GetOwnerEmail()),
IsMultiZone: common.BoolPtr(request.GetIsMultiZone()),
ActiveZone: common.StringPtr(request.GetActiveZone()),
ZoneConfigs: request.GetZoneConfigs(),
LockTimeoutSeconds: common.Int32Ptr(createRequest.GetLockTimeoutSeconds()),
MaxDeliveryCount: common.Int32Ptr(createRequest.GetMaxDeliveryCount()),
SkipOlderMessagesSeconds: common.Int32Ptr(createRequest.GetSkipOlderMessagesSeconds()),
DeadLetterQueueDestinationUUID: dlqUUID,
OwnerEmail: common.StringPtr(createRequest.GetOwnerEmail()),
IsMultiZone: common.BoolPtr(createRequest.GetIsMultiZone()),
ActiveZone: common.StringPtr(createRequest.GetActiveZone()),
ZoneConfigs: createRequest.GetZoneConfigs(),
}, nil
}

Expand Down Expand Up @@ -1349,7 +1386,7 @@ func (s *CassandraMetadataService) readConsumerGroupByDstUUID(dstUUID string, cg
return result, nil
}

// ReadConsumerGroup returns the ConsumerGroupDescription for the [destinatinPath, groupName].
// ReadConsumerGroup returns the ConsumerGroupDescription for the [destinationPath, groupName].
// When destination path is specified as input, this method only returns result, if the
// destination has not been DELETED. When destination UUID is specified as input, this
// method will always return result, if the consumer group exist.
Expand Down Expand Up @@ -1586,6 +1623,7 @@ func (s *CassandraMetadataService) DeleteConsumerGroup(ctx thrift.Context, reque
// DELETED. The following code adds the DLQ destination delete
// to the batch operation, if there is one.
dlqDstID := existingCG.GetDeadLetterQueueDestinationUUID()
log.Info(fmt.Sprintf("dlq dst id: %v", dlqDstID))
// Not all CGs have a DLQ, only do this if there is a DLQ
if len(dlqDstID) > 0 {
var dlqDstDesc *shared.DestinationDescription
Expand Down
11 changes: 4 additions & 7 deletions clients/metadata/metadata_cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1492,14 +1492,10 @@ func (s *CassandraSuite) TestDeleteConsumerGroupDeletesDLQ() {
assert.Nil(err, "CreateDestination failed")

cgName := s.generateName("/foo.bar/consumer")
dlqPath, _ := common.GetDLQPathNameFromCGName(cgName)
dlqDst, err := createDestination(s, dlqPath, true)
assert.Nil(err, "CreateDestination failed")

createReq := &shared.CreateConsumerGroupRequest{
DestinationPath: common.StringPtr(dstPath),
ConsumerGroupName: common.StringPtr(cgName),
DeadLetterQueueDestinationUUID: common.StringPtr(dlqDst.GetDestinationUUID()),
StartFrom: common.Int64Ptr(30),
LockTimeoutSeconds: common.Int32Ptr(10),
MaxDeliveryCount: common.Int32Ptr(5),
Expand All @@ -1511,10 +1507,11 @@ func (s *CassandraSuite) TestDeleteConsumerGroupDeletesDLQ() {
assert.Nil(err, "CreateConsumerGroup failed")
assert.Equal(shared.ConsumerGroupStatus_ENABLED, gotCG.GetStatus(), "Wrong CG status")

dlqUUID := gotCG.GetDeadLetterQueueDestinationUUID()
readDstReq := &m.ReadDestinationRequest{
Path: common.StringPtr(dlqDst.GetDestinationUUID()),
Path: common.StringPtr(dlqUUID),
}
dlqDst, err = s.client.ReadDestination(nil, readDstReq)
dlqDst, err := s.client.ReadDestination(nil, readDstReq)
assert.Nil(err, "ReadDestination failed for DLQ")
assert.Equal(shared.DestinationStatus_ENABLED, dlqDst.GetStatus(), "Wrong dlq destination status")

Expand All @@ -1540,7 +1537,7 @@ func (s *CassandraSuite) TestDeleteConsumerGroupDeletesDLQ() {
`DLQ should be deleted`)

readDstReq = &m.ReadDestinationRequest{
DestinationUUID: common.StringPtr(createReq.GetDeadLetterQueueDestinationUUID()),
DestinationUUID: common.StringPtr(dlqUUID),
}
dlqDst, err = s.client.ReadDestination(nil, readDstReq)
assert.Nil(err, "ReadDestination failed for DLQ")
Expand Down
18 changes: 18 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,18 @@ const (
ReplicatorDeleteDestScope
// ReplicatorDeleteRmtDestScope represents replicator DeleteRemoteDestination API
ReplicatorDeleteRmtDestScope
// ReplicatorCreateCgUUIDScope represents replicator CreateConsumerGroupUUID API
ReplicatorCreateCgUUIDScope
// ReplicatorCreateRmtCgUUIDScope represents replicator CreateRemoteConsumerGroupUUID API
ReplicatorCreateRmtCgUUIDScope
// ReplicatorUpdateCgScope represents replicator UpdateConsumerGroup API
ReplicatorUpdateCgScope
// ReplicatorUpdateRmtCgScope represents replicator UpdateRemoteConsumerGroup API
ReplicatorUpdateRmtCgScope
// ReplicatorDeleteCgScope represents replicator DeleteConsumerGroup API
ReplicatorDeleteCgScope
// ReplicatorDeleteRmtCgScope represents replicator DeleteRemoteConsumerGroup API
ReplicatorDeleteRmtCgScope
// ReplicatorCreateExtentScope represents replicator CreateExtent API
ReplicatorCreateExtentScope
// ReplicatorCreateRmtExtentScope represents replicator CreateRemoteExtent API
Expand Down Expand Up @@ -539,6 +551,12 @@ var scopeDefs = map[ServiceIdx]map[int]scopeDefinition{
ReplicatorUpdateRmtDestScope: {operation: "ReplicatorUpdateRemoteDestination"},
ReplicatorDeleteDestScope: {operation: "ReplicatorDeleteDestination"},
ReplicatorDeleteRmtDestScope: {operation: "ReplicatorDeleteRemoteDestination"},
ReplicatorCreateCgUUIDScope: {operation: "ReplicatorCreateConsumerGroupUUID"},
ReplicatorCreateRmtCgUUIDScope: {operation: "ReplicatorCreateRemoteConsumerGroupUUID"},
ReplicatorUpdateCgScope: {operation: "ReplicatorUpdateConsumerGroup"},
ReplicatorUpdateRmtCgScope: {operation: "ReplicatorUpdateRemoteConsumerGroup"},
ReplicatorDeleteCgScope: {operation: "ReplicatorDeleteConsumerGroup"},
ReplicatorDeleteRmtCgScope: {operation: "ReplicatorDeleteRemoteConsumerGroup"},
ReplicatorCreateExtentScope: {operation: "ReplicatorCreateExtent"},
ReplicatorCreateRmtExtentScope: {operation: "ReplicatorCreateRemoteExtent"},
ReplicatorReconcileScope: {operation: "ReplicatorReconcile"},
Expand Down

0 comments on commit e6a6457

Please sign in to comment.