diff --git a/api/v1/cmd/mwatch/mwatch.go b/api/v1/cmd/mwatch/mwatch.go index ba024272..7cd30881 100644 --- a/api/v1/cmd/mwatch/mwatch.go +++ b/api/v1/cmd/mwatch/mwatch.go @@ -6,7 +6,9 @@ import ( "fmt" "io" "net" + "os" "strconv" + "text/template" "github.com/mesos/mesos-go/api/v1/lib" "github.com/mesos/mesos-go/api/v1/lib/httpcli" @@ -18,6 +20,16 @@ import ( var ( masterHost = flag.String("master", "127.0.0.1", "IP address of mesos master") masterPort = flag.Int("port", 5050, "Port of mesos master") + + // example of using a template: + // mwatch -task.template='{{if .TaskAdded}}{{with .TaskAdded.Task}} {{.Resources|formatResources}}{{end}}{{end}}' + + taskEvents = flag.Bool("task.on", true, "When true, output mesos task events") + taskTemplate = flag.String("task.template", "", "When defined this golang text-template is used to format task events") + frameworkEvents = flag.Bool("framework.on", true, "When true, output mesos framework events") + frameworkTemplate = flag.String("framework.template", "", "When defined this golang text-template is used to format framework events") + agentEvents = flag.Bool("agent.on", true, "When true, output mesos agent events") + agentTemplate = flag.String("agent.template", "", "When defined this golang text-template is used to format agent events") ) func init() { @@ -29,48 +41,113 @@ func main() { uri = fmt.Sprintf("http://%s/api/v1", net.JoinHostPort(*masterHost, strconv.Itoa(*masterPort))) cli = httpmaster.NewSender(httpcli.New(httpcli.Endpoint(uri)).Send) ctx = context.Background() - err = watch(cli.Send(ctx, calls.NonStreaming(calls.Subscribe()))) + + taskTemp, frameworkTemp, agentTemp *template.Template ) + if *taskTemplate != "" { + fm := template.FuncMap(map[string]interface{}{ + "formatResources": func(r []mesos.Resource) string { return mesos.Resources(r).String() }, + }) + taskTemp = template.Must(template.New("task").Funcs(fm).Parse(*taskTemplate)) + } + if *frameworkTemplate != "" { + frameworkTemp = template.Must(template.New("framework").Parse(*frameworkTemplate)) + } + if *agentTemplate != "" { + agentTemp = template.Must(template.New("agent").Parse(*agentTemplate)) + } + err := watch(taskTemp, frameworkTemp, agentTemp)(cli.Send(ctx, calls.NonStreaming(calls.Subscribe()))) if err != nil { panic(err) } } -func watch(resp mesos.Response, err error) error { - defer func() { - if resp != nil { - resp.Close() - } - }() - if err != nil { - return err - } - for { - var e master.Event - if err := resp.Decode(&e); err != nil { - if err == io.EOF { - break +func watch(taskTemp, frameworkTemp, agentTemp *template.Template) func(mesos.Response, error) error { + return func(resp mesos.Response, err error) error { + defer func() { + if resp != nil { + resp.Close() + } + }() + for err == nil { + var e master.Event + if err := resp.Decode(&e); err != nil { + if err == io.EOF { + err = nil + break + } + continue + } + switch t := e.GetType(); t { + case master.Event_TASK_ADDED: + if !*taskEvents { + continue + } + if taskTemp != nil { + err = taskTemp.Execute(os.Stdout, e) + continue + } + task := e.GetTaskAdded().Task + fmt.Println(t.String(), task.GetFrameworkID(), task.GetTaskID(), task.GetState(), task.GetLabels().Format(), mesos.Resources(task.GetResources())) + case master.Event_TASK_UPDATED: + if !*taskEvents { + continue + } + if taskTemp != nil { + err = taskTemp.Execute(os.Stdout, e) + continue + } + task := e.GetTaskUpdated().GetStatus() + fmt.Println(t.String(), task.GetTaskID(), task.GetState(), task.GetLabels().Format()) + case master.Event_AGENT_ADDED: + if !*agentEvents { + continue + } + if agentTemp != nil { + err = agentTemp.Execute(os.Stdout, e) + continue + } + fmt.Println(t.String(), e.GetAgentAdded().String()) + case master.Event_AGENT_REMOVED: + if !*agentEvents { + continue + } + if agentTemp != nil { + err = agentTemp.Execute(os.Stdout, e) + continue + } + fmt.Println(t.String(), e.GetAgentRemoved().String()) + case master.Event_FRAMEWORK_ADDED: + if !*frameworkEvents { + continue + } + if frameworkTemp != nil { + err = frameworkTemp.Execute(os.Stdout, e) + continue + } + fmt.Println(t.String(), e.GetFrameworkAdded().String()) + case master.Event_FRAMEWORK_UPDATED: + if !*frameworkEvents { + continue + } + if frameworkTemp != nil { + err = frameworkTemp.Execute(os.Stdout, e) + continue + } + fmt.Println(t.String(), e.GetFrameworkUpdated().String()) + case master.Event_FRAMEWORK_REMOVED: + if !*frameworkEvents { + continue + } + if frameworkTemp != nil { + err = frameworkTemp.Execute(os.Stdout, e) + continue + } + fmt.Println(t.String(), e.GetFrameworkRemoved().String()) + default: + // noop } - return err - } - switch t := e.GetType(); t { - case master.Event_TASK_ADDED: - fmt.Println(t.String(), e.GetTaskAdded().String()) - case master.Event_TASK_UPDATED: - fmt.Println(t.String(), e.GetTaskUpdated().String()) - case master.Event_AGENT_ADDED: - fmt.Println(t.String(), e.GetAgentAdded().String()) - case master.Event_AGENT_REMOVED: - fmt.Println(t.String(), e.GetAgentRemoved().String()) - case master.Event_FRAMEWORK_ADDED: - fmt.Println(t.String(), e.GetFrameworkAdded().String()) - case master.Event_FRAMEWORK_UPDATED: - fmt.Println(t.String(), e.GetFrameworkUpdated().String()) - case master.Event_FRAMEWORK_REMOVED: - fmt.Println(t.String(), e.GetFrameworkRemoved().String()) - default: - fmt.Println(t.String()) } + return err } - return nil } diff --git a/api/v1/lib/labels.go b/api/v1/lib/labels.go index 96e5a309..ecf50553 100644 --- a/api/v1/lib/labels.go +++ b/api/v1/lib/labels.go @@ -1,5 +1,10 @@ package mesos +import ( + "bytes" + "io" +) + type labelList []Label // convenience type, for working with unwrapped Label slices // Equivalent returns true if left and right have the same labels. Order is not important. @@ -39,3 +44,52 @@ func (left Label) Equivalent(right Label) bool { return right.Value != nil && *left.Value == *right.Value } } + +func (left Label) writeTo(w io.Writer) (n int64, err error) { + write := func(s string) { + if err != nil { + return + } + var n2 int + n2, err = io.WriteString(w, s) + n += int64(n2) + } + write(left.Key) + if s := left.GetValue(); s != "" { + write("=") + write(s) + } + return +} + +func (left *Labels) writeTo(w io.Writer) (n int64, err error) { + var ( + lab = left.GetLabels() + n2 int + n3 int64 + ) + for i := range lab { + if i > 0 { + n2, err = io.WriteString(w, ",") + n += int64(n2) + if err != nil { + break + } + } + n3, err = lab[i].writeTo(w) + n += n3 + if err != nil { + break + } + } + return +} + +func (left *Labels) Format() string { + if left == nil { + return "" + } + var b bytes.Buffer + left.writeTo(&b) + return b.String() +} diff --git a/api/v1/lib/labels_test.go b/api/v1/lib/labels_test.go index 9a8ade42..59ec4ea4 100644 --- a/api/v1/lib/labels_test.go +++ b/api/v1/lib/labels_test.go @@ -81,3 +81,25 @@ func TestEquivalent_Labels(t *testing.T) { }) } } + +func TestLabels_Format(t *testing.T) { + ps := func(s string) *string { return &s } + for ti, tc := range []struct { + lab *Labels + wants string + }{ + {}, + {&Labels{}, ""}, + {&Labels{Labels: []Label{}}, ""}, + {&Labels{Labels: []Label{{Key: "a"}}}, "a"}, + {&Labels{Labels: []Label{{Key: "a"}, {Key: "b"}}}, "a,b"}, + {&Labels{Labels: []Label{{Key: "a"}, {Key: "b", Value: ps("1")}, {Key: "c"}}}, "a,b=1,c"}, + } { + t.Run(strconv.Itoa(ti), func(t *testing.T) { + actual := tc.lab.Format() + if tc.wants != actual { + t.Errorf("expected %q instead of %q", tc.wants, actual) + } + }) + } +} diff --git a/api/v1/lib/resources.go b/api/v1/lib/resources.go index 55fb8fd1..2c348030 100644 --- a/api/v1/lib/resources.go +++ b/api/v1/lib/resources.go @@ -232,6 +232,7 @@ func (resources Resources) Format(options ...func(*ResourcesFormatOptions)) stri o(&f) } } + // TODO(jdef) use a string.Builder once we can rely on a more modern golang version buf := bytes.Buffer{} for i := range resources { if i > 0 { @@ -273,8 +274,9 @@ func (resources Resources) Format(options ...func(*ResourcesFormatOptions)) stri buf.WriteString(*rr.Principal) } if rr.Labels != nil { - buf.WriteString(",") - buf.WriteString(rr.GetLabels().String()) + buf.WriteString(",{") + rr.GetLabels().writeTo(&buf) + buf.WriteString("}") } buf.WriteString(")") }