Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[enhancement] proxy: migrate prepare stmts when connection is transferred from one cn to another one. #14901

Merged
merged 2 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions pkg/cnservice/server_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/lockservice"
"github.com/matrixorigin/matrixone/pkg/logutil"
pblock "github.com/matrixorigin/matrixone/pkg/pb/lock"
"github.com/matrixorigin/matrixone/pkg/pb/query"
"github.com/matrixorigin/matrixone/pkg/pb/status"
Expand Down Expand Up @@ -53,6 +54,8 @@ func (s *service) initQueryCommandHandler() {
s.queryService.AddHandleFunc(query.CmdMethod_RunTask, s.handleRunTask, false)
s.queryService.AddHandleFunc(query.CmdMethod_RemoveRemoteLockTable, s.handleRemoveRemoteLockTable, false)
s.queryService.AddHandleFunc(query.CmdMethod_GetPipelineInfo, s.handleGetPipelineInfo, false)
s.queryService.AddHandleFunc(query.CmdMethod_MigrateConnFrom, s.handleMigrateConnFrom, false)
s.queryService.AddHandleFunc(query.CmdMethod_MigrateConnTo, s.handleMigrateConnTo, false)
}

func (s *service) handleKillConn(ctx context.Context, req *query.Request, resp *query.Response) error {
Expand Down Expand Up @@ -329,3 +332,35 @@ func (s *service) handleGetPipelineInfo(ctx context.Context, req *query.Request,
}
return nil
}

func (s *service) handleMigrateConnFrom(
ctx context.Context, req *query.Request, resp *query.Response,
) error {
if req.MigrateConnFromRequest == nil {
return moerr.NewInternalError(ctx, "bad request")
}
rm := s.mo.GetRoutineManager()
resp.MigrateConnFromResponse = &query.MigrateConnFromResponse{}
if err := rm.MigrateConnectionFrom(req.MigrateConnFromRequest, resp.MigrateConnFromResponse); err != nil {
logutil.Errorf("failed to migrate conn from: %v", err)
return err
}
return nil
}

func (s *service) handleMigrateConnTo(
ctx context.Context, req *query.Request, resp *query.Response,
) error {
if req.MigrateConnToRequest == nil {
return moerr.NewInternalError(ctx, "bad request")
}
rm := s.mo.GetRoutineManager()
if err := rm.MigrateConnectionTo(req.MigrateConnToRequest); err != nil {
logutil.Errorf("failed to migrate conn to: %v", err)
return err
}
resp.MigrateConnToResponse = &query.MigrateConnToResponse{
Success: true,
}
return nil
}
12 changes: 12 additions & 0 deletions pkg/frontend/mysql_cmd_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,18 @@ func getPrepareStmtName(stmtID uint32) string {
return fmt.Sprintf("%s_%d", prefixPrepareStmtName, stmtID)
}

func parsePrepareStmtID(s string) uint32 {
if strings.HasPrefix(s, prefixPrepareStmtName) {
ss := strings.Split(s, "_")
v, err := strconv.ParseUint(ss[len(ss)-1], 10, 64)
if err != nil {
return 0
}
return uint32(v)
}
return 0
}

func GetPrepareStmtID(ctx context.Context, name string) (int, error) {
idx := len(prefixPrepareStmtName) + 1
if idx >= len(name) {
Expand Down
24 changes: 24 additions & 0 deletions pkg/frontend/routine.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/config"
"github.com/matrixorigin/matrixone/pkg/defines"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/pb/query"
"github.com/matrixorigin/matrixone/pkg/util/metric"
"github.com/matrixorigin/matrixone/pkg/util/trace"
"go.uber.org/zap"
Expand Down Expand Up @@ -64,6 +65,8 @@ type Routine struct {
restricted atomic.Bool

printInfoOnce bool

migrateOnce sync.Once
}

func (rt *Routine) needPrintSessionInfo() bool {
Expand Down Expand Up @@ -379,6 +382,27 @@ func (rt *Routine) cleanup() {
})
}

func (rt *Routine) migrateConnectionTo(req *query.MigrateConnToRequest) error {
var err error
rt.migrateOnce.Do(func() {
ses := rt.getSession()
err = ses.Migrate(req)
})
return err
}

func (rt *Routine) migrateConnectionFrom(resp *query.MigrateConnFromResponse) error {
ses := rt.getSession()
resp.DB = ses.GetDatabaseName()
for _, st := range ses.GetPrepareStmts() {
resp.PrepareStmts = append(resp.PrepareStmts, &query.PrepareStmt{
Name: st.Name,
SQL: st.Sql,
})
}
return nil
}

func NewRoutine(ctx context.Context, protocol MysqlProtocol, executor CmdExecutor, parameters *config.FrontendParameters, rs goetty.IOSession) *Routine {
ctx = trace.Generate(ctx) // fill span{trace_id} in ctx
cancelRoutineCtx, cancelRoutineFunc := context.WithCancel(ctx)
Expand Down
66 changes: 51 additions & 15 deletions pkg/frontend/routine_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/config"
"github.com/matrixorigin/matrixone/pkg/defines"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/pb/query"
"github.com/matrixorigin/matrixone/pkg/queryservice"
"github.com/matrixorigin/matrixone/pkg/util/metric"
v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2"
Expand All @@ -39,15 +40,17 @@ import (
)

type RoutineManager struct {
mu sync.RWMutex
ctx context.Context
clients map[goetty.IOSession]*Routine
pu *config.ParameterUnit
tlsConfig *tls.Config
aicm *defines.AutoIncrCacheManager
accountRoutine *AccountRoutineManager
baseService BaseService
sessionManager *queryservice.SessionManager
mu sync.RWMutex
ctx context.Context
clients map[goetty.IOSession]*Routine
// routinesByID keeps the routines by connection ID.
routinesByConnID map[uint32]*Routine
pu *config.ParameterUnit
tlsConfig *tls.Config
aicm *defines.AutoIncrCacheManager
accountRoutine *AccountRoutineManager
baseService BaseService
sessionManager *queryservice.SessionManager
}

type AccountRoutineManager struct {
Expand Down Expand Up @@ -167,10 +170,11 @@ func (rm *RoutineManager) getCtx() context.Context {
return rm.ctx
}

func (rm *RoutineManager) setRoutine(rs goetty.IOSession, r *Routine) {
func (rm *RoutineManager) setRoutine(rs goetty.IOSession, id uint32, r *Routine) {
rm.mu.Lock()
defer rm.mu.Unlock()
rm.clients[rs] = r
rm.routinesByConnID[id] = r
}

func (rm *RoutineManager) getRoutine(rs goetty.IOSession) *Routine {
Expand All @@ -179,6 +183,16 @@ func (rm *RoutineManager) getRoutine(rs goetty.IOSession) *Routine {
return rm.clients[rs]
}

func (rm *RoutineManager) getRoutineByConnID(id uint32) *Routine {
rm.mu.RLock()
defer rm.mu.RUnlock()
r, ok := rm.routinesByConnID[id]
if ok {
return r
}
return nil
}

func (rm *RoutineManager) deleteRoutine(rs goetty.IOSession) *Routine {
var rt *Routine
var ok bool
Expand All @@ -188,6 +202,11 @@ func (rm *RoutineManager) deleteRoutine(rs goetty.IOSession) *Routine {
if ok {
delete(rm.clients, rs)
}
connID := rt.getConnectionID()
_, ok = rm.routinesByConnID[connID]
if ok {
delete(rm.routinesByConnID, connID)
}
return rt
}

Expand Down Expand Up @@ -283,7 +302,7 @@ func (rm *RoutineManager) Created(rs goetty.IOSession) {
}

logDebugf(pro.GetDebugString(), "have sent handshake packet to connection %s", rs.RemoteAddress())
rm.setRoutine(rs, routine)
rm.setRoutine(rs, pro.connectionID, routine)
}

/*
Expand Down Expand Up @@ -607,6 +626,22 @@ func (rm *RoutineManager) KillRoutineConnections() {
rm.cleanKillQueue()
}

func (rm *RoutineManager) MigrateConnectionTo(req *query.MigrateConnToRequest) error {
routine := rm.getRoutineByConnID(req.ConnID)
if routine == nil {
return moerr.NewInternalError(rm.ctx, "cannot get routine to migrate connection %d", req.ConnID)
}
return routine.migrateConnectionTo(req)
}

func (rm *RoutineManager) MigrateConnectionFrom(req *query.MigrateConnFromRequest, resp *query.MigrateConnFromResponse) error {
routine, ok := rm.routinesByConnID[req.ConnID]
if !ok {
return moerr.NewInternalError(rm.ctx, "cannot get routine to migrate connection %d", req.ConnID)
}
return routine.migrateConnectionFrom(resp)
}

func NewRoutineManager(ctx context.Context, pu *config.ParameterUnit, aicm *defines.AutoIncrCacheManager) (*RoutineManager, error) {
accountRoutine := &AccountRoutineManager{
killQueueMu: sync.RWMutex{},
Expand All @@ -616,10 +651,11 @@ func NewRoutineManager(ctx context.Context, pu *config.ParameterUnit, aicm *defi
ctx: ctx,
}
rm := &RoutineManager{
ctx: ctx,
clients: make(map[goetty.IOSession]*Routine),
pu: pu,
accountRoutine: accountRoutine,
ctx: ctx,
clients: make(map[goetty.IOSession]*Routine),
routinesByConnID: make(map[uint32]*Routine),
pu: pu,
accountRoutine: accountRoutine,
}

rm.aicm = aicm
Expand Down
82 changes: 82 additions & 0 deletions pkg/frontend/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/pb/metadata"
"github.com/matrixorigin/matrixone/pkg/pb/plan"
"github.com/matrixorigin/matrixone/pkg/pb/query"
"github.com/matrixorigin/matrixone/pkg/pb/status"
"github.com/matrixorigin/matrixone/pkg/pb/timestamp"
"github.com/matrixorigin/matrixone/pkg/sql/colexec"
Expand Down Expand Up @@ -1032,6 +1033,12 @@ func (ses *Session) GenNewStmtId() uint32 {
return ses.lastStmtId
}

func (ses *Session) SetLastStmtID(id uint32) {
ses.mu.Lock()
defer ses.mu.Unlock()
ses.lastStmtId = id
}

func (ses *Session) GetLastStmtId() uint32 {
ses.mu.Lock()
defer ses.mu.Unlock()
Expand Down Expand Up @@ -1243,6 +1250,16 @@ func (ses *Session) GetPrepareStmt(name string) (*PrepareStmt, error) {
return nil, moerr.NewInvalidState(ses.requestCtx, "prepared statement '%s' does not exist", name)
}

func (ses *Session) GetPrepareStmts() []*PrepareStmt {
ses.mu.Lock()
defer ses.mu.Unlock()
ret := make([]*PrepareStmt, 0, len(ses.prepareStmts))
for _, st := range ses.prepareStmts {
ret = append(ret, st)
}
return ret
}

func (ses *Session) RemovePrepareStmt(name string) {
ses.mu.Lock()
defer ses.mu.Unlock()
Expand Down Expand Up @@ -2380,3 +2397,68 @@ func checkPlanIsInsertValues(proc *process.Process,
}
return false, nil
}

type dbMigration struct {
db string
}

func (d *dbMigration) Migrate(ses *Session) error {
if d.db == "" {
return nil
}
return doUse(ses.requestCtx, ses, d.db)
}

type prepareStmtMigration struct {
name string
sql string
}

func (p *prepareStmtMigration) Migrate(ses *Session) error {
v, err := ses.GetGlobalVar("lower_case_table_names")
if err != nil {
return err
}
if !strings.HasPrefix(strings.ToLower(p.sql), "prepare") {
p.sql = fmt.Sprintf("prepare %s from %s", p.name, p.sql)
}
stmts, err := mysql.Parse(ses.requestCtx, p.sql, v.(int64))
if err != nil {
return err
}
if _, err = doPrepareStmt(ses.requestCtx, ses, stmts[0].(*tree.PrepareStmt), p.sql); err != nil {
return err
}
return nil
}

func (ses *Session) Migrate(req *query.MigrateConnToRequest) error {
dbm := dbMigration{
db: req.DB,
}
if err := dbm.Migrate(ses); err != nil {
return err
}

var maxStmtID uint32
for _, p := range req.PrepareStmts {
if p == nil {
continue
}
pm := prepareStmtMigration{
name: p.Name,
sql: p.SQL,
}
if err := pm.Migrate(ses); err != nil {
return err
}
id := parsePrepareStmtID(p.Name)
if id > maxStmtID {
maxStmtID = id
}
}
if maxStmtID > 0 {
ses.SetLastStmtID(maxStmtID)
}
return nil
}
Loading
Loading