/
cmd_processor.go
116 lines (102 loc) · 2.99 KB
/
cmd_processor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
// Copyright 2023 PingCAP, Inc.
// SPDX-License-Identifier: Apache-2.0
package backend
import (
"encoding/binary"
pnet "github.com/pingcap/tiproxy/pkg/proxy/net"
"go.uber.org/zap"
)
const (
StatusInTrans uint32 = 1 << iota
StatusQuit
StatusPrepareWaitExecute
StatusPrepareWaitFetch
)
// CmdProcessor maintains the transaction and prepared statement status and decides whether the session can be redirected.
type CmdProcessor struct {
// Each prepared statement has an independent status.
preparedStmtStatus map[int]uint32
capability pnet.Capability
// Only includes in_trans or quit status.
serverStatus uint32
logger *zap.Logger
}
func NewCmdProcessor(logger *zap.Logger) *CmdProcessor {
return &CmdProcessor{
serverStatus: 0,
preparedStmtStatus: make(map[int]uint32),
logger: logger,
}
}
func (cp *CmdProcessor) handleOKPacket(request, response []byte) uint16 {
status := pnet.ParseOKPacket(response)
cp.updateServerStatus(request, status)
return status
}
func (cp *CmdProcessor) handleErrorPacket(data []byte) error {
return pnet.ParseErrorPacket(data)
}
func (cp *CmdProcessor) handleEOFPacket(request, response []byte) uint16 {
serverStatus := binary.LittleEndian.Uint16(response[3:])
cp.updateServerStatus(request, serverStatus)
return serverStatus
}
func (cp *CmdProcessor) updateServerStatus(request []byte, serverStatus uint16) {
cp.updateTxnStatus(serverStatus)
cp.updatePrepStmtStatus(request, serverStatus)
}
func (cp *CmdProcessor) updateTxnStatus(serverStatus uint16) {
if serverStatus&pnet.ServerStatusInTrans > 0 {
cp.serverStatus |= StatusInTrans
} else {
cp.serverStatus &^= StatusInTrans
}
}
func (cp *CmdProcessor) updatePrepStmtStatus(request []byte, serverStatus uint16) {
var (
stmtID int
prepStmtStatus uint32
)
cmd := pnet.Command(request[0])
switch cmd {
case pnet.ComStmtSendLongData, pnet.ComStmtExecute, pnet.ComStmtFetch, pnet.ComStmtReset, pnet.ComStmtClose:
stmtID = int(binary.LittleEndian.Uint32(request[1:5]))
case pnet.ComResetConnection, pnet.ComChangeUser:
cp.preparedStmtStatus = make(map[int]uint32)
return
default:
return
}
switch cmd {
case pnet.ComStmtSendLongData:
prepStmtStatus = StatusPrepareWaitExecute
case pnet.ComStmtExecute:
if serverStatus&pnet.ServerStatusCursorExists > 0 {
prepStmtStatus = StatusPrepareWaitFetch
}
case pnet.ComStmtFetch:
if serverStatus&pnet.ServerStatusLastRowSend == 0 {
prepStmtStatus = StatusPrepareWaitFetch
}
}
if prepStmtStatus > 0 {
cp.preparedStmtStatus[stmtID] = prepStmtStatus
} else {
delete(cp.preparedStmtStatus, stmtID)
}
}
func (cp *CmdProcessor) finishedTxn() bool {
if cp.serverStatus&(StatusInTrans|StatusQuit) > 0 {
return false
}
// If any result of the prepared statements is not fetched, we should wait.
return !cp.hasPendingPreparedStmts()
}
func (cp *CmdProcessor) hasPendingPreparedStmts() bool {
for _, serverStatus := range cp.preparedStmtStatus {
if serverStatus > 0 {
return true
}
}
return false
}