Skip to content

Commit

Permalink
Merge 9bb28f2 into 28b4b10
Browse files Browse the repository at this point in the history
  • Loading branch information
jdef committed Mar 27, 2018
2 parents 28b4b10 + 9bb28f2 commit 6af4eb5
Show file tree
Hide file tree
Showing 4 changed files with 192 additions and 37 deletions.
147 changes: 112 additions & 35 deletions api/v1/cmd/mwatch/mwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (
"fmt"
"io"
"net"
"os"
"strconv"
"text/template"

"github.com/mesos/mesos-go/api/v1/lib"
"github.com/mesos/mesos-go/api/v1/lib/httpcli"
Expand All @@ -18,6 +20,16 @@ import (
var (
masterHost = flag.String("master", "127.0.0.1", "IP address of mesos master")
masterPort = flag.Int("port", 5050, "Port of mesos master")

// example of using a template:
// mwatch -task.template='{{if .TaskAdded}}{{with .TaskAdded.Task}} {{.Resources|formatResources}}{{end}}{{end}}'

taskEvents = flag.Bool("task.on", true, "When true, output mesos task events")
taskTemplate = flag.String("task.template", "", "When defined this golang text-template is used to format task events")
frameworkEvents = flag.Bool("framework.on", true, "When true, output mesos framework events")
frameworkTemplate = flag.String("framework.template", "", "When defined this golang text-template is used to format framework events")
agentEvents = flag.Bool("agent.on", true, "When true, output mesos agent events")
agentTemplate = flag.String("agent.template", "", "When defined this golang text-template is used to format agent events")
)

func init() {
Expand All @@ -29,48 +41,113 @@ func main() {
uri = fmt.Sprintf("http://%s/api/v1", net.JoinHostPort(*masterHost, strconv.Itoa(*masterPort)))
cli = httpmaster.NewSender(httpcli.New(httpcli.Endpoint(uri)).Send)
ctx = context.Background()
err = watch(cli.Send(ctx, calls.NonStreaming(calls.Subscribe())))

taskTemp, frameworkTemp, agentTemp *template.Template
)
if *taskTemplate != "" {
fm := template.FuncMap(map[string]interface{}{
"formatResources": func(r []mesos.Resource) string { return mesos.Resources(r).String() },
})
taskTemp = template.Must(template.New("task").Funcs(fm).Parse(*taskTemplate))
}
if *frameworkTemplate != "" {
frameworkTemp = template.Must(template.New("framework").Parse(*frameworkTemplate))
}
if *agentTemplate != "" {
agentTemp = template.Must(template.New("agent").Parse(*agentTemplate))
}
err := watch(taskTemp, frameworkTemp, agentTemp)(cli.Send(ctx, calls.NonStreaming(calls.Subscribe())))
if err != nil {
panic(err)
}
}

func watch(resp mesos.Response, err error) error {
defer func() {
if resp != nil {
resp.Close()
}
}()
if err != nil {
return err
}
for {
var e master.Event
if err := resp.Decode(&e); err != nil {
if err == io.EOF {
break
func watch(taskTemp, frameworkTemp, agentTemp *template.Template) func(mesos.Response, error) error {
return func(resp mesos.Response, err error) error {
defer func() {
if resp != nil {
resp.Close()
}
}()
for err == nil {
var e master.Event
if err := resp.Decode(&e); err != nil {
if err == io.EOF {
err = nil
break
}
continue
}
switch t := e.GetType(); t {
case master.Event_TASK_ADDED:
if !*taskEvents {
continue
}
if taskTemp != nil {
err = taskTemp.Execute(os.Stdout, e)
continue
}
task := e.GetTaskAdded().Task
fmt.Println(t.String(), task.GetFrameworkID(), task.GetTaskID(), task.GetState(), task.GetLabels().Format(), mesos.Resources(task.GetResources()))
case master.Event_TASK_UPDATED:
if !*taskEvents {
continue
}
if taskTemp != nil {
err = taskTemp.Execute(os.Stdout, e)
continue
}
task := e.GetTaskUpdated().GetStatus()
fmt.Println(t.String(), task.GetTaskID(), task.GetState(), task.GetLabels().Format())
case master.Event_AGENT_ADDED:
if !*agentEvents {
continue
}
if agentTemp != nil {
err = agentTemp.Execute(os.Stdout, e)
continue
}
fmt.Println(t.String(), e.GetAgentAdded().String())
case master.Event_AGENT_REMOVED:
if !*agentEvents {
continue
}
if agentTemp != nil {
err = agentTemp.Execute(os.Stdout, e)
continue
}
fmt.Println(t.String(), e.GetAgentRemoved().String())
case master.Event_FRAMEWORK_ADDED:
if !*frameworkEvents {
continue
}
if frameworkTemp != nil {
err = frameworkTemp.Execute(os.Stdout, e)
continue
}
fmt.Println(t.String(), e.GetFrameworkAdded().String())
case master.Event_FRAMEWORK_UPDATED:
if !*frameworkEvents {
continue
}
if frameworkTemp != nil {
err = frameworkTemp.Execute(os.Stdout, e)
continue
}
fmt.Println(t.String(), e.GetFrameworkUpdated().String())
case master.Event_FRAMEWORK_REMOVED:
if !*frameworkEvents {
continue
}
if frameworkTemp != nil {
err = frameworkTemp.Execute(os.Stdout, e)
continue
}
fmt.Println(t.String(), e.GetFrameworkRemoved().String())
default:
// noop
}
return err
}
switch t := e.GetType(); t {
case master.Event_TASK_ADDED:
fmt.Println(t.String(), e.GetTaskAdded().String())
case master.Event_TASK_UPDATED:
fmt.Println(t.String(), e.GetTaskUpdated().String())
case master.Event_AGENT_ADDED:
fmt.Println(t.String(), e.GetAgentAdded().String())
case master.Event_AGENT_REMOVED:
fmt.Println(t.String(), e.GetAgentRemoved().String())
case master.Event_FRAMEWORK_ADDED:
fmt.Println(t.String(), e.GetFrameworkAdded().String())
case master.Event_FRAMEWORK_UPDATED:
fmt.Println(t.String(), e.GetFrameworkUpdated().String())
case master.Event_FRAMEWORK_REMOVED:
fmt.Println(t.String(), e.GetFrameworkRemoved().String())
default:
fmt.Println(t.String())
}
return err
}
return nil
}
54 changes: 54 additions & 0 deletions api/v1/lib/labels.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
package mesos

import (
"bytes"
"io"
)

type labelList []Label // convenience type, for working with unwrapped Label slices

// Equivalent returns true if left and right have the same labels. Order is not important.
Expand Down Expand Up @@ -39,3 +44,52 @@ func (left Label) Equivalent(right Label) bool {
return right.Value != nil && *left.Value == *right.Value
}
}

func (left Label) writeTo(w io.Writer) (n int64, err error) {
write := func(s string) {
if err != nil {
return
}
var n2 int
n2, err = io.WriteString(w, s)
n += int64(n2)
}
write(left.Key)
if s := left.GetValue(); s != "" {
write("=")
write(s)
}
return
}

func (left *Labels) writeTo(w io.Writer) (n int64, err error) {
var (
lab = left.GetLabels()
n2 int
n3 int64
)
for i := range lab {
if i > 0 {
n2, err = io.WriteString(w, ",")
n += int64(n2)
if err != nil {
break
}
}
n3, err = lab[i].writeTo(w)
n += n3
if err != nil {
break
}
}
return
}

func (left *Labels) Format() string {
if left == nil {
return ""
}
var b bytes.Buffer
left.writeTo(&b)
return b.String()
}
22 changes: 22 additions & 0 deletions api/v1/lib/labels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,25 @@ func TestEquivalent_Labels(t *testing.T) {
})
}
}

func TestLabels_Format(t *testing.T) {
ps := func(s string) *string { return &s }
for ti, tc := range []struct {
lab *Labels
wants string
}{
{},
{&Labels{}, ""},
{&Labels{Labels: []Label{}}, ""},
{&Labels{Labels: []Label{{Key: "a"}}}, "a"},
{&Labels{Labels: []Label{{Key: "a"}, {Key: "b"}}}, "a,b"},
{&Labels{Labels: []Label{{Key: "a"}, {Key: "b", Value: ps("1")}, {Key: "c"}}}, "a,b=1,c"},
} {
t.Run(strconv.Itoa(ti), func(t *testing.T) {
actual := tc.lab.Format()
if tc.wants != actual {
t.Errorf("expected %q instead of %q", tc.wants, actual)
}
})
}
}
6 changes: 4 additions & 2 deletions api/v1/lib/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ func (resources Resources) Format(options ...func(*ResourcesFormatOptions)) stri
o(&f)
}
}
// TODO(jdef) use a string.Builder once we can rely on a more modern golang version
buf := bytes.Buffer{}
for i := range resources {
if i > 0 {
Expand Down Expand Up @@ -273,8 +274,9 @@ func (resources Resources) Format(options ...func(*ResourcesFormatOptions)) stri
buf.WriteString(*rr.Principal)
}
if rr.Labels != nil {
buf.WriteString(",")
buf.WriteString(rr.GetLabels().String())
buf.WriteString(",{")
rr.GetLabels().writeTo(&buf)
buf.WriteString("}")
}
buf.WriteString(")")
}
Expand Down

0 comments on commit 6af4eb5

Please sign in to comment.