Skip to content

Commit

Permalink
Report the task status for a query
Browse files Browse the repository at this point in the history
This more accurately shows whether or not a query has been killed.
Instead of automatically removing it from the query table when it's
killed even though goroutines and iterators may still be open, it now
marks the process as killed. This should allow us to more accurately
determine if a query has been stalled and is still using resources on
the server.

This is related to #8848, but not directly connected.
  • Loading branch information
jsternberg committed Sep 19, 2017
1 parent 2ec4f55 commit 107feeb
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
- [#8791](https://github.com/influxdata/influxdb/pull/8791): Include the number of scanned cached values in the iterator cost.
- [#8784](https://github.com/influxdata/influxdb/pull/8784): Add support for the Prometheus remote read and write APIs.
- [#8851](https://github.com/influxdata/influxdb/pull/8851): Improve performance of `Include` and `Exclude` functions
- [#8854](https://github.com/influxdata/influxdb/pull/8854): Report the task status for a query.

### Bugfixes

Expand Down
3 changes: 2 additions & 1 deletion query/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ func (e *QueryExecutor) executeQuery(query *influxql.Query, opt ExecutionOptions
}
return
}
defer e.TaskManager.KillQuery(qid)
defer e.TaskManager.DetachQuery(qid)

// Setup the execution context that will be used when executing statements.
ctx := ExecutionContext{
Expand Down Expand Up @@ -441,6 +441,7 @@ type QueryMonitorFunc func(<-chan struct{}) error
type QueryTask struct {
query string
database string
status TaskStatus
startTime time.Time
closing chan struct{}
monitorCh chan error
Expand Down
68 changes: 67 additions & 1 deletion query/query_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,70 @@ func TestQueryExecutor_KillQuery(t *testing.T) {
}
}

func TestQueryExecutor_KillQuery_Zombie(t *testing.T) {
q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`)
if err != nil {
t.Fatal(err)
}

qid := make(chan uint64)
done := make(chan struct{})

e := NewQueryExecutor()
e.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx query.ExecutionContext) error {
switch stmt.(type) {
case *influxql.KillQueryStatement, *influxql.ShowQueriesStatement:
return e.TaskManager.ExecuteStatement(stmt, ctx)
}

qid <- ctx.QueryID
select {
case <-ctx.InterruptCh:
select {
case <-done:
// Keep the query running until we run SHOW QUERIES.
case <-time.After(100 * time.Millisecond):
// Ensure that we don't have a lingering goroutine.
}
return query.ErrQueryInterrupted
case <-time.After(100 * time.Millisecond):
t.Error("killing the query did not close the channel after 100 milliseconds")
return errUnexpected
}
},
}

results := e.ExecuteQuery(q, query.ExecutionOptions{}, nil)
q, err = influxql.ParseQuery(fmt.Sprintf("KILL QUERY %d", <-qid))
if err != nil {
t.Fatal(err)
}
discardOutput(e.ExecuteQuery(q, query.ExecutionOptions{}, nil))

// Display the queries and ensure that the original is still in there.
q, err = influxql.ParseQuery("SHOW QUERIES")
if err != nil {
t.Fatal(err)
}
tasks := e.ExecuteQuery(q, query.ExecutionOptions{}, nil)

// The killed query should still be there.
task := <-tasks
if len(task.Series) != 1 {
t.Errorf("expected %d series, got %d", 1, len(task.Series))
} else if len(task.Series[0].Values) != 2 {
t.Errorf("expected %d rows, got %d", 2, len(task.Series[0].Values))
}
close(done)

// The original query should return.
result := <-results
if result.Err != query.ErrQueryInterrupted {
t.Errorf("unexpected error: %s", result.Err)
}
}

func TestQueryExecutor_Interrupt(t *testing.T) {
q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`)
if err != nil {
Expand Down Expand Up @@ -165,7 +229,9 @@ func TestQueryExecutor_ShowQueries(t *testing.T) {
results := e.ExecuteQuery(q, query.ExecutionOptions{}, nil)
result := <-results
if len(result.Series) != 1 {
t.Errorf("expected %d rows, got %d", 1, len(result.Series))
t.Errorf("expected %d series, got %d", 1, len(result.Series))
} else if len(result.Series[0].Values) != 1 {
t.Errorf("expected %d row, got %d", 1, len(result.Series[0].Values))
}
if result.Err != nil {
t.Errorf("unexpected error: %s", result.Err)
Expand Down
49 changes: 45 additions & 4 deletions query/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,27 @@ const (
DefaultQueryTimeout = time.Duration(0)
)

type TaskStatus int

const (
// RunningTask is set when the task is running.
RunningTask TaskStatus = iota

// KilledTask is set when the task is killed, but resources are still
// being used.
KilledTask
)

func (t TaskStatus) String() string {
switch t {
case RunningTask:
return "running"
case KilledTask:
return "killed"
}
panic(fmt.Sprintf("unknown task status: %d", int(t)))
}

// TaskManager takes care of all aspects related to managing running queries.
type TaskManager struct {
// Query execution timeout.
Expand Down Expand Up @@ -104,11 +125,11 @@ func (t *TaskManager) executeShowQueriesStatement(q *influxql.ShowQueriesStateme
d = d - (d % time.Microsecond)
}

values = append(values, []interface{}{id, qi.query, qi.database, d.String()})
values = append(values, []interface{}{id, qi.query, qi.database, d.String(), qi.status.String()})
}

return []*models.Row{{
Columns: []string{"qid", "query", "database", "duration"},
Columns: []string{"qid", "query", "database", "duration", "status"},
Values: values,
}}, nil
}
Expand Down Expand Up @@ -145,6 +166,7 @@ func (t *TaskManager) AttachQuery(q *influxql.Query, database string, interrupt
query := &QueryTask{
query: q.String(),
database: database,
status: RunningTask,
startTime: time.Now(),
closing: make(chan struct{}),
monitorCh: make(chan error),
Expand All @@ -170,8 +192,9 @@ func (t *TaskManager) AttachQuery(q *influxql.Query, database string, interrupt
return qid, query, nil
}

// KillQuery stops and removes a query from the TaskManager.
// This method can be used to forcefully terminate a running query.
// KillQuery enters a query into the killed state and closes the channel
// from the TaskManager. This method can be used to forcefully terminate a
// running query.
func (t *TaskManager) KillQuery(qid uint64) error {
t.mu.Lock()
defer t.mu.Unlock()
Expand All @@ -182,6 +205,24 @@ func (t *TaskManager) KillQuery(qid uint64) error {
}

close(query.closing)
query.status = KilledTask
return nil
}

// DetachQuery removes a query from the query table. If the query is not in the
// killed state, this will also close the related channel.
func (t *TaskManager) DetachQuery(qid uint64) error {
t.mu.Lock()
defer t.mu.Unlock()

query := t.queries[qid]
if query == nil {
return fmt.Errorf("no such query id: %d", qid)
}

if query.status != KilledTask {
close(query.closing)
}
delete(t.queries, qid)
return nil
}
Expand Down

0 comments on commit 107feeb

Please sign in to comment.