Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 3 additions & 10 deletions commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

"github.com/redis/go-redis/v9/internal"
"github.com/redis/go-redis/v9/logging"
)

// KeepTTL is a Redis KEEPTTL option to keep existing TTL, it requires your redis-server version >= 6.0,
Expand All @@ -28,23 +29,15 @@ func usePrecise(dur time.Duration) bool {

func formatMs(ctx context.Context, dur time.Duration) int64 {
if dur > 0 && dur < time.Millisecond {
internal.Logger.Printf(
ctx,
"specified duration is %s, but minimal supported value is %s - truncating to 1ms",
dur, time.Millisecond,
)
logging.LoggerWithLevel().Infof(ctx, "specified duration is %s, but minimal supported value is %s - truncating to 1ms", dur, time.Millisecond)
return 1
}
return int64(dur / time.Millisecond)
}

func formatSec(ctx context.Context, dur time.Duration) int64 {
if dur > 0 && dur < time.Second {
internal.Logger.Printf(
ctx,
"specified duration is %s, but minimal supported value is %s - truncating to 1s",
dur, time.Second,
)
logging.LoggerWithLevel().Infof(ctx, "specified duration is %s, but minimal supported value is %s - truncating to 1s", dur, time.Second)
return 1
}
return int64(dur / time.Second)
Expand Down
4 changes: 2 additions & 2 deletions export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (
"net"
"strings"

"github.com/redis/go-redis/v9/internal"
"github.com/redis/go-redis/v9/internal/hashtag"
"github.com/redis/go-redis/v9/internal/pool"
"github.com/redis/go-redis/v9/logging"
)

func (c *baseClient) Pool() pool.Pooler {
Expand Down Expand Up @@ -87,7 +87,7 @@ func (c *clusterState) IsConsistent(ctx context.Context) bool {
func GetSlavesAddrByName(ctx context.Context, c *SentinelClient, name string) []string {
addrs, err := c.Replicas(ctx, name).Result()
if err != nil {
internal.Logger.Printf(ctx, "sentinel: Replicas name=%q failed: %s",
logging.LoggerWithLevel().Errorf(ctx, "sentinel: Replicas name=%q failed: %s",
name, err)
return []string{}
}
Expand Down
3 changes: 2 additions & 1 deletion internal/auth/streaming/pool_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/redis/go-redis/v9/internal"
"github.com/redis/go-redis/v9/internal/pool"
"github.com/redis/go-redis/v9/logging"
)

// ReAuthPoolHook is a pool hook that manages background re-authentication of connections
Expand Down Expand Up @@ -166,7 +167,7 @@ func (r *ReAuthPoolHook) OnPut(_ context.Context, conn *pool.Conn) (bool, bool,
defer func() {
if rec := recover(); rec != nil {
// once again - safety first
internal.Logger.Printf(context.Background(), "panic in reauth worker: %v", rec)
logging.LoggerWithLevel().Errorf(context.Background(), "panic in reauth worker: %v", rec)
}
r.scheduledLock.Lock()
delete(r.scheduledReAuth, connID)
Expand Down
3 changes: 2 additions & 1 deletion internal/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type DefaultLogger struct {
}

func (l *DefaultLogger) Printf(ctx context.Context, format string, v ...interface{}) {
_ = l.log.Output(2, fmt.Sprintf(format, v...))
_ = l.log.Output(4, fmt.Sprintf(format, v...))
}

func NewDefaultLogger() Logging {
Expand All @@ -38,6 +38,7 @@ var LogLevel LogLevelT = LogLevelError
type LogLevelT int

// Log level constants for the entire go-redis library
// TODO(ndyakov): In v10 align those levels with slog.Level
const (
LogLevelError LogLevelT = iota // 0 - errors only
LogLevelWarn // 1 - warnings and errors
Expand Down
6 changes: 3 additions & 3 deletions internal/maintnotifications/logs/log_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,11 +295,11 @@ func RemovingConnectionFromPool(connID uint64, reason error) string {
})
}

func NoPoolProvidedCannotRemove(connID uint64, reason error) string {
message := fmt.Sprintf("conn[%d] %s due to: %v", connID, NoPoolProvidedMessageCannotRemoveMessage, reason)
func NoPoolProvidedCannotRemove(connID uint64) string {
message := fmt.Sprintf("conn[%d] %s", connID, NoPoolProvidedMessageCannotRemoveMessage)
return appendJSONIfDebug(message, map[string]interface{}{
"connID": connID,
"reason": reason.Error(),
"reason": nil,
})
}

Expand Down
6 changes: 3 additions & 3 deletions internal/pool/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (
"sync/atomic"
"time"

"github.com/redis/go-redis/v9/internal"
"github.com/redis/go-redis/v9/internal/maintnotifications/logs"
"github.com/redis/go-redis/v9/internal/proto"
"github.com/redis/go-redis/v9/logging"
)

var noDeadline = time.Time{}
Expand Down Expand Up @@ -508,7 +508,7 @@ func (cn *Conn) getEffectiveReadTimeout(normalTimeout time.Duration) time.Durati
// Deadline has passed, clear relaxed timeouts atomically and use normal timeout
newCount := cn.relaxedCounter.Add(-1)
if newCount <= 0 {
internal.Logger.Printf(context.Background(), logs.UnrelaxedTimeoutAfterDeadline(cn.GetID()))
logging.LoggerWithLevel().Infof(context.Background(), logs.UnrelaxedTimeoutAfterDeadline(cn.GetID()))
cn.clearRelaxedTimeout()
}
return normalTimeout
Expand Down Expand Up @@ -542,7 +542,7 @@ func (cn *Conn) getEffectiveWriteTimeout(normalTimeout time.Duration) time.Durat
// Deadline has passed, clear relaxed timeouts atomically and use normal timeout
newCount := cn.relaxedCounter.Add(-1)
if newCount <= 0 {
internal.Logger.Printf(context.Background(), logs.UnrelaxedTimeoutAfterDeadline(cn.GetID()))
logging.LoggerWithLevel().Infof(context.Background(), logs.UnrelaxedTimeoutAfterDeadline(cn.GetID()))
cn.clearRelaxedTimeout()
}
return normalTimeout
Expand Down
35 changes: 23 additions & 12 deletions internal/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/redis/go-redis/v9/internal"
"github.com/redis/go-redis/v9/internal/proto"
"github.com/redis/go-redis/v9/internal/util"
"github.com/redis/go-redis/v9/logging"
)

var (
Expand Down Expand Up @@ -119,6 +120,9 @@ type Options struct {
// DialerRetryTimeout is the backoff duration between retry attempts.
// Default: 100ms
DialerRetryTimeout time.Duration

// Optional logger for connection pool operations.
Logger logging.Lgr
}

type lastDialErrorWrap struct {
Expand Down Expand Up @@ -254,7 +258,7 @@ func (p *ConnPool) checkMinIdleConns() {
p.idleConnsLen.Add(-1)

p.freeTurn()
internal.Logger.Printf(context.Background(), "addIdleConn panic: %+v", err)
p.logger().Errorf(context.Background(), "addIdleConn panic: %+v", err)
}
}()

Expand Down Expand Up @@ -416,7 +420,7 @@ func (p *ConnPool) dialConn(ctx context.Context, pooled bool) (*Conn, error) {
return cn, nil
}

internal.Logger.Printf(ctx, "redis: connection pool: failed to dial after %d attempts: %v", attempt, lastErr)
p.logger().Errorf(ctx, "redis: connection pool: failed to dial after %d attempts: %v", attempt, lastErr)
// All retries failed - handle error tracking
p.setLastDialError(lastErr)
if atomic.AddUint32(&p.dialErrorsNum, 1) == uint32(p.cfg.PoolSize) {
Expand Down Expand Up @@ -510,10 +514,10 @@ func (p *ConnPool) getConn(ctx context.Context) (*Conn, error) {
acceptConn, err := hookManager.ProcessOnGet(ctx, cn, false)
if err != nil || !acceptConn {
if err != nil {
internal.Logger.Printf(ctx, "redis: connection pool: failed to process idle connection by hook: %v", err)
p.logger().Errorf(ctx, "redis: connection pool: failed to process idle connection by hook: %v", err)
_ = p.CloseConn(cn)
} else {
internal.Logger.Printf(ctx, "redis: connection pool: conn[%d] rejected by hook, returning to pool", cn.GetID())
p.logger().Errorf(ctx, "redis: connection pool: conn[%d] rejected by hook, returning to pool", cn.GetID())
// Return connection to pool without freeing the turn that this Get() call holds.
// We use putConnWithoutTurn() to run all the Put hooks and logic without freeing a turn.
p.putConnWithoutTurn(ctx, cn)
Expand Down Expand Up @@ -541,7 +545,7 @@ func (p *ConnPool) getConn(ctx context.Context) (*Conn, error) {
// this should not happen with a new connection, but we handle it gracefully
if err != nil || !acceptConn {
// Failed to process connection, discard it
internal.Logger.Printf(ctx, "redis: connection pool: failed to process new connection conn[%d] by hook: accept=%v, err=%v", newcn.GetID(), acceptConn, err)
p.logger().Errorf(ctx, "redis: connection pool: failed to process new connection conn[%d] by hook: accept=%v, err=%v", newcn.GetID(), acceptConn, err)
_ = p.CloseConn(newcn)
return nil, err
}
Expand Down Expand Up @@ -584,7 +588,7 @@ func (p *ConnPool) queuedNewConn(ctx context.Context) (*Conn, error) {
if !freeTurnCalled {
p.freeTurn()
}
internal.Logger.Printf(context.Background(), "queuedNewConn panic: %+v", err)
p.logger().Errorf(ctx, "queuedNewConn panic: %+v", err)
}
}()

Expand Down Expand Up @@ -728,7 +732,7 @@ func (p *ConnPool) popIdle() (*Conn, error) {

// If we exhausted all attempts without finding a usable connection, return nil
if attempts > 1 && attempts >= maxAttempts && int32(attempts) >= p.poolSize.Load() {
internal.Logger.Printf(context.Background(), "redis: connection pool: failed to get a usable connection after %d attempts", attempts)
p.logger().Errorf(context.Background(), "redis: connection pool: failed to get a usable connection after %d attempts", attempts)
return nil, nil
}

Expand Down Expand Up @@ -757,7 +761,7 @@ func (p *ConnPool) putConn(ctx context.Context, cn *Conn, freeTurn bool) {
// Peek at the reply type to check if it's a push notification
if replyType, err := cn.PeekReplyTypeSafe(); err != nil || replyType != proto.RespPush {
// Not a push notification or error peeking, remove connection
internal.Logger.Printf(ctx, "Conn has unread data (not push notification), removing it")
p.logger().Errorf(ctx, "Conn has unread data (not push notification), removing it")
p.removeConnInternal(ctx, cn, err, freeTurn)
return
}
Expand All @@ -770,7 +774,7 @@ func (p *ConnPool) putConn(ctx context.Context, cn *Conn, freeTurn bool) {
if hookManager != nil {
shouldPool, shouldRemove, err = hookManager.ProcessOnPut(ctx, cn)
if err != nil {
internal.Logger.Printf(ctx, "Connection hook error: %v", err)
p.logger().Errorf(ctx, "Connection hook error: %v", err)
p.removeConnInternal(ctx, cn, err, freeTurn)
return
}
Expand Down Expand Up @@ -803,12 +807,12 @@ func (p *ConnPool) putConn(ctx context.Context, cn *Conn, freeTurn bool) {
case StateUnusable:
// expected state, don't log it
case StateClosed:
internal.Logger.Printf(ctx, "Unexpected conn[%d] state changed by hook to %v, closing it", cn.GetID(), currentState)
p.logger().Errorf(ctx, "Unexpected conn[%d] state changed by hook to %v, closing it", cn.GetID(), currentState)
shouldCloseConn = true
p.removeConnWithLock(cn)
default:
// Pool as-is
internal.Logger.Printf(ctx, "Unexpected conn[%d] state changed by hook to %v, pooling as-is", cn.GetID(), currentState)
p.logger().Warnf(ctx, "Unexpected conn[%d] state changed by hook to %v, pooling as-is", cn.GetID(), currentState)
}
}

Expand Down Expand Up @@ -1022,7 +1026,7 @@ func (p *ConnPool) isHealthyConn(cn *Conn, nowNs int64) bool {
if replyType, err := cn.rd.PeekReplyType(); err == nil && replyType == proto.RespPush {
// For RESP3 connections with push notifications, we allow some buffered data
// The client will process these notifications before using the connection
internal.Logger.Printf(
p.logger().Infof(
context.Background(),
"push: conn[%d] has buffered data, likely push notifications - will be processed by client",
cn.GetID(),
Expand All @@ -1045,3 +1049,10 @@ func (p *ConnPool) isHealthyConn(cn *Conn, nowNs int64) bool {
cn.SetUsedAtNs(nowNs)
return true
}

func (p *ConnPool) logger() logging.Lgr {
if p.cfg != nil && p.cfg.Logger != nil {
return p.cfg.Logger
}
return logging.LoggerWithLevel()
}
Loading
Loading