-
Notifications
You must be signed in to change notification settings - Fork 2.5k
Description
Question
Per the documentation of XREAD:
In order to avoid polling at a fixed or adaptive interval the command is able to block if it could not return any data, according to the specified streams and IDs, and automatically unblock once one of the requested keys accept data.
In other words, it's recommended to use the blocking mode while reading from Redis streams.
When using go-redis, we may write the following code:
package main
import (
"log"
"os"
"os/signal"
"sync"
"github.com/go-redis/redis"
)
func main() {
client := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
// Only need 1 connection for reading.
PoolSize: 1,
})
var waitGroup sync.WaitGroup
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
lastID := "0-0"
for {
streams, err := client.XRead(&redis.XReadArgs{
Streams: []string{"mystream", lastID},
Block: 0, // Wait for new messages without a timeout.
}).Result()
if err != nil {
log.Printf("err: %+v\n", err)
return
}
log.Printf("received streams: %+v\n", streams)
messages := streams[0].Messages
lastID = messages[len(messages)-1].ID
}
}()
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
select {
case <-c:
// Question: how to gracefully close the blocking connection here?
client.Close()
waitGroup.Wait()
}
}Then there is a question: how to gracefully close the blocking connection?
The problem of client.Close()
In the example code, If we use client.Close() to close the connection, it will cause an error log:
err: read tcp 127.0.0.1:55520->127.0.0.1:6379: use of closed network connection
What happened behind client.Close() is as follows:
client.Close()will close the connection pool- then the only 1 connection in the pool will be actually closed
- then
client.XRead(), which is blocking on cn.WithReader(), will be unblocked and returns an error "use of closed network connection" - then
c.releaseConn()will be called, and err is judged as a bad one, which will finally closes the connection again
The real problem is:
- when the connection is closed at step 2, the file descriptor is also released
- when the connection is closed again at step 4, the original file descriptor may have been used by another valid connection, which will be closed mistakenly.
There was a discussion about the same issue on StackOverflow.
Possible solution
Use TCPConn.CloseRead or UnixConn.CloseRead, which are the Golang's equivalents of Unix's shutdown().
Then we need to add a functionCloseRead() into redis.Client.
Any other thoughts?