Skip to content

Commit

Permalink
Merge branch '1.1-dev' into cp-14899-to-1.1
Browse files Browse the repository at this point in the history
  • Loading branch information
sukki37 committed Mar 12, 2024
2 parents ab5068a + e5a1c21 commit 78a5778
Show file tree
Hide file tree
Showing 41 changed files with 12,576 additions and 4,361 deletions.
35 changes: 35 additions & 0 deletions pkg/cnservice/server_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/lockservice"
"github.com/matrixorigin/matrixone/pkg/logutil"
pblock "github.com/matrixorigin/matrixone/pkg/pb/lock"
"github.com/matrixorigin/matrixone/pkg/pb/query"
"github.com/matrixorigin/matrixone/pkg/pb/status"
Expand Down Expand Up @@ -53,6 +54,8 @@ func (s *service) initQueryCommandHandler() {
s.queryService.AddHandleFunc(query.CmdMethod_RunTask, s.handleRunTask, false)
s.queryService.AddHandleFunc(query.CmdMethod_RemoveRemoteLockTable, s.handleRemoveRemoteLockTable, false)
s.queryService.AddHandleFunc(query.CmdMethod_GetPipelineInfo, s.handleGetPipelineInfo, false)
s.queryService.AddHandleFunc(query.CmdMethod_MigrateConnFrom, s.handleMigrateConnFrom, false)
s.queryService.AddHandleFunc(query.CmdMethod_MigrateConnTo, s.handleMigrateConnTo, false)
}

func (s *service) handleKillConn(ctx context.Context, req *query.Request, resp *query.Response) error {
Expand Down Expand Up @@ -329,3 +332,35 @@ func (s *service) handleGetPipelineInfo(ctx context.Context, req *query.Request,
}
return nil
}

func (s *service) handleMigrateConnFrom(
ctx context.Context, req *query.Request, resp *query.Response,
) error {
if req.MigrateConnFromRequest == nil {
return moerr.NewInternalError(ctx, "bad request")
}
rm := s.mo.GetRoutineManager()
resp.MigrateConnFromResponse = &query.MigrateConnFromResponse{}
if err := rm.MigrateConnectionFrom(req.MigrateConnFromRequest, resp.MigrateConnFromResponse); err != nil {
logutil.Errorf("failed to migrate conn from: %v", err)
return err
}
return nil
}

func (s *service) handleMigrateConnTo(
ctx context.Context, req *query.Request, resp *query.Response,
) error {
if req.MigrateConnToRequest == nil {
return moerr.NewInternalError(ctx, "bad request")
}
rm := s.mo.GetRoutineManager()
if err := rm.MigrateConnectionTo(req.MigrateConnToRequest); err != nil {
logutil.Errorf("failed to migrate conn to: %v", err)
return err
}
resp.MigrateConnToResponse = &query.MigrateConnToResponse{
Success: true,
}
return nil
}
12 changes: 12 additions & 0 deletions pkg/frontend/mysql_cmd_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,18 @@ func getPrepareStmtName(stmtID uint32) string {
return fmt.Sprintf("%s_%d", prefixPrepareStmtName, stmtID)
}

func parsePrepareStmtID(s string) uint32 {
if strings.HasPrefix(s, prefixPrepareStmtName) {
ss := strings.Split(s, "_")
v, err := strconv.ParseUint(ss[len(ss)-1], 10, 64)
if err != nil {
return 0
}
return uint32(v)
}
return 0
}

func GetPrepareStmtID(ctx context.Context, name string) (int, error) {
idx := len(prefixPrepareStmtName) + 1
if idx >= len(name) {
Expand Down
24 changes: 24 additions & 0 deletions pkg/frontend/routine.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/config"
"github.com/matrixorigin/matrixone/pkg/defines"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/pb/query"
"github.com/matrixorigin/matrixone/pkg/util/metric"
"github.com/matrixorigin/matrixone/pkg/util/trace"
"go.uber.org/zap"
Expand Down Expand Up @@ -64,6 +65,8 @@ type Routine struct {
restricted atomic.Bool

printInfoOnce bool

migrateOnce sync.Once
}

func (rt *Routine) needPrintSessionInfo() bool {
Expand Down Expand Up @@ -379,6 +382,27 @@ func (rt *Routine) cleanup() {
})
}

func (rt *Routine) migrateConnectionTo(req *query.MigrateConnToRequest) error {
var err error
rt.migrateOnce.Do(func() {
ses := rt.getSession()
err = ses.Migrate(req)
})
return err
}

func (rt *Routine) migrateConnectionFrom(resp *query.MigrateConnFromResponse) error {
ses := rt.getSession()
resp.DB = ses.GetDatabaseName()
for _, st := range ses.GetPrepareStmts() {
resp.PrepareStmts = append(resp.PrepareStmts, &query.PrepareStmt{
Name: st.Name,
SQL: st.Sql,
})
}
return nil
}

func NewRoutine(ctx context.Context, protocol MysqlProtocol, executor CmdExecutor, parameters *config.FrontendParameters, rs goetty.IOSession) *Routine {
ctx = trace.Generate(ctx) // fill span{trace_id} in ctx
cancelRoutineCtx, cancelRoutineFunc := context.WithCancel(ctx)
Expand Down
66 changes: 51 additions & 15 deletions pkg/frontend/routine_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/config"
"github.com/matrixorigin/matrixone/pkg/defines"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/pb/query"
"github.com/matrixorigin/matrixone/pkg/queryservice"
"github.com/matrixorigin/matrixone/pkg/util/metric"
v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2"
Expand All @@ -39,15 +40,17 @@ import (
)

type RoutineManager struct {
mu sync.RWMutex
ctx context.Context
clients map[goetty.IOSession]*Routine
pu *config.ParameterUnit
tlsConfig *tls.Config
aicm *defines.AutoIncrCacheManager
accountRoutine *AccountRoutineManager
baseService BaseService
sessionManager *queryservice.SessionManager
mu sync.RWMutex
ctx context.Context
clients map[goetty.IOSession]*Routine
// routinesByID keeps the routines by connection ID.
routinesByConnID map[uint32]*Routine
pu *config.ParameterUnit
tlsConfig *tls.Config
aicm *defines.AutoIncrCacheManager
accountRoutine *AccountRoutineManager
baseService BaseService
sessionManager *queryservice.SessionManager
}

type AccountRoutineManager struct {
Expand Down Expand Up @@ -167,10 +170,11 @@ func (rm *RoutineManager) getCtx() context.Context {
return rm.ctx
}

func (rm *RoutineManager) setRoutine(rs goetty.IOSession, r *Routine) {
func (rm *RoutineManager) setRoutine(rs goetty.IOSession, id uint32, r *Routine) {
rm.mu.Lock()
defer rm.mu.Unlock()
rm.clients[rs] = r
rm.routinesByConnID[id] = r
}

func (rm *RoutineManager) getRoutine(rs goetty.IOSession) *Routine {
Expand All @@ -179,6 +183,16 @@ func (rm *RoutineManager) getRoutine(rs goetty.IOSession) *Routine {
return rm.clients[rs]
}

func (rm *RoutineManager) getRoutineByConnID(id uint32) *Routine {
rm.mu.RLock()
defer rm.mu.RUnlock()
r, ok := rm.routinesByConnID[id]
if ok {
return r
}
return nil
}

func (rm *RoutineManager) deleteRoutine(rs goetty.IOSession) *Routine {
var rt *Routine
var ok bool
Expand All @@ -188,6 +202,11 @@ func (rm *RoutineManager) deleteRoutine(rs goetty.IOSession) *Routine {
if ok {
delete(rm.clients, rs)
}
connID := rt.getConnectionID()
_, ok = rm.routinesByConnID[connID]
if ok {
delete(rm.routinesByConnID, connID)
}
return rt
}

Expand Down Expand Up @@ -283,7 +302,7 @@ func (rm *RoutineManager) Created(rs goetty.IOSession) {
}

logDebugf(pro.GetDebugString(), "have sent handshake packet to connection %s", rs.RemoteAddress())
rm.setRoutine(rs, routine)
rm.setRoutine(rs, pro.connectionID, routine)
}

/*
Expand Down Expand Up @@ -607,6 +626,22 @@ func (rm *RoutineManager) KillRoutineConnections() {
rm.cleanKillQueue()
}

func (rm *RoutineManager) MigrateConnectionTo(req *query.MigrateConnToRequest) error {
routine := rm.getRoutineByConnID(req.ConnID)
if routine == nil {
return moerr.NewInternalError(rm.ctx, "cannot get routine to migrate connection %d", req.ConnID)
}
return routine.migrateConnectionTo(req)
}

func (rm *RoutineManager) MigrateConnectionFrom(req *query.MigrateConnFromRequest, resp *query.MigrateConnFromResponse) error {
routine, ok := rm.routinesByConnID[req.ConnID]
if !ok {
return moerr.NewInternalError(rm.ctx, "cannot get routine to migrate connection %d", req.ConnID)
}
return routine.migrateConnectionFrom(resp)
}

func NewRoutineManager(ctx context.Context, pu *config.ParameterUnit, aicm *defines.AutoIncrCacheManager) (*RoutineManager, error) {
accountRoutine := &AccountRoutineManager{
killQueueMu: sync.RWMutex{},
Expand All @@ -616,10 +651,11 @@ func NewRoutineManager(ctx context.Context, pu *config.ParameterUnit, aicm *defi
ctx: ctx,
}
rm := &RoutineManager{
ctx: ctx,
clients: make(map[goetty.IOSession]*Routine),
pu: pu,
accountRoutine: accountRoutine,
ctx: ctx,
clients: make(map[goetty.IOSession]*Routine),
routinesByConnID: make(map[uint32]*Routine),
pu: pu,
accountRoutine: accountRoutine,
}

rm.aicm = aicm
Expand Down
82 changes: 82 additions & 0 deletions pkg/frontend/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/pb/metadata"
"github.com/matrixorigin/matrixone/pkg/pb/plan"
"github.com/matrixorigin/matrixone/pkg/pb/query"
"github.com/matrixorigin/matrixone/pkg/pb/status"
"github.com/matrixorigin/matrixone/pkg/pb/timestamp"
"github.com/matrixorigin/matrixone/pkg/sql/colexec"
Expand Down Expand Up @@ -1032,6 +1033,12 @@ func (ses *Session) GenNewStmtId() uint32 {
return ses.lastStmtId
}

func (ses *Session) SetLastStmtID(id uint32) {
ses.mu.Lock()
defer ses.mu.Unlock()
ses.lastStmtId = id
}

func (ses *Session) GetLastStmtId() uint32 {
ses.mu.Lock()
defer ses.mu.Unlock()
Expand Down Expand Up @@ -1243,6 +1250,16 @@ func (ses *Session) GetPrepareStmt(name string) (*PrepareStmt, error) {
return nil, moerr.NewInvalidState(ses.requestCtx, "prepared statement '%s' does not exist", name)
}

func (ses *Session) GetPrepareStmts() []*PrepareStmt {
ses.mu.Lock()
defer ses.mu.Unlock()
ret := make([]*PrepareStmt, 0, len(ses.prepareStmts))
for _, st := range ses.prepareStmts {
ret = append(ret, st)
}
return ret
}

func (ses *Session) RemovePrepareStmt(name string) {
ses.mu.Lock()
defer ses.mu.Unlock()
Expand Down Expand Up @@ -2380,3 +2397,68 @@ func checkPlanIsInsertValues(proc *process.Process,
}
return false, nil
}

type dbMigration struct {
db string
}

func (d *dbMigration) Migrate(ses *Session) error {
if d.db == "" {
return nil
}
return doUse(ses.requestCtx, ses, d.db)
}

type prepareStmtMigration struct {
name string
sql string
}

func (p *prepareStmtMigration) Migrate(ses *Session) error {
v, err := ses.GetGlobalVar("lower_case_table_names")
if err != nil {
return err
}
if !strings.HasPrefix(strings.ToLower(p.sql), "prepare") {
p.sql = fmt.Sprintf("prepare %s from %s", p.name, p.sql)
}
stmts, err := mysql.Parse(ses.requestCtx, p.sql, v.(int64))
if err != nil {
return err
}
if _, err = doPrepareStmt(ses.requestCtx, ses, stmts[0].(*tree.PrepareStmt), p.sql); err != nil {
return err
}
return nil
}

func (ses *Session) Migrate(req *query.MigrateConnToRequest) error {
dbm := dbMigration{
db: req.DB,
}
if err := dbm.Migrate(ses); err != nil {
return err
}

var maxStmtID uint32
for _, p := range req.PrepareStmts {
if p == nil {
continue
}
pm := prepareStmtMigration{
name: p.Name,
sql: p.SQL,
}
if err := pm.Migrate(ses); err != nil {
return err
}
id := parsePrepareStmtID(p.Name)
if id > maxStmtID {
maxStmtID = id
}
}
if maxStmtID > 0 {
ses.SetLastStmtID(maxStmtID)
}
return nil
}
Loading

0 comments on commit 78a5778

Please sign in to comment.