Skip to content

Commit

Permalink
feat(scan): quick cancel query with proactive manager (#506)
Browse files Browse the repository at this point in the history
Signed-off-by: Jack Liu <1758975670@qq.com>
  • Loading branch information
MrSanZhi committed Feb 29, 2024
1 parent 12964c4 commit eb39829
Show file tree
Hide file tree
Showing 15 changed files with 157 additions and 19 deletions.
2 changes: 1 addition & 1 deletion app/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (pm *ProactiveManager) KillQuery(id uint64) {
}
isExist = true
manager.Kill(id)
manager.Abort(id)
manager.Crash(id)
abortSuccess = true
}
query.VisitManagers(killQueryByIDFn)
Expand Down
2 changes: 2 additions & 0 deletions app/ts-store/transport/handler/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ type mockQuery struct {

func (m *mockQuery) Abort() {}

func (m *mockQuery) Crash() {}

func (m *mockQuery) GetQueryExeInfo() *netstorage.QueryExeInfo {
return m.info
}
Expand Down
30 changes: 29 additions & 1 deletion app/ts-store/transport/handler/select.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type Select struct {

aborted bool
abortHook func()
crashHook func()

trace *tracing.Trace
buildPlanSpan *tracing.Span
Expand Down Expand Up @@ -105,6 +106,28 @@ func (s *Select) isAborted() bool {
return s.aborted
}

func (s *Select) Crash() {
s.mu.Lock()
defer s.mu.Unlock()

s.aborted = true
if s.crashHook != nil {
s.crashHook()
s.crashHook = nil
}
}

func (s *Select) SetCrashHook(hook func()) bool {
s.mu.Lock()
defer s.mu.Unlock()

if s.aborted {
return false
}
s.crashHook = hook
return true
}

func (s *Select) Process() error {
if s.isAborted() {
s.logger().Info("[Select.Process] aborted")
Expand Down Expand Up @@ -233,9 +256,14 @@ func (s *Select) execute(ctx context.Context, p hybridqp.Executor) error {
// query aborted
return nil
}
if !s.SetCrashHook(pe.Crash) {
// query crashed
return nil
}
ctx = context.WithValue(ctx, query2.IndexScanDagStartTimeKey, time.Now())
err := pe.ExecuteExecutor(ctx)
if err == nil || pe.Aborted() {
// ignore the PipelineExecutor error caused by abort or kill query.
if err == nil || pe.Aborted() || s.isAborted() {
return nil
}
return err
Expand Down
6 changes: 6 additions & 0 deletions app/ts-store/transport/handler/select_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,12 @@ func TestCreateSerfInstance(t *testing.T) {
h.Abort()
require.NoError(t, h.Process())

h = NewSelect(store, resp, req)
h.SetCrashHook(func() {})
h.Crash()
h.SetCrashHook(func() {})
require.NoError(t, h.Process())

h = NewSelect(store, resp, req)
require.NotEmpty(t, h.execute(context.Background(), nil))
h.Abort()
Expand Down
12 changes: 12 additions & 0 deletions app/ts-store/transport/query/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const (

type IQuery interface {
Abort()
Crash()
GetQueryExeInfo() *netstorage.QueryExeInfo
}

Expand Down Expand Up @@ -141,6 +142,17 @@ func (qm *Manager) Abort(qid uint64) {
}
}

func (qm *Manager) Crash(qid uint64) {
qm.abortedMu.Lock()
qm.aborted[qid] = time.Now()
qm.abortedMu.Unlock()

h := qm.Get(qid)
if h != nil {
h.Crash()
}
}

func (qm *Manager) Finish(qid uint64) {
qm.mu.Lock()
defer qm.mu.Unlock()
Expand Down
2 changes: 2 additions & 0 deletions app/ts-store/transport/query/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ func (m *mockQuery) Abort() {

}

func (m *mockQuery) Crash() {}

func (m *mockQuery) GetQueryExeInfo() *netstorage.QueryExeInfo {
return m.info
}
Expand Down
23 changes: 13 additions & 10 deletions engine/column_store_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,14 @@ type ColumnStoreReader struct {
frags executor.ShardsFragments
plan hybridqp.QueryNode

limit int
ops []hybridqp.ExprOptions
rowBitmap []bool
dimVals []string
outInIdxMap map[int]int
inOutIdxMap map[int]int
closedCh chan struct{}
closedCount int64
limit int
ops []hybridqp.ExprOptions
rowBitmap []bool
dimVals []string
outInIdxMap map[int]int
inOutIdxMap map[int]int
closedCh chan struct{}
closedSignal *int32
}

func NewColumnStoreReader(plan hybridqp.QueryNode, frags executor.ShardsFragments) *ColumnStoreReader {
Expand All @@ -94,6 +94,8 @@ func NewColumnStoreReader(plan hybridqp.QueryNode, frags executor.ShardsFragment
if len(r.schema.GetSortFields()) == 0 && !r.schema.HasCall() {
r.limit = plan.Schema().Options().GetLimit() + plan.Schema().Options().GetOffset()
}
closedSignal := int32(0)
r.closedSignal = &closedSignal
return r
}

Expand Down Expand Up @@ -153,7 +155,7 @@ func (r *ColumnStoreReader) initSpan() {

func (r *ColumnStoreReader) Close() {
r.Once(func() {
atomic.AddInt64(&r.closedCount, 1)
atomic.AddInt32(r.closedSignal, 1)
r.output.Close()
})
}
Expand Down Expand Up @@ -226,6 +228,7 @@ func (r *ColumnStoreReader) initReadCursor() (err error) {
if err != nil {
return
}
readCtx.SetClosedSignal(r.closedSignal)
if !r.schema.Options().IsTimeSorted() {
tr = util.TimeRange{Min: influxql.MinTime, Max: influxql.MaxTime}
}
Expand Down Expand Up @@ -549,7 +552,7 @@ func (r *ColumnStoreReader) sendChunk(chunk executor.Chunk) {
r.closedCh <- struct{}{}
}
}()
if atomic.LoadInt64(&r.closedCount) == 0 {
if atomic.LoadInt32(r.closedSignal) == 0 {
statistics.ExecutorStat.SourceRows.Push(int64(chunk.NumberOfRows()))
r.output.State <- chunk
} else {
Expand Down
2 changes: 1 addition & 1 deletion engine/column_store_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func TestColumnStoreReaderFunctions(t *testing.T) {
assert2.Equal(t, reader.GetOutputNumber(nil), 0)
assert2.Equal(t, reader.GetInputNumber(nil), 0)
reader.sendChunk(nil)
assert2.Equal(t, reader.closedCount, int64(1))
assert2.Equal(t, *(reader.closedSignal), int32(1))
}

type MockStoreEngine struct {
Expand Down
16 changes: 14 additions & 2 deletions engine/executor/hash_agg_transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"errors"
"fmt"
"runtime/debug"
"sync/atomic"

"github.com/openGemini/openGemini/engine/hybridqp"
"github.com/openGemini/openGemini/lib/errno"
Expand Down Expand Up @@ -263,6 +264,7 @@ type HashAggTransform struct {
timeFuncState TimeFuncState
firstOrLastFuncLoc int
haveTopBottomOp bool
closedSignal int32
}

type TimeFuncState uint32
Expand Down Expand Up @@ -409,8 +411,11 @@ func (trans *HashAggTransform) Explain() []ValuePair {
}

func (trans *HashAggTransform) Close() {
trans.output.Close()
trans.outputChunkPool.Release()
trans.Once(func() {
atomic.AddInt32(&trans.closedSignal, 1)
trans.output.Close()
trans.outputChunkPool.Release()
})
}

func (trans *HashAggTransform) receiveChunk(c Chunk) {
Expand Down Expand Up @@ -439,6 +444,7 @@ func (trans *HashAggTransform) runnable(ctx context.Context, errs *errno.Errs, i
trans.receiveChunk(c)
tracing.EndPP(trans.span)
case <-ctx.Done():
atomic.AddInt32(&trans.closedSignal, 1)
trans.receiveChunk(nil)
return
}
Expand Down Expand Up @@ -513,6 +519,12 @@ func (trans *HashAggTransform) getChunkFromChild() bool {

func (trans *HashAggTransform) getChunk() hashAggGetChunkState {
var ret bool
// The interrupt signal is received. No result is returned.
if atomic.LoadInt32(&trans.closedSignal) > 0 {
if !trans.initDiskAsInput() {
return noChunk
}
}
if trans.isChildDrained {
ret = trans.getChunkFromDisk()
} else {
Expand Down
14 changes: 13 additions & 1 deletion engine/executor/hash_merge_transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package executor
import (
"context"
"fmt"
"sync/atomic"

"github.com/openGemini/openGemini/engine/hybridqp"
"github.com/openGemini/openGemini/lib/errno"
Expand Down Expand Up @@ -66,6 +67,7 @@ type HashMergeTransform struct {
isSpill bool
isChildDrained bool
hashMergeType HashMergeType
closedSignal int32
}

type HashMergeTransformCreator struct {
Expand Down Expand Up @@ -163,7 +165,10 @@ func (trans *HashMergeTransform) Explain() []ValuePair {
}

func (trans *HashMergeTransform) Close() {
trans.output.Close()
trans.Once(func() {
atomic.AddInt32(&trans.closedSignal, 1)
trans.output.Close()
})
}

func (trans *HashMergeTransform) addChunk(c Chunk) {
Expand All @@ -190,6 +195,7 @@ func (trans *HashMergeTransform) runnable(ctx context.Context, errs *errno.Errs,
}
trans.addChunk(c)
case <-ctx.Done():
atomic.AddInt32(&trans.closedSignal, 1)
trans.addChunk(nil)
return
}
Expand Down Expand Up @@ -272,6 +278,12 @@ func (trans *HashMergeTransform) streamMergeHelper(ctx context.Context, errs *er

func (trans *HashMergeTransform) getChunk() hashAggGetChunkState {
var ret bool
// The interrupt signal is received. No result is returned.
if atomic.LoadInt32(&trans.closedSignal) > 0 {
if !trans.initDiskAsInput() {
return noChunk
}
}
if trans.isChildDrained {
ret = trans.getChunkFromDisk()
} else {
Expand Down
4 changes: 4 additions & 0 deletions engine/executor/rpc_message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,10 @@ func (c *RPCServer) Abort() {
fmt.Println("aborted")
}

func (c *RPCServer) Crash() {
fmt.Println("crashed")
}

type RPCAbort struct {
}

Expand Down
24 changes: 23 additions & 1 deletion engine/executor/sort_transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"sort"
"sync/atomic"

"github.com/openGemini/openGemini/engine/hybridqp"
"github.com/openGemini/openGemini/lib/errno"
Expand Down Expand Up @@ -92,6 +93,7 @@ type SortTransform struct {
ascending []bool
dimension []string
outputChunkPool *CircularChunkPool
closedSignal int32

schema *QuerySchema
opt *query.ProcessorOptions
Expand Down Expand Up @@ -259,7 +261,10 @@ func (trans *SortTransform) Explain() []ValuePair {
}

func (trans *SortTransform) Close() {
trans.output.Close()
trans.Once(func() {
atomic.AddInt32(&trans.closedSignal, 1)
trans.output.Close()
})
}

func (trans *SortTransform) addChunk(c Chunk) bool {
Expand Down Expand Up @@ -293,6 +298,8 @@ func (trans *SortTransform) runnable(ctx context.Context, errs *errno.Errs, i in
return
}
case <-ctx.Done():
atomic.AddInt32(&trans.closedSignal, 1)
trans.addChunk(nil)
return
}
}
Expand Down Expand Up @@ -352,6 +359,13 @@ func (trans *SortTransform) singleSortWorkerHelper(ctx context.Context, errs *er
}
}()
for {
// The interrupt signal is received. No result is returned.
if atomic.LoadInt32(&trans.closedSignal) > 0 {
if err := trans.Release(); err != nil {
return
}
break
}
// 1.getChunk from childs
if !trans.getChunkFromChild(i) {
// 3. sortLastPartition
Expand Down Expand Up @@ -472,3 +486,11 @@ func (trans *SortTransform) GetOutputNumber(_ Port) int {
func (trans *SortTransform) GetInputNumber(_ Port) int {
return 0
}

func (trans *SortTransform) Release() error {
trans.sortWorkerBufChunk = trans.sortWorkerBufChunk[:0]
trans.sortWorkerPartitionIdx = trans.sortWorkerPartitionIdx[:0]
trans.sortWorkerResult = trans.sortWorkerResult[:0]
trans.sortKeysIdxs = trans.sortKeysIdxs[:0]
return nil
}
3 changes: 3 additions & 0 deletions engine/immutable/location.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,9 @@ func (l *Location) readData(filterOpts *FilterOptions, dst, filterRec *record.Re
}

for rec == nil && l.hasNext() {
if l.ctx.isAborted() {
return nil, oriRowCount, nil
}
if (!l.ctx.tr.Overlaps(l.getCurSegMinMax())) ||
(!l.overlapsForRowFilter(filterOpts.rowFilters)) {
l.nextSegment(false)
Expand Down
18 changes: 18 additions & 0 deletions engine/immutable/location_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package immutable

import (
"sync/atomic"
"testing"

"github.com/openGemini/openGemini/lib/fragment"
Expand Down Expand Up @@ -166,3 +167,20 @@ func TestNextSegmentLast(t *testing.T) {
)
})
}

func TestReadCtx(t *testing.T) {
readCtx := NewReadContext(true)
assert.False(t, readCtx.isAborted())
s := int32(0)
closedSignal := &s
readCtx.SetClosedSignal(closedSignal)
atomic.AddInt32(closedSignal, 1)
assert.True(t, readCtx.isAborted())

loc := NewLocation(nil, readCtx)
loc.ctx.tr.Min, loc.ctx.tr.Max = 0, 1
loc.SetFragmentRanges([]*fragment.FragmentRange{{Start: 0, End: 1}})
loc.SetChunkMeta(&ChunkMeta{timeRange: []SegmentRange{[2]int64{0, 1}}})
_, _, err := loc.readData(nil, nil, nil, nil, nil)
assert.Equal(t, err, nil)
}
Loading

0 comments on commit eb39829

Please sign in to comment.