Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
Merge branch 'master' into report-atleast-once
Browse files Browse the repository at this point in the history
  • Loading branch information
Guillaume Bailey committed Feb 20, 2017
2 parents c7837a1 + a8751aa commit b4ab366
Show file tree
Hide file tree
Showing 23 changed files with 963 additions and 126 deletions.
16 changes: 16 additions & 0 deletions clients/outputhost/client.go
Expand Up @@ -67,3 +67,19 @@ func (s *OutClientImpl) UnloadConsumerGroups(req *admin.UnloadConsumerGroupsRequ

return s.client.UnloadConsumerGroups(ctx, req)
}

// ListLoadedConsumerGroups lists all the loaded cgs from the outputhost
func (s *OutClientImpl) ListLoadedConsumerGroups() (*admin.ListConsumerGroupsResult_, error) {
ctx, cancel := tcthrift.NewContext(15 * time.Second)
defer cancel()

return s.client.ListLoadedConsumerGroups(ctx)
}

// ReadCgState
func (s *OutClientImpl) ReadCgState(req *admin.ReadConsumerGroupStateRequest) (*admin.ReadConsumerGroupStateResult_, error) {
ctx, cancel := tcthrift.NewContext(15 * time.Second)
defer cancel()

return s.client.ReadCgState(ctx, req)
}
67 changes: 45 additions & 22 deletions cmd/tools/admin/main.go
Expand Up @@ -568,28 +568,6 @@ func main() {
println("**not implemented** merged DLQ in consumer group: ", c.Args().First())
},
},
{
Name: "unload",
Aliases: []string{"ul"},
Usage: "unload (consumergroup)",
Subcommands: []cli.Command{
{
Name: "consumergroup",
Aliases: []string{"c", "cg"},
Usage: "unload consumergroup <hostport> [options]",
Flags: []cli.Flag{
cli.StringFlag{
Name: "cg_uuid, u",
Value: "",
Usage: "The consumergroup UUID which should be unloaded",
},
},
Action: func(c *cli.Context) {
admin.UnloadConsumerGroup(c)
},
},
},
},
{
Name: "serviceconfig",
Aliases: []string{"cfg"},
Expand Down Expand Up @@ -628,6 +606,51 @@ func main() {
},
},
},
{
Name: "outputhost",
Aliases: []string{"oh"},
Usage: "outputhost (cgstate|listAllCgs|unloadcg)",
Subcommands: []cli.Command{
{
Name: "cgstate",
Aliases: []string{"cgs"},
Usage: "outputhost cgstate <hostport> [options]",
Flags: []cli.Flag{
cli.StringFlag{
Name: "cg_uuid, cg",
Value: "",
Usage: "The UUID of the consumer group whose state will be dumped",
},
},
Action: func(c *cli.Context) {
admin.GetCgState(c)
},
},
{
Name: "listAllCgs",
Aliases: []string{"ls"},
Usage: "outputhost listAllCgs <hostport>",
Action: func(c *cli.Context) {
admin.ListAllCgs(c)
},
},
{
Name: "unloadcg",
Aliases: []string{"uc"},
Usage: "outputhost unloadcg <hostport> [options]",
Flags: []cli.Flag{
cli.StringFlag{
Name: "cg_uuid, cg",
Value: "",
Usage: "The consumergroup UUID which should be unloaded",
},
},
Action: func(c *cli.Context) {
admin.UnloadConsumerGroup(c)
},
},
},
},
{
Name: "seal-check",
Aliases: []string{"sc"},
Expand Down
2 changes: 1 addition & 1 deletion cmd/tools/cli/main.go
Expand Up @@ -213,7 +213,7 @@ func main() {
{
Name: "cgBacklog",
Aliases: []string{"cgb", "cb"},
Usage: "show cgBacklog <consumer_group_uuid>",
Usage: "show cgBacklog (<consumer_group_uuid> | <destination_path> <consumer_group_name>)",
Action: func(c *cli.Context) {
lib.ReadCgBacklog(c)
},
Expand Down
2 changes: 1 addition & 1 deletion common/clientfactory.go
Expand Up @@ -97,7 +97,7 @@ type (
// storeClient is the existing store client object
storeClient struct {
lk sync.Mutex // mutex protecting this object
// map of host address to the client object
// map of uuid(could be destination uuid or a context uuid specified by GetThriftStoreClientUUID) to the client object
currentCh map[string]*chInfo
}

Expand Down
5 changes: 5 additions & 0 deletions common/convert.go
Expand Up @@ -46,6 +46,11 @@ func TSPtr(v time.Time) *time.Time {
return &v
}

// Int16Ptr makes a copy and returns the pointer to an int16.
func Int16Ptr(v int16) *int16 {
return &v
}

// Int32Ptr makes a copy and returns the pointer to an int32.
func Int32Ptr(v int32) *int32 {
return &v
Expand Down
9 changes: 9 additions & 0 deletions common/log_tag.go
Expand Up @@ -27,6 +27,7 @@ import (
"time"

"github.com/uber/cherami-thrift/.generated/go/admin"
"github.com/uber/cherami-thrift/.generated/go/shared"
)

// TagErr is the tag for error object message
Expand Down Expand Up @@ -152,6 +153,9 @@ const TagEvent = `event`
// TagState is for "state" in event handlers
const TagState = `state`

// TagExtentStatus is for extent status
const TagExtentStatus = `extStatus`

// TagSeq is for sequence number
const TagSeq = `seq`

Expand Down Expand Up @@ -356,3 +360,8 @@ func FmtCnsmID(s int) string {
func FmtAddr(i int64) string {
return fmt.Sprintf("%v", i)
}

// FmtExtentStatus formats ExtentStatus to be used with TagExtentStatus
func FmtExtentStatus(status shared.ExtentStatus) string {
return fmt.Sprintf("%v", status.String())
}
45 changes: 27 additions & 18 deletions common/metrics/defs.go
Expand Up @@ -949,8 +949,14 @@ const (
ReplicatorReconcileDestExtentFail
// ReplicatorReconcileDestExtentFoundMissing indicates the reconcile for dest extent found a missing dest extent
ReplicatorReconcileDestExtentFoundMissing
// ReplicatorReconcileDestExtentInconsistentStatus indicates the reconcile for dest extent found an inconsistent extent status
ReplicatorReconcileDestExtentInconsistentStatus
// ReplicatorReconcileDestExtentRemoteConsumedLocalMissing indicates the reconcile for dest extent found a dest extent that is consumed on remote side and local is missing
ReplicatorReconcileDestExtentRemoteConsumedLocalMissing
// ReplicatorReconcileDestExtentRemoteDeleted indicates the reconcile for dest extent found a dest extent that is deleted on remote side and local is missing
ReplicatorReconcileDestExtentRemoteDeletedLocalMissing
// ReplicatorReconcileDestExtentRemoteDeletedLocalNot indicates the reconcile for dest extent found an inconsistent extent status(remote is deleted, local is not)
ReplicatorReconcileDestExtentRemoteDeletedLocalNot
// ReplicatorReconcileDestExtentSuspectMissingExtents indicates the length of the suspect missing extent list
ReplicatorReconcileDestExtentSuspectMissingExtents

numMetrics
)
Expand Down Expand Up @@ -1088,22 +1094,25 @@ var metricDefs = map[ServiceIdx]map[int]metricDefinition{

// definitions for Replicator metrics
Replicator: {
ReplicatorCreateInStreamFailure: {Counter, "replicator.create-in-stream.failure"},
ReplicatorCreateOutStreamFailure: {Counter, "replicator.create-out-stream.failure"},
ReplicatorRequests: {Counter, "replicator.requests"},
ReplicatorFailures: {Counter, "replicator.errors"},
ReplicatorBadRequest: {Counter, "replicator.requests.bad"},
ReplicatorInConnCreditsReceived: {Counter, "replicator.inconn.creditsreceived"},
ReplicatorInConnMsgWritten: {Counter, "replicator.inconn.msgwritten"},
ReplicatorOutConnCreditsSent: {Counter, "replicator.outconn.creditssent"},
ReplicatorOutConnMsgRead: {Counter, "replicator.outconn.msgread"},
ReplicatorReconcileDestRun: {Gauge, "replicator.reconcile.dest.run"},
ReplicatorReconcileDestFail: {Gauge, "replicator.reconcile.dest.fail"},
ReplicatorReconcileDestFoundMissing: {Gauge, "replicator.reconcile.dest.foundmissing"},
ReplicatorReconcileDestExtentRun: {Gauge, "replicator.reconcile.destextent.run"},
ReplicatorReconcileDestExtentFail: {Gauge, "replicator.reconcile.destextent.fail"},
ReplicatorReconcileDestExtentFoundMissing: {Gauge, "replicator.reconcile.destextent.foundmissing"},
ReplicatorReconcileDestExtentInconsistentStatus: {Gauge, "replicator.reconcile.destextent.inconsistentstatus"},
ReplicatorCreateInStreamFailure: {Counter, "replicator.create-in-stream.failure"},
ReplicatorCreateOutStreamFailure: {Counter, "replicator.create-out-stream.failure"},
ReplicatorRequests: {Counter, "replicator.requests"},
ReplicatorFailures: {Counter, "replicator.errors"},
ReplicatorBadRequest: {Counter, "replicator.requests.bad"},
ReplicatorInConnCreditsReceived: {Counter, "replicator.inconn.creditsreceived"},
ReplicatorInConnMsgWritten: {Counter, "replicator.inconn.msgwritten"},
ReplicatorOutConnCreditsSent: {Counter, "replicator.outconn.creditssent"},
ReplicatorOutConnMsgRead: {Counter, "replicator.outconn.msgread"},
ReplicatorReconcileDestRun: {Gauge, "replicator.reconcile.dest.run"},
ReplicatorReconcileDestFail: {Gauge, "replicator.reconcile.dest.fail"},
ReplicatorReconcileDestFoundMissing: {Gauge, "replicator.reconcile.dest.foundmissing"},
ReplicatorReconcileDestExtentRun: {Gauge, "replicator.reconcile.destextent.run"},
ReplicatorReconcileDestExtentFail: {Gauge, "replicator.reconcile.destextent.fail"},
ReplicatorReconcileDestExtentFoundMissing: {Gauge, "replicator.reconcile.destextent.foundmissing"},
ReplicatorReconcileDestExtentRemoteConsumedLocalMissing:{Gauge, "replicator.reconcile.destextent.remote-consumed-local-missing"},
ReplicatorReconcileDestExtentRemoteDeletedLocalMissing: {Gauge, "replicator.reconcile.destextent.remote-deleted-local-missing"},
ReplicatorReconcileDestExtentRemoteDeletedLocalNot: {Gauge, "replicator.reconcile.destextent.remote-deleted-local-not"},
ReplicatorReconcileDestExtentSuspectMissingExtents: {Gauge, "replicator.reconcile.destextent.suspect-missing-extent"},
},
}

Expand Down
4 changes: 2 additions & 2 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions services/controllerhost/event_pipeline_test.go
Expand Up @@ -715,3 +715,11 @@ func (service *MockInputOutputService) GetUpdatedCount(key string) int {
func (service *MockInputOutputService) UnloadConsumerGroups(ctx thrift.Context, request *admin.UnloadConsumerGroupsRequest) error {
return fmt.Errorf("mock not implemented")
}

func (service *MockInputOutputService) ListLoadedConsumerGroups(ctx thrift.Context) (*admin.ListConsumerGroupsResult_, error) {
return nil, fmt.Errorf("mock not implemented")
}

func (service *MockInputOutputService) ReadCgState(ctx thrift.Context, req *admin.ReadConsumerGroupStateRequest) (*admin.ReadConsumerGroupStateResult_, error) {
return nil, fmt.Errorf("mock not implemented")
}
25 changes: 19 additions & 6 deletions services/frontendhost/frontend.go
Expand Up @@ -1301,12 +1301,24 @@ func (h *Frontend) GetQueueDepthInfo(ctx thrift.Context, queueRequest *c.GetQueu
if _, err = h.prolog(ctx, queueRequest); err != nil {
return
}
var cgDesc *c.ConsumerGroupDescription
var cgUUID string

cgUUID := queueRequest.GetKey()

if !common.UUIDRegex.MatchString(cgUUID) { // Special handling to ensure only a UUID is allowed
err = &c.BadRequestError{Message: fmt.Sprintf("Consumer group must be given as UUID, not \"%v\"", cgUUID)}
return
if queueRequest.GetDestinationPath() != `` { // Normal path+cg-name specification
rcgReq := &c.ReadConsumerGroupRequest{
ConsumerGroupName: common.StringPtr(queueRequest.GetConsumerGroupName()),
DestinationPath: common.StringPtr(queueRequest.GetDestinationPath()),
}
cgDesc, err = h.ReadConsumerGroup(ctx, rcgReq)
if err != nil {
return
}
cgUUID = cgDesc.GetConsumerGroupUUID()
} else { // No destination path, therefore consumer group specified by UUID
cgUUID = queueRequest.GetConsumerGroupName()
if !common.UUIDRegex.MatchString(cgUUID) {
return nil, &c.BadRequestError{Message: `if destination path not specified, consumer group must be supplied as UUID`}
}
}

// Request to the extent controller
Expand Down Expand Up @@ -1464,7 +1476,8 @@ func (h *Frontend) prolog(ctx thrift.Context, request interface{}) (allowMutate
case *c.DeleteDestinationRequest:
allowMutate, eD = h.validateName(v.Path, destinationName, validateDisallowUUID, validateDisallowEmpty)
case *c.GetQueueDepthInfoRequest:
_, eC = h.validateName(v.Key, consumerGroupName, validateAllowUUID, validateDisallowEmpty)
_, eC = h.validateName(v.ConsumerGroupName, consumerGroupName, validateAllowUUID, validateDisallowEmpty)
_, eC = h.validateName(v.DestinationPath, destinationName, validateDisallowUUID, validateAllowEmpty)
case *c.ListConsumerGroupRequest:
_, eD = h.validateName(v.DestinationPath, destinationName, validateDisallowUUID, validateDisallowEmpty)
_, eC = h.validateName(v.ConsumerGroupName, consumerGroupName, validateDisallowUUID, validateAllowEmpty)
Expand Down
44 changes: 40 additions & 4 deletions services/frontendhost/frontend_test.go
Expand Up @@ -1239,28 +1239,64 @@ func (s *FrontendHostSuite) TestFrontendHostGetQueueDepthInfoBad() {
frontendHost, ctx := s.utilGetContextAndFrontend()
r := c.NewGetQueueDepthInfoRequest()

for _, k := range []string{``, `/foo/bar`, `foo`, `7851377b-4eb3-430b-ae46-513bc1df503` /*short by one*/} {
r.Key = common.StringPtr(k) // Only UUID is allowed
t := []string{``, `/foo/bar`, `foo`, `7851377b-4eb3-430b-ae46-513bc1df503` /*short by one*/}

for _, k := range t {
r.ConsumerGroupName = common.StringPtr(k) // Only UUID is allowed
info, err := frontendHost.GetQueueDepthInfo(ctx, r)
s.Error(err)
s.Nil(info)
assert.IsType(s.T(), &c.BadRequestError{}, err)
s.True(strings.Contains(err.Error(), `UUID`), err.Error())
}

u := []*string{common.StringPtr(`7851377b-4eb3-430b-ae46-513bc1df503c`), common.StringPtr(`/foo/bar`)}
v := []*string{common.StringPtr(`7851377b-4eb3-430b-ae46-513bc1df503c`), common.StringPtr(`/foo/bar/`), common.StringPtr(``), nil}

for _, l := range u {
for _, m := range v {
r.ConsumerGroupName = m
r.DestinationPath = l
info, err := frontendHost.GetQueueDepthInfo(ctx, r)
s.Error(err)
s.Nil(info)
assert.IsType(s.T(), &c.BadRequestError{}, err)
}
}
}

// TestFrontendHostGetQueueDepthInfoGood tests that a good request succeeds
func (s *FrontendHostSuite) TestFrontendHostGetQueueDepthInfoGood() {
frontendHost, ctx := s.utilGetContextAndFrontend()
r := c.NewGetQueueDepthInfoRequest()
r.Key = common.StringPtr(`7851377b-4eb3-430b-ae46-513bc1df503c`)
r.ConsumerGroupName = common.StringPtr(`7851377b-4eb3-430b-ae46-513bc1df503c`)
s.mockController.On(`GetQueueDepthInfo`, mock.Anything, mock.Anything).Return(
&controller.GetQueueDepthInfoResult_{
Value: common.StringPtr(`foo`),
},
nil)
nil).Twice()
s.mockMeta.On(`ReadConsumerGroup`, mock.Anything, mock.Anything).Return(
&shared.ConsumerGroupDescription{
ConsumerGroupUUID: common.StringPtr(`foo`),
},
nil).Once()
s.mockMeta.On(`ReadDestination`, mock.Anything, mock.Anything).Return(
&shared.DestinationDescription{
Path: common.StringPtr(`bar`),
},
nil,
).Once()

info, err := frontendHost.GetQueueDepthInfo(ctx, r)
s.NoError(err)
s.NotNil(info)
s.Equal(info.GetValue(), `foo`)

r.ConsumerGroupName = common.StringPtr(`/foo/bar_cg`)
r.DestinationPath = common.StringPtr(`/foo/bar`)

info, err = frontendHost.GetQueueDepthInfo(ctx, r)
s.NoError(err)
s.NotNil(info)
s.Equal(info.GetValue(), `foo`)
}

0 comments on commit b4ab366

Please sign in to comment.