Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

cmq -- Cherami Metadata Query-er #314

Merged
merged 39 commits into from
Oct 27, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
3343bb9
Revert "Slowdown redeliveries, add timeout metrics (#290)"
Oct 13, 2017
91edd0d
cmq - Cherami Metadata Query-r
Oct 16, 2017
287389a
ttt
Oct 17, 2017
d091918
show remote extents
Oct 17, 2017
7b02891
remove binary
Oct 17, 2017
ed0bd52
fix rate computation
Oct 17, 2017
86204e0
flag DLQ extents
Oct 18, 2017
462aa5c
monitor -> watch
Oct 18, 2017
3ef749a
Revert "Slowdown redeliveries, add timeout metrics (#290)" (#316)
Oct 18, 2017
e405821
sort by cgx-status updated time
Oct 19, 2017
7faacc4
show milliseconds since update
Oct 19, 2017
524fc58
mark cgx for consumed extent as consumed
Oct 19, 2017
9213708
~
Oct 19, 2017
387d9fa
~
Oct 19, 2017
5652d7a
RetMgr: Handle cleanup of consumer-groups and consumer-group extents …
Oct 20, 2017
24452aa
cmq - Cherami Metadata Query-r
Oct 16, 2017
652adad
ttt
Oct 17, 2017
eef62d9
show remote extents
Oct 17, 2017
4ac8474
remove binary
Oct 17, 2017
a9c1b23
fix rate computation
Oct 17, 2017
26caeff
flag DLQ extents
Oct 18, 2017
0c1c208
monitor -> watch
Oct 18, 2017
8d13903
sort by cgx-status updated time
Oct 19, 2017
289f37e
show milliseconds since update
Oct 19, 2017
ff53556
mark cgx for consumed extent as consumed
Oct 19, 2017
85bc839
~
Oct 19, 2017
c2a36f2
~
Oct 19, 2017
34f06d9
...
Oct 19, 2017
e354382
cg deleting
Oct 24, 2017
891e359
fix build
Oct 25, 2017
c229701
yaml config support
Oct 25, 2017
9803c0b
Merge branch 'cmq' of ssh://github.com/uber/cherami-server into cmq
Oct 25, 2017
1f3eaee
cmq zone
Oct 25, 2017
684f7dd
output type support
Oct 25, 2017
9c87a38
json map output
Oct 25, 2017
ae3594e
fix lint
Oct 26, 2017
4dae9bb
fix lint
Oct 26, 2017
252ced9
fix build
Oct 26, 2017
f6cb6ef
fix lint
Oct 26, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,10 @@ cherami-store-tool: $(DEPS)
cdb: $(DEPS)
go build -i $(EMBED) -o cdb cmd/tools/cdb/*.go

bins: cherami-server cherami-replicator-server cherami-cli cherami-admin cherami-replicator-tool cherami-cassandra-tool cherami-store-tool cdb
cmq: $(DEPS)
go build -i $(EMBED) -o cmq cmd/tools/cmq/*.go

bins: cherami-server cherami-replicator-server cherami-cli cherami-admin cherami-replicator-tool cherami-cassandra-tool cherami-store-tool cdb cmq

cover_profile: lint bins
@mkdir -p $(BUILD)
Expand All @@ -123,7 +126,7 @@ cover_ci: cover_profile
goveralls -coverprofile=$(BUILD)/cover.out -service=travis-ci || echo -e "\x1b[31mCoveralls failed\x1b[m"

clean:
rm -f cherami-server cherami-replicator-server cherami-cli cherami-admin cherami-replicator-tool cherami-cassandra-tool cherami-store-tool cdb
rm -f cherami-server cherami-replicator-server cherami-cli cherami-admin cherami-replicator-tool cherami-cassandra-tool cherami-store-tool cdb cmq
rm -Rf vendor/*
rm -Rf $(BUILD)

Expand Down
156 changes: 105 additions & 51 deletions clients/metadata/metadata_cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,18 +198,18 @@ func parseConsistency(cfgCons string) (lowCons gocql.Consistency, midCons gocql.

switch cons := strings.Split(cfgCons, ","); len(cons) {
case 3:
lowCons = gocql.ParseConsistency(strings.TrimSpace(cons[2]))
lowCons, _ = gocql.ParseConsistency(strings.TrimSpace(cons[2]))
fallthrough

case 2:
midCons = gocql.ParseConsistency(strings.TrimSpace(cons[1]))
midCons, _ = gocql.ParseConsistency(strings.TrimSpace(cons[1]))
if len(cons) == 2 {
lowCons = midCons
}
fallthrough

case 1:
highCons = gocql.ParseConsistency(strings.TrimSpace(cons[0]))
highCons, _ = gocql.ParseConsistency(strings.TrimSpace(cons[0]))
}

return
Expand Down Expand Up @@ -1189,12 +1189,29 @@ const (
columnZoneConfigs + `: ?, ` +
columnOptions + `: ? }`

sqlConsumerGroupType = columnConsumerGroup + `.` + columnUUID + "," +
columnConsumerGroup + `.` + columnDestinationUUID + "," +
columnConsumerGroup + `.` + columnName + "," +
columnConsumerGroup + `.` + columnStartFrom + "," +
columnConsumerGroup + `.` + columnStatus + "," +
columnConsumerGroup + `.` + columnLockTimeoutSeconds + "," +
columnConsumerGroup + `.` + columnMaxDeliveryCount + "," +
columnConsumerGroup + `.` + columnSkipOlderMessagesSeconds + "," +
columnConsumerGroup + `.` + columnDelaySeconds + "," +
columnConsumerGroup + `.` + columnDeadLetterQueueDestinationUUID + "," +
columnConsumerGroup + `.` + columnOwnerEmail + "," +
columnConsumerGroup + `.` + columnIsMultiZone + "," +
columnConsumerGroup + `.` + columnActiveZone + "," +
columnConsumerGroup + `.` + columnZoneConfigs + "," +
columnConsumerGroup + `.` + columnOptions

sqlInsertCGByUUID = `INSERT INTO ` + tableConsumerGroups +
`(` +
columnUUID + `, ` +
columnDestinationUUID + `, ` +
columnIsMultiZone + `, ` +
columnConsumerGroup +
`) VALUES (?, ?,` + sqlCGValue + `)`
`) VALUES (?, ?, ?, ` + sqlCGValue + `)`

sqlInsertCGByName = `INSERT INTO ` + tableConsumerGroupsByName +
`(` +
Expand All @@ -1205,65 +1222,28 @@ const (
`) VALUES (?, ?, ?, ` + sqlCGValue + `) IF NOT EXISTS`

sqlGetCGByName = `SELECT ` +
columnConsumerGroup + `.` + columnUUID + "," +
columnDestinationUUID + "," +
columnName + "," +
columnConsumerGroup + `.` + columnStartFrom + "," +
columnConsumerGroup + `.` + columnStatus + "," +
columnConsumerGroup + `.` + columnLockTimeoutSeconds + "," +
columnConsumerGroup + `.` + columnMaxDeliveryCount + "," +
columnConsumerGroup + `.` + columnSkipOlderMessagesSeconds + "," +
columnConsumerGroup + `.` + columnDelaySeconds + "," +
columnConsumerGroup + `.` + columnDeadLetterQueueDestinationUUID + "," +
columnConsumerGroup + `.` + columnOwnerEmail + "," +
columnConsumerGroup + `.` + columnIsMultiZone + "," +
columnConsumerGroup + `.` + columnActiveZone + "," +
columnConsumerGroup + `.` + columnZoneConfigs + "," +
columnConsumerGroup + `.` + columnOptions +
sqlConsumerGroupType +
` FROM ` + tableConsumerGroupsByName +
` WHERE ` + columnDestinationUUID + `=? and ` + columnName + `=?`

sqlGetCG = `SELECT ` +
columnConsumerGroup + `.` + columnUUID + "," +
columnConsumerGroup + `.` + columnDestinationUUID + "," +
columnConsumerGroup + `.` + columnName + "," +
columnConsumerGroup + `.` + columnStartFrom + "," +
columnConsumerGroup + `.` + columnStatus + "," +
columnConsumerGroup + `.` + columnLockTimeoutSeconds + "," +
columnConsumerGroup + `.` + columnMaxDeliveryCount + "," +
columnConsumerGroup + `.` + columnSkipOlderMessagesSeconds + "," +
columnConsumerGroup + `.` + columnDelaySeconds + "," +
columnConsumerGroup + `.` + columnDeadLetterQueueDestinationUUID + "," +
columnConsumerGroup + `.` + columnOwnerEmail + "," +
columnConsumerGroup + `.` + columnIsMultiZone + "," +
columnConsumerGroup + `.` + columnActiveZone + "," +
columnConsumerGroup + `.` + columnZoneConfigs + "," +
columnConsumerGroup + `.` + columnOptions +
sqlConsumerGroupType +
` FROM ` + tableConsumerGroups

sqlGetCGByUUID = sqlGetCG + ` WHERE ` + columnUUID + `=?`

sqlListCGsByDestUUID = `SELECT ` +
columnConsumerGroup + `.` + columnUUID + "," +
columnDestinationUUID + "," +
columnName + "," +
columnConsumerGroup + `.` + columnStartFrom + "," +
columnConsumerGroup + `.` + columnStatus + "," +
columnConsumerGroup + `.` + columnLockTimeoutSeconds + "," +
columnConsumerGroup + `.` + columnMaxDeliveryCount + "," +
columnConsumerGroup + `.` + columnSkipOlderMessagesSeconds + "," +
columnConsumerGroup + `.` + columnDelaySeconds + "," +
columnConsumerGroup + `.` + columnDeadLetterQueueDestinationUUID + "," +
columnConsumerGroup + `.` + columnOwnerEmail + "," +
columnConsumerGroup + `.` + columnIsMultiZone + "," +
columnConsumerGroup + `.` + columnActiveZone + "," +
columnConsumerGroup + `.` + columnZoneConfigs + "," +
columnConsumerGroup + `.` + columnOptions +
sqlConsumerGroupType +
` FROM ` + tableConsumerGroupsByName +
` WHERE ` + columnDestinationUUID + `=?`

sqlListCGsUUID = `SELECT ` +
sqlConsumerGroupType +
` FROM ` + tableConsumerGroups +
` WHERE ` + columnDestinationUUID + `=?`

sqlUpdateCGByUUID = `UPDATE ` + tableConsumerGroups +
` SET ` + columnIsMultiZone + ` = ?, ` + columnConsumerGroup + `= ` + sqlCGValue +
` SET ` + columnDestinationUUID + ` = ?, ` + columnIsMultiZone + ` = ?, ` + columnConsumerGroup + `= ` + sqlCGValue +
` WHERE ` + columnUUID + `=?`

sqlUpdateCGByName = `UPDATE ` + tableConsumerGroupsByName +
Expand Down Expand Up @@ -1334,6 +1314,7 @@ func (s *CassandraMetadataService) CreateConsumerGroupUUID(ctx thrift.Context, r

err = s.session.Query(sqlInsertCGByUUID,
cgUUID,
dstUUID,
createRequest.GetIsMultiZone(),
cgUUID,
dstUUID,
Expand Down Expand Up @@ -1651,6 +1632,7 @@ func (s *CassandraMetadataService) UpdateConsumerGroup(ctx thrift.Context, reque

batch.Query(sqlUpdateCGByUUID,
// Value columns
newCG.GetDestinationUUID(),
newCG.GetIsMultiZone(),
newCG.GetConsumerGroupUUID(),
newCG.GetDestinationUUID(),
Expand Down Expand Up @@ -1794,6 +1776,7 @@ func (s *CassandraMetadataService) DeleteConsumerGroup(ctx thrift.Context, reque

batch.Query(sqlUpdateCGByUUID,
// Value columns
existingCG.GetDestinationUUID(),
existingCG.GetIsMultiZone(),
existingCG.GetConsumerGroupUUID(),
existingCG.GetDestinationUUID(),
Expand Down Expand Up @@ -1862,6 +1845,7 @@ func (s *CassandraMetadataService) DeleteConsumerGroupUUID(ctx thrift.Context, r

query := s.session.Query(sqlDeleteCGByUUIDWithTTL,
existing.GetConsumerGroupUUID(),
existing.GetDestinationUUID(),
existing.GetIsMultiZone(),
existing.GetConsumerGroupUUID(),
existing.GetDestinationUUID(),
Expand Down Expand Up @@ -2003,6 +1987,73 @@ func (s *CassandraMetadataService) ListConsumerGroups(ctx thrift.Context, reques
return result, nil
}

// ListConsumerGroupsUUID returns all ConsumerGroups matching the given destination-uuid.
func (s *CassandraMetadataService) ListConsumerGroupsUUID(ctx thrift.Context, request *shared.ListConsumerGroupsUUIDRequest) (*shared.ListConsumerGroupsUUIDResult_, error) {

if !request.IsSetDestinationUUID() {
return nil, &shared.BadRequestError{
Message: fmt.Sprintf("DestinationUUID not specified"),
}
}

dstUUID := request.GetDestinationUUID()

var iter *gocql.Iter
iter = s.session.Query(sqlListCGsUUID, dstUUID).Consistency(s.lowConsLevel).PageSize(int(request.GetLimit())).PageState(request.PageToken).Iter()

if iter == nil {
return nil, &shared.InternalServiceError{
Message: "Query returned nil iterator",
}
}

result := &shared.ListConsumerGroupsUUIDResult_{
ConsumerGroups: []*shared.ConsumerGroupDescription{},
NextPageToken: request.PageToken,
}
cg := getUtilConsumerGroupDescription()
var zoneConfigsData []map[string]interface{}
for iter.Scan(
cg.ConsumerGroupUUID,
cg.DestinationUUID,
cg.ConsumerGroupName,
cg.StartFrom,
cg.Status,
cg.LockTimeoutSeconds,
cg.MaxDeliveryCount,
cg.SkipOlderMessagesSeconds,
cg.DelaySeconds,
&cg.DeadLetterQueueDestinationUUID,
cg.OwnerEmail,
cg.IsMultiZone,
cg.ActiveZone,
&zoneConfigsData,
&cg.Options) {

// skip over deleted rows
if cg.GetStatus() == shared.ConsumerGroupStatus_DELETED {
zoneConfigsData = nil
continue
}

cg.ZoneConfigs = unmarshalCgZoneConfigs(zoneConfigsData)
result.ConsumerGroups = append(result.ConsumerGroups, cg)
cg = getUtilConsumerGroupDescription()
zoneConfigsData = nil
}

nextPageToken := iter.PageState()
result.NextPageToken = make([]byte, len(nextPageToken))
copy(result.NextPageToken, nextPageToken)
if err := iter.Close(); err != nil {
return nil, &shared.InternalServiceError{
Message: err.Error(),
}
}

return result, nil
}

// ListAllConsumerGroups returns all ConsumerGroups in ConsumerGroups Table. This API is only used for debuging tool
func (s *CassandraMetadataService) ListAllConsumerGroups(ctx thrift.Context, request *shared.ListConsumerGroupRequest) (*shared.ListConsumerGroupResult_, error) {

Expand Down Expand Up @@ -3394,7 +3445,10 @@ func (s *CassandraMetadataService) UpdateStoreExtentReplicaStats(ctx thrift.Cont
)
}
if err := s.session.ExecuteBatch(batch); err != nil {
s.log.WithField(common.TagExt, request.GetExtentUUID()).Error("UpdateExtentReplicaStats failed")
s.log.WithFields(bark.Fields{
common.TagExt: request.GetExtentUUID(),
common.TagErr: err,
}).Error("UpdateExtentReplicaStats failed")
return &shared.InternalServiceError{
Message: "UpdateStoreExtentReplicaStats: %v" + err.Error(),
}
Expand Down
Loading