Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
494 changes: 487 additions & 7 deletions pkg/vm/engine/readutil/filter_test.go

Large diffs are not rendered by default.

886 changes: 487 additions & 399 deletions pkg/vm/engine/readutil/pk_filter.go

Large diffs are not rendered by default.

62 changes: 48 additions & 14 deletions pkg/vm/engine/readutil/pk_filter_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ type BasePKFilter struct {
UB []byte
Vec *vector.Vector
Oid types.T
// Disjuncts holds OR-ed atomic filters; when non-empty, Op/LB/UB/Vec are ignored.
Disjuncts []BasePKFilter
}

func (b *BasePKFilter) String() string {
Expand Down Expand Up @@ -137,47 +139,72 @@ func ConstructBasePKFilter(
return ret, nil

case "or":
var filters []BasePKFilter
var (
filters1 []BasePKFilter
filters2 []BasePKFilter

cannotMerge bool
hasUnsupported bool
)

for idx := range exprImpl.F.Args {
ff, err := ConstructBasePKFilter(exprImpl.F.Args[idx], tblDef, mp)
if err != nil {
return BasePKFilter{}, err
}
if !ff.Valid {
return BasePKFilter{}, err
hasUnsupported = true
continue
}

filters = append(filters, ff)
filters1 = append(filters1, toDisjuncts(ff)...)
filters2 = append(filters2, ff)
}

if len(filters) == 0 {
if hasUnsupported {
return BasePKFilter{}, nil
}

for idx := 0; idx < len(filters)-1; {
f1 := &filters[idx]
f2 := &filters[idx+1]
if len(filters1) == 0 {
return BasePKFilter{}, nil
}

if len(filters1) == 1 {
return filters1[0], nil
}

for idx := 0; idx < len(filters2)-1; {
f1 := &filters2[idx]
f2 := &filters2[idx+1]
ff, err := mergeFilters(f1, f2, function.OR, mp)
if err != nil {
return BasePKFilter{}, nil
}

if !ff.Valid {
return BasePKFilter{}, nil
//return BasePKFilter{}, nil
cannotMerge = true
break
}

idx++
filters[idx] = ff
filters2[idx] = ff
}

for idx := 0; idx < len(filters)-1; idx++ {
if filters[idx].Vec != nil {
filters[idx].Vec.Free(mp)
if !cannotMerge {
for idx := 0; idx < len(filters2)-1; idx++ {
if filters2[idx].Vec != nil {
filters2[idx].Vec.Free(mp)
}
}

ret := filters2[len(filters2)-1]
return ret, nil
}

ret := filters[len(filters)-1]
return ret, nil
filter.Valid = true
filter.Disjuncts = filters1
return filter, nil

case ">=":
//a >= ?
Expand Down Expand Up @@ -307,3 +334,10 @@ func ConstructBasePKFilter(

return
}

func toDisjuncts(f BasePKFilter) []BasePKFilter {
if len(f.Disjuncts) > 0 {
return f.Disjuncts
}
return []BasePKFilter{f}
}
11 changes: 8 additions & 3 deletions pkg/vm/engine/readutil/pk_filter_mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@ package readutil
import (
"bytes"
"fmt"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/objectio"
"go.uber.org/zap"

"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/fileservice"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/objectio"
"github.com/matrixorigin/matrixone/pkg/pb/plan"
"github.com/matrixorigin/matrixone/pkg/pb/timestamp"
"github.com/matrixorigin/matrixone/pkg/sql/plan/function"
"github.com/matrixorigin/matrixone/pkg/vm/engine"
"go.uber.org/zap"
)

type MemPKFilter struct {
Expand Down Expand Up @@ -70,6 +70,11 @@ func NewMemPKFilter(
return
}

// Currently only support single atomic filter in memory path.
if len(basePKFilter.Disjuncts) > 0 {
return
}

var lbVal, ubVal any
var packed [][]byte
var packer *types.Packer
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
drop database if exists block_or_cpk;
create database block_or_cpk;
use block_or_cpk;
select enable_fault_injection();
enable_fault_injection()
true
create table cpk_int(a int, b int, c int, primary key(a, b));
select add_fault_point('fj/cn/flush_small_objs',':::','echo',40,'block_or_cpk.cpk_int');
add_fault_point(fj/cn/flush_small_objs, :::, echo, 40, block_or_cpk.cpk_int)
true
insert into cpk_int select g.result, g.result * 2, g.result * 3 from generate_series(1, 8192) g;
select a, b from cpk_int where a = 10 or a in (20, 30, 40) order by a, b;
a b
10 20
20 40
30 60
40 80
select a, b from cpk_int where a between 100 and 102 or a >= 8190 order by a, b;
a b
100 200
101 202
102 204
8190 16380
8191 16382
8192 16384
select a, b from cpk_int where a < 4 or a > 8189 order by a, b;
a b
1 2
2 4
3 6
8190 16380
8191 16382
8192 16384
select a, b from cpk_int where a <= 5 or a >= 8191 order by a, b;
a b
1 2
2 4
3 6
4 8
5 10
8191 16382
8192 16384
select a, b from cpk_int where (a = 11 and a = 12) or a = 13 order by a, b;
a b
13 26
select a, b from cpk_int where a < 4 or a between 256 and 258 or a in (500, 600) or a >= 8191 order by a, b;
a b
1 2
2 4
3 6
256 512
257 514
258 516
500 1000
600 1200
8191 16382
8192 16384
drop table cpk_int;
create table cpk_varchar(a varchar(64), b varchar(64), c int, primary key(a, b));
select add_fault_point('fj/cn/flush_small_objs',':::','echo',40,'block_or_cpk.cpk_varchar');
add_fault_point(fj/cn/flush_small_objs, :::, echo, 40, block_or_cpk.cpk_varchar)
true
insert into cpk_varchar select cast(g.result as varchar), cast(g.result * 2 as varchar), g.result * 3 from generate_series(1, 8192) g;
select a, b from cpk_varchar where a = '10' or a in ('20', '30', '40') order by cast(a as int), cast(b as int);
a b
10 20
20 40
30 60
40 80
select a, b from cpk_varchar where a between '8000' and '8003' or a between '900' and '903' order by cast(a as int), cast(b as int);
a b
900 1800
901 1802
902 1804
903 1806
8000 16000
8001 16002
8002 16004
8003 16006
select a, b from cpk_varchar where a in ('5', '15', '25') or a = '512' order by cast(a as int), cast(b as int);
a b
5 10
15 30
25 50
512 1024
select a, b from cpk_varchar where (a = '200' and a = '201') or a = '300' order by cast(a as int), cast(b as int);
a b
300 600
select a, b from cpk_varchar where a between '2500' and '2502' or a between '6000' and '6001' or a in ('700', '800') or a between '950' and '952' order by cast(a as int), cast(b as int);
a b
700 1400
800 1600
950 1900
951 1902
952 1904
2500 5000
2501 5002
2502 5004
6000 12000
6001 12002
drop table cpk_varchar;
drop database block_or_cpk;
select disable_fault_injection();
disable_fault_injection()
true
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
drop database if exists block_or_cpk;
create database block_or_cpk;
use block_or_cpk;

select enable_fault_injection();

-- composite pk (int, int) - value range filters
create table cpk_int(a int, b int, c int, primary key(a, b));
select add_fault_point('fj/cn/flush_small_objs',':::','echo',40,'block_or_cpk.cpk_int');
insert into cpk_int select g.result, g.result * 2, g.result * 3 from generate_series(1, 8192) g;
select a, b from cpk_int where a = 10 or a in (20, 30, 40) order by a, b;
select a, b from cpk_int where a between 100 and 102 or a >= 8190 order by a, b;
select a, b from cpk_int where a < 4 or a > 8189 order by a, b;
select a, b from cpk_int where a <= 5 or a >= 8191 order by a, b;
select a, b from cpk_int where (a = 11 and a = 12) or a = 13 order by a, b;
select a, b from cpk_int where a < 4 or a between 256 and 258 or a in (500, 600) or a >= 8191 order by a, b;
drop table cpk_int;

-- composite pk (varchar, varchar) - simple comparisons
create table cpk_varchar(a varchar(64), b varchar(64), c int, primary key(a, b));
select add_fault_point('fj/cn/flush_small_objs',':::','echo',40,'block_or_cpk.cpk_varchar');
insert into cpk_varchar select cast(g.result as varchar), cast(g.result * 2 as varchar), g.result * 3 from generate_series(1, 8192) g;
select a, b from cpk_varchar where a = '10' or a in ('20', '30', '40') order by cast(a as int), cast(b as int);
select a, b from cpk_varchar where a between '8000' and '8003' or a between '900' and '903' order by cast(a as int), cast(b as int);
select a, b from cpk_varchar where a in ('5', '15', '25') or a = '512' order by cast(a as int), cast(b as int);
select a, b from cpk_varchar where (a = '200' and a = '201') or a = '300' order by cast(a as int), cast(b as int);
select a, b from cpk_varchar where a between '2500' and '2502' or a between '6000' and '6001' or a in ('700', '800') or a between '950' and '952' order by cast(a as int), cast(b as int);
drop table cpk_varchar;

drop database block_or_cpk;

select disable_fault_injection();
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
drop database if exists block_or_npk;
create database block_or_npk;
use block_or_npk;
select enable_fault_injection();
enable_fault_injection()
true
create table npk_int(a int, b int);
select add_fault_point('fj/cn/flush_small_objs',':::','echo',40,'block_or_npk.npk_int');
add_fault_point(fj/cn/flush_small_objs, :::, echo, 40, block_or_npk.npk_int)
true
insert into npk_int select g.result, g.result * 10 from generate_series(1, 8192) g;
select a from npk_int where a = 10 or a in (20, 30, 40) order by a;
a
10
20
30
40
select a from npk_int where a between 100 and 102 or a >= 8190 order by a;
a
100
101
102
8190
8191
8192
select a from npk_int where a < 4 or a > 8189 order by a;
a
1
2
3
8190
8191
8192
select a from npk_int where a <= 5 or a >= 8191 order by a;
a
1
2
3
4
5
8191
8192
select a from npk_int where (a = 11 and a = 12) or a = 13 order by a;
a
13
select a from npk_int where a < 4 or a between 256 and 258 or a in (500, 600) or a >= 8191 order by a;
a
1
2
3
256
257
258
500
600
8191
8192
drop table npk_int;
create table npk_varchar(a varchar(64), b int);
select add_fault_point('fj/cn/flush_small_objs',':::','echo',40,'block_or_npk.npk_varchar');
add_fault_point(fj/cn/flush_small_objs, :::, echo, 40, block_or_npk.npk_varchar)
true
insert into npk_varchar select cast(g.result as varchar), g.result from generate_series(1, 8192) g;
select a from npk_varchar where a = '10' or a in ('20', '30', '40') order by cast(a as int);
a
10
20
30
40
select a from npk_varchar where a between '8000' and '8003' or a between '900' and '903' order by cast(a as int);
a
900
901
902
903
8000
8001
8002
8003
select a from npk_varchar where a in ('5', '15', '25') or a = '512' order by cast(a as int);
a
5
15
25
512
select a from npk_varchar where (a = '200' and a = '201') or a = '300' order by cast(a as int);
a
300
select a from npk_varchar where a between '2500' and '2502' or a between '6000' and '6001' or a in ('700', '800') or a between '950' and '952' order by cast(a as int);
a
700
800
950
951
952
2500
2501
2502
6000
6001
drop table npk_varchar;
drop database block_or_npk;
select disable_fault_injection();
disable_fault_injection()
true
Loading
Loading