Skip to content

Commit

Permalink
Merge pull request influxdata#106 from influxdb/nc-replay-hang
Browse files Browse the repository at this point in the history
Add tm.Drain to task complete in replay
  • Loading branch information
Nathaniel Cook committed Dec 21, 2015
2 parents 24aadcf + e944f0d commit b376ede
Show file tree
Hide file tree
Showing 6 changed files with 496 additions and 34 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,18 @@

### Release Notes

Bugfix #106 made a breaking change to the internal HTTP API. This was to facilitate integration testing and overall better design.
Now POSTing a recording request will start the recording and immediately return. If you want to wait till it is complete do
a GET for the recording info and it will block until its complete. The kapacitor cli has been updated accordingly.

### Features
- [#96](https://github.com/influxdb/kapacitor/issues/96): Use KAPACITOR_URL env var for setting the kapacitord url in the client.
- [#109](https://github.com/influxdb/kapacitor/pull/109): Add throughput counts to DOT format in `kapacitor show` command, if task is executing.

### Bugfixes
- [#102](https://github.com/influxdb/kapacitor/issues/102): Fix race when start/stoping timeTicker in batch.go
- [#106](https://github.com/influxdb/kapacitor/pull/106): Fix hang when replaying stream recording.


## v0.2.2 [2015-12-16]

Expand Down
18 changes: 17 additions & 1 deletion cmd/kapacitor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,23 @@ func doRecord(args []string) error {
if rp.Error != "" {
return errors.New(rp.Error)
}
fmt.Println(rp.RecordingID)

v = url.Values{}
v.Add("id", rp.RecordingID)
r, err = http.Get(kapacitorEndpoint + "/record?" + v.Encode())
if err != nil {
return err
}
defer r.Body.Close()

d = json.NewDecoder(r.Body)
ri := replay.RecordingInfo{}
d.Decode(&ri)
if ri.Error != "" {
return errors.New(ri.Error)
}

fmt.Println(ri.ID)
return nil
}

Expand Down
81 changes: 80 additions & 1 deletion cmd/kapacitord/run/server_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,10 +220,11 @@ func (s *Server) GetTask(name string) (ti task_store.TaskInfo, err error) {
if err != nil {
return
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
err = fmt.Errorf("unexpected status code got %d exp %d", resp.StatusCode, http.StatusOK)
return
}
defer resp.Body.Close()
d := json.NewDecoder(resp.Body)
d.Decode(&ti)
return
Expand All @@ -247,6 +248,84 @@ func (s *Server) MustWrite(db, rp, body string, params url.Values) string {
return results
}

func (s *Server) DoStreamRecording(name string, duration time.Duration, started chan struct{}) (id string, err error) {
v := url.Values{}
v.Add("type", "stream")
v.Add("name", name)
v.Add("duration", duration.String())
r, err := http.Post(s.URL()+"/api/v1/record?"+v.Encode(), "", nil)
if err != nil {
return
}
defer r.Body.Close()
if r.StatusCode != http.StatusOK {
err = fmt.Errorf("unexpected status code got %d exp %d", r.StatusCode, http.StatusOK)
return
}

// Decode valid response
type resp struct {
RecordingID string `json:"RecordingID"`
Error string `json:"Error"`
}
rp := resp{}
d := json.NewDecoder(r.Body)
d.Decode(&rp)
if rp.Error != "" {
err = errors.New(rp.Error)
return
}
id = rp.RecordingID
close(started)
v = url.Values{}
v.Add("id", id)
_, err = s.HTTPGet(s.URL() + "/api/v1/record?" + v.Encode())
return
}

func (s *Server) DoBatchRecording(name string, past time.Duration) (id string, err error) {
v := url.Values{}
v.Add("type", "batch")
v.Add("name", name)
v.Add("past", past.String())
r, err := http.Post(s.URL()+"/api/v1/record?"+v.Encode(), "", nil)
if err != nil {
return
}
defer r.Body.Close()
if r.StatusCode != http.StatusOK {
err = fmt.Errorf("unexpected status code got %d exp %d", r.StatusCode, http.StatusOK)
return
}

// Decode valid response
type resp struct {
RecordingID string `json:"RecordingID"`
Error string `json:"Error"`
}
rp := resp{}
d := json.NewDecoder(r.Body)
d.Decode(&rp)
if rp.Error != "" {
err = errors.New(rp.Error)
return
}
id = rp.RecordingID
v = url.Values{}
v.Add("id", id)
_, err = s.HTTPGet(s.URL() + "/api/v1/record?" + v.Encode())
return
}

func (s *Server) DoReplay(name, id string) (string, error) {
v := url.Values{}
v.Add("name", name)
v.Add("id", id)
v.Add("clock", "fast")
v.Add("rec-time", "true")
return s.HTTPPost(s.URL()+"/api/v1/replay?"+v.Encode(), nil)
}

// NewConfig returns the default config with temporary paths.
func NewConfig() *run.Config {
c := run.NewConfig()
Expand Down

0 comments on commit b376ede

Please sign in to comment.