Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions pkg/container/vector/vector.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,17 +395,23 @@ func (v *Vector) Dup(proc *process.Process) (*Vector, error) {
Ref: v.Ref,
}, nil
case types.T_char, types.T_varchar, types.T_json:
var err error
var data []byte

vs := v.Col.(*types.Bytes)
data, err := proc.Alloc(int64(len(vs.Data)))
if err != nil {
return nil, err
}
ws := &types.Bytes{
Data: data,
Offsets: make([]uint32, len(vs.Offsets)),
Lengths: make([]uint32, len(vs.Lengths)),
}
copy(ws.Data, vs.Data)
if len(vs.Data) > 0 {
if data, err = proc.Alloc(int64(len(vs.Data))); err != nil {
return nil, err
}
ws.Data = data
copy(ws.Data, vs.Data)
} else {
ws.Data = make([]byte, 0)
}
copy(ws.Offsets, vs.Offsets)
copy(ws.Lengths, vs.Lengths)
return &Vector{
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/colexec/merge/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ func Call(proc *process.Process, _ interface{}) (bool, error) {
}
for i := 0; i < len(proc.Reg.MergeReceivers); i++ {
reg := proc.Reg.MergeReceivers[i]
if reg.Ch == nil {
continue
}
v := <-reg.Ch
if v == nil {
reg.Ch = nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/transfer/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func Call(proc *process.Process, arg interface{}) (bool, error) {
vecs := n.vecs[:0]
for i := range bat.Vecs {
if bat.Vecs[i].Or {
vec, err := bat.Vecs[i].Dup(n.Proc)
vec, err := bat.Vecs[i].Dup(proc)
if err != nil {
clean(vecs, n.Proc)
return false, err
Expand Down
22 changes: 18 additions & 4 deletions pkg/sql/compile/compile.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@ func (c *compile) Build() ([]*Exec, error) {
es := make([]*Exec, len(stmts))
for i, stmt := range stmts {
es[i] = &Exec{
c: c,
stmt: stmt,
c: c,
stmt: stmt,
affectRows: 0,
}
}
return es, nil
Expand Down Expand Up @@ -147,6 +148,18 @@ func (e *Exec) Columns() []*Col {
return e.resultCols
}

func (e *Exec) IncreaseAffectedRows(n uint64) {
e.affectRows += n
}

func (e *Exec) SetAffectedRows(n uint64) {
e.affectRows = n
}

func (e *Exec) GetAffectedRows() uint64 {
return e.affectRows
}

// Run applies the scopes to the specified data object
// and run through the instruction in each of the scope.
func (e *Exec) Run(ts uint64) error {
Expand Down Expand Up @@ -179,8 +192,10 @@ func (e *Exec) Run(ts uint64) error {
case Insert:
wg.Add(1)
go func(s *Scope) {
if err := s.Insert(ts); err != nil {
if rows, err := s.Insert(ts); err != nil {
e.err = err
} else {
e.SetAffectedRows(rows)
}
wg.Done()
}(e.scopes[i])
Expand Down Expand Up @@ -242,7 +257,6 @@ func (e *Exec) Run(ts uint64) error {
}(e.scopes[i])
}
}

wg.Wait()
return e.err
}
Expand Down
18 changes: 13 additions & 5 deletions pkg/sql/compile/scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,11 @@ func (s *Scope) RemoteRun(e engine.Engine) error {

arg := s.Instructions[len(s.Instructions)-1].Arg.(*transfer.Argument)
defer func() {
arg.Reg.Wg.Add(1)
arg.Reg.Ch <- nil
arg.Reg.Wg.Wait()
if arg.Reg.Ch != nil {
arg.Reg.Wg.Add(1)
arg.Reg.Ch <- nil
arg.Reg.Wg.Wait()
}
}()
encoder, decoder := rpcserver.NewCodec(1 << 30)
conn := goetty.NewIOSession(goetty.WithCodec(encoder, decoder))
Expand Down Expand Up @@ -161,17 +163,23 @@ func (s *Scope) RemoteRun(e engine.Engine) error {
if err != nil {
return err
}
if arg.Reg.Ch == nil {
if bat != nil {
bat.Clean(s.Proc)
}
continue
}
arg.Reg.Wg.Add(1)
arg.Reg.Ch <- bat
arg.Reg.Wg.Wait()
}
return nil
}

func (s *Scope) Insert(ts uint64) error {
func (s *Scope) Insert(ts uint64) (uint64, error) {
o, _ := s.Operator.(*insert.Insert)
defer o.R.Close()
return o.R.Write(ts, o.Bat)
return uint64(o.Bat.Vecs[0].Length()), o.R.Write(ts, o.Bat)
}

func (s *Scope) Explain(u interface{}, fill func(interface{}, *batch.Batch) error) error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/compile/top.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (c *compile) compileTopOutput(o *top.Top, mp map[string]uint64) ([]*Scope,
mp[g.Name]++
}
}
ss, err := c.compile(o.Prev.(*projection.Projection), mp)
ss, err := c.compileOutput(o.Prev.(*projection.Projection), mp)
if err != nil {
return nil, err
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/compile/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ type Exec struct {
resultCols []*Col
scopes []*Scope
c *compile
//affectRows stores the number of rows affected while insert / update / delete
affectRows uint64
//e a dbengine instance
e engine.Engine
//stmt ast of a single sql
Expand Down
6 changes: 4 additions & 2 deletions pkg/sql/protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -1126,8 +1126,10 @@ func EncodeVector(v *vector.Vector, buf *bytes.Buffer) error {
size += uint64(v)
}
buf.Write(encoding.EncodeUint64(size))
for i, j := int64(0), int64(cnt); i < j; i++ {
buf.Write(vs.Get(i))
if size > 0 {
for i, j := int64(0), int64(cnt); i < j; i++ {
buf.Write(vs.Get(i))
}
}
}
case types.T_tuple:
Expand Down
37 changes: 34 additions & 3 deletions pkg/vm/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package pipeline

import (
"bytes"
"fmt"
"matrixone/pkg/container/batch"
"matrixone/pkg/vm"
"matrixone/pkg/vm/engine"
"matrixone/pkg/vm/mempool"
Expand Down Expand Up @@ -86,6 +88,9 @@ func (p *Pipeline) Run(segs []engine.Segment, proc *process.Process) (bool, erro
}

func (p *Pipeline) RunMerge(proc *process.Process) (bool, error) {
var end bool
var err error

proc.Mp = mempool.New()
defer func() {
proc.Reg.InputBatch = nil
Expand All @@ -96,11 +101,13 @@ func (p *Pipeline) RunMerge(proc *process.Process) (bool, error) {
vm.Clean(p.instructions, proc)
return false, err
}
i := 0
for {
i++
proc.Reg.InputBatch = nil
if end, err := vm.Run(p.instructions, proc); err != nil || end {
if end, err = vm.Run(p.instructions, proc); err != nil || end {
{
fmt.Printf("+++%p begin clean\n", p)
}
p.clean(proc)
return end, err
}
}
Expand All @@ -121,6 +128,30 @@ func (p *Pipeline) prefetch(segs []engine.Segment, proc *process.Process) *queue
return q
}

func (p *Pipeline) clean(proc *process.Process) {
for _, reg := range proc.Reg.MergeReceivers {
if reg.Ch != nil {
v := <-reg.Ch
switch {
case v == nil:
reg.Ch = nil
reg.Wg.Done()
default:
bat := v.(*batch.Batch)
if bat == nil || bat.Attrs == nil {
reg.Ch = nil
reg.Wg.Done()
} else {
bat.Clean(proc)
reg.Ch = nil
reg.Wg.Done()
}
}
}
}

}

// prefetch
func (q *queue) prefetch(attrs []string) error {
if q.prefetchIndex == len(q.blocks) {
Expand Down