Skip to content

Commit

Permalink
Scope tasks to a set of DBRPs.
Browse files Browse the repository at this point in the history
Remove 'fork' method and use 'from' as an implicit fork call.
  • Loading branch information
nathanielc committed Oct 28, 2015
1 parent e1cd392 commit 19bf7e7
Show file tree
Hide file tree
Showing 21 changed files with 465 additions and 166 deletions.
2 changes: 0 additions & 2 deletions alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,6 @@ func (a *AlertNode) runAlert() error {
}
}
}
a.logger.Println("I! alert node done")
return nil
}

Expand Down Expand Up @@ -279,7 +278,6 @@ func (a *AlertNode) handleLog(ad AlertData) {
if n != 1 || err != nil {
a.logger.Println("E! failed to write to file", err)
}
a.logger.Println("I! handled Log")
}

func (a *AlertNode) handleExec(ad AlertData) {
Expand Down
32 changes: 28 additions & 4 deletions batch.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kapacitor

import (
"encoding/json"
"time"

"github.com/influxdb/influxdb/client"
Expand Down Expand Up @@ -36,6 +37,12 @@ func newBatchNode(et *ExecutingTask, n *pipeline.BatchNode) (*BatchNode, error)
return bn, nil
}

// Return list of databases and retention policies
// the batcher will query.
func (b *BatchNode) DBRPs() ([]DBRP, error) {
return b.query.DBRPs()
}

// Query InfluxDB return Edge with data
func (b *BatchNode) Query(batch BatchCollector) {
defer batch.Close()
Expand Down Expand Up @@ -87,8 +94,9 @@ func (b *BatchNode) Query(batch BatchCollector) {
Group: groupID,
Tags: series.Tags,
}
bch.Points = make([]models.TimeFields, len(series.Values))
for i, v := range series.Values {
bch.Points = make([]models.TimeFields, 0, len(series.Values))
for _, v := range series.Values {
var skip bool
fields := make(models.Fields)
var t time.Time
for i, c := range series.Columns {
Expand All @@ -104,10 +112,26 @@ func (b *BatchNode) Query(batch BatchCollector) {
return
}
} else {
fields[c] = v[i]
value := v[i]
if n, ok := value.(json.Number); ok {
f, err := n.Float64()
if err == nil {
value = f
}
}
if value == nil {
skip = true
break
}
fields[c] = value
}
}
bch.Points[i] = models.TimeFields{Time: t, Fields: fields}
if !skip {
bch.Points = append(
bch.Points,
models.TimeFields{Time: t, Fields: fields},
)
}
}
batch.CollectBatch(bch)
}
Expand Down
140 changes: 120 additions & 20 deletions cmd/kapacitor/main.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package main

import (
"bytes"
"encoding/json"
"errors"
"flag"
"fmt"
"io"
"log"
"net/http"
"net/url"
Expand All @@ -13,6 +15,7 @@ import (
"strings"
"time"

"github.com/dustin/go-humanize"
"github.com/influxdb/kapacitor"
)

Expand Down Expand Up @@ -187,9 +190,10 @@ func doHelp(args []string) error {
// Record
var (
recordFlags = flag.NewFlagSet("record", flag.ExitOnError)
rname = recordFlags.String("name", "", "the name of a task. If recording a batch")
rstop = recordFlags.String("stop", "", "the stop time of a query if recording a batch. Defaults to now.")
rnum = recordFlags.Int("num", 1, "the number of periods to query. If recording a batch")
rname = recordFlags.String("name", "", "the name of a task. If recording a batch or stream")

rstop = recordFlags.String("stop", "", "the stop time of a query if recording a batch. Defaults to now.")
rnum = recordFlags.Int("num", 1, "the number of periods to query. If recording a batch")

rquery = recordFlags.String("query", "", "the query to record. If recording a query.")
rtype = recordFlags.String("type", "", "the type of the recording to save (stream|batch). If recording a query.")
Expand All @@ -210,9 +214,10 @@ func recordUsage() {
Examples:
$ kapacitor record stream -duration 1m
$ kapacitor record stream -name mem_free -duration 1m
This records the live data stream for 1 minute.
This records the live data stream for 1 minute using the databases and retention policies
from the named task.
$ kapacitor record batch -name cpu_idle -stop 2015-09-01T00:00:00Z -num 10
Expand All @@ -221,7 +226,7 @@ Examples:
$ kapacitor record query -query "select value from cpu_idle where time > now() - 1h and time < now()" -type stream
This records the result of the query and stores it as a stream recording. Use -type batcher to store as batch recording.
This records the result of the query and stores it as a stream recording. Use -type batch to store as batch recording.
Options:
`
Expand All @@ -235,6 +240,7 @@ func doRecord(args []string) error {
v.Add("type", args[0])
switch args[0] {
case "stream":
v.Add("name", *rname)
v.Add("duration", rdur.String())
case "batch":
v.Add("name", *rname)
Expand Down Expand Up @@ -272,36 +278,129 @@ var (
dname = defineFlags.String("name", "", "the task name")
dtick = defineFlags.String("tick", "", "path to the TICKscript")
dtype = defineFlags.String("type", "", "the task type (stream|batch)")
ddbrp = make(dbrps, 0)
)

func init() {
defineFlags.Var(&ddbrp, "dbrp", `a database and retention policy pair of the form "db"."rp" the quotes are optional. The flag can be specified multiple times.`)
}

type dbrps []kapacitor.DBRP

func (d *dbrps) String() string {
return fmt.Sprint(*d)
}

// Parse string of the form "db"."rp" where the quotes are optional but can include escaped quotes
// within the strings.
func (d *dbrps) Set(value string) error {
dbrp := kapacitor.DBRP{}
if len(value) == 0 {
return fmt.Errorf("dbrp cannot be empty")
}
var n int
if value[0] == '"' {
dbrp.Database, n = parseQuotedStr(value)
} else {
n = strings.IndexRune(value, '.')
dbrp.Database = value[:n]
}
if value[n] != '.' {
return fmt.Errorf("dbrp must specify retention policy, do you have a missing or extra '.'?")
}
value = value[n+1:]
if value[0] == '"' {
dbrp.RetentionPolicy, n = parseQuotedStr(value)
} else {
dbrp.RetentionPolicy = value
}
*d = append(*d, dbrp)
return nil
}

// read from txt starting with begining quote until next unescaped quote.
func parseQuotedStr(txt string) (string, int) {
literal := txt[1 : len(txt)-1]
quote := txt[0]
// Unescape quotes
var buf bytes.Buffer
buf.Grow(len(literal))
last := 0
i := 0
for ; i < len(literal)-1; i++ {
if literal[i] == '\\' && literal[i+1] == quote {
buf.Write([]byte(literal[last:i]))
buf.Write([]byte{quote})
i += 2
last = i
} else if literal[i] == quote {
break
}
}
buf.Write([]byte(literal[last:i]))
literal = buf.String()
return literal, i + 1
}

func defineUsage() {
var u = `Usage: kapacitor define [options]
Create or update a task.
A task is defined via a TICKscript that defines the data processing pipeline of the task.
If an option is absent it will be left unmodified.
For example:
You can define a task for the first time with all the flags.
$ kapacitor define -name my_task -tick path/to/TICKscript -type stream -dbrp mydb.myrp
Later you can change a sinlge property of the task by referencing its name
and only providing the single option you wish to modify.
$ kapacitor define -name my_task -tick path/to/TICKscript
or
$ kapacitor define -name my_task -dbrp mydb.myrp -dbrp otherdb.default
NOTE: you must specify all 'dbrp' flags you desire if you wish to modify them.
Options:
`
fmt.Fprintln(os.Stderr, u)
defineFlags.PrintDefaults()
}

func doDefine(args []string) error {

if *dtick == "" || *dname == "" || *dtype == "" {
fmt.Fprintln(os.Stderr, "Must pass name,tick and type options.")
if *dname == "" {
fmt.Fprintln(os.Stderr, "Must always pass name flag.")
defineFlags.Usage()
os.Exit(2)
}

f, err := os.Open(*dtick)
if err != nil {
return err
var f io.Reader
if *dtick != "" {
var err error
f, err = os.Open(*dtick)
if err != nil {
return err
}
}
v := url.Values{}
v.Add("name", *dname)
v.Add("type", *dtype)
if len(ddbrp) > 0 {
b, err := json.Marshal(ddbrp)
if err != nil {
return err
}
v.Add("dbrps", string(b))
}
r, err := http.Post(*kapacitordURL+"/task?"+v.Encode(), "application/octetstream", f)
if err != nil {
return err
Expand All @@ -326,7 +425,7 @@ var (
replayFlags = flag.NewFlagSet("replay", flag.ExitOnError)
rtname = replayFlags.String("name", "", "the task name")
rid = replayFlags.String("id", "", "the recording ID")
rfast = replayFlags.Bool("fast", false, "whether to replay the data as fast as possible. If false, replay the data in real time")
rfast = replayFlags.Bool("fast", false, "If set, replay the data as fast as possible. If not set, replay the data in real time.")
)

func replayUsage() {
Expand Down Expand Up @@ -459,12 +558,12 @@ func reloadUsage() {
}

func doReload(args []string) error {
err := doEnable(args)
err := doDisable(args)
if err != nil {
return err
}

return doDisable(args)
return doEnable(args)
}

// List
Expand Down Expand Up @@ -504,6 +603,7 @@ func doList(args []string) error {
Tasks []struct {
Name string
Type kapacitor.TaskType
DBRPs []kapacitor.DBRP
Enabled bool
} `json:"Tasks"`
}
Expand All @@ -514,10 +614,10 @@ func doList(args []string) error {
return errors.New(rp.Error)
}

outFmt := "%-30s%-10v%-10v\n"
fmt.Fprintf(os.Stdout, outFmt, "Name", "Type", "Enabled")
outFmt := "%-30s%-10v%-10v%s\n"
fmt.Fprintf(os.Stdout, outFmt, "Name", "Type", "Enabled", "Databases and Retention Policies")
for _, t := range rp.Tasks {
fmt.Fprintf(os.Stdout, outFmt, t.Name, t.Type, t.Enabled)
fmt.Fprintf(os.Stdout, outFmt, t.Name, t.Type, t.Enabled, t.DBRPs)
}
case "recordings":

Expand Down Expand Up @@ -545,10 +645,10 @@ func doList(args []string) error {
return errors.New(rp.Error)
}

outFmt := "%-40s%-10v%15.2f\n"
fmt.Fprintf(os.Stdout, "%-40s%-10s%15s\n", "ID", "Type", "Size (MB)")
outFmt := "%-40s%-10v%15s\n"
fmt.Fprintf(os.Stdout, "%-40s%-10s%15s\n", "ID", "Type", "Size")
for _, r := range rp.Recordings {
fmt.Fprintf(os.Stdout, outFmt, r.ID, r.Type, float64(r.Size)/1024.0/1024.0)
fmt.Fprintf(os.Stdout, outFmt, r.ID, r.Type, humanize.Bytes(uint64(r.Size)))
}
default:
return fmt.Errorf("cannot list '%s' did you mean 'tasks' or 'recordings'?", kind)
Expand Down
10 changes: 5 additions & 5 deletions expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,16 @@ func EvalPredicate(se *tick.StatefulExpr, fields models.Fields, tags map[string]
return b, nil
}

func mergeFieldsAndTags(fields models.Fields, tags map[string]string) (tick.Vars, error) {
vars := make(tick.Vars)
func mergeFieldsAndTags(fields models.Fields, tags map[string]string) (*tick.Scope, error) {
scope := tick.NewScope()
for k, v := range fields {
if _, ok := tags[k]; ok {
return nil, fmt.Errorf("cannot have field and tags with same name %q", k)
}
vars[k] = v
scope.Set(k, v)
}
for k, v := range tags {
vars[k] = v
scope.Set(k, v)
}
return vars, nil
return scope, nil
}
1 change: 1 addition & 0 deletions http_out.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ func (h *HTTPOutNode) updateResultWithBatch(b models.Batch) {
h.result.Series[idx] = row
}
}

func (h *HTTPOutNode) stopOut() {
h.et.tm.HTTPDService.DelRoutes(h.routes)
}
2 changes: 1 addition & 1 deletion integrations/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func testBatcher(t *testing.T, name, script string) (clock.Setter, *kapacitor.Ex
}

// Create task
task, err := kapacitor.NewBatcher(name, script)
task, err := kapacitor.NewBatcher(name, script, dbrps)
if err != nil {
t.Fatal(err)
}
Expand Down
Loading

0 comments on commit 19bf7e7

Please sign in to comment.