Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cherry-pick bug fixes to 1.1-dev #15425

Merged
merged 3 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 9 additions & 0 deletions pkg/container/types/bytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package types

import (
"bytes"
"unsafe"

"github.com/matrixorigin/matrixone/pkg/common/mpool"
Expand Down Expand Up @@ -125,3 +126,11 @@ func (v *Varlena) Reset() {
var vzero Varlena
*v = vzero
}

func PrefixCompare(lhs, rhs []byte) int {
if len(lhs) > len(rhs) {
lhs = lhs[:len(rhs)]
}

return bytes.Compare(lhs, rhs)
}
87 changes: 87 additions & 0 deletions pkg/container/vector/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,3 +160,90 @@ func FixedSizedBinarySearchOffsetByValFactory[T any](vals []T, cmp func(T, T) in
return sels
}
}

func CollectOffsetsByPrefixEqFactory(val []byte) func(*Vector) []int32 {
return func(lvec *Vector) []int32 {
lvlen := lvec.Length()
if lvlen == 0 {
return nil
}
lcol, larea := MustVarlenaRawData(lvec)
start, _ := sort.Find(lvlen, func(i int) int {
return bytes.Compare(val, lcol[i].GetByteSlice(larea))
})
end := start
for end < lvlen && bytes.HasPrefix(lcol[end].GetByteSlice(larea), val) {
end++
}
if start == end {
return nil
}
sels := make([]int32, end-start)
for i := start; i < end; i++ {
sels[i-start] = int32(i)
}
return sels
}
}

func CollectOffsetsByPrefixBetweenFactory(lval, rval []byte) func(*Vector) []int32 {
return func(lvec *Vector) []int32 {
lvlen := lvec.Length()
if lvlen == 0 {
return nil
}
lcol, larea := MustVarlenaRawData(lvec)
start := sort.Search(lvlen, func(i int) bool {
return bytes.Compare(lcol[i].GetByteSlice(larea), lval) >= 0
})
if start == lvlen {
return nil
}
end := sort.Search(lvlen, func(i int) bool {
return types.PrefixCompare(lcol[i].GetByteSlice(larea), rval) > 0
})
if start == end {
return nil
}
sels := make([]int32, end-start)
for i := start; i < end; i++ {
sels[i-start] = int32(i)
}
return sels
}
}

func CollectOffsetsByPrefixInFactory(rvec *Vector) func(*Vector) []int32 {
return func(lvec *Vector) []int32 {
lvlen := lvec.Length()
if lvlen == 0 {
return nil
}

lcol, larea := MustVarlenaRawData(lvec)
rcol, rarea := MustVarlenaRawData(rvec)

rval := rcol[0].GetByteSlice(rarea)
rpos := 0
rvlen := rvec.Length()

sels := make([]int32, 0, rvlen)
for i := 0; i < lvlen; i++ {
lval := lcol[i].GetByteSlice(larea)
for types.PrefixCompare(lval, rval) > 0 {
rpos++
if rpos == rvlen {
return sels
}

rval = rcol[rpos].GetByteSlice(rarea)
}

if bytes.HasPrefix(lval, rval) {
sels = append(sels, int32(i))
}
}

return sels
}
}
57 changes: 57 additions & 0 deletions pkg/container/vector/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,60 @@ func TestFindFirstIndexInSortedVarlenVector(t *testing.T) {
}
})
}

func TestCollectOffsetsByPrefixEqFactory(t *testing.T) {
mp := mpool.MustNewZero()
v1 := NewVec(types.T_char.ToType())
defer v1.Free(mp)

AppendBytes(v1, []byte("1111"), false, mp)
AppendBytes(v1, []byte("1121"), false, mp)
AppendBytes(v1, []byte("1211"), false, mp)
AppendBytes(v1, []byte("1221"), false, mp)
AppendBytes(v1, []byte("1231"), false, mp)
AppendBytes(v1, []byte("1311"), false, mp)

prefix1 := []byte("01")
prefix2 := []byte("12")
prefix3 := []byte("14")
prefix4 := []byte("113")

fn1 := CollectOffsetsByPrefixEqFactory(prefix1)
fn2 := CollectOffsetsByPrefixEqFactory(prefix2)
fn3 := CollectOffsetsByPrefixEqFactory(prefix3)
fn4 := CollectOffsetsByPrefixEqFactory(prefix4)
off1 := fn1(v1)
off2 := fn2(v1)
off3 := fn3(v1)
off4 := fn4(v1)
require.Equal(t, 0, len(off1))
require.Equal(t, []int32{2, 3, 4}, off2)
require.Equal(t, 0, len(off3))
require.Equal(t, 0, len(off4))
}

func TestCollectOffsetsByPrefixBetweenFactory(t *testing.T) {
mp := mpool.MustNewZero()
v1 := NewVec(types.T_char.ToType())
defer v1.Free(mp)

AppendBytes(v1, []byte("1111"), false, mp)
AppendBytes(v1, []byte("1121"), false, mp)
AppendBytes(v1, []byte("1211"), false, mp)
AppendBytes(v1, []byte("1221"), false, mp)
AppendBytes(v1, []byte("1231"), false, mp)
AppendBytes(v1, []byte("1311"), false, mp)

left1 := []byte("11")
right1 := []byte("12")
left2 := []byte("113")
right2 := []byte("124")

fn1 := CollectOffsetsByPrefixBetweenFactory(left1, right1)
fn2 := CollectOffsetsByPrefixBetweenFactory(left2, right2)
off1 := fn1(v1)
off2 := fn2(v1)

require.Equal(t, []int32{0, 1, 2, 3, 4}, off1)
require.Equal(t, []int32{2, 3, 4}, off2)
}
12 changes: 6 additions & 6 deletions pkg/sql/plan/function/func_prefix.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import (
"bytes"
"sort"

"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/index"
"github.com/matrixorigin/matrixone/pkg/vm/process"
)

Expand Down Expand Up @@ -68,11 +68,11 @@ func PrefixBetween(parameters []*vector.Vector, result vector.FunctionResultWrap

if ivec.GetSorted() {
lowerBound := sort.Search(len(icol), func(i int) bool {
return index.PrefixCompare(icol[i].GetByteSlice(iarea), lval) >= 0
return types.PrefixCompare(icol[i].GetByteSlice(iarea), lval) >= 0
})

upperBound := sort.Search(len(icol), func(i int) bool {
return index.PrefixCompare(icol[i].GetByteSlice(iarea), rval) > 0
return types.PrefixCompare(icol[i].GetByteSlice(iarea), rval) > 0
})

for i := 0; i < lowerBound; i++ {
Expand All @@ -87,7 +87,7 @@ func PrefixBetween(parameters []*vector.Vector, result vector.FunctionResultWrap
} else {
for i := 0; i < length; i++ {
val := icol[i].GetByteSlice(iarea)
res[i] = index.PrefixCompare(val, lval) >= 0 && index.PrefixCompare(val, rval) <= 0
res[i] = types.PrefixCompare(val, lval) >= 0 && types.PrefixCompare(val, rval) <= 0
}
}

Expand All @@ -109,7 +109,7 @@ func PrefixIn(parameters []*vector.Vector, result vector.FunctionResultWrapper,

for i := 0; i < length; i++ {
lval := lcol[i].GetByteSlice(larea)
for index.PrefixCompare(lval, rval) > 0 {
for types.PrefixCompare(lval, rval) > 0 {
rpos++
if rpos == rlen {
for j := i; j < length; j++ {
Expand All @@ -127,7 +127,7 @@ func PrefixIn(parameters []*vector.Vector, result vector.FunctionResultWrapper,
for i := 0; i < length; i++ {
lval := lcol[i].GetByteSlice(larea)
rpos, _ := sort.Find(len(rcol), func(j int) int {
return index.PrefixCompare(lval, rcol[j].GetByteSlice(rarea))
return types.PrefixCompare(lval, rcol[j].GetByteSlice(rarea))
})

res[i] = rpos < len(rcol) && bytes.HasPrefix(lval, rcol[rpos].GetByteSlice(rarea))
Expand Down
37 changes: 31 additions & 6 deletions pkg/vm/engine/disttae/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,8 @@ func getPkExpr(expr *plan.Expr, pkName string, proc *process.Process) *plan.Expr
return getPkExpr(exprImpl.F.Args[1], pkName, proc)

case "=":
if leftExpr, ok := exprImpl.F.Args[0].Expr.(*plan.Expr_Col); ok {
if !compPkCol(leftExpr.Col.Name, pkName) {
if col := exprImpl.F.Args[0].GetCol(); col != nil {
if !compPkCol(col.Name, pkName) {
return nil
}
constVal := getConstValueByExpr(exprImpl.F.Args[1], proc)
Expand All @@ -191,8 +191,8 @@ func getPkExpr(expr *plan.Expr, pkName string, proc *process.Process) *plan.Expr
},
}
}
if rightExpr, ok := exprImpl.F.Args[1].Expr.(*plan.Expr_Col); ok {
if !compPkCol(rightExpr.Col.Name, pkName) {
if col := exprImpl.F.Args[1].GetCol(); col != nil {
if !compPkCol(col.Name, pkName) {
return nil
}
constVal := getConstValueByExpr(exprImpl.F.Args[0], proc)
Expand All @@ -209,12 +209,20 @@ func getPkExpr(expr *plan.Expr, pkName string, proc *process.Process) *plan.Expr
return nil

case "in":
if leftExpr, ok := exprImpl.F.Args[0].Expr.(*plan.Expr_Col); ok {
if !compPkCol(leftExpr.Col.Name, pkName) {
if col := exprImpl.F.Args[0].GetCol(); col != nil {
if !compPkCol(col.Name, pkName) {
return nil
}
return exprImpl.F.Args[1]
}

case "prefix_eq", "prefix_between", "prefix_in":
if col := exprImpl.F.Args[0].GetCol(); col != nil {
if !compPkCol(col.Name, pkName) {
return nil
}
return expr
}
}
}

Expand Down Expand Up @@ -279,6 +287,23 @@ func getNonCompositePKSearchFuncByExpr(
searchPKFunc = vector.OrderedBinarySearchOffsetByValFactory([]types.Enum{types.Enum(val.EnumVal)})
}

case *plan.Expr_F:
switch exprImpl.F.Func.ObjName {
case "prefix_eq":
val := util.UnsafeStringToBytes(exprImpl.F.Args[1].GetLit().GetSval())
searchPKFunc = vector.CollectOffsetsByPrefixEqFactory(val)

case "prefix_between":
lval := util.UnsafeStringToBytes(exprImpl.F.Args[1].GetLit().GetSval())
rval := util.UnsafeStringToBytes(exprImpl.F.Args[2].GetLit().GetSval())
searchPKFunc = vector.CollectOffsetsByPrefixBetweenFactory(lval, rval)

case "prefix_in":
vec := vector.NewVec(types.T_any.ToType())
vec.UnmarshalBinary(exprImpl.F.Args[1].GetVec().Data)
searchPKFunc = vector.CollectOffsetsByPrefixInFactory(vec)
}

case *plan.Expr_Vec:
vec := vector.NewVec(types.T_any.ToType())
vec.UnmarshalBinary(exprImpl.Vec.Data)
Expand Down
10 changes: 4 additions & 6 deletions pkg/vm/engine/tae/blockio/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,19 @@ import (
"math"
"time"

"github.com/matrixorigin/matrixone/pkg/container/nulls"
v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2"
"go.uber.org/zap"

"github.com/matrixorigin/matrixone/pkg/container/vector"

pkgcatalog "github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/common/mpool"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/nulls"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/fileservice"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/objectio"
"github.com/matrixorigin/matrixone/pkg/pb/timestamp"
v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2"
"github.com/matrixorigin/matrixone/pkg/vm/engine"
"go.uber.org/zap"
)

type ReadFilter = func([]*vector.Vector) []int32
Expand Down
1 change: 1 addition & 0 deletions pkg/vm/engine/tae/blockio/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package blockio

import (
"context"

"github.com/matrixorigin/matrixone/pkg/common/mpool"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types"
Expand Down
20 changes: 6 additions & 14 deletions pkg/vm/engine/tae/index/zm.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,36 +557,28 @@ func (zm ZM) Or(o ZM) (res bool, ok bool) {
return
}

func PrefixCompare(lhs, rhs []byte) int {
if len(lhs) > len(rhs) {
lhs = lhs[:len(rhs)]
}

return bytes.Compare(lhs, rhs)
}

func (zm ZM) PrefixEq(s []byte) bool {
zmin := zm.GetMinBuf()
zmax := zm.GetMaxBuf()

return PrefixCompare(zmin, s) <= 0 && PrefixCompare(s, zmax) <= 0
return types.PrefixCompare(zmin, s) <= 0 && types.PrefixCompare(zmax, s) >= 0
}

func (zm ZM) PrefixBetween(lb, ub []byte) bool {
zmin := zm.GetMinBuf()
zmax := zm.GetMaxBuf()

return PrefixCompare(lb, zmax) <= 0 && PrefixCompare(zmin, ub) <= 0
return types.PrefixCompare(zmin, ub) <= 0 && types.PrefixCompare(zmax, lb) >= 0
}

func (zm ZM) PrefixIn(vec *vector.Vector) bool {
col, area := vector.MustVarlenaRawData(vec)
minVal, maxVal := zm.GetMinBuf(), zm.GetMaxBuf()
lowerBound := sort.Search(len(col), func(i int) bool {
return PrefixCompare(minVal, col[i].GetByteSlice(area)) <= 0
return types.PrefixCompare(minVal, col[i].GetByteSlice(area)) <= 0
})

return lowerBound < len(col) && PrefixCompare(col[lowerBound].GetByteSlice(area), maxVal) <= 0
return lowerBound < len(col) && types.PrefixCompare(maxVal, col[lowerBound].GetByteSlice(area)) >= 0
}

func (zm ZM) AnyIn(vec *vector.Vector) bool {
Expand Down Expand Up @@ -784,10 +776,10 @@ func (zm ZM) AnyIn(vec *vector.Vector) bool {
col, area := vector.MustVarlenaRawData(vec)
minVal, maxVal := zm.GetMinBuf(), zm.GetMaxBuf()
lowerBound := sort.Search(len(col), func(i int) bool {
return bytes.Compare(minVal, col[i].GetByteSlice(area)) <= 0
return types.PrefixCompare(minVal, col[i].GetByteSlice(area)) <= 0
})

return lowerBound < len(col) && PrefixCompare(col[lowerBound].GetByteSlice(area), maxVal) <= 0
return lowerBound < len(col) && types.PrefixCompare(maxVal, col[lowerBound].GetByteSlice(area)) >= 0

case types.T_array_float32:
col := vector.MustArrayCol[float32](vec)
Expand Down