diff --git a/clients/inputhost/client.go b/clients/inputhost/client.go new file mode 100644 index 00000000..2b98ccda --- /dev/null +++ b/clients/inputhost/client.go @@ -0,0 +1,85 @@ +// Copyright (c) 2016 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package inputhost + +import ( + "fmt" + "time" + + "github.com/uber/cherami-server/common" + "github.com/uber/cherami-thrift/.generated/go/admin" + + tchannel "github.com/uber/tchannel-go" + tcthrift "github.com/uber/tchannel-go/thrift" +) + +// InClientImpl is a inputhost cherami tchannel client +type InClientImpl struct { + connection *tchannel.Channel + client admin.TChanInputHostAdmin +} + +// NewClient returns a new instance of cherami tchannel client +func NewClient(instanceID int, hostAddr string) (*InClientImpl, error) { + ch, err := tchannel.NewChannel(fmt.Sprintf("inputhost-client-%v", instanceID), nil) + if err != nil { + return nil, err + } + + tClient := tcthrift.NewClient(ch, common.InputServiceName, &tcthrift.ClientOptions{ + HostPort: hostAddr, + }) + client := admin.NewTChanInputHostAdminClient(tClient) + + return &InClientImpl{ + connection: ch, + client: client, + }, nil +} + +// Close closes the client +func (s *InClientImpl) Close() { + s.connection.Close() +} + +// UnloadDestinations unloads the destination from the inputhost +func (s *InClientImpl) UnloadDestinations(req *admin.UnloadDestinationsRequest) error { + ctx, cancel := tcthrift.NewContext(2 * time.Second) + defer cancel() + + return s.client.UnloadDestinations(ctx, req) +} + +// ListLoadedDestinations lists all the loaded destinations from the inputhost +func (s *InClientImpl) ListLoadedDestinations() (*admin.ListDestinationsResult_, error) { + ctx, cancel := tcthrift.NewContext(15 * time.Second) + defer cancel() + + return s.client.ListLoadedDestinations(ctx) +} + +// ReadDestState +func (s *InClientImpl) ReadDestState(req *admin.ReadDestinationStateRequest) (*admin.ReadDestinationStateResult_, error) { + ctx, cancel := tcthrift.NewContext(15 * time.Second) + defer cancel() + + return s.client.ReadDestState(ctx, req) +} diff --git a/cmd/tools/admin/main.go b/cmd/tools/admin/main.go index f3b6513a..2fc263e2 100644 --- a/cmd/tools/admin/main.go +++ b/cmd/tools/admin/main.go @@ -651,6 +651,51 @@ func main() { }, }, }, + { + Name: "inputhost", + Aliases: []string{"ih"}, + Usage: "inputhost (deststate|listAllDests|unloaddest)", + Subcommands: []cli.Command{ + { + Name: "deststate", + Aliases: []string{"dests"}, + Usage: "inputhost deststate [options]", + Flags: []cli.Flag{ + cli.StringFlag{ + Name: "dest_uuid, dest", + Value: "", + Usage: "The UUID of the destination whose state will be dumped", + }, + }, + Action: func(c *cli.Context) { + admin.GetDestinationState(c) + }, + }, + { + Name: "listAllDests", + Aliases: []string{"ls"}, + Usage: "inputhost listAllDests ", + Action: func(c *cli.Context) { + admin.ListAllLoadedDestinations(c) + }, + }, + { + Name: "unloaddest", + Aliases: []string{"ud"}, + Usage: "inputhost unloaddest [options]", + Flags: []cli.Flag{ + cli.StringFlag{ + Name: "dest_uuid, dest", + Value: "", + Usage: "The destination UUID which should be unloaded", + }, + }, + Action: func(c *cli.Context) { + admin.UnloadDestination(c) + }, + }, + }, + }, { Name: "seal-check", Aliases: []string{"sc"}, diff --git a/tools/admin/lib.go b/tools/admin/lib.go index 76b89307..5ca2e388 100644 --- a/tools/admin/lib.go +++ b/tools/admin/lib.go @@ -178,6 +178,22 @@ func ReadCgAckID(c *cli.Context) { fmt.Fprintln(os.Stdout, string(outputStr)) } +// UnloadDestination unloads the destination on the given inputhost +func UnloadDestination(c *cli.Context) { + mClient := toolscommon.GetMClient(c, adminToolService) + toolscommon.UnloadDestination(c, mClient) +} + +// ListAllLoadedDestinations unloads the destination on the given inputhost +func ListAllLoadedDestinations(c *cli.Context) { + toolscommon.ListAllLoadedDestinations(c) +} + +// GetDestinationState gets the destination state on the given inputhost +func GetDestinationState(c *cli.Context) { + toolscommon.GetDestinationState(c) +} + type storeExtJSONOutputFields struct { StoreAddr string `json:"storehost_addr"` StoreUUID string `json:"storehost_uuid"` diff --git a/tools/common/lib.go b/tools/common/lib.go index e162eb6d..c35b63ec 100644 --- a/tools/common/lib.go +++ b/tools/common/lib.go @@ -39,6 +39,7 @@ import ( "github.com/apache/thrift/lib/go/thrift" "github.com/codegangsta/cli" ccli "github.com/uber/cherami-client-go/client/cherami" + "github.com/uber/cherami-server/clients/inputhost" mcli "github.com/uber/cherami-server/clients/metadata" "github.com/uber/cherami-server/clients/outputhost" "github.com/uber/cherami-server/clients/storehost" @@ -508,6 +509,117 @@ func printCgState(cgState *admin.ConsumerGroupState) { fmt.Fprintln(os.Stdout, string(outputStr)) } +// UnloadDestination unloads the Destination based on cli.Context +func UnloadDestination(c *cli.Context, mClient mcli.Client) { + if len(c.Args()) < 1 { + ExitIfError(errors.New(strNotEnoughArgs)) + } + + hostPort := c.Args()[0] + destUUID := c.String("dest_uuid") + + if !uuidRegex.MatchString(destUUID) { + ExitIfError(errors.New("specify a valid dest UUID")) + } + + // generate a random instance id to be used to create a client tchannel + instanceID := rand.Intn(50000) + inputClient, err := inputhost.NewClient(instanceID, hostPort) + ExitIfError(err) + defer inputClient.Close() + + destUnloadReq := admin.NewUnloadDestinationsRequest() + destUnloadReq.DestUUIDs = []string{destUUID} + + err = inputClient.UnloadDestinations(destUnloadReq) + ExitIfError(err) +} + +// ListAllLoadedDestinations lists all loaded Destinations in memory of the inputhost +func ListAllLoadedDestinations(c *cli.Context) { + if len(c.Args()) < 1 { + ExitIfError(errors.New(strNotEnoughArgs)) + } + + hostPort := c.Args()[0] + + // generate a random instance id to be used to create a client tchannel + instanceID := rand.Intn(50000) + inputClient, err := inputhost.NewClient(instanceID, hostPort) + ExitIfError(err) + defer inputClient.Close() + + listDestResult, err := inputClient.ListLoadedDestinations() + ExitIfError(err) + + if listDestResult != nil { + for _, dest := range listDestResult.Dests { + fmt.Printf("%v\n", Jsonify(dest)) + } + } +} + +// GetDestinationState unloads the Destination based on cli.Context +func GetDestinationState(c *cli.Context) { + if len(c.Args()) < 1 { + ExitIfError(errors.New(strNotEnoughArgs)) + } + + hostPort := c.Args()[0] + destUUID := c.String("dest_uuid") + + if !uuidRegex.MatchString(destUUID) { + ExitIfError(errors.New("specify a valid dest UUID")) + } + + // generate a random instance id to be used to create a client tchannel + instanceID := rand.Intn(50000) + inputClient, err := inputhost.NewClient(instanceID, hostPort) + ExitIfError(err) + defer inputClient.Close() + + destStateReq := admin.NewReadDestinationStateRequest() + destStateReq.DestUUIDs = []string{destUUID} + + readdestStateRes, err1 := inputClient.ReadDestState(destStateReq) + ExitIfError(err1) + + fmt.Printf("inputhostUUID: %v\n", readdestStateRes.GetInputHostUUID()) + + for _, dest := range readdestStateRes.DestState { + printDestState(dest) + for _, ext := range dest.DestExtents { + fmt.Printf("\t%v\n", Jsonify(ext)) + } + } +} + +type destStateJSONOutput struct { + DestUUID string `json""destUUID"` + MsgsChSize int64 `json:"msgsChSize"` + NumConnections int64 `json:"numConnections"` + NumMsgsIn int64 `json:"numMsgsIn"` + NumSentAcks int64 `json:"numSentAcks"` + NumSentNacks int64 `json:"numSentNacks"` + NumThrottled int64 `json:"numThrottled"` + NumFailed int64 `json:"numFailed"` +} + +func printDestState(destState *admin.DestinationState) { + input := &destStateJSONOutput{ + DestUUID: destState.GetDestUUID(), + MsgsChSize: destState.GetMsgsChSize(), + NumConnections: destState.GetNumConnections(), + NumMsgsIn: destState.GetNumMsgsIn(), + NumSentAcks: destState.GetNumSentAcks(), + NumSentNacks: destState.GetNumSentNacks(), + NumThrottled: destState.GetNumThrottled(), + NumFailed: destState.GetNumFailed(), + } + inputStr, _ := json.Marshal(input) + fmt.Fprintln(os.Stdout, string(inputStr)) +} + type destDescJSONOutputFields struct { Path string `json:"path"` UUID string `json:"uuid"`