Skip to content

Commit

Permalink
Fix discarding of error values
Browse files Browse the repository at this point in the history
When discarding the responses from previous commands that weren't
successfully read (e.g. if their context had a deadline that was
exceeded), RESP error values get treated the same as connection errors
and cause any reading from the connection to stop. This can cause a
response to not be read off of the connection or marked for discarding,
and therefore leave the connection out of sync.

This fix changes the discarding process to ignore RESP error values.
  • Loading branch information
woodsbury authored and Brian Picciano committed Jul 14, 2022
1 parent 4bca982 commit 248804f
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 1 deletion.
7 changes: 6 additions & 1 deletion conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,12 @@ func (c *conn) reader(ctx context.Context) {
discardMU := el.Value.(connMarshalerUnmarshaler)

if err = resp3.Unmarshal(c.br, discardMU.unmarshalInto, c.rOpts); err != nil {
break
// Ignore RESP errors.
if !errors.As(err, &resp3.SimpleError{}) && !errors.As(err, &resp3.BlobError{}) {
break
}

err = nil
}

discardList.Remove(el)
Expand Down
90 changes: 90 additions & 0 deletions conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,96 @@ func TestConnDeadlineExceeded(t *T) {
t.Logf("successes:%d timeouts:%d closed:%d", numSuccesses, numTimeouts, numClosed)
})

t.Run("error", func(t *T) {
const pe, ps, n = 5, 5, 100
const initTimeout = time.Second
conn := dial()
defer conn.Close()
ctx := testCtx(t)

var wg sync.WaitGroup
wg.Add(pe + ps)

var numTimeouts, numClosed uint64

for i := 0; i < pe; i++ {
go func() {
timeout := initTimeout
defer wg.Done()
for i := 0; i < n; i++ {
if err := ctx.Err(); err != nil {
panic(err)
}

innerCtx, cancel := context.WithTimeout(ctx, mkTimeout(timeout))
into := ""
err := conn.Do(innerCtx, Cmd(&into, "EVAL", "return redis.error_reply('NOTOK')", "0"))
cancel()

// We want to see a timeout that happens while the
// the next goroutine in the other loop is already running.
timeout /= 2

if !assert.NotNil(t, err) {
return
}

if !errors.As(err, &resp3.SimpleError{}) {
isTimeout := errors.Is(err, context.DeadlineExceeded)
isClosed := errors.Is(err, proc.ErrClosed) ||
errors.Is(err, net.ErrClosed)
if !assert.True(t, isTimeout || isClosed, "err:%v", err) {
return
}
if isTimeout {
atomic.AddUint64(&numTimeouts, 1)
}
if isClosed {
atomic.AddUint64(&numClosed, 1)
}
return
}
}
}()
}

for i := 0; i < ps; i++ {
go func() {
defer wg.Done()
for i := 0; i < n; i++ {
if err := ctx.Err(); err != nil {
panic(err)
}

into := ""
err := conn.Do(ctx, Cmd(&into, "EVAL", "return redis.status_reply('OK')", "0"))

if err != nil {
// If we see a RESP error value then the results from
// this command and the previous one have been mixed
// up.
if !assert.False(t, errors.As(err, &resp3.SimpleError{})) {
return
}

isTimeout := errors.Is(err, context.DeadlineExceeded)
isClosed := errors.Is(err, proc.ErrClosed) ||
errors.Is(err, net.ErrClosed)
if !assert.True(t, isTimeout || isClosed, "err:%v", err) {
return
}
} else if !assert.Equal(t, "OK", into) {
return
}
}
}()
}

wg.Wait()
assert.NotZero(t, numTimeouts, "number of timeouts")
t.Logf("timeouts:%d closed:%d", numTimeouts, numClosed)
})

t.Run("pubsub", func(t *T) {
const n = 100

Expand Down

0 comments on commit 248804f

Please sign in to comment.