Skip to content

Commit

Permalink
expression: seperate BuildContext and EvalContext
Browse files Browse the repository at this point in the history
  • Loading branch information
lcwangchao committed Apr 2, 2024
1 parent 3cfea6a commit 2bdaf6f
Show file tree
Hide file tree
Showing 105 changed files with 440 additions and 418 deletions.
2 changes: 1 addition & 1 deletion pkg/ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -1970,7 +1970,7 @@ func generateOriginDefaultValue(col *model.ColumnInfo, ctx sessionctx.Context) (
if ctx == nil {
t = time.Now()
} else {
t, _ = expression.GetStmtTimestamp(ctx.GetExprCtx())
t, _ = expression.GetStmtTimestamp(ctx.GetExprCtx().GetEvalCtx())
}
if col.GetType() == mysql.TypeTimestamp {
odValue = types.NewTime(types.FromGoTime(t.UTC()), col.GetType(), col.GetDecimal()).String()
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3391,7 +3391,7 @@ func parseAndEvalBoolExpr(ctx expression.BuildContext, l, r string, colInfo *mod
return false, err
}
e.SetCharsetAndCollation(colInfo.GetCharset(), colInfo.GetCollate())
res, _, err1 := e.EvalInt(ctx, chunk.Row{})
res, _, err1 := e.EvalInt(ctx.GetEvalCtx(), chunk.Row{})
if err1 != nil {
return false, err1
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -725,7 +725,7 @@ func getPartitionIntervalFromTable(ctx expression.BuildContext, tbInfo *model.Ta
if err != nil {
return nil
}
val, isNull, err := expr.EvalInt(ctx, chunk.Row{})
val, isNull, err := expr.EvalInt(ctx.GetEvalCtx(), chunk.Row{})
if isNull || err != nil || val < 1 {
// If NULL, error or interval < 1 then cannot be an INTERVAL partitioned table
return nil
Expand All @@ -748,7 +748,7 @@ func getPartitionIntervalFromTable(ctx expression.BuildContext, tbInfo *model.Ta
if err != nil {
return nil
}
val, isNull, err := expr.EvalInt(ctx, chunk.Row{})
val, isNull, err := expr.EvalInt(ctx.GetEvalCtx(), chunk.Row{})
if isNull || err != nil || val < 1 {
// If NULL, error or interval < 1 then cannot be an INTERVAL partitioned table
return nil
Expand Down Expand Up @@ -1763,7 +1763,7 @@ func formatListPartitionValue(ctx expression.BuildContext, tblInfo *model.TableI
if err != nil {
return nil, errors.Trace(err)
}
eval, err := expr.Eval(ctx, chunk.Row{})
eval, err := expr.Eval(ctx.GetEvalCtx(), chunk.Row{})
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -1803,7 +1803,7 @@ func getRangeValue(ctx expression.BuildContext, str string, unsigned bool) (any,
if err1 != nil {
return 0, false, err1
}
res, isNull, err2 := e.EvalInt(ctx, chunk.Row{})
res, isNull, err2 := e.EvalInt(ctx.GetEvalCtx(), chunk.Row{})
if err2 == nil && !isNull {
return uint64(res), true, nil
}
Expand All @@ -1819,7 +1819,7 @@ func getRangeValue(ctx expression.BuildContext, str string, unsigned bool) (any,
if err1 != nil {
return 0, false, err1
}
res, isNull, err2 := e.EvalInt(ctx, chunk.Row{})
res, isNull, err2 := e.EvalInt(ctx.GetEvalCtx(), chunk.Row{})
if err2 == nil && !isNull {
return res, true, nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ func (e *RecoverIndexExec) buildIndexedValues(row chunk.Row, idxVals []types.Dat
sctx := e.Ctx()
for i, col := range e.index.Meta().Columns {
if e.table.Meta().Columns[col.Offset].IsGenerated() {
val, err := e.cols[col.Offset].EvalVirtualColumn(sctx.GetExprCtx(), row)
val, err := e.cols[col.Offset].EvalVirtualColumn(sctx.GetExprCtx().GetEvalCtx(), row)
if err != nil {
return nil, err
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/executor/aggfuncs/aggfunc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ func buildAggTesterWithFieldType(funcName string, ft *types.FieldType, numRows i
return pt
}

func testMultiArgsMergePartialResult(t *testing.T, ctx expression.BuildContext, p multiArgsAggTest) {
func testMultiArgsMergePartialResult(t *testing.T, ctx *mock.Context, p multiArgsAggTest) {
srcChk := p.genSrcChk()
iter := chunk.NewIterator4Chunk(srcChk)

Expand Down Expand Up @@ -666,7 +666,7 @@ func testAggMemFunc(t *testing.T, p aggMemTest) {
}
}

func testMultiArgsAggFunc(t *testing.T, ctx expression.BuildContext, p multiArgsAggTest) {
func testMultiArgsAggFunc(t *testing.T, ctx *mock.Context, p multiArgsAggTest) {
srcChk := p.genSrcChk()

args := make([]expression.Expression, len(p.dataTypes))
Expand Down Expand Up @@ -790,7 +790,7 @@ func testMultiArgsAggMemFunc(t *testing.T, p multiArgsAggMemTest) {
}
}

func benchmarkAggFunc(b *testing.B, ctx expression.BuildContext, p aggTest) {
func benchmarkAggFunc(b *testing.B, ctx *mock.Context, p aggTest) {
srcChk := chunk.NewChunkWithCapacity([]*types.FieldType{p.dataType}, p.numRows)
for i := 0; i < p.numRows; i++ {
dt := p.dataGen(i)
Expand Down Expand Up @@ -838,7 +838,7 @@ func benchmarkAggFunc(b *testing.B, ctx expression.BuildContext, p aggTest) {
})
}

func benchmarkMultiArgsAggFunc(b *testing.B, ctx expression.BuildContext, p multiArgsAggTest) {
func benchmarkMultiArgsAggFunc(b *testing.B, ctx *mock.Context, p multiArgsAggTest) {
srcChk := chunk.NewChunkWithCapacity(p.dataTypes, p.numRows)
for i := 0; i < p.numRows; i++ {
for j := 0; j < len(p.dataGens); j++ {
Expand Down Expand Up @@ -892,7 +892,7 @@ func benchmarkMultiArgsAggFunc(b *testing.B, ctx expression.BuildContext, p mult
})
}

func baseBenchmarkAggFunc(b *testing.B, ctx expression.BuildContext, finalFunc aggfuncs.AggFunc, input []chunk.Row, output *chunk.Chunk) {
func baseBenchmarkAggFunc(b *testing.B, ctx aggfuncs.AggFuncUpdateContext, finalFunc aggfuncs.AggFunc, input []chunk.Row, output *chunk.Chunk) {
finalPr, _ := finalFunc.AllocPartialResult()
output.Reset()
b.ResetTimer()
Expand Down
10 changes: 5 additions & 5 deletions pkg/executor/aggfuncs/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func buildApproxPercentile(sctx AggFuncBuildContext, aggFuncDesc *aggregation.Ag
}

// Checked while building descriptor
percent, _, err := aggFuncDesc.Args[1].EvalInt(sctx, chunk.Row{})
percent, _, err := aggFuncDesc.Args[1].EvalInt(sctx.GetEvalCtx(), chunk.Row{})
if err != nil {
// Should not reach here
logutil.BgLogger().Error("Error happened when buildApproxPercentile", zap.Error(err))
Expand Down Expand Up @@ -472,7 +472,7 @@ func buildGroupConcat(ctx AggFuncBuildContext, aggFuncDesc *aggregation.AggFuncD
default:
// The last arg is promised to be a not-null string constant, so the error can be ignored.
c, _ := aggFuncDesc.Args[len(aggFuncDesc.Args)-1].(*expression.Constant)
sep, _, err := c.EvalString(ctx, chunk.Row{})
sep, _, err := c.EvalString(ctx.GetEvalCtx(), chunk.Row{})
// This err should never happen.
if err != nil {
panic(fmt.Sprintf("Error happened when buildGroupConcat: %s", err.Error()))
Expand Down Expand Up @@ -689,7 +689,7 @@ func buildNthValue(ctx AggFuncBuildContext, aggFuncDesc *aggregation.AggFuncDesc
ordinal: ordinal,
}
// Already checked when building the function description.
nth, _, _ := expression.GetUint64FromConstant(ctx, aggFuncDesc.Args[1])
nth, _, _ := expression.GetUint64FromConstant(ctx.GetEvalCtx(), aggFuncDesc.Args[1])
return &nthValue{baseAggFunc: base, tp: aggFuncDesc.RetTp, nth: nth}
}

Expand All @@ -698,7 +698,7 @@ func buildNtile(ctx AggFuncBuildContext, aggFuncDes *aggregation.AggFuncDesc, or
args: aggFuncDes.Args,
ordinal: ordinal,
}
n, _, _ := expression.GetUint64FromConstant(ctx, aggFuncDes.Args[0])
n, _, _ := expression.GetUint64FromConstant(ctx.GetEvalCtx(), aggFuncDes.Args[0])
return &ntile{baseAggFunc: base, n: n}
}

Expand All @@ -712,7 +712,7 @@ func buildPercentRank(ordinal int, orderByCols []*expression.Column) AggFunc {
func buildLeadLag(ctx AggFuncBuildContext, aggFuncDesc *aggregation.AggFuncDesc, ordinal int) baseLeadLag {
offset := uint64(1)
if len(aggFuncDesc.Args) >= 2 {
offset, _, _ = expression.GetUint64FromConstant(ctx, aggFuncDesc.Args[1])
offset, _, _ = expression.GetUint64FromConstant(ctx.GetEvalCtx(), aggFuncDesc.Args[1])
}
var defaultExpr expression.Expression
defaultExpr = expression.NewNull()
Expand Down
4 changes: 2 additions & 2 deletions pkg/executor/aggfuncs/func_group_concat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func groupConcatOrderMultiArgsUpdateMemDeltaGens(ctx sessionctx.Context, srcChk
}
memDelta := int64(buffer.Cap() - oldMemSize)
for _, byItem := range byItems {
fdt, _ := byItem.Expr.Eval(ctx.GetExprCtx(), row)
fdt, _ := byItem.Expr.Eval(ctx.GetExprCtx().GetEvalCtx(), row)
datumMem := aggfuncs.GetDatumMemSize(&fdt)
memDelta += datumMem
}
Expand Down Expand Up @@ -202,7 +202,7 @@ func groupConcatDistinctOrderMultiArgsUpdateMemDeltaGens(ctx sessionctx.Context,
valSet.Insert(joinedVal)
memDelta := int64(len(joinedVal) + (valsBuf.Cap() + cap(encodeBytesBuffer) - oldMemSize))
for _, byItem := range byItems {
fdt, _ := byItem.Expr.Eval(ctx.GetExprCtx(), row)
fdt, _ := byItem.Expr.Eval(ctx.GetExprCtx().GetEvalCtx(), row)
datumMem := aggfuncs.GetDatumMemSize(&fdt)
memDelta += datumMem
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/executor/aggregate/agg_hash_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,7 @@ func (e *HashAggExec) unparallelExec(ctx context.Context, chk *chunk.Chunk) erro
chk.SetNumVirtualRows(chk.NumRows() + 1)
}
for i, af := range e.PartialAggFuncs {
if err := af.AppendFinalResult2Chunk(exprCtx, partialResults[i], chk); err != nil {
if err := af.AppendFinalResult2Chunk(exprCtx.GetEvalCtx(), partialResults[i], chk); err != nil {
return err
}
}
Expand Down Expand Up @@ -740,7 +740,7 @@ func (e *HashAggExec) execute(ctx context.Context) (err error) {
partialResults := e.getPartialResults(groupKey)
for i, af := range e.PartialAggFuncs {
tmpBuf[0] = e.childResult.GetRow(j)
memDelta, err := af.UpdatePartialResult(exprCtx, tmpBuf[:], partialResults[i])
memDelta, err := af.UpdatePartialResult(exprCtx.GetEvalCtx(), tmpBuf[:], partialResults[i])
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/executor/aggregate/agg_hash_final_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (w *HashAggFinalWorker) mergeInputIntoResultMap(sctx sessionctx.Context, in
}

for j, af := range w.aggFuncs {
memDelta, err := af.MergePartialResult(exprCtx, value[j], dstVal[j])
memDelta, err := af.MergePartialResult(exprCtx.GetEvalCtx(), value[j], dstVal[j])
if err != nil {
return err
}
Expand Down Expand Up @@ -144,7 +144,7 @@ func (w *HashAggFinalWorker) generateResultAndSend(sctx sessionctx.Context, resu
exprCtx := sctx.GetExprCtx()
for _, results := range w.partialResultMap {
for j, af := range w.aggFuncs {
if err := af.AppendFinalResult2Chunk(exprCtx, results[j], result); err != nil {
if err := af.AppendFinalResult2Chunk(exprCtx.GetEvalCtx(), results[j], result); err != nil {
logutil.BgLogger().Error("HashAggFinalWorker failed to append final result to Chunk", zap.Error(err))
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/aggregate/agg_hash_partial_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ func (w *HashAggPartialWorker) updatePartialResult(ctx sessionctx.Context, chk *
partialResult := partialResultOfEachRow[i]
rows[0] = chk.GetRow(i)
for j, af := range w.aggFuncs {
memDelta, err := af.UpdatePartialResult(exprCtx, rows, partialResult[j])
memDelta, err := af.UpdatePartialResult(exprCtx.GetEvalCtx(), rows, partialResult[j])
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/aggregate/agg_spill.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func (p *parallelHashAggSpillHelper) processRow(context *processRowContext) (tot
exprCtx := context.ctx.GetExprCtx()
// The key has appeared before, merge results.
for aggPos := 0; aggPos < context.aggFuncNum; aggPos++ {
memDelta, err := p.aggFuncsForRestoring[aggPos].MergePartialResult(exprCtx, context.partialResultsRestored[aggPos][context.rowPos], prs[aggPos])
memDelta, err := p.aggFuncsForRestoring[aggPos].MergePartialResult(exprCtx.GetEvalCtx(), context.partialResultsRestored[aggPos][context.rowPos], prs[aggPos])
if err != nil {
return totalMemDelta, 0, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/executor/aggregate/agg_stream_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (e *StreamAggExec) consumeGroupRows() error {
allMemDelta := int64(0)
exprCtx := e.Ctx().GetExprCtx()
for i, aggFunc := range e.AggFuncs {
memDelta, err := aggFunc.UpdatePartialResult(exprCtx, e.groupRows, e.partialResults[i])
memDelta, err := aggFunc.UpdatePartialResult(exprCtx.GetEvalCtx(), e.groupRows, e.partialResults[i])
if err != nil {
return err
}
Expand Down Expand Up @@ -217,7 +217,7 @@ func (e *StreamAggExec) consumeCurGroupRowsAndFetchChild(ctx context.Context, ch
func (e *StreamAggExec) appendResult2Chunk(chk *chunk.Chunk) error {
exprCtx := e.Ctx().GetExprCtx()
for i, aggFunc := range e.AggFuncs {
err := aggFunc.AppendFinalResult2Chunk(exprCtx, e.partialResults[i], chk)
err := aggFunc.AppendFinalResult2Chunk(exprCtx.GetEvalCtx(), e.partialResults[i], chk)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/aggregate/agg_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func GetGroupKey(ctx sessionctx.Context, input *chunk.Chunk, groupKey [][]byte,
tp = &newTp
}

if err := expression.EvalExpr(exprCtx, ctx.GetSessionVars().EnableVectorizedExpression, item, tp.EvalType(), input, buf); err != nil {
if err := expression.EvalExpr(exprCtx.GetEvalCtx(), ctx.GetSessionVars().EnableVectorizedExpression, item, tp.EvalType(), input, buf); err != nil {
expression.PutColumn(buf)
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/batch_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ func getOldRow(ctx context.Context, sctx sessionctx.Context, txn kv.Transaction,
// only the virtual column needs fill back.
// Insert doesn't fill the generated columns at non-public state.
if !col.GeneratedStored {
val, err := genExprs[gIdx].Eval(sctx.GetExprCtx(), chunk.MutRowFromDatums(oldRow).ToRow())
val, err := genExprs[gIdx].Eval(sctx.GetExprCtx().GetEvalCtx(), chunk.MutRowFromDatums(oldRow).ToRow())
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1712,7 +1712,7 @@ func (b *executorBuilder) buildStreamAgg(v *plannercore.PhysicalStreamAgg) exec.
exprCtx := b.ctx.GetExprCtx()
e := &aggregate.StreamAggExec{
BaseExecutor: exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID(), src),
GroupChecker: vecgroupchecker.NewVecGroupChecker(exprCtx, b.ctx.GetSessionVars().EnableVectorizedExpression, v.GroupByItems),
GroupChecker: vecgroupchecker.NewVecGroupChecker(exprCtx.GetEvalCtx(), b.ctx.GetSessionVars().EnableVectorizedExpression, v.GroupByItems),
AggFuncs: make([]aggfuncs.AggFunc, 0, len(v.AggFuncs)),
}

Expand Down Expand Up @@ -4677,7 +4677,7 @@ func (b *executorBuilder) buildWindow(v *plannercore.PhysicalWindow) exec.Execut
if b.ctx.GetSessionVars().EnablePipelinedWindowExec {
exec := &PipelinedWindowExec{
BaseExecutor: base,
groupChecker: vecgroupchecker.NewVecGroupChecker(b.ctx.GetExprCtx(), b.ctx.GetSessionVars().EnableVectorizedExpression, groupByItems),
groupChecker: vecgroupchecker.NewVecGroupChecker(b.ctx.GetExprCtx().GetEvalCtx(), b.ctx.GetSessionVars().EnableVectorizedExpression, groupByItems),
numWindowFuncs: len(v.WindowFuncDescs),
}

Expand Down Expand Up @@ -4755,7 +4755,7 @@ func (b *executorBuilder) buildWindow(v *plannercore.PhysicalWindow) exec.Execut
}
return &WindowExec{BaseExecutor: base,
processor: processor,
groupChecker: vecgroupchecker.NewVecGroupChecker(b.ctx.GetExprCtx(), b.ctx.GetSessionVars().EnableVectorizedExpression, groupByItems),
groupChecker: vecgroupchecker.NewVecGroupChecker(b.ctx.GetExprCtx().GetEvalCtx(), b.ctx.GetSessionVars().EnableVectorizedExpression, groupByItems),
numWindowFuncs: len(v.WindowFuncDescs),
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange)
if err != nil {
return err
}
pbConditions, err := expression.ExpressionsToPBList(e.Ctx().GetExprCtx(), []expression.Expression{inCondition}, e.Ctx().GetClient())
pbConditions, err := expression.ExpressionsToPBList(e.Ctx().GetExprCtx().GetEvalCtx(), []expression.Expression{inCondition}, e.Ctx().GetClient())
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1621,7 +1621,7 @@ func (e *SelectionExec) Next(ctx context.Context, req *chunk.Chunk) error {
if e.childResult.NumRows() == 0 {
return nil
}
e.selected, err = expression.VectorizedFilter(e.Ctx().GetExprCtx(), e.Ctx().GetSessionVars().EnableVectorizedExpression, e.filters, e.inputIter, e.selected)
e.selected, err = expression.VectorizedFilter(e.Ctx().GetExprCtx().GetEvalCtx(), e.Ctx().GetSessionVars().EnableVectorizedExpression, e.filters, e.inputIter, e.selected)
if err != nil {
return err
}
Expand All @@ -1636,7 +1636,7 @@ func (e *SelectionExec) unBatchedNext(ctx context.Context, chk *chunk.Chunk) err
exprCtx := e.Ctx().GetExprCtx()
for {
for ; e.inputRow != e.inputIter.End(); e.inputRow = e.inputIter.Next() {
selected, _, err := expression.EvalBool(exprCtx, e.filters, e.inputRow)
selected, _, err := expression.EvalBool(exprCtx.GetEvalCtx(), e.filters, e.inputRow)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/executor/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ func (p *Plan) initOptions(ctx context.Context, seCtx sessionctx.Context, option
if opt.Value.GetType().GetType() != mysql.TypeVarString {
return "", exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name)
}
val, isNull, err2 := opt.Value.EvalString(seCtx.GetExprCtx(), chunk.Row{})
val, isNull, err2 := opt.Value.EvalString(seCtx.GetExprCtx().GetEvalCtx(), chunk.Row{})
if err2 != nil || isNull {
return "", exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name)
}
Expand All @@ -604,7 +604,7 @@ func (p *Plan) initOptions(ctx context.Context, seCtx sessionctx.Context, option
if opt.Value.GetType().GetType() != mysql.TypeLonglong || mysql.HasIsBooleanFlag(opt.Value.GetType().GetFlag()) {
return 0, exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name)
}
val, isNull, err2 := opt.Value.EvalInt(seCtx.GetExprCtx(), chunk.Row{})
val, isNull, err2 := opt.Value.EvalInt(seCtx.GetExprCtx().GetEvalCtx(), chunk.Row{})
if err2 != nil || isNull {
return 0, exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/importer/kv_encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (en *tableKVEncoder) parserData2TableData(parserData []types.Datum, rowID i
}
for i := 0; i < len(en.columnAssignments); i++ {
// eval expression of `SET` clause
d, err := en.columnAssignments[i].Eval(en.SessionCtx.GetExprCtx(), chunk.Row{})
d, err := en.columnAssignments[i].Eval(en.SessionCtx.GetExprCtx().GetEvalCtx(), chunk.Row{})
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/index_lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ func (ow *outerWorker) buildTask(ctx context.Context) (*lookUpJoinTask, error) {
chk := task.outerResult.GetChunk(i)
outerMatch := make([]bool, 0, chk.NumRows())
task.memTracker.Consume(int64(cap(outerMatch)))
task.outerMatch[i], err = expression.VectorizedFilter(exprCtx, ow.ctx.GetSessionVars().EnableVectorizedExpression, ow.filter, chunk.NewIterator4Chunk(chk), outerMatch)
task.outerMatch[i], err = expression.VectorizedFilter(exprCtx.GetEvalCtx(), ow.ctx.GetSessionVars().EnableVectorizedExpression, ow.filter, chunk.NewIterator4Chunk(chk), outerMatch)
if err != nil {
return task, err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/executor/index_lookup_merge_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ func (imw *innerMergeWorker) handleTask(ctx context.Context, task *lookUpMergeJo
for i := 0; i < numOuterChks; i++ {
chk := task.outerResult.GetChunk(i)
task.outerMatch[i] = make([]bool, chk.NumRows())
task.outerMatch[i], err = expression.VectorizedFilter(exprCtx, imw.ctx.GetSessionVars().EnableVectorizedExpression, imw.outerMergeCtx.filter, chunk.NewIterator4Chunk(chk), task.outerMatch[i])
task.outerMatch[i], err = expression.VectorizedFilter(exprCtx.GetEvalCtx(), imw.ctx.GetSessionVars().EnableVectorizedExpression, imw.outerMergeCtx.filter, chunk.NewIterator4Chunk(chk), task.outerMatch[i])
if err != nil {
return err
}
Expand Down Expand Up @@ -457,7 +457,7 @@ func (imw *innerMergeWorker) handleTask(ctx context.Context, task *lookUpMergeJo
var err error
for _, keyOff := range imw.keyOff2KeyOffOrderByIdx {
joinKey := imw.outerMergeCtx.joinKeys[keyOff]
c, _, err = imw.outerMergeCtx.compareFuncs[keyOff](exprCtx, joinKey, joinKey, rowI, rowJ)
c, _, err = imw.outerMergeCtx.compareFuncs[keyOff](exprCtx.GetEvalCtx(), joinKey, joinKey, rowI, rowJ)
terror.Log(err)
if c != 0 {
break
Expand Down Expand Up @@ -632,7 +632,7 @@ func (imw *innerMergeWorker) fetchInnerRowsWithSameKey(ctx context.Context, task
func (imw *innerMergeWorker) compare(outerRow, innerRow chunk.Row) (int, error) {
exprCtx := imw.ctx.GetExprCtx()
for _, keyOff := range imw.innerMergeCtx.keyOff2KeyOffOrderByIdx {
cmp, _, err := imw.innerMergeCtx.compareFuncs[keyOff](exprCtx, imw.outerMergeCtx.joinKeys[keyOff], imw.innerMergeCtx.joinKeys[keyOff], outerRow, innerRow)
cmp, _, err := imw.innerMergeCtx.compareFuncs[keyOff](exprCtx.GetEvalCtx(), imw.outerMergeCtx.joinKeys[keyOff], imw.innerMergeCtx.joinKeys[keyOff], outerRow, innerRow)
if err != nil || cmp != 0 {
return int(cmp), err
}
Expand Down

0 comments on commit 2bdaf6f

Please sign in to comment.