Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
add option to query DLQs
Browse files Browse the repository at this point in the history
  • Loading branch information
kiranrg committed Aug 16, 2017
1 parent 1f89737 commit 58e4ffe
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 88 deletions.
11 changes: 7 additions & 4 deletions cmd/tools/common/lib.go
Expand Up @@ -506,10 +506,9 @@ func SetAdminCommands(commands *[]cli.Command) {
Aliases: []string{"e"},
Usage: "show extent <extent_uuid>",
Flags: []cli.Flag{
cli.StringFlag{
Name: "showcg, sc",
Value: "false",
Usage: "show consumer group(false, true), default to false",
cli.BoolFlag{
Name: "showcg, cg",
Usage: "show consumer group",
},
},
Action: func(c *cli.Context) {
Expand Down Expand Up @@ -845,6 +844,10 @@ func SetAdminCommands(commands *[]cli.Command) {
Value: "/",
Usage: "only process destinations with prefix",
},
cli.BoolFlag{
Name: "dlq",
Usage: "query and check corresopnding DLQ destinations",
},
cli.BoolFlag{
Name: "seal",
Usage: "seal extents on replica that are not sealed",
Expand Down
236 changes: 152 additions & 84 deletions tools/common/lib.go
Expand Up @@ -914,13 +914,13 @@ func ReadDestination(c *cli.Context, serviceName string) {
}

path := c.Args().First()
showCG := string(c.String("showcg"))
showCG := c.Bool("showcg")
desc, err := readDestinationFromMetadata(mClient, path)
ExitIfError(err)
printDest(desc)

// only show cg info if showCG flag is true
if showCG == "true" {
if showCG {
// read all the consumer group for this destination, including deleted ones
destUUID := desc.GetDestinationUUID()
req := &shared.ListConsumerGroupRequest{
Expand Down Expand Up @@ -1698,142 +1698,206 @@ func SealConsistencyCheck(c *cli.Context, mClient mcli.Client) {
sealKeyTimestamp = math.MaxInt64 & timestampBitmask
)

checkDest := func(destUUID string) {
checkDests := func(destUUIDs []string) {

// find all sealed extents for the destination that are "local" -- ie, we skip
// extents that belong to a multi-zone destination, but are not in the "origin"
// zone, because they could potentially be still being replicated.
listExtentsStats := &shared.ListExtentsStatsRequest{
DestinationUUID: common.StringPtr(string(destUUID)),
Status: shared.ExtentStatusPtr(shared.ExtentStatus_SEALED),
LocalExtentsOnly: common.BoolPtr(true), // FIXME: make arg
Limit: common.Int64Ptr(DefaultPageSize),
}

iterate_listextents_pages:
for {
if veryVerbose {
fmt.Printf("querying metadata: ListExtentsStats(dest=%v status=%v LocalOnly=%v Limit=%v)",
destUUID, shared.ExtentStatus_SEALED, true, DefaultPageSize)
}
for _, destUUID := range destUUIDs {

listExtentStatsResult, err1 := mClient.ListExtentsStats(listExtentsStats)

if err1 != nil {
fmt.Fprintf(os.Stderr, "ListExtentsStats(dest=%v) error: %v\n", destUUID, err)
break iterate_listextents_pages
// find all sealed extents for the destination that are "local" -- ie, we skip
// extents that belong to a multi-zone destination, but are not in the "origin"
// zone, because they could potentially be still being replicated.
listExtentsStats := &shared.ListExtentsStatsRequest{
DestinationUUID: common.StringPtr(string(destUUID)),
Status: shared.ExtentStatusPtr(shared.ExtentStatus_SEALED),
LocalExtentsOnly: common.BoolPtr(true), // FIXME: make arg
Limit: common.Int64Ptr(DefaultPageSize),
}

for _, stats := range listExtentStatsResult.ExtentStatsList {

extent := stats.GetExtent()
extentUUID := extent.GetExtentUUID()
storeUUIDs := extent.GetStoreUUIDs()

iterate_stores:
for _, storeUUID := range storeUUIDs {
iterate_listextents_pages:
for {
if veryVerbose {
fmt.Printf("querying metadata: ListExtentsStats(dest=%v status=%v LocalOnly=%v Limit=%v)",
destUUID, shared.ExtentStatus_SEALED, true, DefaultPageSize)
}

storeClient, err1 := storeClients.get(storeUUID)
listExtentStatsResult, err1 := mClient.ListExtentsStats(listExtentsStats)

if err1 != nil {
fmt.Fprintf(os.Stderr, "error getting store client (store=%v): %v\n", storeUUID, err)
continue iterate_stores
}
if err1 != nil {
fmt.Fprintf(os.Stderr, "ListExtentsStats(dest=%v) error: %v\n", destUUID, err1)
break iterate_listextents_pages
}

req := store.NewGetAddressFromTimestampRequest()
req.ExtentUUID = common.StringPtr(string(extentUUID))
req.Timestamp = common.Int64Ptr(sealKeyTimestamp)
for _, stats := range listExtentStatsResult.ExtentStatsList {

// query storage to find address of the message with the given timestamp
resp, err1 := storeClient.GetAddressFromTimestamp(req)
extent := stats.GetExtent()
extentUUID := extent.GetExtentUUID()
storeUUIDs := extent.GetStoreUUIDs()

var extentNotFound bool
iterate_stores:
for _, storeUUID := range storeUUIDs {

if err1 != nil {
_, extentNotFound = err1.(*store.ExtentNotFoundError)
storeClient, err1 := storeClients.get(storeUUID)

if !extentNotFound {
fmt.Fprintf(os.Stderr, "dest=%v extent=%v store=%v: GetAddressFromTimestamp error: %v\n",
destUUID, extentUUID, storeUUID, err1)
if err1 != nil {
fmt.Fprintf(os.Stderr, "error getting store client (store=%v): %v\n", storeUUID, err1)
continue iterate_stores
}
}

switch {
case extentNotFound || !resp.GetSealed(): // handle un-sealed extent

output := &sealcheckJSONOutputFields{
DestinationUUID: destUUID,
ExtentUUID: extentUUID,
StoreUUID: storeUUID,
IsSealed: false,
IsMissing: extentNotFound,
IsEmpty: extentNotFound || resp.GetAddress() == store.ADDR_BEGIN,
}

outputStr, _ := json.Marshal(output)
fmt.Fprintln(os.Stdout, string(outputStr))

// now seal the extent
if seal {
req := store.NewGetAddressFromTimestampRequest()
req.ExtentUUID = common.StringPtr(string(extentUUID))
req.Timestamp = common.Int64Ptr(sealKeyTimestamp)

fmt.Printf("sealing extent on replica: %v %v %v", destUUID, extentUUID, storeUUID)
// query storage to find address of the message with the given timestamp
resp, err1 := storeClient.GetAddressFromTimestamp(req)

req := store.NewSealExtentRequest()
req.ExtentUUID = common.StringPtr(string(extentUUID))
req.SequenceNumber = nil // seal at 'unspecified' seqnum
var extentNotFound bool

// seal the extent on the store
err2 := storeClient.SealExtent(req)
if err1 != nil {
_, extentNotFound = err1.(*store.ExtentNotFoundError)

if err2 != nil {
if !extentNotFound {
fmt.Fprintf(os.Stderr, "dest=%v extent=%v store=%v: GetAddressFromTimestamp error: %v\n",
destUUID, extentUUID, storeUUID, err1)
continue iterate_stores
}
}

default:

if verbose {
switch {
case extentNotFound || !resp.GetSealed(): // handle un-sealed extent

output := &sealcheckJSONOutputFields{
DestinationUUID: destUUID,
ExtentUUID: extentUUID,
StoreUUID: storeUUID,
IsSealed: resp.GetSealed(),
IsSealed: false,
IsMissing: extentNotFound,
IsEmpty: extentNotFound || resp.GetAddress() == store.ADDR_BEGIN,
}

outputStr, _ := json.Marshal(output)
fmt.Fprintln(os.Stdout, string(outputStr))

// now seal the extent
if seal {

fmt.Printf("sealing extent on replica: %v %v %v", destUUID, extentUUID, storeUUID)

req := store.NewSealExtentRequest()
req.ExtentUUID = common.StringPtr(string(extentUUID))
req.SequenceNumber = nil // seal at 'unspecified' seqnum

// seal the extent on the store
err2 := storeClient.SealExtent(req)

if err2 != nil {
fmt.Fprintf(os.Stderr, "dest=%v extent=%v store=%v: GetAddressFromTimestamp error: %v\n",
destUUID, extentUUID, storeUUID, err1)
continue iterate_stores
}
}

default:

if verbose {

output := &sealcheckJSONOutputFields{
DestinationUUID: destUUID,
ExtentUUID: extentUUID,
StoreUUID: storeUUID,
IsSealed: resp.GetSealed(),
IsMissing: extentNotFound,
IsEmpty: extentNotFound || resp.GetAddress() == store.ADDR_BEGIN,
}

outputStr, _ := json.Marshal(output)
fmt.Fprintln(os.Stdout, string(outputStr))
}
}
}

}

if len(listExtentStatsResult.GetNextPageToken()) == 0 {
break iterate_listextents_pages
}

listExtentsStats.PageToken = listExtentStatsResult.GetNextPageToken()
}
}
}

var getDlqs = func(destUUID string) (dlqs []string) {

req := &shared.ListConsumerGroupRequest{
Limit: common.Int64Ptr(DefaultPageSize),
}

for {
resp, err1 := mClient.ListAllConsumerGroups(req)
ExitIfError(err1)

for _, cg := range resp.GetConsumerGroups() {
if destUUID == cg.GetDestinationUUID() {
dlqs = append(dlqs, cg.GetDeadLetterQueueDestinationUUID())
}
}

if len(resp.GetNextPageToken()) == 0 {
return
}

if len(listExtentStatsResult.GetNextPageToken()) == 0 {
break iterate_listextents_pages
req.PageToken = resp.NextPageToken
}
}

var getAllDlqs = func() (dlqs map[string][]string) {

req := &shared.ListConsumerGroupRequest{
Limit: common.Int64Ptr(DefaultPageSize),
}

for {
resp, err1 := mClient.ListAllConsumerGroups(req)
ExitIfError(err1)

for _, cg := range resp.GetConsumerGroups() {

destUUID := cg.GetDestinationUUID()
dlqs[destUUID] = append(dlqs[destUUID], cg.GetDeadLetterQueueDestinationUUID())
}

listExtentsStats.PageToken = listExtentStatsResult.GetNextPageToken()
if len(resp.GetNextPageToken()) == 0 {
return
}

req.PageToken = resp.NextPageToken
}
}

if len(c.Args()) > 0 {

desc, err := mClient.ReadDestination(&shared.ReadDestinationRequest{
Path: c.Args()[0],
Path: common.StringPtr(c.Args()[0]),
})

ExitIfError(err)

checkDest(dsc.GetDestinationUUID())
var destUUIDs []string
destUUIDs = append(destUUIDs, desc.GetDestinationUUID())

if dlq { // check DLQ destinations
destUUIDs = append(destUUIDs, getDlqs(desc.GetDestinationUUID())...)
}

checkDests(destUUIDs)

} else {

var dlqs map[string][]string

if dlq { // check DLQ destinations
dlqs = getAllDlqs()
}

reqListDest := &shared.ListDestinationsRequest{
Prefix: common.StringPtr(prefix),
Limit: common.Int64Ptr(DefaultPageSize),
Expand All @@ -1854,9 +1918,13 @@ func SealConsistencyCheck(c *cli.Context, mClient mcli.Client) {

for _, desc := range respListDest.GetDestinations() {

destUUID := desc.GetDestinationUUID()
destUUIDs := []string{desc.GetDestinationUUID()}

if dlq { // check DLQ destinations
destUUIDs = append(destUUIDs, dlqs[destUUIDs[0]]...)
}

checkDest(destUUID)
checkDests(destUUIDs)
}

if len(respListDest.GetNextPageToken()) == 0 {
Expand Down

0 comments on commit 58e4ffe

Please sign in to comment.