Skip to content

Commit

Permalink
Merge pull request #7263 from planetscale/online-ddl-vtgate-session-uuid
Browse files Browse the repository at this point in the history
Adding @@session_uuid to vtgate; used as 'context' by Online DDL
  • Loading branch information
shlomi-noach committed Jan 7, 2021
2 parents 3e8a926 + b36cb24 commit ccd52da
Show file tree
Hide file tree
Showing 12 changed files with 132 additions and 91 deletions.
183 changes: 96 additions & 87 deletions go/vt/proto/vtgate/vtgate.pb.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions go/vt/sqlparser/ast_rewriting.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ func (er *expressionRewriter) sysVarRewrite(cursor *Cursor, node *ColName) {
sysvars.TransactionMode.Name,
sysvars.Workload.Name,
sysvars.DDLStrategy.Name,
sysvars.SessionUUID.Name,
sysvars.ReadAfterWriteGTID.Name,
sysvars.ReadAfterWriteTimeOut.Name,
sysvars.SessionTrackGTIDs.Name:
Expand Down
3 changes: 2 additions & 1 deletion go/vt/sqlparser/ast_rewriting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
type myTestCase struct {
in, expected string
liid, db, foundRows, rowCount, rawGTID, rawTimeout, sessTrackGTID bool
ddlStrategy bool
ddlStrategy, sessionUUID bool
udv int
autocommit, clientFoundRows, skipQueryPlanCache bool
sqlSelectLimit, transactionMode, workload bool
Expand Down Expand Up @@ -194,6 +194,7 @@ func TestRewrites(in *testing.T) {
assert.Equal(tc.transactionMode, result.NeedsSysVar(sysvars.TransactionMode.Name), "should need :__vttransactionMode")
assert.Equal(tc.workload, result.NeedsSysVar(sysvars.Workload.Name), "should need :__vtworkload")
assert.Equal(tc.ddlStrategy, result.NeedsSysVar(sysvars.DDLStrategy.Name), "should need ddlStrategy")
assert.Equal(tc.sessionUUID, result.NeedsSysVar(sysvars.SessionUUID.Name), "should need sessionUUID")
assert.Equal(tc.rawGTID, result.NeedsSysVar(sysvars.ReadAfterWriteGTID.Name), "should need rawGTID")
assert.Equal(tc.rawTimeout, result.NeedsSysVar(sysvars.ReadAfterWriteTimeOut.Name), "should need rawTimeout")
assert.Equal(tc.sessTrackGTID, result.NeedsSysVar(sysvars.SessionTrackGTIDs.Name), "should need sessTrackGTID")
Expand Down
2 changes: 2 additions & 0 deletions go/vt/sysvars/sysvars.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ var (
Workload = SystemVariable{Name: "workload", IdentifierAsString: true}
Charset = SystemVariable{Name: "charset", Default: utf8, IdentifierAsString: true}
Names = SystemVariable{Name: "names", Default: utf8, IdentifierAsString: true}
SessionUUID = SystemVariable{Name: "session_uuid", IdentifierAsString: true}
// Online DDL
DDLStrategy = SystemVariable{Name: "ddl_strategy", IdentifierAsString: true}

Expand All @@ -73,6 +74,7 @@ var (
Workload,
Charset,
Names,
SessionUUID,
ReadAfterWriteGTID,
ReadAfterWriteTimeOut,
SessionTrackGTIDs,
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vtgate/engine/fake_vcursor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ func (t noopVCursor) GetDDLStrategy() string {
panic("implement me")
}

func (t noopVCursor) GetSessionUUID() string {
panic("implement me")
}

func (t noopVCursor) SetReadAfterWriteGTID(s string) {
panic("implement me")
}
Expand Down
4 changes: 3 additions & 1 deletion go/vt/vtgate/engine/online_ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package engine

import (
"fmt"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/proto/query"
querypb "vitess.io/vitess/go/vt/proto/query"
Expand Down Expand Up @@ -75,7 +77,7 @@ func (v *OnlineDDL) Execute(vcursor VCursor, bindVars map[string]*query.BindVari
}
rows := [][]sqltypes.Value{}
for _, normalized := range normalizedQueries {
onlineDDL, err := schema.NewOnlineDDL(v.GetKeyspaceName(), normalized.TableName.Name.String(), normalized.SQL, v.Strategy, v.Options, "vtgate")
onlineDDL, err := schema.NewOnlineDDL(v.GetKeyspaceName(), normalized.TableName.Name.String(), normalized.SQL, v.Strategy, v.Options, fmt.Sprintf("vtgate:%s", vcursor.Session().GetSessionUUID()))
if err != nil {
return result, err
}
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vtgate/engine/primitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ type (
SetDDLStrategy(string)
GetDDLStrategy() string

GetSessionUUID() string

// SetReadAfterWriteGTID sets the GTID that the user expects a replica to have caught up with before answering a query
SetReadAfterWriteGTID(string)
SetReadAfterWriteTimeout(float64)
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,8 @@ func (e *Executor) addNeededBindVars(bindVarNeeds *sqlparser.BindVarNeeds, bindV
bindVars[key] = sqltypes.StringBindVariable(v)
case sysvars.DDLStrategy.Name:
bindVars[key] = sqltypes.StringBindVariable(session.DDLStrategy)
case sysvars.SessionUUID.Name:
bindVars[key] = sqltypes.StringBindVariable(session.SessionUUID)
case sysvars.ReadAfterWriteGTID.Name:
var v string
ifReadAfterWriteExist(session, func(raw *vtgatepb.ReadAfterWrite) {
Expand Down
7 changes: 5 additions & 2 deletions go/vt/vtgate/plugin_mysql_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,9 @@ import (

"context"

"vitess.io/vitess/go/trace"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/callerid"
"vitess.io/vitess/go/vt/callinfo"
"vitess.io/vitess/go/vt/log"
Expand All @@ -46,6 +45,8 @@ import (

querypb "vitess.io/vitess/go/vt/proto/query"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"

"github.com/google/uuid"
)

var (
Expand Down Expand Up @@ -335,13 +336,15 @@ func (vh *vtgateHandler) WarningCount(c *mysql.Conn) uint16 {
func (vh *vtgateHandler) session(c *mysql.Conn) *vtgatepb.Session {
session, _ := c.ClientData.(*vtgatepb.Session)
if session == nil {
u, _ := uuid.NewUUID()
session = &vtgatepb.Session{
Options: &querypb.ExecuteOptions{
IncludedFields: querypb.ExecuteOptions_ALL,
Workload: querypb.ExecuteOptions_Workload(mysqlDefaultWorkload),
},
Autocommit: true,
DDLStrategy: *defaultDDLStrategy,
SessionUUID: u.String(),
}
if c.Capabilities&mysql.CapabilityClientFoundRows != 0 {
session.Options.ClientFoundRows = true
Expand Down
7 changes: 7 additions & 0 deletions go/vt/vtgate/safe_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,13 @@ func (session *SafeSession) GetDDLStrategy() string {
return session.DDLStrategy
}

// GetSessionUUID returns the SessionUUID value.
func (session *SafeSession) GetSessionUUID() string {
session.mu.Lock()
defer session.mu.Unlock()
return session.SessionUUID
}

// SetReadAfterWriteGTID set the ReadAfterWriteGtid setting.
func (session *SafeSession) SetReadAfterWriteGTID(vtgtid string) {
session.mu.Lock()
Expand Down
5 changes: 5 additions & 0 deletions go/vt/vtgate/vcursor_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,11 @@ func (vc *vcursorImpl) GetDDLStrategy() string {
return vc.safeSession.GetDDLStrategy()
}

// GetSessionUUID implements the SessionActions interface
func (vc *vcursorImpl) GetSessionUUID() string {
return vc.safeSession.GetSessionUUID()
}

// SetReadAfterWriteGTID implements the SessionActions interface
func (vc *vcursorImpl) SetReadAfterWriteGTID(vtgtid string) {
vc.safeSession.SetReadAfterWriteGTID(vtgtid)
Expand Down
3 changes: 3 additions & 0 deletions proto/vtgate.proto
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ message Session {

// DDL strategy
string DDLStrategy = 21;

// Session UUID
string SessionUUID = 22;
}

// ReadAfterWrite contains information regarding gtid set and timeout
Expand Down

0 comments on commit ccd52da

Please sign in to comment.