diff --git a/CHANGELOG.md b/CHANGELOG.md index a0251a5e0..5008b9750 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,12 +4,18 @@ ### Release Notes +Bugfix #106 made a breaking change to the internal HTTP API. This was to facilitate integration testing and overall better design. +Now POSTing a recording request will start the recording and immediately return. If you want to wait till it is complete do +a GET for the recording info and it will block until its complete. The kapacitor cli has been updated accordingly. + ### Features - [#96](https://github.com/influxdb/kapacitor/issues/96): Use KAPACITOR_URL env var for setting the kapacitord url in the client. - [#109](https://github.com/influxdb/kapacitor/pull/109): Add throughput counts to DOT format in `kapacitor show` command, if task is executing. ### Bugfixes - [#102](https://github.com/influxdb/kapacitor/issues/102): Fix race when start/stoping timeTicker in batch.go +- [#106](https://github.com/influxdb/kapacitor/pull/106): Fix hang when replaying stream recording. + ## v0.2.2 [2015-12-16] diff --git a/cmd/kapacitor/main.go b/cmd/kapacitor/main.go index 7d27f80fe..9fdb32e92 100644 --- a/cmd/kapacitor/main.go +++ b/cmd/kapacitor/main.go @@ -304,7 +304,23 @@ func doRecord(args []string) error { if rp.Error != "" { return errors.New(rp.Error) } - fmt.Println(rp.RecordingID) + + v = url.Values{} + v.Add("id", rp.RecordingID) + r, err = http.Get(kapacitorEndpoint + "/record?" + v.Encode()) + if err != nil { + return err + } + defer r.Body.Close() + + d = json.NewDecoder(r.Body) + ri := replay.RecordingInfo{} + d.Decode(&ri) + if ri.Error != "" { + return errors.New(ri.Error) + } + + fmt.Println(ri.ID) return nil } diff --git a/cmd/kapacitord/run/server_helper_test.go b/cmd/kapacitord/run/server_helper_test.go index c3518eb8b..1c202b3c1 100644 --- a/cmd/kapacitord/run/server_helper_test.go +++ b/cmd/kapacitord/run/server_helper_test.go @@ -220,10 +220,11 @@ func (s *Server) GetTask(name string) (ti task_store.TaskInfo, err error) { if err != nil { return } + defer resp.Body.Close() if resp.StatusCode != http.StatusOK { err = fmt.Errorf("unexpected status code got %d exp %d", resp.StatusCode, http.StatusOK) + return } - defer resp.Body.Close() d := json.NewDecoder(resp.Body) d.Decode(&ti) return @@ -247,6 +248,84 @@ func (s *Server) MustWrite(db, rp, body string, params url.Values) string { return results } +func (s *Server) DoStreamRecording(name string, duration time.Duration, started chan struct{}) (id string, err error) { + v := url.Values{} + v.Add("type", "stream") + v.Add("name", name) + v.Add("duration", duration.String()) + r, err := http.Post(s.URL()+"/api/v1/record?"+v.Encode(), "", nil) + if err != nil { + return + } + defer r.Body.Close() + if r.StatusCode != http.StatusOK { + err = fmt.Errorf("unexpected status code got %d exp %d", r.StatusCode, http.StatusOK) + return + } + + // Decode valid response + type resp struct { + RecordingID string `json:"RecordingID"` + Error string `json:"Error"` + } + rp := resp{} + d := json.NewDecoder(r.Body) + d.Decode(&rp) + if rp.Error != "" { + err = errors.New(rp.Error) + return + } + id = rp.RecordingID + close(started) + v = url.Values{} + v.Add("id", id) + _, err = s.HTTPGet(s.URL() + "/api/v1/record?" + v.Encode()) + return +} + +func (s *Server) DoBatchRecording(name string, past time.Duration) (id string, err error) { + v := url.Values{} + v.Add("type", "batch") + v.Add("name", name) + v.Add("past", past.String()) + r, err := http.Post(s.URL()+"/api/v1/record?"+v.Encode(), "", nil) + if err != nil { + return + } + defer r.Body.Close() + if r.StatusCode != http.StatusOK { + err = fmt.Errorf("unexpected status code got %d exp %d", r.StatusCode, http.StatusOK) + return + } + + // Decode valid response + type resp struct { + RecordingID string `json:"RecordingID"` + Error string `json:"Error"` + } + rp := resp{} + d := json.NewDecoder(r.Body) + d.Decode(&rp) + if rp.Error != "" { + err = errors.New(rp.Error) + return + } + id = rp.RecordingID + v = url.Values{} + v.Add("id", id) + _, err = s.HTTPGet(s.URL() + "/api/v1/record?" + v.Encode()) + return +} + +func (s *Server) DoReplay(name, id string) (string, error) { + v := url.Values{} + v.Add("name", name) + v.Add("id", id) + v.Add("clock", "fast") + v.Add("rec-time", "true") + return s.HTTPPost(s.URL()+"/api/v1/replay?"+v.Encode(), nil) +} + // NewConfig returns the default config with temporary paths. func NewConfig() *run.Config { c := run.NewConfig() diff --git a/cmd/kapacitord/run/server_test.go b/cmd/kapacitord/run/server_test.go index 1634d8853..bf18f6229 100644 --- a/cmd/kapacitord/run/server_test.go +++ b/cmd/kapacitord/run/server_test.go @@ -1,14 +1,20 @@ package run_test import ( + "bufio" + "encoding/json" "fmt" + "io/ioutil" "net/http" "net/url" + "os" + "path" "reflect" "testing" "time" "github.com/influxdb/influxdb/client" + "github.com/influxdb/influxdb/influxql" "github.com/influxdb/influxdb/models" "github.com/influxdb/kapacitor" ) @@ -418,3 +424,271 @@ batch t.Error("unexpected query count", count) } } + +func TestServer_RecordReplayStream(t *testing.T) { + s := OpenDefaultServer() + defer s.Close() + + name := "testStreamTask" + ttype := "stream" + dbrps := []kapacitor.DBRP{{ + Database: "mydb", + RetentionPolicy: "myrp", + }} + + tmpDir, err := ioutil.TempDir("", "testStreamTaskRecording") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmpDir) + tick := ` +stream + .from().measurement('test') + .window() + .period(10s) + .every(10s) + .mapReduce(influxql.count('value')) + .alert() + .id('test-count') + .message('{{ .ID }} got: {{ index .Fields "count" }}') + .crit(lambda: TRUE) + .log('` + tmpDir + `/alert.log') +` + + r, err := s.DefineTask(name, ttype, tick, dbrps) + if err != nil { + t.Fatal(err) + } + if r != "" { + t.Fatal("unexpected result", r) + } + + points := `test value=1 0000000000 +test value=1 0000000001 +test value=1 0000000001 +test value=1 0000000002 +test value=1 0000000002 +test value=1 0000000003 +test value=1 0000000003 +test value=1 0000000004 +test value=1 0000000005 +test value=1 0000000005 +test value=1 0000000005 +test value=1 0000000006 +test value=1 0000000007 +test value=1 0000000008 +test value=1 0000000009 +test value=1 0000000010 +test value=1 0000000011 +test value=1 0000000012 +` + rid := make(chan string, 1) + started := make(chan struct{}) + go func() { + id, err := s.DoStreamRecording(name, 10*time.Second, started) + if err != nil { + t.Fatal(err) + } + rid <- id + }() + <-started + v := url.Values{} + v.Add("precision", "s") + s.MustWrite("mydb", "myrp", points, v) + id := <-rid + + _, err = s.DoReplay(name, id) + if err != nil { + t.Fatal(err) + } + + f, err := os.Open(path.Join(tmpDir, "alert.log")) + if err != nil { + t.Fatal(err) + } + defer f.Close() + type response struct { + ID string `json:"id"` + Message string `json:"message"` + Time time.Time `json:"time"` + Level string `json:"level"` + Data influxql.Result `json:"data"` + } + exp := response{ + ID: "test-count", + Message: "test-count got: 15", + Time: time.Date(1970, 1, 1, 0, 0, 10, 0, time.UTC), + Level: "CRITICAL", + Data: influxql.Result{ + Series: models.Rows{ + { + Name: "test", + Columns: []string{"time", "count"}, + Values: [][]interface{}{ + { + time.Date(1970, 1, 1, 0, 0, 10, 0, time.UTC).Format(time.RFC3339Nano), + 15.0, + }, + }, + }, + }, + }, + } + got := response{} + d := json.NewDecoder(f) + d.Decode(&got) + if !reflect.DeepEqual(exp, got) { + t.Errorf("unexpected alert log:\ngot %v\nexp %v", got, exp) + } +} + +func TestServer_RecordReplayBatch(t *testing.T) { + c := NewConfig() + c.InfluxDB.Enabled = true + value := 0 + db := NewInfluxDB(func(q string) *client.Response { + if len(q) > 6 && q[:6] == "SELECT" { + r := &client.Response{ + Results: []client.Result{{ + Series: []models.Row{{ + Name: "cpu", + Columns: []string{"time", "value"}, + Values: [][]interface{}{ + { + time.Date(1971, 1, 1, 0, 0, value, 0, time.UTC).Format(time.RFC3339Nano), + float64(value), + }, + { + time.Date(1971, 1, 1, 0, 0, value+1, 0, time.UTC).Format(time.RFC3339Nano), + float64(value + 1), + }, + }, + }}, + }}, + } + value += 2 + return r + } + return nil + }) + c.InfluxDB.URLs = []string{db.URL()} + s := OpenServer(c) + defer s.Close() + + name := "testBatchTask" + ttype := "batch" + dbrps := []kapacitor.DBRP{{ + Database: "mydb", + RetentionPolicy: "myrp", + }} + + tmpDir, err := ioutil.TempDir("", "testBatchTaskRecording") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmpDir) + tick := ` +batch + .query('SELECT value from mydb.myrp.cpu') + .period(2s) + .every(2s) + .alert() + .id('test-batch') + .message('{{ .ID }} got: {{ index .Fields "value" }}') + .crit(lambda: "value" > 2.0) + .log('` + tmpDir + `/alert.log') +` + + r, err := s.DefineTask(name, ttype, tick, dbrps) + if err != nil { + t.Fatal(err) + } + if r != "" { + t.Fatal("unexpected result", r) + } + + id, err := s.DoBatchRecording(name, time.Second*8) + if err != nil { + t.Fatal(err) + } + + _, err = s.DoReplay(name, id) + if err != nil { + t.Fatal(err) + } + + f, err := os.Open(path.Join(tmpDir, "alert.log")) + if err != nil { + t.Fatal(err) + } + defer f.Close() + type response struct { + ID string `json:"id"` + Message string `json:"message"` + Time time.Time `json:"time"` + Level string `json:"level"` + Data influxql.Result `json:"data"` + } + exp := []response{ + { + ID: "test-batch", + Message: "test-batch got: 3", + Time: time.Date(1971, 1, 1, 0, 0, 3, 0, time.UTC), + Level: "CRITICAL", + Data: influxql.Result{ + Series: models.Rows{ + { + Name: "cpu", + Columns: []string{"time", "value"}, + Values: [][]interface{}{ + { + time.Date(1971, 1, 1, 0, 0, 2, 0, time.UTC).Format(time.RFC3339Nano), + 2.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 3, 0, time.UTC).Format(time.RFC3339Nano), + 3.0, + }, + }, + }, + }, + }, + }, + { + ID: "test-batch", + Message: "test-batch got: 4", + Time: time.Date(1971, 1, 1, 0, 0, 4, 0, time.UTC), + Level: "CRITICAL", + Data: influxql.Result{ + Series: models.Rows{ + { + Name: "cpu", + Columns: []string{"time", "value"}, + Values: [][]interface{}{ + { + time.Date(1971, 1, 1, 0, 0, 4, 0, time.UTC).Format(time.RFC3339Nano), + 4.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 5, 0, time.UTC).Format(time.RFC3339Nano), + 5.0, + }, + }, + }, + }, + }, + }, + } + scanner := bufio.NewScanner(f) + got := make([]response, 0) + g := response{} + for scanner.Scan() { + json.Unmarshal(scanner.Bytes(), &g) + got = append(got, g) + } + if !reflect.DeepEqual(exp, got) { + t.Errorf("unexpected alert log:\ngot %v\nexp %v", got, exp) + t.Errorf("unexpected alert log:\ngot %v\nexp %v", got[0].Data.Series[0], exp[0].Data.Series[0]) + t.Errorf("unexpected alert log:\ngot %v\nexp %v", got[1].Data.Series[0], exp[1].Data.Series[0]) + } +} diff --git a/services/replay/service.go b/services/replay/service.go index c67fb8183..6184f6136 100644 --- a/services/replay/service.go +++ b/services/replay/service.go @@ -13,6 +13,7 @@ import ( "path" "strconv" "strings" + "sync" "time" "github.com/influxdb/influxdb/client" @@ -50,14 +51,18 @@ type Service struct { Stream(name string) (kapacitor.StreamCollector, error) } + recordingsMu sync.RWMutex + runningRecordings map[string]<-chan error + logger *log.Logger } // Create a new replay master. func NewService(conf Config, l *log.Logger) *Service { return &Service{ - saveDir: conf.Dir, - logger: l, + saveDir: conf.Dir, + logger: l, + runningRecordings: make(map[string]<-chan error), } } @@ -87,6 +92,14 @@ func (r *Service) Open() error { true, r.handleRecord, }, + { + "record", + "GET", + "/record", + true, + true, + r.handleGetRecording, + }, { "replay", "POST", @@ -199,17 +212,20 @@ func (r *Service) handleReplay(w http.ResponseWriter, req *http.Request) { replayC = replay.ReplayBatch(fs, batches, recTime) } - // Check first for error on task - err = et.Err() + // Check for error on replay + err = <-replayC if err != nil { - httpd.HttpError(w, "task run: "+err.Error(), true, http.StatusInternalServerError) + httpd.HttpError(w, "replay: "+err.Error(), true, http.StatusInternalServerError) return } - // Check for error on replay - err = <-replayC + // Drain tm so the task can finish + tm.Drain() + + // Check for error on task + err = et.Err() if err != nil { - httpd.HttpError(w, "replay: "+err.Error(), true, http.StatusInternalServerError) + httpd.HttpError(w, "task run: "+err.Error(), true, http.StatusInternalServerError) return } @@ -222,6 +238,8 @@ func (r *Service) handleReplay(w http.ResponseWriter, req *http.Request) { } func (r *Service) handleRecord(w http.ResponseWriter, req *http.Request) { + type doFunc func() error + var doF doFunc rid := uuid.NewV4() typ := req.URL.Query().Get("type") @@ -245,10 +263,8 @@ func (r *Service) handleRecord(w http.ResponseWriter, req *http.Request) { return } - err = r.doRecordStream(rid, dur, t.DBRPs) - if err != nil { - httpd.HttpError(w, err.Error(), true, http.StatusInternalServerError) - return + doF = func() error { + return r.doRecordStream(rid, dur, t.DBRPs) } case "batch": @@ -263,6 +279,8 @@ func (r *Service) handleRecord(w http.ResponseWriter, req *http.Request) { return } + now := time.Now() + switch { case startStr != "": start, err = time.Parse(time.RFC3339, startStr) @@ -276,11 +294,11 @@ func (r *Service) handleRecord(w http.ResponseWriter, req *http.Request) { httpd.HttpError(w, err.Error(), true, http.StatusBadRequest) return } - start = time.Now().Add(-1 * diff) + start = now.Add(-1 * diff) } // Get stop time, if present - var stop time.Time + stop := now stopStr := req.URL.Query().Get("stop") if stopStr != "" { stop, err = time.Parse(time.RFC3339, stopStr) @@ -303,11 +321,8 @@ func (r *Service) handleRecord(w http.ResponseWriter, req *http.Request) { return } - // Record batch data - err = r.doRecordBatch(rid, t, start, stop) - if err != nil { - httpd.HttpError(w, err.Error(), true, http.StatusInternalServerError) - return + doF = func() error { + return r.doRecordBatch(rid, t, start, stop) } case "query": query := req.URL.Query().Get("query") @@ -327,15 +342,35 @@ func (r *Service) handleRecord(w http.ResponseWriter, req *http.Request) { httpd.HttpError(w, fmt.Sprintf("invalid type %q", typeStr), true, http.StatusBadRequest) return } - err := r.doRecordQuery(rid, query, tt) - if err != nil { - httpd.HttpError(w, err.Error(), true, http.StatusInternalServerError) - return + doF = func() error { + return r.doRecordQuery(rid, query, tt) } default: httpd.HttpError(w, "invalid recording type", true, http.StatusBadRequest) return } + // Store recording in running recordings. + errC := make(chan error, 1) + func() { + r.recordingsMu.Lock() + defer r.recordingsMu.Unlock() + r.runningRecordings[rid.String()] = errC + }() + + // Spawn routine to perform actual recording. + go func() { + err := doF() + if err != nil { + // Always log an error since the user may not have requested the error. + r.logger.Printf("E! recording %s failed: %v", rid.String(), err) + } + errC <- err + // We have finished delete from running map + r.recordingsMu.Lock() + defer r.recordingsMu.Unlock() + delete(r.runningRecordings, rid.String()) + }() + // Respond with the recording ID type response struct { RecordingID string `json:"RecordingID"` @@ -343,11 +378,51 @@ func (r *Service) handleRecord(w http.ResponseWriter, req *http.Request) { w.Write(httpd.MarshalJSON(response{rid.String()}, true)) } +func (r *Service) handleGetRecording(w http.ResponseWriter, req *http.Request) { + rid := req.URL.Query().Get("id") + + // First check if its still running + var errC <-chan error + var running bool + func() { + r.recordingsMu.RLock() + defer r.recordingsMu.RUnlock() + errC, running = r.runningRecordings[rid] + }() + + if running { + // It is still running wait for it to finish + err := <-errC + if err != nil { + info := RecordingInfo{ + ID: rid, + Error: err.Error(), + } + w.Write(httpd.MarshalJSON(info, true)) + return + } + } + + // It already finished, return its info + info, err := r.GetRecordings([]string{rid}) + if err != nil { + httpd.HttpError(w, "error finding recording: "+err.Error(), true, http.StatusInternalServerError) + return + } + if len(info) != 1 { + httpd.HttpError(w, "recording not found", true, http.StatusNotFound) + return + } + + w.Write(httpd.MarshalJSON(info[0], true)) +} + type RecordingInfo struct { ID string - Type kapacitor.TaskType - Size int64 - Created time.Time + Type kapacitor.TaskType `json:",omitempty"` + Size int64 `json:",omitempty"` + Created time.Time `json:",omitempty"` + Error string `json:",omitempty"` } func (r *Service) GetRecordings(rids []string) ([]RecordingInfo, error) { @@ -523,14 +598,28 @@ func (r *Service) doRecordStream(rid uuid.UUID, dur time.Duration, dbrps []kapac } defer sw.Close() - done := false + done := make(chan struct{}) go func() { - for p, ok := e.NextPoint(); ok && !done; p, ok = e.NextPoint() { + start := time.Time{} + closed := false + for p, ok := e.NextPoint(); ok; p, ok = e.NextPoint() { + if closed { + continue + } + if start.IsZero() { + start = p.Time + } + if p.Time.Sub(start) > dur { + closed = true + close(done) + //continue to read any data already on the edge, but just drop it. + continue + } kapacitor.WritePointForRecording(sw, p, precision) } }() - time.Sleep(dur) - done = true + <-done + e.Abort() r.TaskMaster.DelFork(rid.String()) return nil } diff --git a/task_master.go b/task_master.go index 126028a92..99f411a87 100644 --- a/task_master.go +++ b/task_master.go @@ -274,8 +274,6 @@ func (tm *TaskMaster) forkPoint(p models.Point) { } func (tm *TaskMaster) WritePoints(pts *cluster.WritePointsRequest) error { - tm.mu.RLock() - defer tm.mu.RUnlock() if tm.closed { return ErrTaskMasterClosed }