diff --git a/pkg/container/vector/vector.go b/pkg/container/vector/vector.go index e07bf281fa6b1..527fb2c167c7e 100644 --- a/pkg/container/vector/vector.go +++ b/pkg/container/vector/vector.go @@ -395,17 +395,23 @@ func (v *Vector) Dup(proc *process.Process) (*Vector, error) { Ref: v.Ref, }, nil case types.T_char, types.T_varchar, types.T_json: + var err error + var data []byte + vs := v.Col.(*types.Bytes) - data, err := proc.Alloc(int64(len(vs.Data))) - if err != nil { - return nil, err - } ws := &types.Bytes{ - Data: data, Offsets: make([]uint32, len(vs.Offsets)), Lengths: make([]uint32, len(vs.Lengths)), } - copy(ws.Data, vs.Data) + if len(vs.Data) > 0 { + if data, err = proc.Alloc(int64(len(vs.Data))); err != nil { + return nil, err + } + ws.Data = data + copy(ws.Data, vs.Data) + } else { + ws.Data = make([]byte, 0) + } copy(ws.Offsets, vs.Offsets) copy(ws.Lengths, vs.Lengths) return &Vector{ diff --git a/pkg/sql/colexec/merge/merge.go b/pkg/sql/colexec/merge/merge.go index 6b4ac2c1c9e7a..5019d0b9814c2 100644 --- a/pkg/sql/colexec/merge/merge.go +++ b/pkg/sql/colexec/merge/merge.go @@ -34,6 +34,9 @@ func Call(proc *process.Process, _ interface{}) (bool, error) { } for i := 0; i < len(proc.Reg.MergeReceivers); i++ { reg := proc.Reg.MergeReceivers[i] + if reg.Ch == nil { + continue + } v := <-reg.Ch if v == nil { reg.Ch = nil diff --git a/pkg/sql/colexec/transfer/transfer.go b/pkg/sql/colexec/transfer/transfer.go index 3a7ebe0765ee2..bc3cf7d6f1483 100644 --- a/pkg/sql/colexec/transfer/transfer.go +++ b/pkg/sql/colexec/transfer/transfer.go @@ -69,7 +69,7 @@ func Call(proc *process.Process, arg interface{}) (bool, error) { vecs := n.vecs[:0] for i := range bat.Vecs { if bat.Vecs[i].Or { - vec, err := bat.Vecs[i].Dup(n.Proc) + vec, err := bat.Vecs[i].Dup(proc) if err != nil { clean(vecs, n.Proc) return false, err diff --git a/pkg/sql/compile/compile.go b/pkg/sql/compile/compile.go index c957510dc707d..a78000cef6d22 100644 --- a/pkg/sql/compile/compile.go +++ b/pkg/sql/compile/compile.go @@ -71,8 +71,9 @@ func (c *compile) Build() ([]*Exec, error) { es := make([]*Exec, len(stmts)) for i, stmt := range stmts { es[i] = &Exec{ - c: c, - stmt: stmt, + c: c, + stmt: stmt, + affectRows: 0, } } return es, nil @@ -147,6 +148,18 @@ func (e *Exec) Columns() []*Col { return e.resultCols } +func (e *Exec) IncreaseAffectedRows(n uint64) { + e.affectRows += n +} + +func (e *Exec) SetAffectedRows(n uint64) { + e.affectRows = n +} + +func (e *Exec) GetAffectedRows() uint64 { + return e.affectRows +} + // Run applies the scopes to the specified data object // and run through the instruction in each of the scope. func (e *Exec) Run(ts uint64) error { @@ -179,8 +192,10 @@ func (e *Exec) Run(ts uint64) error { case Insert: wg.Add(1) go func(s *Scope) { - if err := s.Insert(ts); err != nil { + if rows, err := s.Insert(ts); err != nil { e.err = err + } else { + e.SetAffectedRows(rows) } wg.Done() }(e.scopes[i]) @@ -242,7 +257,6 @@ func (e *Exec) Run(ts uint64) error { }(e.scopes[i]) } } - wg.Wait() return e.err } diff --git a/pkg/sql/compile/scope.go b/pkg/sql/compile/scope.go index 7489eb8085fc2..40c48214ee397 100644 --- a/pkg/sql/compile/scope.go +++ b/pkg/sql/compile/scope.go @@ -128,9 +128,11 @@ func (s *Scope) RemoteRun(e engine.Engine) error { arg := s.Instructions[len(s.Instructions)-1].Arg.(*transfer.Argument) defer func() { - arg.Reg.Wg.Add(1) - arg.Reg.Ch <- nil - arg.Reg.Wg.Wait() + if arg.Reg.Ch != nil { + arg.Reg.Wg.Add(1) + arg.Reg.Ch <- nil + arg.Reg.Wg.Wait() + } }() encoder, decoder := rpcserver.NewCodec(1 << 30) conn := goetty.NewIOSession(goetty.WithCodec(encoder, decoder)) @@ -161,6 +163,12 @@ func (s *Scope) RemoteRun(e engine.Engine) error { if err != nil { return err } + if arg.Reg.Ch == nil { + if bat != nil { + bat.Clean(s.Proc) + } + continue + } arg.Reg.Wg.Add(1) arg.Reg.Ch <- bat arg.Reg.Wg.Wait() @@ -168,10 +176,10 @@ func (s *Scope) RemoteRun(e engine.Engine) error { return nil } -func (s *Scope) Insert(ts uint64) error { +func (s *Scope) Insert(ts uint64) (uint64, error) { o, _ := s.Operator.(*insert.Insert) defer o.R.Close() - return o.R.Write(ts, o.Bat) + return uint64(o.Bat.Vecs[0].Length()), o.R.Write(ts, o.Bat) } func (s *Scope) Explain(u interface{}, fill func(interface{}, *batch.Batch) error) error { diff --git a/pkg/sql/compile/top.go b/pkg/sql/compile/top.go index 16891cba8b4ed..b4b3ae90c6ac6 100644 --- a/pkg/sql/compile/top.go +++ b/pkg/sql/compile/top.go @@ -32,7 +32,7 @@ func (c *compile) compileTopOutput(o *top.Top, mp map[string]uint64) ([]*Scope, mp[g.Name]++ } } - ss, err := c.compile(o.Prev.(*projection.Projection), mp) + ss, err := c.compileOutput(o.Prev.(*projection.Projection), mp) if err != nil { return nil, err } diff --git a/pkg/sql/compile/types.go b/pkg/sql/compile/types.go index 93cae2d5f5142..646dc711d3c00 100644 --- a/pkg/sql/compile/types.go +++ b/pkg/sql/compile/types.go @@ -85,6 +85,8 @@ type Exec struct { resultCols []*Col scopes []*Scope c *compile + //affectRows stores the number of rows affected while insert / update / delete + affectRows uint64 //e a dbengine instance e engine.Engine //stmt ast of a single sql diff --git a/pkg/sql/protocol/protocol.go b/pkg/sql/protocol/protocol.go index bf3a777030f02..928d6b7b170d6 100644 --- a/pkg/sql/protocol/protocol.go +++ b/pkg/sql/protocol/protocol.go @@ -1126,8 +1126,10 @@ func EncodeVector(v *vector.Vector, buf *bytes.Buffer) error { size += uint64(v) } buf.Write(encoding.EncodeUint64(size)) - for i, j := int64(0), int64(cnt); i < j; i++ { - buf.Write(vs.Get(i)) + if size > 0 { + for i, j := int64(0), int64(cnt); i < j; i++ { + buf.Write(vs.Get(i)) + } } } case types.T_tuple: diff --git a/pkg/vm/pipeline/pipeline.go b/pkg/vm/pipeline/pipeline.go index d51369b365ffa..a85a0ffd9b286 100644 --- a/pkg/vm/pipeline/pipeline.go +++ b/pkg/vm/pipeline/pipeline.go @@ -16,6 +16,8 @@ package pipeline import ( "bytes" + "fmt" + "matrixone/pkg/container/batch" "matrixone/pkg/vm" "matrixone/pkg/vm/engine" "matrixone/pkg/vm/mempool" @@ -86,6 +88,9 @@ func (p *Pipeline) Run(segs []engine.Segment, proc *process.Process) (bool, erro } func (p *Pipeline) RunMerge(proc *process.Process) (bool, error) { + var end bool + var err error + proc.Mp = mempool.New() defer func() { proc.Reg.InputBatch = nil @@ -96,11 +101,13 @@ func (p *Pipeline) RunMerge(proc *process.Process) (bool, error) { vm.Clean(p.instructions, proc) return false, err } - i := 0 for { - i++ proc.Reg.InputBatch = nil - if end, err := vm.Run(p.instructions, proc); err != nil || end { + if end, err = vm.Run(p.instructions, proc); err != nil || end { + { + fmt.Printf("+++%p begin clean\n", p) + } + p.clean(proc) return end, err } } @@ -121,6 +128,30 @@ func (p *Pipeline) prefetch(segs []engine.Segment, proc *process.Process) *queue return q } +func (p *Pipeline) clean(proc *process.Process) { + for _, reg := range proc.Reg.MergeReceivers { + if reg.Ch != nil { + v := <-reg.Ch + switch { + case v == nil: + reg.Ch = nil + reg.Wg.Done() + default: + bat := v.(*batch.Batch) + if bat == nil || bat.Attrs == nil { + reg.Ch = nil + reg.Wg.Done() + } else { + bat.Clean(proc) + reg.Ch = nil + reg.Wg.Done() + } + } + } + } + +} + // prefetch func (q *queue) prefetch(attrs []string) error { if q.prefetchIndex == len(q.blocks) {