Skip to content

Commit

Permalink
Merge pull request influxdata#109 from influxdb/nc-edot
Browse files Browse the repository at this point in the history
Add edot to show
  • Loading branch information
Nathaniel Cook committed Dec 18, 2015
2 parents b26797a + a017aa1 commit 24aadcf
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

### Features
- [#96](https://github.com/influxdb/kapacitor/issues/96): Use KAPACITOR_URL env var for setting the kapacitord url in the client.
- [#109](https://github.com/influxdb/kapacitor/pull/109): Add throughput counts to DOT format in `kapacitor show` command, if task is executing.

### Bugfixes
- [#102](https://github.com/influxdb/kapacitor/issues/102): Fix race when start/stoping timeTicker in batch.go
Expand Down
2 changes: 1 addition & 1 deletion cmd/kapacitord/run/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func TestServer_EnableTask(t *testing.T) {
if ti.TICKscript != tick {
t.Fatalf("unexpected TICKscript got %s exp %s", ti.TICKscript, tick)
}
dot := "digraph testTaskName {\nstream0 -> stream1;\n}"
dot := "digraph testTaskName {\nstream0 -> stream1 [label=\"0\"];\n}"
if ti.Dot != dot {
t.Fatalf("unexpected dot got %s exp %s", ti.Dot, dot)
}
Expand Down
4 changes: 4 additions & 0 deletions edge.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ func newEdge(taskName, parentName, childName string, t pipeline.EdgeType, logSer
return e
}

func (e *Edge) collectedCount() string {
return e.statMap.Get(statCollected).String()
}

// Close the edge, this can only be called after all
// collect calls to the edge have finished.
func (e *Edge) Close() {
Expand Down
16 changes: 16 additions & 0 deletions node.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kapacitor

import (
"bytes"
"fmt"
"log"
"runtime"
Expand Down Expand Up @@ -33,6 +34,9 @@ type Node interface {
closeChildEdges()
// abort parent edges
abortParentEdges()

// executing dot
edot(buf *bytes.Buffer)
}

//implementation of Node
Expand Down Expand Up @@ -148,3 +152,15 @@ func (n *node) closeChildEdges() {
child.Close()
}
}

func (n *node) edot(buf *bytes.Buffer) {
for i, c := range n.children {
buf.Write([]byte(
fmt.Sprintf("%s -> %s [label=\"%s\"];\n",
n.Name(),
c.Name(),
n.outs[i].collectedCount(),
),
))
}
}
10 changes: 8 additions & 2 deletions services/task_store/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type Service struct {
StartTask(t *kapacitor.Task) (*kapacitor.ExecutingTask, error)
StopTask(name string) error
IsExecuting(name string) bool
ExecutingDot(name string) string
}
logger *log.Logger
}
Expand Down Expand Up @@ -214,11 +215,16 @@ func (ts *Service) handleTask(w http.ResponseWriter, r *http.Request) {
return
}

executing := ts.TaskMaster.IsExecuting(name)
errMsg := raw.Error
dot := ""
task, err := ts.Load(name)
if err == nil {
dot = string(task.Dot())
if executing {
dot = ts.TaskMaster.ExecutingDot(name)
} else {
dot = string(task.Dot())
}
} else {
errMsg = err.Error()
}
Expand All @@ -230,7 +236,7 @@ func (ts *Service) handleTask(w http.ResponseWriter, r *http.Request) {
TICKscript: raw.TICKscript,
Dot: dot,
Enabled: ts.IsEnabled(name),
Executing: ts.TaskMaster.IsExecuting(name),
Executing: executing,
Error: errMsg,
}

Expand Down
19 changes: 19 additions & 0 deletions task.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kapacitor

import (
"bytes"
"errors"
"fmt"
"log"
Expand Down Expand Up @@ -283,6 +284,24 @@ func (et *ExecutingTask) registerOutput(name string, o Output) {
et.outputs[name] = o
}

// Return a graphviz .dot formatted byte array.
// Label edges with relavant execution information.
func (et *ExecutingTask) EDot() []byte {

var buf bytes.Buffer

buf.Write([]byte("digraph "))
buf.Write([]byte(et.Task.Name))
buf.Write([]byte(" {\n"))
et.walk(func(n Node) error {
n.edot(&buf)
return nil
})
buf.Write([]byte("}"))

return buf.Bytes()
}

// Create a node from a given pipeline node.
func (et *ExecutingTask) createNode(p pipeline.Node) (Node, error) {
switch t := p.(type) {
Expand Down
10 changes: 10 additions & 0 deletions task_master.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,16 @@ func (tm *TaskMaster) IsExecuting(name string) bool {
return executing
}

func (tm *TaskMaster) ExecutingDot(name string) string {
tm.mu.RLock()
defer tm.mu.RUnlock()
et, executing := tm.tasks[name]
if executing {
return string(et.EDot())
}
return ""
}

func (tm *TaskMaster) Stream(name string) (StreamCollector, error) {
tm.mu.Lock()
defer tm.mu.Unlock()
Expand Down

0 comments on commit 24aadcf

Please sign in to comment.