Skip to content

Commit

Permalink
extract resultset package out of server package
Browse files Browse the repository at this point in the history
Signed-off-by: Yang Keao <yangkeao@chunibyo.icu>
  • Loading branch information
YangKeao committed Jul 11, 2023
1 parent 3443e52 commit b78344b
Show file tree
Hide file tree
Showing 12 changed files with 322 additions and 224 deletions.
2 changes: 2 additions & 0 deletions server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ go_library(
"//server/internal/dump",
"//server/internal/handshake",
"//server/internal/parse",
"//server/internal/resultset",
"//server/internal/util",
"//server/metrics",
"//session",
Expand Down Expand Up @@ -185,6 +186,7 @@ go_test(
"//server/internal/column",
"//server/internal/handshake",
"//server/internal/parse",
"//server/internal/resultset",
"//server/internal/util",
"//session",
"//sessionctx",
Expand Down
11 changes: 6 additions & 5 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ import (
"github.com/pingcap/tidb/server/internal/dump"
"github.com/pingcap/tidb/server/internal/handshake"
"github.com/pingcap/tidb/server/internal/parse"
"github.com/pingcap/tidb/server/internal/resultset"
util2 "github.com/pingcap/tidb/server/internal/util"
server_metrics "github.com/pingcap/tidb/server/metrics"
"github.com/pingcap/tidb/session"
Expand Down Expand Up @@ -2117,7 +2118,7 @@ func (cc *clientConn) handleFieldList(ctx context.Context, sql string) (err erro
// retryable indicates whether the call of writeResultSet has no side effect and can be retried to correct error. The call
// has side effect in cursor mode or once data has been sent to client. Currently retryable is used to fallback to TiKV when
// TiFlash is down.
func (cc *clientConn) writeResultSet(ctx context.Context, rs ResultSet, binary bool, serverStatus uint16, fetchSize int) (retryable bool, runErr error) {
func (cc *clientConn) writeResultSet(ctx context.Context, rs resultset.ResultSet, binary bool, serverStatus uint16, fetchSize int) (retryable bool, runErr error) {
defer func() {
// close ResultSet when cursor doesn't exist
r := recover()
Expand All @@ -2134,7 +2135,7 @@ func (cc *clientConn) writeResultSet(ctx context.Context, rs ResultSet, binary b
cc.initResultEncoder(ctx)
defer cc.rsEncoder.Clean()
if mysql.HasCursorExistsFlag(serverStatus) {
crs, ok := rs.(cursorResultSet)
crs, ok := rs.(resultset.CursorResultSet)
if !ok {
// this branch is actually unreachable
return false, errors.New("this cursor is not a resultSet")
Expand Down Expand Up @@ -2171,7 +2172,7 @@ func (cc *clientConn) writeColumnInfo(columns []*column.Info) error {
// binary specifies the way to dump data. It throws any error while dumping data.
// serverStatus, a flag bit represents server information
// The first return value indicates whether error occurs at the first call of ResultSet.Next.
func (cc *clientConn) writeChunks(ctx context.Context, rs ResultSet, binary bool, serverStatus uint16) (bool, error) {
func (cc *clientConn) writeChunks(ctx context.Context, rs resultset.ResultSet, binary bool, serverStatus uint16) (bool, error) {
data := cc.alloc.AllocWithLen(4, 1024)
req := rs.NewChunk(cc.chunkAlloc)
gotColumnInfo := false
Expand Down Expand Up @@ -2273,7 +2274,7 @@ func (cc *clientConn) writeChunks(ctx context.Context, rs ResultSet, binary bool
// binary specifies the way to dump data. It throws any error while dumping data.
// serverStatus, a flag bit represents server information.
// fetchSize, the desired number of rows to be fetched each time when client uses cursor.
func (cc *clientConn) writeChunksWithFetchSize(ctx context.Context, rs cursorResultSet, serverStatus uint16, fetchSize int) error {
func (cc *clientConn) writeChunksWithFetchSize(ctx context.Context, rs resultset.CursorResultSet, serverStatus uint16, fetchSize int) error {
var (
stmtDetail *execdetails.StmtExecDetails
err error
Expand Down Expand Up @@ -2321,7 +2322,7 @@ func (cc *clientConn) writeChunksWithFetchSize(ctx context.Context, rs cursorRes
stmtDetail.WriteSQLRespDuration += time.Since(start)
}

if cl, ok := rs.(fetchNotifier); ok {
if cl, ok := rs.(resultset.FetchNotifier); ok {
cl.OnFetchReturned()
}

Expand Down
13 changes: 5 additions & 8 deletions server/conn_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/server/internal/dump"
"github.com/pingcap/tidb/server/internal/parse"
"github.com/pingcap/tidb/server/internal/resultset"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn"
storeerr "github.com/pingcap/tidb/store/driver/error"
Expand Down Expand Up @@ -321,19 +322,15 @@ func (cc *clientConn) executePreparedStmtAndWriteResult(ctx context.Context, stm
}
return false, cc.writeOK(ctx)
}
// since there are multiple implementations of ResultSet (the rs might be wrapped), we have to unwrap the rs before
// casting it to *tidbResultSet.
if result, ok := rs.(*tidbResultSet); ok {
if planCacheStmt, ok := prepStmt.(*plannercore.PlanCacheStmt); ok {
result.preparedStmt = planCacheStmt
}
if planCacheStmt, ok := prepStmt.(*plannercore.PlanCacheStmt); ok {
rs.SetPreparedStmt(planCacheStmt)
}

// if the client wants to use cursor
// we should hold the ResultSet in PreparedStatement for next stmt_fetch, and only send back ColumnInfo.
// Tell the client cursor exists in server by setting proper serverStatus.
if useCursor {
crs := wrapWithCursor(rs)
crs := resultset.WrapWithCursor(rs)

cc.initResultEncoder(ctx)
defer cc.rsEncoder.Clean()
Expand Down Expand Up @@ -392,7 +389,7 @@ func (cc *clientConn) executePreparedStmtAndWriteResult(ctx context.Context, stm
crs.StoreRowContainerReader(reader)
stmt.StoreResultSet(crs)
stmt.StoreRowContainer(rowContainer)
if cl, ok := crs.(fetchNotifier); ok {
if cl, ok := crs.(resultset.FetchNotifier); ok {
cl.OnFetchReturned()
}
stmt.SetCursorActive(true)
Expand Down
41 changes: 4 additions & 37 deletions server/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ import (

"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/extension"
"github.com/pingcap/tidb/server/internal/column"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/server/internal/resultset"
"github.com/pingcap/tidb/util/chunk"
)

Expand All @@ -37,7 +36,7 @@ type PreparedStatement interface {
ID() int

// Execute executes the statement.
Execute(context.Context, []expression.Expression) (ResultSet, error)
Execute(context.Context, []expression.Expression) (resultset.ResultSet, error)

// AppendParam appends parameter to the statement.
AppendParam(paramID int, data []byte) error
Expand All @@ -55,10 +54,10 @@ type PreparedStatement interface {
GetParamsType() []byte

// StoreResultSet stores ResultSet for subsequent stmt fetching
StoreResultSet(rs cursorResultSet)
StoreResultSet(rs resultset.CursorResultSet)

// GetResultSet gets ResultSet associated this statement
GetResultSet() cursorResultSet
GetResultSet() resultset.CursorResultSet

// Reset removes all bound parameters and opened resultSet/rowContainer.
Reset() error
Expand All @@ -79,35 +78,3 @@ type PreparedStatement interface {
// GetRowContainer returns the row container of the statement
GetRowContainer() *chunk.RowContainer
}

// ResultSet is the result set of an query.
type ResultSet interface {
Columns() []*column.Info
NewChunk(chunk.Allocator) *chunk.Chunk
Next(context.Context, *chunk.Chunk) error
Close() error
// IsClosed checks whether the result set is closed.
IsClosed() bool
FieldTypes() []*types.FieldType
}

// cursorResultSet extends the `ResultSet` to provide the ability to store an iterator
type cursorResultSet interface {
ResultSet

StoreRowContainerReader(reader chunk.RowContainerReader)
GetRowContainerReader() chunk.RowContainerReader
}

// fetchNotifier represents notifier will be called in COM_FETCH.
type fetchNotifier interface {
// OnFetchReturned be called when COM_FETCH returns.
// it will be used in server-side cursor.
OnFetchReturned()
}

func wrapWithCursor(rs ResultSet) cursorResultSet {
return &tidbCursorResultSet{
rs, nil,
}
}

0 comments on commit b78344b

Please sign in to comment.