From 58e4ffee471e88440d1bc99a57cbd7f3b9403a59 Mon Sep 17 00:00:00 2001 From: kiranrg Date: Wed, 16 Aug 2017 11:28:46 -0700 Subject: [PATCH] add option to query DLQs --- cmd/tools/common/lib.go | 11 +- tools/common/lib.go | 236 ++++++++++++++++++++++++++-------------- 2 files changed, 159 insertions(+), 88 deletions(-) diff --git a/cmd/tools/common/lib.go b/cmd/tools/common/lib.go index 5a8c41db..b2c8e874 100644 --- a/cmd/tools/common/lib.go +++ b/cmd/tools/common/lib.go @@ -506,10 +506,9 @@ func SetAdminCommands(commands *[]cli.Command) { Aliases: []string{"e"}, Usage: "show extent ", Flags: []cli.Flag{ - cli.StringFlag{ - Name: "showcg, sc", - Value: "false", - Usage: "show consumer group(false, true), default to false", + cli.BoolFlag{ + Name: "showcg, cg", + Usage: "show consumer group", }, }, Action: func(c *cli.Context) { @@ -845,6 +844,10 @@ func SetAdminCommands(commands *[]cli.Command) { Value: "/", Usage: "only process destinations with prefix", }, + cli.BoolFlag{ + Name: "dlq", + Usage: "query and check corresopnding DLQ destinations", + }, cli.BoolFlag{ Name: "seal", Usage: "seal extents on replica that are not sealed", diff --git a/tools/common/lib.go b/tools/common/lib.go index 2fcb97a0..4b9fd006 100644 --- a/tools/common/lib.go +++ b/tools/common/lib.go @@ -914,13 +914,13 @@ func ReadDestination(c *cli.Context, serviceName string) { } path := c.Args().First() - showCG := string(c.String("showcg")) + showCG := c.Bool("showcg") desc, err := readDestinationFromMetadata(mClient, path) ExitIfError(err) printDest(desc) // only show cg info if showCG flag is true - if showCG == "true" { + if showCG { // read all the consumer group for this destination, including deleted ones destUUID := desc.GetDestinationUUID() req := &shared.ListConsumerGroupRequest{ @@ -1698,142 +1698,206 @@ func SealConsistencyCheck(c *cli.Context, mClient mcli.Client) { sealKeyTimestamp = math.MaxInt64 & timestampBitmask ) - checkDest := func(destUUID string) { + checkDests := func(destUUIDs []string) { - // find all sealed extents for the destination that are "local" -- ie, we skip - // extents that belong to a multi-zone destination, but are not in the "origin" - // zone, because they could potentially be still being replicated. - listExtentsStats := &shared.ListExtentsStatsRequest{ - DestinationUUID: common.StringPtr(string(destUUID)), - Status: shared.ExtentStatusPtr(shared.ExtentStatus_SEALED), - LocalExtentsOnly: common.BoolPtr(true), // FIXME: make arg - Limit: common.Int64Ptr(DefaultPageSize), - } - - iterate_listextents_pages: - for { - if veryVerbose { - fmt.Printf("querying metadata: ListExtentsStats(dest=%v status=%v LocalOnly=%v Limit=%v)", - destUUID, shared.ExtentStatus_SEALED, true, DefaultPageSize) - } + for _, destUUID := range destUUIDs { - listExtentStatsResult, err1 := mClient.ListExtentsStats(listExtentsStats) - - if err1 != nil { - fmt.Fprintf(os.Stderr, "ListExtentsStats(dest=%v) error: %v\n", destUUID, err) - break iterate_listextents_pages + // find all sealed extents for the destination that are "local" -- ie, we skip + // extents that belong to a multi-zone destination, but are not in the "origin" + // zone, because they could potentially be still being replicated. + listExtentsStats := &shared.ListExtentsStatsRequest{ + DestinationUUID: common.StringPtr(string(destUUID)), + Status: shared.ExtentStatusPtr(shared.ExtentStatus_SEALED), + LocalExtentsOnly: common.BoolPtr(true), // FIXME: make arg + Limit: common.Int64Ptr(DefaultPageSize), } - for _, stats := range listExtentStatsResult.ExtentStatsList { - - extent := stats.GetExtent() - extentUUID := extent.GetExtentUUID() - storeUUIDs := extent.GetStoreUUIDs() - - iterate_stores: - for _, storeUUID := range storeUUIDs { + iterate_listextents_pages: + for { + if veryVerbose { + fmt.Printf("querying metadata: ListExtentsStats(dest=%v status=%v LocalOnly=%v Limit=%v)", + destUUID, shared.ExtentStatus_SEALED, true, DefaultPageSize) + } - storeClient, err1 := storeClients.get(storeUUID) + listExtentStatsResult, err1 := mClient.ListExtentsStats(listExtentsStats) - if err1 != nil { - fmt.Fprintf(os.Stderr, "error getting store client (store=%v): %v\n", storeUUID, err) - continue iterate_stores - } + if err1 != nil { + fmt.Fprintf(os.Stderr, "ListExtentsStats(dest=%v) error: %v\n", destUUID, err1) + break iterate_listextents_pages + } - req := store.NewGetAddressFromTimestampRequest() - req.ExtentUUID = common.StringPtr(string(extentUUID)) - req.Timestamp = common.Int64Ptr(sealKeyTimestamp) + for _, stats := range listExtentStatsResult.ExtentStatsList { - // query storage to find address of the message with the given timestamp - resp, err1 := storeClient.GetAddressFromTimestamp(req) + extent := stats.GetExtent() + extentUUID := extent.GetExtentUUID() + storeUUIDs := extent.GetStoreUUIDs() - var extentNotFound bool + iterate_stores: + for _, storeUUID := range storeUUIDs { - if err1 != nil { - _, extentNotFound = err1.(*store.ExtentNotFoundError) + storeClient, err1 := storeClients.get(storeUUID) - if !extentNotFound { - fmt.Fprintf(os.Stderr, "dest=%v extent=%v store=%v: GetAddressFromTimestamp error: %v\n", - destUUID, extentUUID, storeUUID, err1) + if err1 != nil { + fmt.Fprintf(os.Stderr, "error getting store client (store=%v): %v\n", storeUUID, err1) continue iterate_stores } - } - - switch { - case extentNotFound || !resp.GetSealed(): // handle un-sealed extent - - output := &sealcheckJSONOutputFields{ - DestinationUUID: destUUID, - ExtentUUID: extentUUID, - StoreUUID: storeUUID, - IsSealed: false, - IsMissing: extentNotFound, - IsEmpty: extentNotFound || resp.GetAddress() == store.ADDR_BEGIN, - } - - outputStr, _ := json.Marshal(output) - fmt.Fprintln(os.Stdout, string(outputStr)) - // now seal the extent - if seal { + req := store.NewGetAddressFromTimestampRequest() + req.ExtentUUID = common.StringPtr(string(extentUUID)) + req.Timestamp = common.Int64Ptr(sealKeyTimestamp) - fmt.Printf("sealing extent on replica: %v %v %v", destUUID, extentUUID, storeUUID) + // query storage to find address of the message with the given timestamp + resp, err1 := storeClient.GetAddressFromTimestamp(req) - req := store.NewSealExtentRequest() - req.ExtentUUID = common.StringPtr(string(extentUUID)) - req.SequenceNumber = nil // seal at 'unspecified' seqnum + var extentNotFound bool - // seal the extent on the store - err2 := storeClient.SealExtent(req) + if err1 != nil { + _, extentNotFound = err1.(*store.ExtentNotFoundError) - if err2 != nil { + if !extentNotFound { fmt.Fprintf(os.Stderr, "dest=%v extent=%v store=%v: GetAddressFromTimestamp error: %v\n", destUUID, extentUUID, storeUUID, err1) continue iterate_stores } } - default: - - if verbose { + switch { + case extentNotFound || !resp.GetSealed(): // handle un-sealed extent output := &sealcheckJSONOutputFields{ DestinationUUID: destUUID, ExtentUUID: extentUUID, StoreUUID: storeUUID, - IsSealed: resp.GetSealed(), + IsSealed: false, IsMissing: extentNotFound, IsEmpty: extentNotFound || resp.GetAddress() == store.ADDR_BEGIN, } outputStr, _ := json.Marshal(output) fmt.Fprintln(os.Stdout, string(outputStr)) + + // now seal the extent + if seal { + + fmt.Printf("sealing extent on replica: %v %v %v", destUUID, extentUUID, storeUUID) + + req := store.NewSealExtentRequest() + req.ExtentUUID = common.StringPtr(string(extentUUID)) + req.SequenceNumber = nil // seal at 'unspecified' seqnum + + // seal the extent on the store + err2 := storeClient.SealExtent(req) + + if err2 != nil { + fmt.Fprintf(os.Stderr, "dest=%v extent=%v store=%v: GetAddressFromTimestamp error: %v\n", + destUUID, extentUUID, storeUUID, err1) + continue iterate_stores + } + } + + default: + + if verbose { + + output := &sealcheckJSONOutputFields{ + DestinationUUID: destUUID, + ExtentUUID: extentUUID, + StoreUUID: storeUUID, + IsSealed: resp.GetSealed(), + IsMissing: extentNotFound, + IsEmpty: extentNotFound || resp.GetAddress() == store.ADDR_BEGIN, + } + + outputStr, _ := json.Marshal(output) + fmt.Fprintln(os.Stdout, string(outputStr)) + } } } + + } + + if len(listExtentStatsResult.GetNextPageToken()) == 0 { + break iterate_listextents_pages + } + + listExtentsStats.PageToken = listExtentStatsResult.GetNextPageToken() + } + } + } + + var getDlqs = func(destUUID string) (dlqs []string) { + + req := &shared.ListConsumerGroupRequest{ + Limit: common.Int64Ptr(DefaultPageSize), + } + + for { + resp, err1 := mClient.ListAllConsumerGroups(req) + ExitIfError(err1) + + for _, cg := range resp.GetConsumerGroups() { + if destUUID == cg.GetDestinationUUID() { + dlqs = append(dlqs, cg.GetDeadLetterQueueDestinationUUID()) } + } + if len(resp.GetNextPageToken()) == 0 { + return } - if len(listExtentStatsResult.GetNextPageToken()) == 0 { - break iterate_listextents_pages + req.PageToken = resp.NextPageToken + } + } + + var getAllDlqs = func() (dlqs map[string][]string) { + + req := &shared.ListConsumerGroupRequest{ + Limit: common.Int64Ptr(DefaultPageSize), + } + + for { + resp, err1 := mClient.ListAllConsumerGroups(req) + ExitIfError(err1) + + for _, cg := range resp.GetConsumerGroups() { + + destUUID := cg.GetDestinationUUID() + dlqs[destUUID] = append(dlqs[destUUID], cg.GetDeadLetterQueueDestinationUUID()) } - listExtentsStats.PageToken = listExtentStatsResult.GetNextPageToken() + if len(resp.GetNextPageToken()) == 0 { + return + } + + req.PageToken = resp.NextPageToken } } if len(c.Args()) > 0 { desc, err := mClient.ReadDestination(&shared.ReadDestinationRequest{ - Path: c.Args()[0], + Path: common.StringPtr(c.Args()[0]), }) ExitIfError(err) - checkDest(dsc.GetDestinationUUID()) + var destUUIDs []string + destUUIDs = append(destUUIDs, desc.GetDestinationUUID()) + + if dlq { // check DLQ destinations + destUUIDs = append(destUUIDs, getDlqs(desc.GetDestinationUUID())...) + } + + checkDests(destUUIDs) } else { + var dlqs map[string][]string + + if dlq { // check DLQ destinations + dlqs = getAllDlqs() + } + reqListDest := &shared.ListDestinationsRequest{ Prefix: common.StringPtr(prefix), Limit: common.Int64Ptr(DefaultPageSize), @@ -1854,9 +1918,13 @@ func SealConsistencyCheck(c *cli.Context, mClient mcli.Client) { for _, desc := range respListDest.GetDestinations() { - destUUID := desc.GetDestinationUUID() + destUUIDs := []string{desc.GetDestinationUUID()} + + if dlq { // check DLQ destinations + destUUIDs = append(destUUIDs, dlqs[destUUIDs[0]]...) + } - checkDest(destUUID) + checkDests(destUUIDs) } if len(respListDest.GetNextPageToken()) == 0 {