Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
Add admin command for inputhost
Browse files Browse the repository at this point in the history
This patch just wires admin command for inputhost to do the
following:
* Unload loaded destination from an input
* Get destination state from inputhosts' memory
* Get a list of destinations which are loaded in memory

In order to do this we also had to expose a client for directly
talking to inputhost.

Tested this manually locally.
  • Loading branch information
Aravind Srinivasan committed Mar 8, 2017
1 parent 2dc1e2b commit a04e0c4
Show file tree
Hide file tree
Showing 4 changed files with 258 additions and 0 deletions.
85 changes: 85 additions & 0 deletions clients/inputhost/client.go
@@ -0,0 +1,85 @@
// Copyright (c) 2016 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package inputhost

import (
"fmt"
"time"

"github.com/uber/cherami-server/common"
"github.com/uber/cherami-thrift/.generated/go/admin"

tchannel "github.com/uber/tchannel-go"
tcthrift "github.com/uber/tchannel-go/thrift"
)

// InClientImpl is a inputhost cherami tchannel client
type InClientImpl struct {
connection *tchannel.Channel
client admin.TChanInputHostAdmin
}

// NewClient returns a new instance of cherami tchannel client
func NewClient(instanceID int, hostAddr string) (*InClientImpl, error) {
ch, err := tchannel.NewChannel(fmt.Sprintf("inputhost-client-%v", instanceID), nil)
if err != nil {
return nil, err
}

tClient := tcthrift.NewClient(ch, common.InputServiceName, &tcthrift.ClientOptions{
HostPort: hostAddr,
})
client := admin.NewTChanInputHostAdminClient(tClient)

return &InClientImpl{
connection: ch,
client: client,
}, nil
}

// Close closes the client
func (s *InClientImpl) Close() {
s.connection.Close()
}

// UnloadDestinations unloads the destination from the inputhost
func (s *InClientImpl) UnloadDestinations(req *admin.UnloadDestinationsRequest) error {
ctx, cancel := tcthrift.NewContext(2 * time.Second)
defer cancel()

return s.client.UnloadDestinations(ctx, req)
}

// ListLoadedDestinations lists all the loaded destinations from the inputhost
func (s *InClientImpl) ListLoadedDestinations() (*admin.ListDestinationsResult_, error) {
ctx, cancel := tcthrift.NewContext(15 * time.Second)
defer cancel()

return s.client.ListLoadedDestinations(ctx)
}

// ReadDestState
func (s *InClientImpl) ReadDestState(req *admin.ReadDestinationStateRequest) (*admin.ReadDestinationStateResult_, error) {
ctx, cancel := tcthrift.NewContext(15 * time.Second)
defer cancel()

return s.client.ReadDestState(ctx, req)
}
45 changes: 45 additions & 0 deletions cmd/tools/admin/main.go
Expand Up @@ -651,6 +651,51 @@ func main() {
},
},
},
{
Name: "inputhost",
Aliases: []string{"ih"},
Usage: "inputhost (deststate|listAllDests|unloaddest)",
Subcommands: []cli.Command{
{
Name: "deststate",
Aliases: []string{"dests"},
Usage: "inputhost deststate <hostport> [options]",
Flags: []cli.Flag{
cli.StringFlag{
Name: "dest_uuid, dest",
Value: "",
Usage: "The UUID of the destination whose state will be dumped",
},
},
Action: func(c *cli.Context) {
admin.GetDestinationState(c)
},
},
{
Name: "listAllDests",
Aliases: []string{"ls"},
Usage: "inputhost listAllDests <hostport>",
Action: func(c *cli.Context) {
admin.ListAllLoadedDestinations(c)
},
},
{
Name: "unloaddest",
Aliases: []string{"ud"},
Usage: "inputhost unloaddest <hostport> [options]",
Flags: []cli.Flag{
cli.StringFlag{
Name: "dest_uuid, dest",
Value: "",
Usage: "The destination UUID which should be unloaded",
},
},
Action: func(c *cli.Context) {
admin.UnloadDestination(c)
},
},
},
},
{
Name: "seal-check",
Aliases: []string{"sc"},
Expand Down
16 changes: 16 additions & 0 deletions tools/admin/lib.go
Expand Up @@ -178,6 +178,22 @@ func ReadCgAckID(c *cli.Context) {
fmt.Fprintln(os.Stdout, string(outputStr))
}

// UnloadDestination unloads the destination on the given inputhost
func UnloadDestination(c *cli.Context) {
mClient := toolscommon.GetMClient(c, adminToolService)
toolscommon.UnloadDestination(c, mClient)
}

// ListAllLoadedDestinations unloads the destination on the given inputhost
func ListAllLoadedDestinations(c *cli.Context) {
toolscommon.ListAllLoadedDestinations(c)
}

// GetDestinationState gets the destination state on the given inputhost
func GetDestinationState(c *cli.Context) {
toolscommon.GetDestinationState(c)
}

type storeExtJSONOutputFields struct {
StoreAddr string `json:"storehost_addr"`
StoreUUID string `json:"storehost_uuid"`
Expand Down
112 changes: 112 additions & 0 deletions tools/common/lib.go
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/apache/thrift/lib/go/thrift"
"github.com/codegangsta/cli"
ccli "github.com/uber/cherami-client-go/client/cherami"
"github.com/uber/cherami-server/clients/inputhost"
mcli "github.com/uber/cherami-server/clients/metadata"
"github.com/uber/cherami-server/clients/outputhost"
"github.com/uber/cherami-server/clients/storehost"
Expand Down Expand Up @@ -508,6 +509,117 @@ func printCgState(cgState *admin.ConsumerGroupState) {
fmt.Fprintln(os.Stdout, string(outputStr))
}

// UnloadDestination unloads the Destination based on cli.Context
func UnloadDestination(c *cli.Context, mClient mcli.Client) {
if len(c.Args()) < 1 {
ExitIfError(errors.New(strNotEnoughArgs))
}

hostPort := c.Args()[0]
destUUID := c.String("dest_uuid")

if !uuidRegex.MatchString(destUUID) {
ExitIfError(errors.New("specify a valid dest UUID"))
}

// generate a random instance id to be used to create a client tchannel
instanceID := rand.Intn(50000)
inputClient, err := inputhost.NewClient(instanceID, hostPort)
ExitIfError(err)
defer inputClient.Close()

destUnloadReq := admin.NewUnloadDestinationsRequest()
destUnloadReq.DestUUIDs = []string{destUUID}

err = inputClient.UnloadDestinations(destUnloadReq)
ExitIfError(err)
}

// ListAllLoadedDestinations lists all loaded Destinations in memory of the inputhost
func ListAllLoadedDestinations(c *cli.Context) {
if len(c.Args()) < 1 {
ExitIfError(errors.New(strNotEnoughArgs))
}

hostPort := c.Args()[0]

// generate a random instance id to be used to create a client tchannel
instanceID := rand.Intn(50000)
inputClient, err := inputhost.NewClient(instanceID, hostPort)
ExitIfError(err)
defer inputClient.Close()

listDestResult, err := inputClient.ListLoadedDestinations()
ExitIfError(err)

if listDestResult != nil {
for _, dest := range listDestResult.Dests {
fmt.Printf("%v\n", Jsonify(dest))
}
}
}

// GetDestinationState unloads the Destination based on cli.Context
func GetDestinationState(c *cli.Context) {
if len(c.Args()) < 1 {
ExitIfError(errors.New(strNotEnoughArgs))
}

hostPort := c.Args()[0]
destUUID := c.String("dest_uuid")

if !uuidRegex.MatchString(destUUID) {
ExitIfError(errors.New("specify a valid dest UUID"))
}

// generate a random instance id to be used to create a client tchannel
instanceID := rand.Intn(50000)
inputClient, err := inputhost.NewClient(instanceID, hostPort)
ExitIfError(err)
defer inputClient.Close()

destStateReq := admin.NewReadDestinationStateRequest()
destStateReq.DestUUIDs = []string{destUUID}

readdestStateRes, err1 := inputClient.ReadDestState(destStateReq)
ExitIfError(err1)

fmt.Printf("inputhostUUID: %v\n", readdestStateRes.GetInputHostUUID())

for _, dest := range readdestStateRes.DestState {
printDestState(dest)
for _, ext := range dest.DestExtents {
fmt.Printf("\t%v\n", Jsonify(ext))
}
}
}

type destStateJSONOutput struct {
DestUUID string `json""destUUID"`
MsgsChSize int64 `json:"msgsChSize"`
NumConnections int64 `json:"numConnections"`
NumMsgsIn int64 `json:"numMsgsIn"`
NumSentAcks int64 `json:"numSentAcks"`
NumSentNacks int64 `json:"numSentNacks"`
NumThrottled int64 `json:"numThrottled"`
NumFailed int64 `json:"numFailed"`
}

func printDestState(destState *admin.DestinationState) {
input := &destStateJSONOutput{
DestUUID: destState.GetDestUUID(),
MsgsChSize: destState.GetMsgsChSize(),
NumConnections: destState.GetNumConnections(),
NumMsgsIn: destState.GetNumMsgsIn(),
NumSentAcks: destState.GetNumSentAcks(),
NumSentNacks: destState.GetNumSentNacks(),
NumThrottled: destState.GetNumThrottled(),
NumFailed: destState.GetNumFailed(),
}
inputStr, _ := json.Marshal(input)
fmt.Fprintln(os.Stdout, string(inputStr))
}

type destDescJSONOutputFields struct {
Path string `json:"path"`
UUID string `json:"uuid"`
Expand Down

0 comments on commit a04e0c4

Please sign in to comment.