From 7176fac13ad5af563d92e00cb56d51fcef81c72a Mon Sep 17 00:00:00 2001 From: andy Date: Thu, 22 Aug 2019 11:16:48 +0800 Subject: [PATCH] *: add autocommit-false-is-txn to set autocommit=0 start txn #298 --- src/config/config.go | 2 ++ src/proxy/proxy.go | 8 ++++++++ src/proxy/query.go | 2 ++ src/proxy/set.go | 3 ++- src/proxy/set_test.go | 31 ++++++++++++++++++++++++++++++- src/proxy/spanner.go | 4 ++++ 6 files changed, 48 insertions(+), 2 deletions(-) diff --git a/src/config/config.go b/src/config/config.go index 2d1f1581..b38cddf6 100644 --- a/src/config/config.go +++ b/src/config/config.go @@ -38,6 +38,8 @@ type ProxyConfig struct { LongQueryTime int `json:"long-query-time"` StreamBufferSize int `json:"stream-buffer-size"` IdleTxnTimeout uint32 `json:"kill-idle-transaction"` //is consistent with the official 8.0 kill_idle_transaction + + AutocommitFalseIsTxn bool `json:"autocommit-false-is-txn"` } // DefaultProxyConfig returns default proxy config. diff --git a/src/proxy/proxy.go b/src/proxy/proxy.go index d16d658a..232315d0 100644 --- a/src/proxy/proxy.go +++ b/src/proxy/proxy.go @@ -229,6 +229,14 @@ func (p *Proxy) SetTwoPC(enable bool) { p.conf.Proxy.TwopcEnable = enable } +// SetAutocommitFalseIsTxn used to set autocommitFalseIsTxn to true or false. +func (p *Proxy) SetAutocommitFalseIsTxn(enable bool) { + p.mu.Lock() + defer p.mu.Unlock() + p.log.Info("proxy.SetAutocommitFalseIsTxn:[%v->%v]", p.conf.Proxy.AutocommitFalseIsTxn, enable) + p.conf.Proxy.AutocommitFalseIsTxn = enable +} + // SetAllowIP used to set allow ips. func (p *Proxy) SetAllowIP(ips []string) { p.mu.Lock() diff --git a/src/proxy/query.go b/src/proxy/query.go index 982b8c67..5c2d1454 100644 --- a/src/proxy/query.go +++ b/src/proxy/query.go @@ -284,12 +284,14 @@ func (spanner *Spanner) ComQuery(session *driver.Session, query string, bindVari spanner.auditLog(session, R, xbase.RADON, query, qr) return returnQuery(qr, callback, err) case *sqlparser.Set: + log.Warning("proxy.query.set.query:%s", query) if qr, err = spanner.handleSet(session, query, node); err != nil { log.Error("proxy.set[%s].from.session[%v].error:%+v", query, session.ID(), err) } spanner.auditLog(session, R, xbase.SET, query, qr) return returnQuery(qr, callback, err) case *sqlparser.Checksum: + log.Warning("proxy.query.checksum.query:%s", query) if qr, err = spanner.handleChecksumTable(session, query, node); err != nil { log.Error("proxy.checksum[%s].from.session[%v].error:%+v", query, session.ID(), err) } diff --git a/src/proxy/set.go b/src/proxy/set.go index 679b348f..c414e532 100644 --- a/src/proxy/set.go +++ b/src/proxy/set.go @@ -56,6 +56,7 @@ func (spanner *Spanner) handleSet(session *driver.Session, query string, node *s txSession.setStreamingFetchVar(false) } } + case var_mysql_autocommit: var autocommit = true @@ -68,7 +69,7 @@ func (spanner *Spanner) handleSet(session *driver.Session, query string, node *s } } } - if !autocommit { + if !autocommit && spanner.isAutocommitFalseIsTxn() { query := "begin" node := &sqlparser.Transaction{ Action: "begin", diff --git a/src/proxy/set_test.go b/src/proxy/set_test.go index de36384c..ebf5266d 100644 --- a/src/proxy/set_test.go +++ b/src/proxy/set_test.go @@ -79,7 +79,7 @@ func TestProxySet(t *testing.T) { } func TestProxySetAutocommit(t *testing.T) { - log := xlog.NewStdLog(xlog.Level(xlog.DEBUG)) + log := xlog.NewStdLog(xlog.Level(xlog.PANIC)) fakedbs, proxy, cleanup := MockProxy(log) defer cleanup() address := proxy.Address() @@ -87,6 +87,7 @@ func TestProxySetAutocommit(t *testing.T) { // fakedbs. { proxy.conf.Proxy.TwopcEnable = true + proxy.conf.Proxy.AutocommitFalseIsTxn = true fakedbs.AddQueryPattern("create .*", &sqltypes.Result{}) fakedbs.AddQueryPattern("select .*", &sqltypes.Result{}) fakedbs.AddQueryPattern("xa .*", &sqltypes.Result{}) @@ -119,4 +120,32 @@ func TestProxySetAutocommit(t *testing.T) { assert.NotNil(t, err) } } + + proxy.conf.Proxy.AutocommitFalseIsTxn = false + { + client, err := driver.NewConn("mock", "mock", address, "", "utf8") + assert.Nil(t, err) + { + query := "set autocommit=0" + _, err := client.FetchAll(query, -1) + assert.Nil(t, err) + + query = "select 1" + _, err = client.FetchAll(query, -1) + assert.Nil(t, err) + + query = "commit" + _, err = client.FetchAll(query, -1) + assert.NotNil(t, err) + } + { + query := "set autocommit=1" + _, err := client.FetchAll(query, -1) + assert.Nil(t, err) + + query = "commit" + _, err = client.FetchAll(query, -1) + assert.NotNil(t, err) + } + } } diff --git a/src/proxy/spanner.go b/src/proxy/spanner.go index cec59d26..b545b795 100644 --- a/src/proxy/spanner.go +++ b/src/proxy/spanner.go @@ -124,3 +124,7 @@ func (spanner *Spanner) ServerVersion() string { func (spanner *Spanner) isTwoPC() bool { return spanner.conf.Proxy.TwopcEnable } + +func (spanner *Spanner) isAutocommitFalseIsTxn() bool { + return spanner.conf.Proxy.AutocommitFalseIsTxn +}