Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When DDL statement has been executed within a transaction, force the use of a single CN #17282

Merged
merged 5 commits into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions pkg/frontend/test/txn_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions pkg/frontend/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,14 @@ func (t *testWorkspace) BindTxnOp(op client.TxnOperator) {
panic("implement me")
}

func (t *testWorkspace) SetHaveDDL(flag bool) {
//TODO implement me
}

func (t *testWorkspace) GetHaveDDL() bool {
return false
}

func TestWorkspace(t *testing.T) {
convey.Convey("no panic", t, func() {
convey.So(
Expand Down
32 changes: 31 additions & 1 deletion pkg/sql/compile/compile.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,9 @@ func (c *Compile) Run(_ uint64) (result *util2.RunResult, err error) {
err,
)
v2.TxnStatementExecuteDurationHistogram.Observe(cost.Seconds())
if _, ok := c.pn.Plan.(*plan.Plan_Ddl); ok {
c.setHaveDDL(true)
}
}()

for _, s := range c.scope {
Expand Down Expand Up @@ -945,7 +948,6 @@ func (c *Compile) getCNList() (engine.Nodes, error) {
if err != nil {
return nil, err
}

// We should always make sure the current CN is contained in the cn list.
if c.proc == nil || c.proc.QueryClient == nil {
return cnList, nil
Expand All @@ -961,6 +963,7 @@ func (c *Compile) getCNList() (engine.Nodes, error) {
Addr: c.addr,
Mcpu: ncpu,
})

return cnList, nil
}

Expand All @@ -975,6 +978,18 @@ func (c *Compile) compileQuery(qry *plan.Query) ([]*Scope, error) {
if err != nil {
return nil, err
}
//When DDL statement has been executed within a transaction, force the use of a single CN
if c.getHaveDDL() {
if c.proc != nil && c.proc.QueryClient != nil {
cnID := c.proc.QueryClient.ServiceID()
for _, node := range c.cnList {
if node.Id == cnID {
c.cnList = []engine.Node{node}
break
}
}
}
}
// sort by addr to get fixed order of CN list
sort.Slice(c.cnList, func(i, j int) bool { return c.cnList[i].Addr < c.cnList[j].Addr })

Expand Down Expand Up @@ -4771,6 +4786,21 @@ func (c *Compile) SetBuildPlanFunc(buildPlanFunc func() (*plan2.Plan, error)) {
c.buildPlanFunc = buildPlanFunc
}

func (c *Compile) setHaveDDL(haveDDL bool) {
txn := c.proc.TxnOperator
if txn != nil && txn.GetWorkspace() != nil {
txn.GetWorkspace().SetHaveDDL(haveDDL)
}
}

func (c *Compile) getHaveDDL() bool {
txn := c.proc.TxnOperator
if txn != nil && txn.GetWorkspace() != nil {
return txn.GetWorkspace().GetHaveDDL()
}
return false
}

// detectFkSelfRefer checks if foreign key self refer confirmed
func detectFkSelfRefer(c *Compile, detectSqls []string) error {
if len(detectSqls) == 0 {
Expand Down
7 changes: 7 additions & 0 deletions pkg/sql/compile/compile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,13 @@ func (w *Ws) CloneSnapshotWS() client.Workspace {
func (w *Ws) BindTxnOp(op client.TxnOperator) {
}

func (w *Ws) SetHaveDDL(flag bool) {
}

func (w *Ws) GetHaveDDL() bool {
return false
}

func TestCompile(t *testing.T) {
cnclient.NewCNClient("test", new(cnclient.ClientConfig))
ctrl := gomock.NewController(t)
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/compile/scope_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/defines"

Expand Down Expand Up @@ -79,6 +80,10 @@ func generateScopeCases(t *testing.T, testCases []string) []*Scope {
getScope := func(t1 *testing.T, sql string) *Scope {
proc := testutil.NewProcess()
proc.SessionInfo.Buf = buffer.New()
ctrl := gomock.NewController(t)
txnCli, txnOp := newTestTxnClientAndOp(ctrl)
proc.TxnClient = txnCli
proc.TxnOperator = txnOp
e, _, compilerCtx := testengine.New(defines.AttachAccountId(context.Background(), catalog.System_Account))
opt := plan2.NewBaseOptimizer(compilerCtx)
ctx := compilerCtx.GetContext()
Expand Down
3 changes: 3 additions & 0 deletions pkg/txn/client/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,9 @@ type Workspace interface {
CloneSnapshotWS() Workspace

BindTxnOp(op TxnOperator)

SetHaveDDL(haveDDL bool)
GetHaveDDL() bool
}

// TxnOverview txn overview include meta and status
Expand Down
8 changes: 8 additions & 0 deletions pkg/vm/engine/disttae/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1230,3 +1230,11 @@ func (txn *Transaction) CloneSnapshotWS() client.Workspace {
func (txn *Transaction) BindTxnOp(op client.TxnOperator) {
txn.op = op
}

func (txn *Transaction) SetHaveDDL(haveDDL bool) {
txn.haveDDL.Store(haveDDL)
}

func (txn *Transaction) GetHaveDDL() bool {
return txn.haveDDL.Load()
}
2 changes: 2 additions & 0 deletions pkg/vm/engine/disttae/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,8 @@ type Transaction struct {
pkCount int

adjustCount int

haveDDL atomic.Bool
}

type Pos struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,3 +518,15 @@ col1
100
200
drop table table01;
drop table if exists t1;
create table t1(a int primary key, b int);
insert into t1 select result ,result from generate_series(1, 20000000) g;
alter table t1 drop primary key;
begin;
create table t2(a int, b int);
insert into t2 select a,b from t1;
select count(*) from t2;
count(*)
20000000
drop table t2;
commit;
Original file line number Diff line number Diff line change
Expand Up @@ -323,4 +323,14 @@ alter table table01 drop primary key;
commit;
select * from table01;
select col1 from table01;
drop table table01;
drop table table01;
drop table if exists t1;
create table t1(a int primary key, b int);
insert into t1 select result ,result from generate_series(1, 20000000) g;
alter table t1 drop primary key;
begin;
create table t2(a int, b int);
insert into t2 select a,b from t1;
select count(*) from t2;
drop table t2;
commit;
Loading