diff --git a/pkg/container/types/bytes.go b/pkg/container/types/bytes.go index 79c03fea81f6..00b7781553d1 100644 --- a/pkg/container/types/bytes.go +++ b/pkg/container/types/bytes.go @@ -15,6 +15,7 @@ package types import ( + "bytes" "unsafe" "github.com/matrixorigin/matrixone/pkg/common/mpool" @@ -125,3 +126,11 @@ func (v *Varlena) Reset() { var vzero Varlena *v = vzero } + +func PrefixCompare(lhs, rhs []byte) int { + if len(lhs) > len(rhs) { + lhs = lhs[:len(rhs)] + } + + return bytes.Compare(lhs, rhs) +} diff --git a/pkg/container/vector/search.go b/pkg/container/vector/search.go index 1c3e1067598e..3abae32ab14b 100644 --- a/pkg/container/vector/search.go +++ b/pkg/container/vector/search.go @@ -160,3 +160,90 @@ func FixedSizedBinarySearchOffsetByValFactory[T any](vals []T, cmp func(T, T) in return sels } } + +func CollectOffsetsByPrefixEqFactory(val []byte) func(*Vector) []int32 { + return func(lvec *Vector) []int32 { + lvlen := lvec.Length() + if lvlen == 0 { + return nil + } + lcol, larea := MustVarlenaRawData(lvec) + start, _ := sort.Find(lvlen, func(i int) int { + return bytes.Compare(val, lcol[i].GetByteSlice(larea)) + }) + end := start + for end < lvlen && bytes.HasPrefix(lcol[end].GetByteSlice(larea), val) { + end++ + } + if start == end { + return nil + } + sels := make([]int32, end-start) + for i := start; i < end; i++ { + sels[i-start] = int32(i) + } + return sels + } +} + +func CollectOffsetsByPrefixBetweenFactory(lval, rval []byte) func(*Vector) []int32 { + return func(lvec *Vector) []int32 { + lvlen := lvec.Length() + if lvlen == 0 { + return nil + } + lcol, larea := MustVarlenaRawData(lvec) + start := sort.Search(lvlen, func(i int) bool { + return bytes.Compare(lcol[i].GetByteSlice(larea), lval) >= 0 + }) + if start == lvlen { + return nil + } + end := sort.Search(lvlen, func(i int) bool { + return types.PrefixCompare(lcol[i].GetByteSlice(larea), rval) > 0 + }) + if start == end { + return nil + } + sels := make([]int32, end-start) + for i := start; i < end; i++ { + sels[i-start] = int32(i) + } + return sels + } +} + +func CollectOffsetsByPrefixInFactory(rvec *Vector) func(*Vector) []int32 { + return func(lvec *Vector) []int32 { + lvlen := lvec.Length() + if lvlen == 0 { + return nil + } + + lcol, larea := MustVarlenaRawData(lvec) + rcol, rarea := MustVarlenaRawData(rvec) + + rval := rcol[0].GetByteSlice(rarea) + rpos := 0 + rvlen := rvec.Length() + + sels := make([]int32, 0, rvlen) + for i := 0; i < lvlen; i++ { + lval := lcol[i].GetByteSlice(larea) + for types.PrefixCompare(lval, rval) > 0 { + rpos++ + if rpos == rvlen { + return sels + } + + rval = rcol[rpos].GetByteSlice(rarea) + } + + if bytes.HasPrefix(lval, rval) { + sels = append(sels, int32(i)) + } + } + + return sels + } +} diff --git a/pkg/container/vector/utils_test.go b/pkg/container/vector/utils_test.go index 2d33fd567093..25b8505eb7eb 100644 --- a/pkg/container/vector/utils_test.go +++ b/pkg/container/vector/utils_test.go @@ -74,3 +74,60 @@ func TestFindFirstIndexInSortedVarlenVector(t *testing.T) { } }) } + +func TestCollectOffsetsByPrefixEqFactory(t *testing.T) { + mp := mpool.MustNewZero() + v1 := NewVec(types.T_char.ToType()) + defer v1.Free(mp) + + AppendBytes(v1, []byte("1111"), false, mp) + AppendBytes(v1, []byte("1121"), false, mp) + AppendBytes(v1, []byte("1211"), false, mp) + AppendBytes(v1, []byte("1221"), false, mp) + AppendBytes(v1, []byte("1231"), false, mp) + AppendBytes(v1, []byte("1311"), false, mp) + + prefix1 := []byte("01") + prefix2 := []byte("12") + prefix3 := []byte("14") + prefix4 := []byte("113") + + fn1 := CollectOffsetsByPrefixEqFactory(prefix1) + fn2 := CollectOffsetsByPrefixEqFactory(prefix2) + fn3 := CollectOffsetsByPrefixEqFactory(prefix3) + fn4 := CollectOffsetsByPrefixEqFactory(prefix4) + off1 := fn1(v1) + off2 := fn2(v1) + off3 := fn3(v1) + off4 := fn4(v1) + require.Equal(t, 0, len(off1)) + require.Equal(t, []int32{2, 3, 4}, off2) + require.Equal(t, 0, len(off3)) + require.Equal(t, 0, len(off4)) +} + +func TestCollectOffsetsByPrefixBetweenFactory(t *testing.T) { + mp := mpool.MustNewZero() + v1 := NewVec(types.T_char.ToType()) + defer v1.Free(mp) + + AppendBytes(v1, []byte("1111"), false, mp) + AppendBytes(v1, []byte("1121"), false, mp) + AppendBytes(v1, []byte("1211"), false, mp) + AppendBytes(v1, []byte("1221"), false, mp) + AppendBytes(v1, []byte("1231"), false, mp) + AppendBytes(v1, []byte("1311"), false, mp) + + left1 := []byte("11") + right1 := []byte("12") + left2 := []byte("113") + right2 := []byte("124") + + fn1 := CollectOffsetsByPrefixBetweenFactory(left1, right1) + fn2 := CollectOffsetsByPrefixBetweenFactory(left2, right2) + off1 := fn1(v1) + off2 := fn2(v1) + + require.Equal(t, []int32{0, 1, 2, 3, 4}, off1) + require.Equal(t, []int32{2, 3, 4}, off2) +} diff --git a/pkg/sql/plan/function/func_prefix.go b/pkg/sql/plan/function/func_prefix.go index 4da8ff6a2541..a810d80bdd8d 100644 --- a/pkg/sql/plan/function/func_prefix.go +++ b/pkg/sql/plan/function/func_prefix.go @@ -18,8 +18,8 @@ import ( "bytes" "sort" + "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/container/vector" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/index" "github.com/matrixorigin/matrixone/pkg/vm/process" ) @@ -68,11 +68,11 @@ func PrefixBetween(parameters []*vector.Vector, result vector.FunctionResultWrap if ivec.GetSorted() { lowerBound := sort.Search(len(icol), func(i int) bool { - return index.PrefixCompare(icol[i].GetByteSlice(iarea), lval) >= 0 + return types.PrefixCompare(icol[i].GetByteSlice(iarea), lval) >= 0 }) upperBound := sort.Search(len(icol), func(i int) bool { - return index.PrefixCompare(icol[i].GetByteSlice(iarea), rval) > 0 + return types.PrefixCompare(icol[i].GetByteSlice(iarea), rval) > 0 }) for i := 0; i < lowerBound; i++ { @@ -87,7 +87,7 @@ func PrefixBetween(parameters []*vector.Vector, result vector.FunctionResultWrap } else { for i := 0; i < length; i++ { val := icol[i].GetByteSlice(iarea) - res[i] = index.PrefixCompare(val, lval) >= 0 && index.PrefixCompare(val, rval) <= 0 + res[i] = types.PrefixCompare(val, lval) >= 0 && types.PrefixCompare(val, rval) <= 0 } } @@ -109,7 +109,7 @@ func PrefixIn(parameters []*vector.Vector, result vector.FunctionResultWrapper, for i := 0; i < length; i++ { lval := lcol[i].GetByteSlice(larea) - for index.PrefixCompare(lval, rval) > 0 { + for types.PrefixCompare(lval, rval) > 0 { rpos++ if rpos == rlen { for j := i; j < length; j++ { @@ -127,7 +127,7 @@ func PrefixIn(parameters []*vector.Vector, result vector.FunctionResultWrapper, for i := 0; i < length; i++ { lval := lcol[i].GetByteSlice(larea) rpos, _ := sort.Find(len(rcol), func(j int) int { - return index.PrefixCompare(lval, rcol[j].GetByteSlice(rarea)) + return types.PrefixCompare(lval, rcol[j].GetByteSlice(rarea)) }) res[i] = rpos < len(rcol) && bytes.HasPrefix(lval, rcol[rpos].GetByteSlice(rarea)) diff --git a/pkg/vm/engine/disttae/util.go b/pkg/vm/engine/disttae/util.go index 28cf3cdef2c3..b9a6dbf4e8f9 100644 --- a/pkg/vm/engine/disttae/util.go +++ b/pkg/vm/engine/disttae/util.go @@ -176,8 +176,8 @@ func getPkExpr(expr *plan.Expr, pkName string, proc *process.Process) *plan.Expr return getPkExpr(exprImpl.F.Args[1], pkName, proc) case "=": - if leftExpr, ok := exprImpl.F.Args[0].Expr.(*plan.Expr_Col); ok { - if !compPkCol(leftExpr.Col.Name, pkName) { + if col := exprImpl.F.Args[0].GetCol(); col != nil { + if !compPkCol(col.Name, pkName) { return nil } constVal := getConstValueByExpr(exprImpl.F.Args[1], proc) @@ -191,8 +191,8 @@ func getPkExpr(expr *plan.Expr, pkName string, proc *process.Process) *plan.Expr }, } } - if rightExpr, ok := exprImpl.F.Args[1].Expr.(*plan.Expr_Col); ok { - if !compPkCol(rightExpr.Col.Name, pkName) { + if col := exprImpl.F.Args[1].GetCol(); col != nil { + if !compPkCol(col.Name, pkName) { return nil } constVal := getConstValueByExpr(exprImpl.F.Args[0], proc) @@ -209,12 +209,20 @@ func getPkExpr(expr *plan.Expr, pkName string, proc *process.Process) *plan.Expr return nil case "in": - if leftExpr, ok := exprImpl.F.Args[0].Expr.(*plan.Expr_Col); ok { - if !compPkCol(leftExpr.Col.Name, pkName) { + if col := exprImpl.F.Args[0].GetCol(); col != nil { + if !compPkCol(col.Name, pkName) { return nil } return exprImpl.F.Args[1] } + + case "prefix_eq", "prefix_between", "prefix_in": + if col := exprImpl.F.Args[0].GetCol(); col != nil { + if !compPkCol(col.Name, pkName) { + return nil + } + return expr + } } } @@ -279,6 +287,23 @@ func getNonCompositePKSearchFuncByExpr( searchPKFunc = vector.OrderedBinarySearchOffsetByValFactory([]types.Enum{types.Enum(val.EnumVal)}) } + case *plan.Expr_F: + switch exprImpl.F.Func.ObjName { + case "prefix_eq": + val := util.UnsafeStringToBytes(exprImpl.F.Args[1].GetLit().GetSval()) + searchPKFunc = vector.CollectOffsetsByPrefixEqFactory(val) + + case "prefix_between": + lval := util.UnsafeStringToBytes(exprImpl.F.Args[1].GetLit().GetSval()) + rval := util.UnsafeStringToBytes(exprImpl.F.Args[2].GetLit().GetSval()) + searchPKFunc = vector.CollectOffsetsByPrefixBetweenFactory(lval, rval) + + case "prefix_in": + vec := vector.NewVec(types.T_any.ToType()) + vec.UnmarshalBinary(exprImpl.F.Args[1].GetVec().Data) + searchPKFunc = vector.CollectOffsetsByPrefixInFactory(vec) + } + case *plan.Expr_Vec: vec := vector.NewVec(types.T_any.ToType()) vec.UnmarshalBinary(exprImpl.Vec.Data) diff --git a/pkg/vm/engine/tae/blockio/read.go b/pkg/vm/engine/tae/blockio/read.go index 79aac187ccf9..af08723e9b97 100644 --- a/pkg/vm/engine/tae/blockio/read.go +++ b/pkg/vm/engine/tae/blockio/read.go @@ -19,21 +19,19 @@ import ( "math" "time" - "github.com/matrixorigin/matrixone/pkg/container/nulls" - v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2" - "go.uber.org/zap" - - "github.com/matrixorigin/matrixone/pkg/container/vector" - pkgcatalog "github.com/matrixorigin/matrixone/pkg/catalog" "github.com/matrixorigin/matrixone/pkg/common/mpool" "github.com/matrixorigin/matrixone/pkg/container/batch" + "github.com/matrixorigin/matrixone/pkg/container/nulls" "github.com/matrixorigin/matrixone/pkg/container/types" + "github.com/matrixorigin/matrixone/pkg/container/vector" "github.com/matrixorigin/matrixone/pkg/fileservice" "github.com/matrixorigin/matrixone/pkg/logutil" "github.com/matrixorigin/matrixone/pkg/objectio" "github.com/matrixorigin/matrixone/pkg/pb/timestamp" + v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2" "github.com/matrixorigin/matrixone/pkg/vm/engine" + "go.uber.org/zap" ) type ReadFilter = func([]*vector.Vector) []int32 diff --git a/pkg/vm/engine/tae/blockio/reader.go b/pkg/vm/engine/tae/blockio/reader.go index 7e8cefed8740..65fecedd1d1f 100644 --- a/pkg/vm/engine/tae/blockio/reader.go +++ b/pkg/vm/engine/tae/blockio/reader.go @@ -16,6 +16,7 @@ package blockio import ( "context" + "github.com/matrixorigin/matrixone/pkg/common/mpool" "github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/container/types" diff --git a/pkg/vm/engine/tae/index/zm.go b/pkg/vm/engine/tae/index/zm.go index 30dc9392ce95..edee0b718395 100644 --- a/pkg/vm/engine/tae/index/zm.go +++ b/pkg/vm/engine/tae/index/zm.go @@ -557,36 +557,28 @@ func (zm ZM) Or(o ZM) (res bool, ok bool) { return } -func PrefixCompare(lhs, rhs []byte) int { - if len(lhs) > len(rhs) { - lhs = lhs[:len(rhs)] - } - - return bytes.Compare(lhs, rhs) -} - func (zm ZM) PrefixEq(s []byte) bool { zmin := zm.GetMinBuf() zmax := zm.GetMaxBuf() - return PrefixCompare(zmin, s) <= 0 && PrefixCompare(s, zmax) <= 0 + return types.PrefixCompare(zmin, s) <= 0 && types.PrefixCompare(zmax, s) >= 0 } func (zm ZM) PrefixBetween(lb, ub []byte) bool { zmin := zm.GetMinBuf() zmax := zm.GetMaxBuf() - return PrefixCompare(lb, zmax) <= 0 && PrefixCompare(zmin, ub) <= 0 + return types.PrefixCompare(zmin, ub) <= 0 && types.PrefixCompare(zmax, lb) >= 0 } func (zm ZM) PrefixIn(vec *vector.Vector) bool { col, area := vector.MustVarlenaRawData(vec) minVal, maxVal := zm.GetMinBuf(), zm.GetMaxBuf() lowerBound := sort.Search(len(col), func(i int) bool { - return PrefixCompare(minVal, col[i].GetByteSlice(area)) <= 0 + return types.PrefixCompare(minVal, col[i].GetByteSlice(area)) <= 0 }) - return lowerBound < len(col) && PrefixCompare(col[lowerBound].GetByteSlice(area), maxVal) <= 0 + return lowerBound < len(col) && types.PrefixCompare(maxVal, col[lowerBound].GetByteSlice(area)) >= 0 } func (zm ZM) AnyIn(vec *vector.Vector) bool { @@ -784,10 +776,10 @@ func (zm ZM) AnyIn(vec *vector.Vector) bool { col, area := vector.MustVarlenaRawData(vec) minVal, maxVal := zm.GetMinBuf(), zm.GetMaxBuf() lowerBound := sort.Search(len(col), func(i int) bool { - return bytes.Compare(minVal, col[i].GetByteSlice(area)) <= 0 + return types.PrefixCompare(minVal, col[i].GetByteSlice(area)) <= 0 }) - return lowerBound < len(col) && PrefixCompare(col[lowerBound].GetByteSlice(area), maxVal) <= 0 + return lowerBound < len(col) && types.PrefixCompare(maxVal, col[lowerBound].GetByteSlice(area)) >= 0 case types.T_array_float32: col := vector.MustArrayCol[float32](vec)