diff --git a/go/vt/vtgate/engine/join.go b/go/vt/vtgate/engine/join.go index c47b523b9b1..d04b205f88e 100644 --- a/go/vt/vtgate/engine/join.go +++ b/go/vt/vtgate/engine/join.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "strings" + "sync" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/sync2" @@ -96,22 +97,31 @@ func (jn *Join) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[st // TryStreamExecute performs a streaming exec. func (jn *Join) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error { - var fieldNeeded sync2.AtomicBool - fieldNeeded.Set(wantfields) - err := vcursor.StreamExecutePrimitive(ctx, jn.Left, bindVars, fieldNeeded.Get(), func(lresult *sqltypes.Result) error { + var mu sync.Mutex + // We need to use this atomic since we're also reading this + // value outside of it being locked with the mu lock. + // This is still racy, but worst case it means that we may + // retrieve the right hand side fields twice instead of once. + var fieldsSent sync2.AtomicBool + fieldsSent.Set(!wantfields) + err := vcursor.StreamExecutePrimitive(ctx, jn.Left, bindVars, wantfields, func(lresult *sqltypes.Result) error { joinVars := make(map[string]*querypb.BindVariable) for _, lrow := range lresult.Rows { for k, col := range jn.Vars { joinVars[k] = sqltypes.ValueBindVariable(lrow[col]) } var rowSent sync2.AtomicBool - err := vcursor.StreamExecutePrimitive(ctx, jn.Right, combineVars(bindVars, joinVars), fieldNeeded.Get(), func(rresult *sqltypes.Result) error { + err := vcursor.StreamExecutePrimitive(ctx, jn.Right, combineVars(bindVars, joinVars), !fieldsSent.Get(), func(rresult *sqltypes.Result) error { + // This needs to be locking since it's not safe to just use + // fieldsSent. This is because we can't have a race between + // checking fieldsSent and then actually calling the callback + // and in parallel another goroutine doing the same. That + // can lead to out of order execution of the callback. So the callback + // itself and the check need to be covered by the same lock. + mu.Lock() + defer mu.Unlock() result := &sqltypes.Result{} - if fieldNeeded.Get() { - // This code is currently unreachable because the first result - // will always be just the field info, which will cause the outer - // wantfields code path to be executed. But this may change in the future. - fieldNeeded.Set(false) + if fieldsSent.CompareAndSwap(false, true) { result.Fields = joinFields(lresult.Fields, rresult.Fields, jn.Cols) } for _, rrow := range rresult.Rows { @@ -135,8 +145,15 @@ func (jn *Join) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars return callback(result) } } - if fieldNeeded.Get() { - fieldNeeded.Set(false) + // This needs to be locking since it's not safe to just use + // fieldsSent. This is because we can't have a race between + // checking fieldsSent and then actually calling the callback + // and in parallel another goroutine doing the same. That + // can lead to out of order execution of the callback. So the callback + // itself and the check need to be covered by the same lock. + mu.Lock() + defer mu.Unlock() + if fieldsSent.CompareAndSwap(false, true) { for k := range jn.Vars { joinVars[k] = sqltypes.NullBindVariable } diff --git a/go/vt/vtgate/engine/scalar_aggregation.go b/go/vt/vtgate/engine/scalar_aggregation.go index a1a76091689..b7c51bc012d 100644 --- a/go/vt/vtgate/engine/scalar_aggregation.go +++ b/go/vt/vtgate/engine/scalar_aggregation.go @@ -133,8 +133,8 @@ func (sa *ScalarAggregate) TryStreamExecute(ctx context.Context, vcursor VCursor var current []sqltypes.Value var curDistincts []sqltypes.Value var fields []*querypb.Field - fieldsSent := false var mu sync.Mutex + fieldsSent := !wantfields err := vcursor.StreamExecutePrimitive(ctx, sa.Input, bindVars, wantfields, func(result *sqltypes.Result) error { // as the underlying primitive call is not sync