From a247360a76d3d63bdc329bfcca39b19baba76166 Mon Sep 17 00:00:00 2001 From: Nedyalko Dyakov Date: Mon, 17 Nov 2025 18:25:23 +0200 Subject: [PATCH 1/6] smigrating/smigrated intro --- cluster_smigrating_test.go | 156 +++++++++ .../maintnotifications/logs/log_messages.go | 34 ++ maintnotifications/README.md | 10 +- maintnotifications/manager.go | 38 ++- maintnotifications/manager_test.go | 2 + .../push_notification_handler.go | 90 ++++- maintnotifications/smigrating_test.go | 309 ++++++++++++++++++ osscluster.go | 20 +- 8 files changed, 642 insertions(+), 17 deletions(-) create mode 100644 cluster_smigrating_test.go create mode 100644 maintnotifications/smigrating_test.go diff --git a/cluster_smigrating_test.go b/cluster_smigrating_test.go new file mode 100644 index 0000000000..ad0fee4718 --- /dev/null +++ b/cluster_smigrating_test.go @@ -0,0 +1,156 @@ +package redis + +import ( + "context" + "sync/atomic" + "testing" + + "github.com/redis/go-redis/v9/maintnotifications" +) + +// TestClusterClientSMigratedCallback tests that ClusterClient sets up SMIGRATED callback on node clients +func TestClusterClientSMigratedCallback(t *testing.T) { + t.Run("CallbackSetupWithMaintNotifications", func(t *testing.T) { + // Track if state reload was called + var reloadCalled atomic.Bool + + // Create cluster options with maintnotifications enabled + opt := &ClusterOptions{ + Addrs: []string{"localhost:7000"}, // Dummy address + MaintNotificationsConfig: &maintnotifications.Config{ + Mode: maintnotifications.ModeEnabled, + }, + // Use custom NewClient to track when nodes are created + NewClient: func(opt *Options) *Client { + client := NewClient(opt) + return client + }, + } + + // Create cluster client + cluster := NewClusterClient(opt) + defer cluster.Close() + + // Manually trigger node creation by calling GetOrCreate + // This simulates what happens during normal cluster operations + node, err := cluster.nodes.GetOrCreate("localhost:7000") + if err != nil { + t.Fatalf("Failed to create node: %v", err) + } + + // Get the maintnotifications manager from the node client + manager := node.Client.GetMaintNotificationsManager() + if manager == nil { + t.Skip("MaintNotifications manager not initialized (expected if not connected to real Redis)") + return + } + + // Temporarily replace the cluster state reload with our test version + var receivedSlot int + originalCallback := manager + manager.SetClusterStateReloadCallback(func(ctx context.Context, slot int) { + reloadCalled.Store(true) + receivedSlot = slot + }) + + // Trigger the callback (this is what SMIGRATED notification would do) + ctx := context.Background() + testSlot := 1234 + manager.TriggerClusterStateReload(ctx, testSlot) + + // Verify callback was called + if !reloadCalled.Load() { + t.Error("Cluster state reload callback should have been called") + } + + // Verify slot was passed correctly + if receivedSlot != testSlot { + t.Errorf("Expected slot %d, got %d", testSlot, receivedSlot) + } + _ = originalCallback + }) + + t.Run("NoCallbackWithoutMaintNotifications", func(t *testing.T) { + // Create cluster options WITHOUT maintnotifications + opt := &ClusterOptions{ + Addrs: []string{"localhost:7000"}, // Dummy address + // MaintNotificationsConfig is nil + } + + // Create cluster client + cluster := NewClusterClient(opt) + defer cluster.Close() + + // The OnNewNode callback should not be registered when MaintNotificationsConfig is nil + // This test just verifies that the cluster client doesn't panic + }) +} + +// TestClusterClientSMigratedIntegration tests SMIGRATED notification handling in cluster context +func TestClusterClientSMigratedIntegration(t *testing.T) { + t.Run("SMigratedTriggersStateReload", func(t *testing.T) { + // This test verifies the integration between SMIGRATED notification and cluster state reload + // We verify that the callback is properly set up to call cluster.state.LazyReload() + + // Create cluster options with maintnotifications enabled + opt := &ClusterOptions{ + Addrs: []string{"localhost:7000"}, + MaintNotificationsConfig: &maintnotifications.Config{ + Mode: maintnotifications.ModeEnabled, + }, + } + + // Create cluster client + cluster := NewClusterClient(opt) + defer cluster.Close() + + // Create a node + node, err := cluster.nodes.GetOrCreate("localhost:7000") + if err != nil { + t.Fatalf("Failed to create node: %v", err) + } + + // Get the maintnotifications manager + manager := node.Client.GetMaintNotificationsManager() + if manager == nil { + t.Skip("MaintNotifications manager not initialized (expected if not connected to real Redis)") + return + } + + // Verify that the callback is set by checking it's not nil + // We can't directly test LazyReload being called without a real cluster, + // but we can verify the callback mechanism works + var callbackWorks atomic.Bool + var receivedSlot int + manager.SetClusterStateReloadCallback(func(ctx context.Context, slot int) { + callbackWorks.Store(true) + receivedSlot = slot + }) + + ctx := context.Background() + testSlot := 5678 + manager.TriggerClusterStateReload(ctx, testSlot) + + if !callbackWorks.Load() { + t.Error("Callback mechanism should work") + } + + if receivedSlot != testSlot { + t.Errorf("Expected slot %d, got %d", testSlot, receivedSlot) + } + }) +} + +// TestSMigratingAndSMigratedConstants verifies the SMIGRATING and SMIGRATED constants are exported +func TestSMigratingAndSMigratedConstants(t *testing.T) { + // This test verifies that the SMIGRATING and SMIGRATED constants are properly defined + // and accessible from the maintnotifications package + if maintnotifications.NotificationSMigrating != "SMIGRATING" { + t.Errorf("Expected NotificationSMigrating to be 'SMIGRATING', got: %s", maintnotifications.NotificationSMigrating) + } + + if maintnotifications.NotificationSMigrated != "SMIGRATED" { + t.Errorf("Expected NotificationSMigrated to be 'SMIGRATED', got: %s", maintnotifications.NotificationSMigrated) + } +} + diff --git a/internal/maintnotifications/logs/log_messages.go b/internal/maintnotifications/logs/log_messages.go index 34cb1692d9..d938dfa88e 100644 --- a/internal/maintnotifications/logs/log_messages.go +++ b/internal/maintnotifications/logs/log_messages.go @@ -121,6 +121,10 @@ const ( UnrelaxedTimeoutMessage = "clearing relaxed timeout" ManagerNotInitializedMessage = "manager not initialized" FailedToMarkForHandoffMessage = "failed to mark connection for handoff" + InvalidSlotInSMigratingNotificationMessage = "invalid slot in SMIGRATING notification" + InvalidSlotInSMigratedNotificationMessage = "invalid slot in SMIGRATED notification" + SlotMigratingMessage = "slot is migrating, applying relaxed timeout" + SlotMigratedMessage = "slot has migrated, triggering cluster state reload" // ======================================== // used in pool/conn @@ -623,3 +627,33 @@ func ExtractDataFromLogMessage(logMessage string) map[string]interface{} { // If JSON parsing fails, return empty map return result } + +// Cluster notification functions +func InvalidSlotInSMigratingNotification(slot interface{}) string { + message := fmt.Sprintf("%s: %v", InvalidSlotInSMigratingNotificationMessage, slot) + return appendJSONIfDebug(message, map[string]interface{}{ + "slot": fmt.Sprintf("%v", slot), + }) +} + +func InvalidSlotInSMigratedNotification(slot interface{}) string { + message := fmt.Sprintf("%s: %v", InvalidSlotInSMigratedNotificationMessage, slot) + return appendJSONIfDebug(message, map[string]interface{}{ + "slot": fmt.Sprintf("%v", slot), + }) +} + +func SlotMigrating(connID uint64, slot int64) string { + message := fmt.Sprintf("conn[%d] %s %d", connID, SlotMigratingMessage, slot) + return appendJSONIfDebug(message, map[string]interface{}{ + "connID": connID, + "slot": slot, + }) +} + +func SlotMigrated(slot int64) string { + message := fmt.Sprintf("%s %d", SlotMigratedMessage, slot) + return appendJSONIfDebug(message, map[string]interface{}{ + "slot": slot, + }) +} diff --git a/maintnotifications/README.md b/maintnotifications/README.md index 2ac6b9cb1d..c9617db503 100644 --- a/maintnotifications/README.md +++ b/maintnotifications/README.md @@ -2,8 +2,14 @@ Seamless Redis connection handoffs during cluster maintenance operations without dropping connections. -## ⚠️ **Important Note** -**Maintenance notifications are currently supported only in standalone Redis clients.** Cluster clients (ClusterClient, FailoverClient, etc.) do not yet support this functionality. +## Cluster Support + +**Cluster notifications are now supported for ClusterClient!** + +- **SMIGRATING**: Relaxes timeouts when a slot is being migrated +- **SMIGRATED**: Reloads cluster state when a slot migration completes + +**Note:** Other maintenance notifications (MOVING, MIGRATING, MIGRATED, FAILING_OVER, FAILED_OVER) are supported only in standalone Redis clients. Cluster clients support SMIGRATING and SMIGRATED for cluster-specific slot migration handling. ## Quick Start diff --git a/maintnotifications/manager.go b/maintnotifications/manager.go index 775c163e14..5a638ca958 100644 --- a/maintnotifications/manager.go +++ b/maintnotifications/manager.go @@ -18,11 +18,13 @@ import ( // Push notification type constants for maintenance const ( - NotificationMoving = "MOVING" - NotificationMigrating = "MIGRATING" - NotificationMigrated = "MIGRATED" - NotificationFailingOver = "FAILING_OVER" - NotificationFailedOver = "FAILED_OVER" + NotificationMoving = "MOVING" // Per-connection handoff notification + NotificationMigrating = "MIGRATING" // Per-connection migration start notification - relaxes timeouts + NotificationMigrated = "MIGRATED" // Per-connection migration complete notification - clears relaxed timeouts + NotificationFailingOver = "FAILING_OVER" // Per-connection failover start notification - relaxes timeouts + NotificationFailedOver = "FAILED_OVER" // Per-connection failover complete notification - clears relaxed timeouts + NotificationSMigrating = "SMIGRATING" // Cluster slot migrating notification - relaxes timeouts + NotificationSMigrated = "SMIGRATED" // Cluster slot migrated notification - triggers cluster state reload ) // maintenanceNotificationTypes contains all notification types that maintenance handles @@ -32,6 +34,8 @@ var maintenanceNotificationTypes = []string{ NotificationMigrated, NotificationFailingOver, NotificationFailedOver, + NotificationSMigrating, + NotificationSMigrated, } // NotificationHook is called before and after notification processing @@ -73,6 +77,9 @@ type Manager struct { hooks []NotificationHook hooksMu sync.RWMutex // Protects hooks slice poolHooksRef *PoolHook + + // Cluster state reload callback for SMIGRATED notifications + clusterStateReloadCallback ClusterStateReloadCallback } // MovingOperation tracks an active MOVING operation. @@ -83,6 +90,13 @@ type MovingOperation struct { Deadline time.Time } +// ClusterStateReloadCallback is a callback function that triggers cluster state reload. +// This is used by node clients to notify their parent ClusterClient about SMIGRATED notifications. +// The slot parameter indicates which slot has migrated (0-16383). +// Currently, implementations typically reload the entire cluster state, but in the future +// this could be optimized to reload only the specific slot. +type ClusterStateReloadCallback func(ctx context.Context, slot int) + // NewManager creates a new simplified manager. func NewManager(client interfaces.ClientInterface, pool pool.Pooler, config *Config) (*Manager, error) { if client == nil { @@ -318,3 +332,17 @@ func (hm *Manager) AddNotificationHook(notificationHook NotificationHook) { defer hm.hooksMu.Unlock() hm.hooks = append(hm.hooks, notificationHook) } + +// SetClusterStateReloadCallback sets the callback function that will be called when a SMOVED notification is received. +// This allows node clients to notify their parent ClusterClient to reload cluster state. +func (hm *Manager) SetClusterStateReloadCallback(callback ClusterStateReloadCallback) { + hm.clusterStateReloadCallback = callback +} + +// TriggerClusterStateReload calls the cluster state reload callback if it's set. +// This is called when a SMOVED notification is received. +func (hm *Manager) TriggerClusterStateReload(ctx context.Context, slot int) { + if hm.clusterStateReloadCallback != nil { + hm.clusterStateReloadCallback(ctx, slot) + } +} diff --git a/maintnotifications/manager_test.go b/maintnotifications/manager_test.go index 35dc4a32ab..aed0c9f7d5 100644 --- a/maintnotifications/manager_test.go +++ b/maintnotifications/manager_test.go @@ -217,6 +217,8 @@ func TestManagerRefactoring(t *testing.T) { NotificationMigrated, NotificationFailingOver, NotificationFailedOver, + NotificationSMigrating, + NotificationSMigrated, } if len(maintenanceNotificationTypes) != len(expectedTypes) { diff --git a/maintnotifications/push_notification_handler.go b/maintnotifications/push_notification_handler.go index 937b4ae82e..524da6e90f 100644 --- a/maintnotifications/push_notification_handler.go +++ b/maintnotifications/push_notification_handler.go @@ -49,6 +49,10 @@ func (snh *NotificationHandler) HandlePushNotification(ctx context.Context, hand err = snh.handleFailingOver(ctx, handlerCtx, modifiedNotification) case NotificationFailedOver: err = snh.handleFailedOver(ctx, handlerCtx, modifiedNotification) + case NotificationSMigrating: + err = snh.handleSMigrating(ctx, handlerCtx, modifiedNotification) + case NotificationSMigrated: + err = snh.handleSMigrated(ctx, handlerCtx, modifiedNotification) default: // Ignore other notification types (e.g., pub/sub messages) err = nil @@ -61,7 +65,9 @@ func (snh *NotificationHandler) HandlePushNotification(ctx context.Context, hand } // handleMoving processes MOVING notifications. -// ["MOVING", seqNum, timeS, endpoint] - per-connection handoff +// MOVING indicates that a connection should be handed off to a new endpoint. +// This is a per-connection notification that triggers connection handoff. +// Expected format: ["MOVING", seqNum, timeS, endpoint] func (snh *NotificationHandler) handleMoving(ctx context.Context, handlerCtx push.NotificationHandlerContext, notification []interface{}) error { if len(notification) < 3 { internal.Logger.Printf(ctx, logs.InvalidNotification("MOVING", notification)) @@ -167,9 +173,10 @@ func (snh *NotificationHandler) markConnForHandoff(conn *pool.Conn, newEndpoint } // handleMigrating processes MIGRATING notifications. +// MIGRATING indicates that a connection migration is starting. +// This is a per-connection notification that applies relaxed timeouts. +// Expected format: ["MIGRATING", ...] func (snh *NotificationHandler) handleMigrating(ctx context.Context, handlerCtx push.NotificationHandlerContext, notification []interface{}) error { - // MIGRATING notifications indicate that a connection is about to be migrated - // Apply relaxed timeouts to the specific connection that received this notification if len(notification) < 2 { internal.Logger.Printf(ctx, logs.InvalidNotification("MIGRATING", notification)) return ErrInvalidNotification @@ -195,9 +202,10 @@ func (snh *NotificationHandler) handleMigrating(ctx context.Context, handlerCtx } // handleMigrated processes MIGRATED notifications. +// MIGRATED indicates that a connection migration has completed. +// This is a per-connection notification that clears relaxed timeouts. +// Expected format: ["MIGRATED", ...] func (snh *NotificationHandler) handleMigrated(ctx context.Context, handlerCtx push.NotificationHandlerContext, notification []interface{}) error { - // MIGRATED notifications indicate that a connection migration has completed - // Restore normal timeouts for the specific connection that received this notification if len(notification) < 2 { internal.Logger.Printf(ctx, logs.InvalidNotification("MIGRATED", notification)) return ErrInvalidNotification @@ -224,9 +232,10 @@ func (snh *NotificationHandler) handleMigrated(ctx context.Context, handlerCtx p } // handleFailingOver processes FAILING_OVER notifications. +// FAILING_OVER indicates that a failover is starting. +// This is a per-connection notification that applies relaxed timeouts. +// Expected format: ["FAILING_OVER", ...] func (snh *NotificationHandler) handleFailingOver(ctx context.Context, handlerCtx push.NotificationHandlerContext, notification []interface{}) error { - // FAILING_OVER notifications indicate that a connection is about to failover - // Apply relaxed timeouts to the specific connection that received this notification if len(notification) < 2 { internal.Logger.Printf(ctx, logs.InvalidNotification("FAILING_OVER", notification)) return ErrInvalidNotification @@ -253,9 +262,10 @@ func (snh *NotificationHandler) handleFailingOver(ctx context.Context, handlerCt } // handleFailedOver processes FAILED_OVER notifications. +// FAILED_OVER indicates that a failover has completed. +// This is a per-connection notification that clears relaxed timeouts. +// Expected format: ["FAILED_OVER", ...] func (snh *NotificationHandler) handleFailedOver(ctx context.Context, handlerCtx push.NotificationHandlerContext, notification []interface{}) error { - // FAILED_OVER notifications indicate that a connection failover has completed - // Restore normal timeouts for the specific connection that received this notification if len(notification) < 2 { internal.Logger.Printf(ctx, logs.InvalidNotification("FAILED_OVER", notification)) return ErrInvalidNotification @@ -280,3 +290,65 @@ func (snh *NotificationHandler) handleFailedOver(ctx context.Context, handlerCtx conn.ClearRelaxedTimeout() return nil } + +// handleSMigrating processes SMIGRATING notifications. +// SMIGRATING indicates that a cluster slot is in the process of migrating to a different node. +// This is a per-connection notification that applies relaxed timeouts during slot migration. +// Expected format: ["SMIGRATING", slot, ...] +func (snh *NotificationHandler) handleSMigrating(ctx context.Context, handlerCtx push.NotificationHandlerContext, notification []interface{}) error { + if len(notification) < 2 { + internal.Logger.Printf(ctx, logs.InvalidNotification("SMIGRATING", notification)) + return ErrInvalidNotification + } + + slot, ok := notification[1].(int64) + if !ok { + internal.Logger.Printf(ctx, logs.InvalidSlotInSMigratingNotification(notification[1])) + return ErrInvalidNotification + } + + if handlerCtx.Conn == nil { + internal.Logger.Printf(ctx, logs.NoConnectionInHandlerContext("SMIGRATING")) + return ErrInvalidNotification + } + + conn, ok := handlerCtx.Conn.(*pool.Conn) + if !ok { + internal.Logger.Printf(ctx, logs.InvalidConnectionTypeInHandlerContext("SMIGRATING", handlerCtx.Conn, handlerCtx)) + return ErrInvalidNotification + } + + // Apply relaxed timeout to this specific connection + if internal.LogLevel.InfoOrAbove() { + internal.Logger.Printf(ctx, logs.SlotMigrating(conn.GetID(), slot)) + } + conn.SetRelaxedTimeout(snh.manager.config.RelaxedTimeout, snh.manager.config.RelaxedTimeout) + return nil +} + +// handleSMigrated processes SMIGRATED notifications. +// SMIGRATED indicates that a cluster slot has finished migrating to a different node. +// This is a cluster-level notification that triggers cluster state reload. +// Expected format: ["SMIGRATED", slot, ...] +func (snh *NotificationHandler) handleSMigrated(ctx context.Context, handlerCtx push.NotificationHandlerContext, notification []interface{}) error { + if len(notification) < 2 { + internal.Logger.Printf(ctx, logs.InvalidNotification("SMIGRATED", notification)) + return ErrInvalidNotification + } + + slot, ok := notification[1].(int64) + if !ok { + internal.Logger.Printf(ctx, logs.InvalidSlotInSMigratedNotification(notification[1])) + return ErrInvalidNotification + } + + if internal.LogLevel.InfoOrAbove() { + internal.Logger.Printf(ctx, logs.SlotMigrated(slot)) + } + + // Trigger cluster state reload via callback, passing the slot ID + // This allows for future optimization of partial slot reloads + snh.manager.TriggerClusterStateReload(ctx, int(slot)) + + return nil +} diff --git a/maintnotifications/smigrating_test.go b/maintnotifications/smigrating_test.go new file mode 100644 index 0000000000..2e62b117e8 --- /dev/null +++ b/maintnotifications/smigrating_test.go @@ -0,0 +1,309 @@ +package maintnotifications + +import ( + "context" + "sync/atomic" + "testing" + + "github.com/redis/go-redis/v9/internal/pool" + "github.com/redis/go-redis/v9/push" +) + +// createMockConnection creates a mock connection for testing +// Uses the mockNetConn from pool_hook_test.go +func createMockConnection() *pool.Conn { + mockNetConn := &mockNetConn{} + return pool.NewConn(mockNetConn) +} + +// TestSMigratingNotificationHandler tests the SMIGRATING notification handler +func TestSMigratingNotificationHandler(t *testing.T) { + t.Run("ValidSMigratingNotification", func(t *testing.T) { + // Create a mock manager with config + config := DefaultConfig() + manager := &Manager{ + config: config, + } + + // Create notification handler + handler := &NotificationHandler{ + manager: manager, + operationsManager: manager, + } + + // Create a mock connection + conn := createMockConnection() + + // Create SMIGRATING notification: ["SMIGRATING", slot] + notification := []interface{}{"SMIGRATING", int64(1234)} + + ctx := context.Background() + handlerCtx := push.NotificationHandlerContext{ + Conn: conn, + } + + // Handle the notification + err := handler.handleSMigrating(ctx, handlerCtx, notification) + if err != nil { + t.Errorf("handleSMigrating should not error: %v", err) + } + + // Verify relaxed timeout was applied + if !conn.HasRelaxedTimeout() { + t.Error("Relaxed timeout should have been set on the connection") + } + }) + + t.Run("InvalidSMigratingNotification_TooShort", func(t *testing.T) { + config := DefaultConfig() + manager := &Manager{ + config: config, + } + handler := &NotificationHandler{ + manager: manager, + operationsManager: manager, + } + + // Invalid notification - too short + notification := []interface{}{"SMIGRATING"} + + ctx := context.Background() + handlerCtx := push.NotificationHandlerContext{} + + err := handler.handleSMigrating(ctx, handlerCtx, notification) + if err != ErrInvalidNotification { + t.Errorf("Expected ErrInvalidNotification, got: %v", err) + } + }) + + t.Run("InvalidSMigratingNotification_InvalidSlot", func(t *testing.T) { + config := DefaultConfig() + manager := &Manager{ + config: config, + } + handler := &NotificationHandler{ + manager: manager, + operationsManager: manager, + } + + // Invalid notification - slot is not int64 + notification := []interface{}{"SMIGRATING", "not-a-number"} + + ctx := context.Background() + handlerCtx := push.NotificationHandlerContext{} + + err := handler.handleSMigrating(ctx, handlerCtx, notification) + if err != ErrInvalidNotification { + t.Errorf("Expected ErrInvalidNotification, got: %v", err) + } + }) + + t.Run("SMigratingNotification_NoConnection", func(t *testing.T) { + config := DefaultConfig() + manager := &Manager{ + config: config, + } + handler := &NotificationHandler{ + manager: manager, + operationsManager: manager, + } + + notification := []interface{}{"SMIGRATING", int64(1234)} + + ctx := context.Background() + handlerCtx := push.NotificationHandlerContext{} // No connection + + err := handler.handleSMigrating(ctx, handlerCtx, notification) + if err != ErrInvalidNotification { + t.Errorf("Expected ErrInvalidNotification when no connection, got: %v", err) + } + }) +} + +// TestSMigratingNotificationRegistration tests that SMIGRATING is registered in the notification types +func TestSMigratingNotificationRegistration(t *testing.T) { + found := false + for _, notifType := range maintenanceNotificationTypes { + if notifType == NotificationSMigrating { + found = true + break + } + } + + if !found { + t.Error("SMIGRATING should be registered in maintenanceNotificationTypes") + } +} + +// TestSMigratingConstant tests that the SMIGRATING constant is defined correctly +func TestSMigratingConstant(t *testing.T) { + if NotificationSMigrating != "SMIGRATING" { + t.Errorf("NotificationSMigrating constant should be 'SMIGRATING', got: %s", NotificationSMigrating) + } +} + +// TestSMigratedNotificationHandler tests the SMIGRATED notification handler +func TestSMigratedNotificationHandler(t *testing.T) { + t.Run("ValidSMigratedNotification", func(t *testing.T) { + // Track if callback was called + var callbackCalled atomic.Bool + var receivedSlot int + + // Create a mock manager with callback + manager := &Manager{ + clusterStateReloadCallback: func(ctx context.Context, slot int) { + callbackCalled.Store(true) + receivedSlot = slot + }, + } + + // Create notification handler + handler := &NotificationHandler{ + manager: manager, + operationsManager: manager, + } + + // Create SMIGRATED notification: ["SMIGRATED", slot] + notification := []interface{}{"SMIGRATED", int64(1234)} + + ctx := context.Background() + handlerCtx := push.NotificationHandlerContext{} + + // Handle the notification + err := handler.handleSMigrated(ctx, handlerCtx, notification) + if err != nil { + t.Errorf("handleSMigrated should not error: %v", err) + } + + // Verify callback was called + if !callbackCalled.Load() { + t.Error("Cluster state reload callback should have been called") + } + + // Verify slot was passed correctly + if receivedSlot != 1234 { + t.Errorf("Expected slot 1234, got %d", receivedSlot) + } + }) + + t.Run("InvalidSMigratedNotification_TooShort", func(t *testing.T) { + manager := &Manager{} + handler := &NotificationHandler{ + manager: manager, + operationsManager: manager, + } + + // Invalid notification - too short + notification := []interface{}{"SMIGRATED"} + + ctx := context.Background() + handlerCtx := push.NotificationHandlerContext{} + + err := handler.handleSMigrated(ctx, handlerCtx, notification) + if err != ErrInvalidNotification { + t.Errorf("Expected ErrInvalidNotification, got: %v", err) + } + }) + + t.Run("InvalidSMigratedNotification_InvalidSlot", func(t *testing.T) { + manager := &Manager{} + handler := &NotificationHandler{ + manager: manager, + operationsManager: manager, + } + + // Invalid notification - slot is not int64 + notification := []interface{}{"SMIGRATED", "not-a-number"} + + ctx := context.Background() + handlerCtx := push.NotificationHandlerContext{} + + err := handler.handleSMigrated(ctx, handlerCtx, notification) + if err != ErrInvalidNotification { + t.Errorf("Expected ErrInvalidNotification, got: %v", err) + } + }) + + t.Run("SMigratedNotification_NoCallback", func(t *testing.T) { + // Manager without callback should not panic + manager := &Manager{} + handler := &NotificationHandler{ + manager: manager, + operationsManager: manager, + } + + notification := []interface{}{"SMIGRATED", int64(1234)} + + ctx := context.Background() + handlerCtx := push.NotificationHandlerContext{} + + err := handler.handleSMigrated(ctx, handlerCtx, notification) + if err != nil { + t.Errorf("handleSMigrated should not error even without callback: %v", err) + } + }) +} + +// TestSMigratedNotificationRegistration tests that SMIGRATED is registered in the notification types +func TestSMigratedNotificationRegistration(t *testing.T) { + found := false + for _, notifType := range maintenanceNotificationTypes { + if notifType == NotificationSMigrated { + found = true + break + } + } + + if !found { + t.Error("SMIGRATED should be registered in maintenanceNotificationTypes") + } +} + +// TestSMigratedConstant tests that the SMIGRATED constant is defined correctly +func TestSMigratedConstant(t *testing.T) { + if NotificationSMigrated != "SMIGRATED" { + t.Errorf("NotificationSMigrated constant should be 'SMIGRATED', got: %s", NotificationSMigrated) + } +} + +// TestClusterStateReloadCallback tests the callback setter and trigger +func TestClusterStateReloadCallback(t *testing.T) { + t.Run("SetAndTriggerCallback", func(t *testing.T) { + var callbackCalled atomic.Bool + var receivedCtx context.Context + var receivedSlot int + + manager := &Manager{} + callback := func(ctx context.Context, slot int) { + callbackCalled.Store(true) + receivedCtx = ctx + receivedSlot = slot + } + + manager.SetClusterStateReloadCallback(callback) + + ctx := context.Background() + testSlot := 1234 + manager.TriggerClusterStateReload(ctx, testSlot) + + if !callbackCalled.Load() { + t.Error("Callback should have been called") + } + + if receivedCtx != ctx { + t.Error("Callback should receive the correct context") + } + + if receivedSlot != testSlot { + t.Errorf("Callback should receive the correct slot, got %d, want %d", receivedSlot, testSlot) + } + }) + + t.Run("TriggerWithoutCallback", func(t *testing.T) { + manager := &Manager{} + // Should not panic + ctx := context.Background() + manager.TriggerClusterStateReload(ctx, 1234) + }) +} + diff --git a/osscluster.go b/osscluster.go index 7925d2c603..623a1df62e 100644 --- a/osscluster.go +++ b/osscluster.go @@ -146,7 +146,8 @@ type ClusterOptions struct { // cluster upgrade notifications gracefully and manage connection/pool state // transitions seamlessly. Requires Protocol: 3 (RESP3) for push notifications. // If nil, maintnotifications upgrades are in "auto" mode and will be enabled if the server supports it. - // The ClusterClient does not directly work with maintnotifications, it is up to the clients in the Nodes map to work with maintnotifications. + // The ClusterClient supports SMOVING notifications for cluster state management. + // Individual node clients handle other maintenance notifications (MOVING, MIGRATING, etc.). MaintNotificationsConfig *maintnotifications.Config } @@ -1038,6 +1039,23 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient { txPipeline: c.processTxPipeline, }) + // Set up SMOVING notification handling for cluster state reload + // When a node client receives a SMOVING notification, it should trigger + // cluster state reload on the parent ClusterClient + if opt.MaintNotificationsConfig != nil { + c.nodes.OnNewNode(func(nodeClient *Client) { + manager := nodeClient.GetMaintNotificationsManager() + if manager != nil { + manager.SetClusterStateReloadCallback(func(ctx context.Context, slot int) { + // Currently we reload the entire cluster state + // In the future, this could be optimized to reload only the specific slot + _ = slot // slot parameter available for future optimization + c.state.LazyReload() + }) + } + }) + } + return c } From 4631320c9403df27f046ce2ae227d3af8dba745d Mon Sep 17 00:00:00 2001 From: Nedyalko Dyakov Date: Mon, 17 Nov 2025 18:55:34 +0200 Subject: [PATCH 2/6] proper notification format --- cluster_smigrating_test.go | 49 ++++++---- .../maintnotifications/logs/log_messages.go | 45 +++++---- maintnotifications/README.md | 4 +- maintnotifications/manager.go | 13 +-- .../push_notification_handler.go | 59 +++++++++--- maintnotifications/smigrating_test.go | 93 +++++++++++++------ osscluster.go | 15 +-- 7 files changed, 190 insertions(+), 88 deletions(-) diff --git a/cluster_smigrating_test.go b/cluster_smigrating_test.go index ad0fee4718..c0fe17e718 100644 --- a/cluster_smigrating_test.go +++ b/cluster_smigrating_test.go @@ -45,29 +45,35 @@ func TestClusterClientSMigratedCallback(t *testing.T) { return } - // Temporarily replace the cluster state reload with our test version - var receivedSlot int - originalCallback := manager - manager.SetClusterStateReloadCallback(func(ctx context.Context, slot int) { + // Set up cluster state reload callback for testing + var receivedHostPort string + var receivedSlotRanges []string + manager.SetClusterStateReloadCallback(func(ctx context.Context, hostPort string, slotRanges []string) { reloadCalled.Store(true) - receivedSlot = slot + receivedHostPort = hostPort + receivedSlotRanges = slotRanges }) // Trigger the callback (this is what SMIGRATED notification would do) ctx := context.Background() - testSlot := 1234 - manager.TriggerClusterStateReload(ctx, testSlot) + testHostPort := "127.0.0.1:6379" + testSlotRanges := []string{"1234", "5000-6000"} + manager.TriggerClusterStateReload(ctx, testHostPort, testSlotRanges) // Verify callback was called if !reloadCalled.Load() { t.Error("Cluster state reload callback should have been called") } - // Verify slot was passed correctly - if receivedSlot != testSlot { - t.Errorf("Expected slot %d, got %d", testSlot, receivedSlot) + // Verify host:port was passed correctly + if receivedHostPort != testHostPort { + t.Errorf("Expected host:port %s, got %s", testHostPort, receivedHostPort) + } + + // Verify slot ranges were passed correctly + if len(receivedSlotRanges) != len(testSlotRanges) { + t.Errorf("Expected %d slot ranges, got %d", len(testSlotRanges), len(receivedSlotRanges)) } - _ = originalCallback }) t.Run("NoCallbackWithoutMaintNotifications", func(t *testing.T) { @@ -121,22 +127,29 @@ func TestClusterClientSMigratedIntegration(t *testing.T) { // We can't directly test LazyReload being called without a real cluster, // but we can verify the callback mechanism works var callbackWorks atomic.Bool - var receivedSlot int - manager.SetClusterStateReloadCallback(func(ctx context.Context, slot int) { + var receivedHostPort string + var receivedSlotRanges []string + manager.SetClusterStateReloadCallback(func(ctx context.Context, hostPort string, slotRanges []string) { callbackWorks.Store(true) - receivedSlot = slot + receivedHostPort = hostPort + receivedSlotRanges = slotRanges }) ctx := context.Background() - testSlot := 5678 - manager.TriggerClusterStateReload(ctx, testSlot) + testHostPort := "127.0.0.1:7000" + testSlotRanges := []string{"5678"} + manager.TriggerClusterStateReload(ctx, testHostPort, testSlotRanges) if !callbackWorks.Load() { t.Error("Callback mechanism should work") } - if receivedSlot != testSlot { - t.Errorf("Expected slot %d, got %d", testSlot, receivedSlot) + if receivedHostPort != testHostPort { + t.Errorf("Expected host:port %s, got %s", testHostPort, receivedHostPort) + } + + if len(receivedSlotRanges) != 1 || receivedSlotRanges[0] != "5678" { + t.Errorf("Expected slot ranges [5678], got %v", receivedSlotRanges) } }) } diff --git a/internal/maintnotifications/logs/log_messages.go b/internal/maintnotifications/logs/log_messages.go index d938dfa88e..7418d9f652 100644 --- a/internal/maintnotifications/logs/log_messages.go +++ b/internal/maintnotifications/logs/log_messages.go @@ -121,10 +121,11 @@ const ( UnrelaxedTimeoutMessage = "clearing relaxed timeout" ManagerNotInitializedMessage = "manager not initialized" FailedToMarkForHandoffMessage = "failed to mark connection for handoff" - InvalidSlotInSMigratingNotificationMessage = "invalid slot in SMIGRATING notification" - InvalidSlotInSMigratedNotificationMessage = "invalid slot in SMIGRATED notification" - SlotMigratingMessage = "slot is migrating, applying relaxed timeout" - SlotMigratedMessage = "slot has migrated, triggering cluster state reload" + InvalidSeqIDInSMigratingNotificationMessage = "invalid SeqID in SMIGRATING notification" + InvalidSeqIDInSMigratedNotificationMessage = "invalid SeqID in SMIGRATED notification" + InvalidHostPortInSMigratedNotificationMessage = "invalid host:port in SMIGRATED notification" + SlotMigratingMessage = "slots migrating, applying relaxed timeout" + SlotMigratedMessage = "slots migrated, triggering cluster state reload" // ======================================== // used in pool/conn @@ -629,31 +630,41 @@ func ExtractDataFromLogMessage(logMessage string) map[string]interface{} { } // Cluster notification functions -func InvalidSlotInSMigratingNotification(slot interface{}) string { - message := fmt.Sprintf("%s: %v", InvalidSlotInSMigratingNotificationMessage, slot) +func InvalidSeqIDInSMigratingNotification(seqID interface{}) string { + message := fmt.Sprintf("%s: %v", InvalidSeqIDInSMigratingNotificationMessage, seqID) return appendJSONIfDebug(message, map[string]interface{}{ - "slot": fmt.Sprintf("%v", slot), + "seqID": fmt.Sprintf("%v", seqID), }) } -func InvalidSlotInSMigratedNotification(slot interface{}) string { - message := fmt.Sprintf("%s: %v", InvalidSlotInSMigratedNotificationMessage, slot) +func InvalidSeqIDInSMigratedNotification(seqID interface{}) string { + message := fmt.Sprintf("%s: %v", InvalidSeqIDInSMigratedNotificationMessage, seqID) return appendJSONIfDebug(message, map[string]interface{}{ - "slot": fmt.Sprintf("%v", slot), + "seqID": fmt.Sprintf("%v", seqID), }) } -func SlotMigrating(connID uint64, slot int64) string { - message := fmt.Sprintf("conn[%d] %s %d", connID, SlotMigratingMessage, slot) +func InvalidHostPortInSMigratedNotification(hostPort interface{}) string { + message := fmt.Sprintf("%s: %v", InvalidHostPortInSMigratedNotificationMessage, hostPort) return appendJSONIfDebug(message, map[string]interface{}{ - "connID": connID, - "slot": slot, + "hostPort": fmt.Sprintf("%v", hostPort), + }) +} + +func SlotMigrating(connID uint64, seqID int64, slotRanges []string) string { + message := fmt.Sprintf("conn[%d] %s seqID=%d slots=%v", connID, SlotMigratingMessage, seqID, slotRanges) + return appendJSONIfDebug(message, map[string]interface{}{ + "connID": connID, + "seqID": seqID, + "slotRanges": slotRanges, }) } -func SlotMigrated(slot int64) string { - message := fmt.Sprintf("%s %d", SlotMigratedMessage, slot) +func SlotMigrated(seqID int64, hostPort string, slotRanges []string) string { + message := fmt.Sprintf("%s seqID=%d host:port=%s slots=%v", SlotMigratedMessage, seqID, hostPort, slotRanges) return appendJSONIfDebug(message, map[string]interface{}{ - "slot": slot, + "seqID": seqID, + "hostPort": hostPort, + "slotRanges": slotRanges, }) } diff --git a/maintnotifications/README.md b/maintnotifications/README.md index c9617db503..c931e61f8f 100644 --- a/maintnotifications/README.md +++ b/maintnotifications/README.md @@ -6,8 +6,8 @@ Seamless Redis connection handoffs during cluster maintenance operations without **Cluster notifications are now supported for ClusterClient!** -- **SMIGRATING**: Relaxes timeouts when a slot is being migrated -- **SMIGRATED**: Reloads cluster state when a slot migration completes +- **SMIGRATING**: `["SMIGRATING", SeqID, slot/range, ...]` - Relaxes timeouts when slots are being migrated +- **SMIGRATED**: `["SMIGRATED", SeqID, host:port, slot/range, ...]` - Reloads cluster state when slot migration completes **Note:** Other maintenance notifications (MOVING, MIGRATING, MIGRATED, FAILING_OVER, FAILED_OVER) are supported only in standalone Redis clients. Cluster clients support SMIGRATING and SMIGRATED for cluster-specific slot migration handling. diff --git a/maintnotifications/manager.go b/maintnotifications/manager.go index 5a638ca958..6f43f1f590 100644 --- a/maintnotifications/manager.go +++ b/maintnotifications/manager.go @@ -92,10 +92,11 @@ type MovingOperation struct { // ClusterStateReloadCallback is a callback function that triggers cluster state reload. // This is used by node clients to notify their parent ClusterClient about SMIGRATED notifications. -// The slot parameter indicates which slot has migrated (0-16383). +// The hostPort parameter indicates the destination node (e.g., "127.0.0.1:6379"). +// The slotRanges parameter contains the migrated slots (e.g., ["1234", "5000-6000"]). // Currently, implementations typically reload the entire cluster state, but in the future -// this could be optimized to reload only the specific slot. -type ClusterStateReloadCallback func(ctx context.Context, slot int) +// this could be optimized to reload only the specific slots. +type ClusterStateReloadCallback func(ctx context.Context, hostPort string, slotRanges []string) // NewManager creates a new simplified manager. func NewManager(client interfaces.ClientInterface, pool pool.Pooler, config *Config) (*Manager, error) { @@ -340,9 +341,9 @@ func (hm *Manager) SetClusterStateReloadCallback(callback ClusterStateReloadCall } // TriggerClusterStateReload calls the cluster state reload callback if it's set. -// This is called when a SMOVED notification is received. -func (hm *Manager) TriggerClusterStateReload(ctx context.Context, slot int) { +// This is called when a SMIGRATED notification is received. +func (hm *Manager) TriggerClusterStateReload(ctx context.Context, hostPort string, slotRanges []string) { if hm.clusterStateReloadCallback != nil { - hm.clusterStateReloadCallback(ctx, slot) + hm.clusterStateReloadCallback(ctx, hostPort, slotRanges) } } diff --git a/maintnotifications/push_notification_handler.go b/maintnotifications/push_notification_handler.go index 524da6e90f..d9c91291d8 100644 --- a/maintnotifications/push_notification_handler.go +++ b/maintnotifications/push_notification_handler.go @@ -294,19 +294,30 @@ func (snh *NotificationHandler) handleFailedOver(ctx context.Context, handlerCtx // handleSMigrating processes SMIGRATING notifications. // SMIGRATING indicates that a cluster slot is in the process of migrating to a different node. // This is a per-connection notification that applies relaxed timeouts during slot migration. -// Expected format: ["SMIGRATING", slot, ...] +// Expected format: ["SMIGRATING", SeqID, slot/range1-range2, ...] func (snh *NotificationHandler) handleSMigrating(ctx context.Context, handlerCtx push.NotificationHandlerContext, notification []interface{}) error { - if len(notification) < 2 { + if len(notification) < 3 { internal.Logger.Printf(ctx, logs.InvalidNotification("SMIGRATING", notification)) return ErrInvalidNotification } - slot, ok := notification[1].(int64) + // Extract SeqID (position 1) + seqID, ok := notification[1].(int64) if !ok { - internal.Logger.Printf(ctx, logs.InvalidSlotInSMigratingNotification(notification[1])) + internal.Logger.Printf(ctx, logs.InvalidSeqIDInSMigratingNotification(notification[1])) return ErrInvalidNotification } + // Extract slot ranges (position 2+) + // For now, we just extract them for logging + // Format can be: single slot "1234" or range "100-200" + var slotRanges []string + for i := 2; i < len(notification); i++ { + if slotRange, ok := notification[i].(string); ok { + slotRanges = append(slotRanges, slotRange) + } + } + if handlerCtx.Conn == nil { internal.Logger.Printf(ctx, logs.NoConnectionInHandlerContext("SMIGRATING")) return ErrInvalidNotification @@ -320,7 +331,7 @@ func (snh *NotificationHandler) handleSMigrating(ctx context.Context, handlerCtx // Apply relaxed timeout to this specific connection if internal.LogLevel.InfoOrAbove() { - internal.Logger.Printf(ctx, logs.SlotMigrating(conn.GetID(), slot)) + internal.Logger.Printf(ctx, logs.SlotMigrating(conn.GetID(), seqID, slotRanges)) } conn.SetRelaxedTimeout(snh.manager.config.RelaxedTimeout, snh.manager.config.RelaxedTimeout) return nil @@ -329,26 +340,48 @@ func (snh *NotificationHandler) handleSMigrating(ctx context.Context, handlerCtx // handleSMigrated processes SMIGRATED notifications. // SMIGRATED indicates that a cluster slot has finished migrating to a different node. // This is a cluster-level notification that triggers cluster state reload. -// Expected format: ["SMIGRATED", slot, ...] +// Expected format: ["SMIGRATED", SeqID, host:port, slot1/range1-range2, ...] func (snh *NotificationHandler) handleSMigrated(ctx context.Context, handlerCtx push.NotificationHandlerContext, notification []interface{}) error { - if len(notification) < 2 { + if len(notification) < 4 { internal.Logger.Printf(ctx, logs.InvalidNotification("SMIGRATED", notification)) return ErrInvalidNotification } - slot, ok := notification[1].(int64) + // Extract SeqID (position 1) + seqID, ok := notification[1].(int64) + if !ok { + internal.Logger.Printf(ctx, logs.InvalidSeqIDInSMigratedNotification(notification[1])) + return ErrInvalidNotification + } + + // Extract host:port (position 2) + hostPort, ok := notification[2].(string) if !ok { - internal.Logger.Printf(ctx, logs.InvalidSlotInSMigratedNotification(notification[1])) + internal.Logger.Printf(ctx, logs.InvalidHostPortInSMigratedNotification(notification[2])) return ErrInvalidNotification } + // Extract slot ranges (position 3+) + // For now, we just extract them for logging + // Format can be: single slot "1234" or range "100-200" + var slotRanges []string + for i := 3; i < len(notification); i++ { + if slotRange, ok := notification[i].(string); ok { + slotRanges = append(slotRanges, slotRange) + } + } + if internal.LogLevel.InfoOrAbove() { - internal.Logger.Printf(ctx, logs.SlotMigrated(slot)) + internal.Logger.Printf(ctx, logs.SlotMigrated(seqID, hostPort, slotRanges)) } - // Trigger cluster state reload via callback, passing the slot ID - // This allows for future optimization of partial slot reloads - snh.manager.TriggerClusterStateReload(ctx, int(slot)) + // Trigger cluster state reload via callback, passing host:port and slot ranges + // For now, implementations just log these and trigger a full reload + // In the future, this could be optimized to reload only the specific slots + snh.manager.TriggerClusterStateReload(ctx, hostPort, slotRanges) + + // TODO: Should we also clear the relaxed timeout here (like MIGRATED does)? + // Currently we only trigger state reload, but the timeout stays relaxed return nil } diff --git a/maintnotifications/smigrating_test.go b/maintnotifications/smigrating_test.go index 2e62b117e8..80e55afe36 100644 --- a/maintnotifications/smigrating_test.go +++ b/maintnotifications/smigrating_test.go @@ -34,8 +34,8 @@ func TestSMigratingNotificationHandler(t *testing.T) { // Create a mock connection conn := createMockConnection() - // Create SMIGRATING notification: ["SMIGRATING", slot] - notification := []interface{}{"SMIGRATING", int64(1234)} + // Create SMIGRATING notification: ["SMIGRATING", SeqID, slot/range, ...] + notification := []interface{}{"SMIGRATING", int64(123), "1234", "5000-6000"} ctx := context.Background() handlerCtx := push.NotificationHandlerContext{ @@ -76,7 +76,7 @@ func TestSMigratingNotificationHandler(t *testing.T) { } }) - t.Run("InvalidSMigratingNotification_InvalidSlot", func(t *testing.T) { + t.Run("InvalidSMigratingNotification_InvalidSeqID", func(t *testing.T) { config := DefaultConfig() manager := &Manager{ config: config, @@ -86,8 +86,8 @@ func TestSMigratingNotificationHandler(t *testing.T) { operationsManager: manager, } - // Invalid notification - slot is not int64 - notification := []interface{}{"SMIGRATING", "not-a-number"} + // Invalid notification - SeqID is not int64 + notification := []interface{}{"SMIGRATING", "not-a-number", "1234"} ctx := context.Background() handlerCtx := push.NotificationHandlerContext{} @@ -108,7 +108,7 @@ func TestSMigratingNotificationHandler(t *testing.T) { operationsManager: manager, } - notification := []interface{}{"SMIGRATING", int64(1234)} + notification := []interface{}{"SMIGRATING", int64(123), "1234"} ctx := context.Background() handlerCtx := push.NotificationHandlerContext{} // No connection @@ -147,13 +147,15 @@ func TestSMigratedNotificationHandler(t *testing.T) { t.Run("ValidSMigratedNotification", func(t *testing.T) { // Track if callback was called var callbackCalled atomic.Bool - var receivedSlot int + var receivedHostPort string + var receivedSlotRanges []string // Create a mock manager with callback manager := &Manager{ - clusterStateReloadCallback: func(ctx context.Context, slot int) { + clusterStateReloadCallback: func(ctx context.Context, hostPort string, slotRanges []string) { callbackCalled.Store(true) - receivedSlot = slot + receivedHostPort = hostPort + receivedSlotRanges = slotRanges }, } @@ -163,8 +165,8 @@ func TestSMigratedNotificationHandler(t *testing.T) { operationsManager: manager, } - // Create SMIGRATED notification: ["SMIGRATED", slot] - notification := []interface{}{"SMIGRATED", int64(1234)} + // Create SMIGRATED notification: ["SMIGRATED", SeqID, host:port, slot/range, ...] + notification := []interface{}{"SMIGRATED", int64(123), "127.0.0.1:6379", "1234", "5000-6000"} ctx := context.Background() handlerCtx := push.NotificationHandlerContext{} @@ -180,9 +182,22 @@ func TestSMigratedNotificationHandler(t *testing.T) { t.Error("Cluster state reload callback should have been called") } - // Verify slot was passed correctly - if receivedSlot != 1234 { - t.Errorf("Expected slot 1234, got %d", receivedSlot) + // Verify host:port was passed correctly + if receivedHostPort != "127.0.0.1:6379" { + t.Errorf("Expected host:port '127.0.0.1:6379', got '%s'", receivedHostPort) + } + + // Verify slot ranges were passed correctly + if len(receivedSlotRanges) != 2 { + t.Errorf("Expected 2 slot ranges, got %d", len(receivedSlotRanges)) + } + if len(receivedSlotRanges) >= 2 { + if receivedSlotRanges[0] != "1234" { + t.Errorf("Expected first slot range '1234', got '%s'", receivedSlotRanges[0]) + } + if receivedSlotRanges[1] != "5000-6000" { + t.Errorf("Expected second slot range '5000-6000', got '%s'", receivedSlotRanges[1]) + } } }) @@ -205,15 +220,34 @@ func TestSMigratedNotificationHandler(t *testing.T) { } }) - t.Run("InvalidSMigratedNotification_InvalidSlot", func(t *testing.T) { + t.Run("InvalidSMigratedNotification_InvalidSeqID", func(t *testing.T) { manager := &Manager{} handler := &NotificationHandler{ manager: manager, operationsManager: manager, } - // Invalid notification - slot is not int64 - notification := []interface{}{"SMIGRATED", "not-a-number"} + // Invalid notification - SeqID is not int64 + notification := []interface{}{"SMIGRATED", "not-a-number", "127.0.0.1:6379", "1234"} + + ctx := context.Background() + handlerCtx := push.NotificationHandlerContext{} + + err := handler.handleSMigrated(ctx, handlerCtx, notification) + if err != ErrInvalidNotification { + t.Errorf("Expected ErrInvalidNotification, got: %v", err) + } + }) + + t.Run("InvalidSMigratedNotification_InvalidHostPort", func(t *testing.T) { + manager := &Manager{} + handler := &NotificationHandler{ + manager: manager, + operationsManager: manager, + } + + // Invalid notification - host:port is not string + notification := []interface{}{"SMIGRATED", int64(123), int64(999), "1234"} ctx := context.Background() handlerCtx := push.NotificationHandlerContext{} @@ -232,7 +266,7 @@ func TestSMigratedNotificationHandler(t *testing.T) { operationsManager: manager, } - notification := []interface{}{"SMIGRATED", int64(1234)} + notification := []interface{}{"SMIGRATED", int64(123), "127.0.0.1:6379", "1234"} ctx := context.Background() handlerCtx := push.NotificationHandlerContext{} @@ -271,20 +305,23 @@ func TestClusterStateReloadCallback(t *testing.T) { t.Run("SetAndTriggerCallback", func(t *testing.T) { var callbackCalled atomic.Bool var receivedCtx context.Context - var receivedSlot int + var receivedHostPort string + var receivedSlotRanges []string manager := &Manager{} - callback := func(ctx context.Context, slot int) { + callback := func(ctx context.Context, hostPort string, slotRanges []string) { callbackCalled.Store(true) receivedCtx = ctx - receivedSlot = slot + receivedHostPort = hostPort + receivedSlotRanges = slotRanges } manager.SetClusterStateReloadCallback(callback) ctx := context.Background() - testSlot := 1234 - manager.TriggerClusterStateReload(ctx, testSlot) + testHostPort := "127.0.0.1:6379" + testSlotRanges := []string{"1234", "5000-6000"} + manager.TriggerClusterStateReload(ctx, testHostPort, testSlotRanges) if !callbackCalled.Load() { t.Error("Callback should have been called") @@ -294,8 +331,12 @@ func TestClusterStateReloadCallback(t *testing.T) { t.Error("Callback should receive the correct context") } - if receivedSlot != testSlot { - t.Errorf("Callback should receive the correct slot, got %d, want %d", receivedSlot, testSlot) + if receivedHostPort != testHostPort { + t.Errorf("Callback should receive the correct host:port, got %s, want %s", receivedHostPort, testHostPort) + } + + if len(receivedSlotRanges) != len(testSlotRanges) { + t.Errorf("Callback should receive the correct slot ranges, got %v, want %v", receivedSlotRanges, testSlotRanges) } }) @@ -303,7 +344,7 @@ func TestClusterStateReloadCallback(t *testing.T) { manager := &Manager{} // Should not panic ctx := context.Background() - manager.TriggerClusterStateReload(ctx, 1234) + manager.TriggerClusterStateReload(ctx, "127.0.0.1:6379", []string{"1234"}) }) } diff --git a/osscluster.go b/osscluster.go index 623a1df62e..adbb73ba79 100644 --- a/osscluster.go +++ b/osscluster.go @@ -146,7 +146,7 @@ type ClusterOptions struct { // cluster upgrade notifications gracefully and manage connection/pool state // transitions seamlessly. Requires Protocol: 3 (RESP3) for push notifications. // If nil, maintnotifications upgrades are in "auto" mode and will be enabled if the server supports it. - // The ClusterClient supports SMOVING notifications for cluster state management. + // The ClusterClient supports SMIGRATING and SMIGRATED notifications for cluster state management. // Individual node clients handle other maintenance notifications (MOVING, MIGRATING, etc.). MaintNotificationsConfig *maintnotifications.Config } @@ -1039,17 +1039,20 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient { txPipeline: c.processTxPipeline, }) - // Set up SMOVING notification handling for cluster state reload - // When a node client receives a SMOVING notification, it should trigger + // Set up SMIGRATED notification handling for cluster state reload + // When a node client receives a SMIGRATED notification, it should trigger // cluster state reload on the parent ClusterClient if opt.MaintNotificationsConfig != nil { c.nodes.OnNewNode(func(nodeClient *Client) { manager := nodeClient.GetMaintNotificationsManager() if manager != nil { - manager.SetClusterStateReloadCallback(func(ctx context.Context, slot int) { + manager.SetClusterStateReloadCallback(func(ctx context.Context, hostPort string, slotRanges []string) { + // Log the migration details for now + if internal.LogLevel.InfoOrAbove() { + internal.Logger.Printf(ctx, "cluster: slots %v migrated to %s, reloading cluster state", slotRanges, hostPort) + } // Currently we reload the entire cluster state - // In the future, this could be optimized to reload only the specific slot - _ = slot // slot parameter available for future optimization + // In the future, this could be optimized to reload only the specific slots c.state.LazyReload() }) } From b3a3bdde70e1607b4069bdf01eb41ba39e88c1b4 Mon Sep 17 00:00:00 2001 From: Nedyalko Dyakov <1547186+ndyakov@users.noreply.github.com> Date: Mon, 17 Nov 2025 23:14:38 +0200 Subject: [PATCH 3/6] Update maintnotifications/manager.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- maintnotifications/manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/maintnotifications/manager.go b/maintnotifications/manager.go index 6f43f1f590..3b17c93f54 100644 --- a/maintnotifications/manager.go +++ b/maintnotifications/manager.go @@ -334,7 +334,7 @@ func (hm *Manager) AddNotificationHook(notificationHook NotificationHook) { hm.hooks = append(hm.hooks, notificationHook) } -// SetClusterStateReloadCallback sets the callback function that will be called when a SMOVED notification is received. +// SetClusterStateReloadCallback sets the callback function that will be called when a SMIGRATED notification is received. // This allows node clients to notify their parent ClusterClient to reload cluster state. func (hm *Manager) SetClusterStateReloadCallback(callback ClusterStateReloadCallback) { hm.clusterStateReloadCallback = callback From d7a246b9a93d80e1736175aa3ef4c9d3ee8d058d Mon Sep 17 00:00:00 2001 From: Nedyalko Dyakov Date: Tue, 18 Nov 2025 17:35:36 +0200 Subject: [PATCH 4/6] cascading smigrated will trigger multiple reloads --- osscluster.go | 34 ++++-- osscluster_lazy_reload_test.go | 202 +++++++++++++++++++++++++++++++++ 2 files changed, 229 insertions(+), 7 deletions(-) create mode 100644 osscluster_lazy_reload_test.go diff --git a/osscluster.go b/osscluster.go index adbb73ba79..2470a213e8 100644 --- a/osscluster.go +++ b/osscluster.go @@ -946,8 +946,9 @@ func (c *clusterState) slotNodes(slot int) []*clusterNode { type clusterStateHolder struct { load func(ctx context.Context) (*clusterState, error) - state atomic.Value - reloading uint32 // atomic + state atomic.Value + reloading uint32 // atomic + reloadPending uint32 // atomic - set to 1 when reload is requested during active reload } func newClusterStateHolder(fn func(ctx context.Context) (*clusterState, error)) *clusterStateHolder { @@ -966,17 +967,36 @@ func (c *clusterStateHolder) Reload(ctx context.Context) (*clusterState, error) } func (c *clusterStateHolder) LazyReload() { + // If already reloading, mark that another reload is pending if !atomic.CompareAndSwapUint32(&c.reloading, 0, 1) { + atomic.StoreUint32(&c.reloadPending, 1) return } + go func() { - defer atomic.StoreUint32(&c.reloading, 0) + for { + _, err := c.Reload(context.Background()) + if err != nil { + atomic.StoreUint32(&c.reloading, 0) + return + } - _, err := c.Reload(context.Background()) - if err != nil { - return + // Clear pending flag after reload completes, before cooldown + // This captures notifications that arrived during the reload + atomic.StoreUint32(&c.reloadPending, 0) + + // Wait cooldown period + time.Sleep(200 * time.Millisecond) + + // Check if another reload was requested during cooldown + if atomic.LoadUint32(&c.reloadPending) == 0 { + // No pending reload, we're done + atomic.StoreUint32(&c.reloading, 0) + return + } + + // Pending reload requested, loop to reload again } - time.Sleep(200 * time.Millisecond) }() } diff --git a/osscluster_lazy_reload_test.go b/osscluster_lazy_reload_test.go new file mode 100644 index 0000000000..308a316e93 --- /dev/null +++ b/osscluster_lazy_reload_test.go @@ -0,0 +1,202 @@ +package redis + +import ( + "context" + "sync/atomic" + "testing" + "time" +) + +// TestLazyReloadQueueBehavior tests that LazyReload properly queues reload requests +func TestLazyReloadQueueBehavior(t *testing.T) { + t.Run("SingleReload", func(t *testing.T) { + var reloadCount atomic.Int32 + + holder := newClusterStateHolder(func(ctx context.Context) (*clusterState, error) { + reloadCount.Add(1) + time.Sleep(50 * time.Millisecond) // Simulate reload work + return &clusterState{}, nil + }) + + // Trigger one reload + holder.LazyReload() + + // Wait for reload to complete + time.Sleep(300 * time.Millisecond) + + if count := reloadCount.Load(); count != 1 { + t.Errorf("Expected 1 reload, got %d", count) + } + }) + + t.Run("ConcurrentReloadsDeduplication", func(t *testing.T) { + var reloadCount atomic.Int32 + + holder := newClusterStateHolder(func(ctx context.Context) (*clusterState, error) { + reloadCount.Add(1) + time.Sleep(50 * time.Millisecond) // Simulate reload work + return &clusterState{}, nil + }) + + // Trigger multiple reloads concurrently + for i := 0; i < 10; i++ { + go holder.LazyReload() + } + + // Wait for all to complete + time.Sleep(100 * time.Millisecond) + + // Should only reload once (all concurrent calls deduplicated) + if count := reloadCount.Load(); count != 1 { + t.Errorf("Expected 1 reload (deduplication), got %d", count) + } + }) + + t.Run("PendingReloadDuringCooldown", func(t *testing.T) { + var reloadCount atomic.Int32 + + holder := newClusterStateHolder(func(ctx context.Context) (*clusterState, error) { + reloadCount.Add(1) + time.Sleep(10 * time.Millisecond) // Simulate reload work + return &clusterState{}, nil + }) + + // Trigger first reload + holder.LazyReload() + + // Wait for reload to complete but still in cooldown + time.Sleep(50 * time.Millisecond) + + // Trigger second reload during cooldown period + holder.LazyReload() + + // Wait for second reload to complete + time.Sleep(300 * time.Millisecond) + + // Should have reloaded twice (second request queued and executed) + if count := reloadCount.Load(); count != 2 { + t.Errorf("Expected 2 reloads (queued during cooldown), got %d", count) + } + }) + + t.Run("MultiplePendingReloadsCollapsed", func(t *testing.T) { + var reloadCount atomic.Int32 + + holder := newClusterStateHolder(func(ctx context.Context) (*clusterState, error) { + reloadCount.Add(1) + time.Sleep(10 * time.Millisecond) // Simulate reload work + return &clusterState{}, nil + }) + + // Trigger first reload + holder.LazyReload() + + // Wait for reload to start + time.Sleep(5 * time.Millisecond) + + // Trigger multiple reloads during active reload + cooldown + for i := 0; i < 10; i++ { + holder.LazyReload() + time.Sleep(5 * time.Millisecond) + } + + // Wait for all to complete + time.Sleep(400 * time.Millisecond) + + // Should have reloaded exactly twice: + // 1. Initial reload + // 2. One more reload for all the pending requests (collapsed into one) + if count := reloadCount.Load(); count != 2 { + t.Errorf("Expected 2 reloads (initial + collapsed pending), got %d", count) + } + }) + + t.Run("ReloadAfterCooldownPeriod", func(t *testing.T) { + var reloadCount atomic.Int32 + + holder := newClusterStateHolder(func(ctx context.Context) (*clusterState, error) { + reloadCount.Add(1) + time.Sleep(10 * time.Millisecond) // Simulate reload work + return &clusterState{}, nil + }) + + // Trigger first reload + holder.LazyReload() + + // Wait for reload + cooldown to complete + time.Sleep(300 * time.Millisecond) + + // Trigger second reload after cooldown + holder.LazyReload() + + // Wait for second reload to complete + time.Sleep(300 * time.Millisecond) + + // Should have reloaded twice (separate reload cycles) + if count := reloadCount.Load(); count != 2 { + t.Errorf("Expected 2 reloads (separate cycles), got %d", count) + } + }) + + t.Run("ErrorDuringReload", func(t *testing.T) { + var reloadCount atomic.Int32 + var shouldFail atomic.Bool + shouldFail.Store(true) + + holder := newClusterStateHolder(func(ctx context.Context) (*clusterState, error) { + reloadCount.Add(1) + if shouldFail.Load() { + return nil, context.DeadlineExceeded + } + return &clusterState{}, nil + }) + + // Trigger reload that will fail + holder.LazyReload() + + // Wait for failed reload + time.Sleep(50 * time.Millisecond) + + // Trigger another reload (should succeed now) + shouldFail.Store(false) + holder.LazyReload() + + // Wait for successful reload + time.Sleep(300 * time.Millisecond) + + // Should have attempted reload twice (first failed, second succeeded) + if count := reloadCount.Load(); count != 2 { + t.Errorf("Expected 2 reload attempts, got %d", count) + } + }) + + t.Run("CascadingSMigratedScenario", func(t *testing.T) { + // Simulate the real-world scenario: multiple SMIGRATED notifications + // arriving in quick succession from different node clients + var reloadCount atomic.Int32 + + holder := newClusterStateHolder(func(ctx context.Context) (*clusterState, error) { + reloadCount.Add(1) + time.Sleep(20 * time.Millisecond) // Simulate realistic reload time + return &clusterState{}, nil + }) + + // Simulate 5 SMIGRATED notifications arriving within 100ms + for i := 0; i < 5; i++ { + go holder.LazyReload() + time.Sleep(20 * time.Millisecond) + } + + // Wait for all reloads to complete + time.Sleep(500 * time.Millisecond) + + // Should reload at most 2 times: + // 1. First notification triggers reload + // 2. Notifications 2-5 collapse into one pending reload + count := reloadCount.Load() + if count < 1 || count > 2 { + t.Errorf("Expected 1-2 reloads for cascading scenario, got %d", count) + } + }) +} + From 391cee9ea321f7cb0c578a3e8684cc4d826db066 Mon Sep 17 00:00:00 2001 From: Nedyalko Dyakov Date: Wed, 19 Nov 2025 00:24:57 +0200 Subject: [PATCH 5/6] process once per client / seqid --- maintnotifications/manager.go | 13 +++++ .../push_notification_handler.go | 10 ++++ maintnotifications/smigrating_test.go | 58 +++++++++++++++++++ 3 files changed, 81 insertions(+) diff --git a/maintnotifications/manager.go b/maintnotifications/manager.go index 3b17c93f54..0cd75c1f40 100644 --- a/maintnotifications/manager.go +++ b/maintnotifications/manager.go @@ -69,6 +69,10 @@ type Manager struct { // MOVING operation tracking - using sync.Map for better concurrent performance activeMovingOps sync.Map // map[MovingOperationKey]*MovingOperation + // SMIGRATED notification deduplication - tracks processed SeqIDs + // Multiple connections may receive the same SMIGRATED notification + processedSMigratedSeqIDs sync.Map // map[int64]bool + // Atomic state tracking - no locks needed for state queries activeOperationCount atomic.Int64 // Number of active operations closed atomic.Bool // Manager closed state @@ -238,6 +242,15 @@ func (hm *Manager) GetActiveOperationCount() int64 { return hm.activeOperationCount.Load() } +// MarkSMigratedSeqIDProcessed attempts to mark a SMIGRATED SeqID as processed. +// Returns true if this is the first time processing this SeqID (should process), +// false if it was already processed (should skip). +// This prevents duplicate processing when multiple connections receive the same notification. +func (hm *Manager) MarkSMigratedSeqIDProcessed(seqID int64) bool { + _, alreadyProcessed := hm.processedSMigratedSeqIDs.LoadOrStore(seqID, true) + return !alreadyProcessed // Return true if NOT already processed +} + // Close closes the manager. func (hm *Manager) Close() error { // Use atomic operation for thread-safe close check diff --git a/maintnotifications/push_notification_handler.go b/maintnotifications/push_notification_handler.go index d9c91291d8..c1296f2f69 100644 --- a/maintnotifications/push_notification_handler.go +++ b/maintnotifications/push_notification_handler.go @@ -341,6 +341,7 @@ func (snh *NotificationHandler) handleSMigrating(ctx context.Context, handlerCtx // SMIGRATED indicates that a cluster slot has finished migrating to a different node. // This is a cluster-level notification that triggers cluster state reload. // Expected format: ["SMIGRATED", SeqID, host:port, slot1/range1-range2, ...] +// Note: Multiple connections may receive the same notification, so we deduplicate by SeqID. func (snh *NotificationHandler) handleSMigrated(ctx context.Context, handlerCtx push.NotificationHandlerContext, notification []interface{}) error { if len(notification) < 4 { internal.Logger.Printf(ctx, logs.InvalidNotification("SMIGRATED", notification)) @@ -354,6 +355,15 @@ func (snh *NotificationHandler) handleSMigrated(ctx context.Context, handlerCtx return ErrInvalidNotification } + // Deduplicate by SeqID - multiple connections may receive the same notification + if !snh.manager.MarkSMigratedSeqIDProcessed(seqID) { + // Already processed this SeqID, skip + if internal.LogLevel.DebugOrAbove() { + internal.Logger.Printf(ctx, "cluster: SMIGRATED notification with SeqID %d already processed, skipping", seqID) + } + return nil + } + // Extract host:port (position 2) hostPort, ok := notification[2].(string) if !ok { diff --git a/maintnotifications/smigrating_test.go b/maintnotifications/smigrating_test.go index 80e55afe36..0a54add225 100644 --- a/maintnotifications/smigrating_test.go +++ b/maintnotifications/smigrating_test.go @@ -201,6 +201,64 @@ func TestSMigratedNotificationHandler(t *testing.T) { } }) +t.Run("SMigratedNotification_Deduplication", func(t *testing.T) { + // Track callback invocations + var callbackCount atomic.Int32 + + // Create a mock manager with callback + manager := &Manager{ + clusterStateReloadCallback: func(ctx context.Context, hostPort string, slotRanges []string) { + callbackCount.Add(1) + }, + } + + // Create notification handler + handler := &NotificationHandler{ + manager: manager, + operationsManager: manager, + } + + // Create SMIGRATED notification with SeqID 456 + notification := []interface{}{"SMIGRATED", int64(456), "127.0.0.1:6379", "1234"} + + ctx := context.Background() + handlerCtx := push.NotificationHandlerContext{} + + // Handle the notification first time + err := handler.handleSMigrated(ctx, handlerCtx, notification) + if err != nil { + t.Errorf("handleSMigrated should not error on first call: %v", err) + } + + // Verify callback was called once + if callbackCount.Load() != 1 { + t.Errorf("Expected callback to be called once, got %d", callbackCount.Load()) + } + + // Handle the same notification again (simulating multiple connections) + err = handler.handleSMigrated(ctx, handlerCtx, notification) + if err != nil { + t.Errorf("handleSMigrated should not error on second call: %v", err) + } + + // Verify callback was NOT called again (still 1) + if callbackCount.Load() != 1 { + t.Errorf("Expected callback to be called only once (deduplication), got %d", callbackCount.Load()) + } + + // Handle a different notification with different SeqID + notification2 := []interface{}{"SMIGRATED", int64(789), "127.0.0.1:6380", "5678"} + err = handler.handleSMigrated(ctx, handlerCtx, notification2) + if err != nil { + t.Errorf("handleSMigrated should not error on third call: %v", err) + } + + // Verify callback was called again (now 2) + if callbackCount.Load() != 2 { + t.Errorf("Expected callback to be called twice (different SeqID), got %d", callbackCount.Load()) + } +}) + t.Run("InvalidSMigratedNotification_TooShort", func(t *testing.T) { manager := &Manager{} handler := &NotificationHandler{ From cf2d5d3aff20f4db03ad1c565367c729b03814b4 Mon Sep 17 00:00:00 2001 From: Nedyalko Dyakov Date: Wed, 19 Nov 2025 10:58:17 +0200 Subject: [PATCH 6/6] fix flaky tests --- commands_test.go | 20 +++++++++++++++----- maintnotifications/pool_hook_test.go | 21 +++++++++++++++++++-- 2 files changed, 34 insertions(+), 7 deletions(-) diff --git a/commands_test.go b/commands_test.go index edbae4e7a3..24640c23c8 100644 --- a/commands_test.go +++ b/commands_test.go @@ -8905,27 +8905,37 @@ var _ = Describe("Commands", func() { const key = "latency-monitor-threshold" old := client.ConfigGet(ctx, key).Val() - client.ConfigSet(ctx, key, "1") + // Use a higher threshold (100ms) to avoid capturing normal operations + // that could cause flakiness due to timing variations + client.ConfigSet(ctx, key, "100") defer client.ConfigSet(ctx, key, old[key]) result, err := client.Latency(ctx).Result() Expect(err).NotTo(HaveOccurred()) Expect(len(result)).Should(Equal(0)) - err = client.Do(ctx, "DEBUG", "SLEEP", 0.01).Err() + // Use a longer sleep (150ms) to ensure it exceeds the 100ms threshold + err = client.Do(ctx, "DEBUG", "SLEEP", 0.15).Err() Expect(err).NotTo(HaveOccurred()) result, err = client.Latency(ctx).Result() Expect(err).NotTo(HaveOccurred()) - Expect(len(result)).Should(Equal(1)) + Expect(len(result)).Should(BeNumerically(">=", 1)) // reset latency by event name - err = client.LatencyReset(ctx, result[0].Name).Err() + eventName := result[0].Name + err = client.LatencyReset(ctx, eventName).Err() Expect(err).NotTo(HaveOccurred()) + // Verify the specific event was reset (not that all events are gone) + // This avoids flakiness from other operations triggering latency events result, err = client.Latency(ctx).Result() Expect(err).NotTo(HaveOccurred()) - Expect(len(result)).Should(Equal(0)) + for _, event := range result { + if event.Name == eventName { + Fail("Event " + eventName + " should have been reset") + } + } }) }) }) diff --git a/maintnotifications/pool_hook_test.go b/maintnotifications/pool_hook_test.go index 6ec61eeda0..01fa35a2c4 100644 --- a/maintnotifications/pool_hook_test.go +++ b/maintnotifications/pool_hook_test.go @@ -700,8 +700,25 @@ func TestConnectionHook(t *testing.T) { t.Errorf("Connection should be pooled after handoff (shouldPool=%v, shouldRemove=%v)", shouldPool, shouldRemove) } - // Wait for handoff to complete - time.Sleep(50 * time.Millisecond) + // Wait for handoff to complete with polling instead of fixed sleep + // This avoids flakiness on slow CI runners where 50ms may not be enough + maxWait := 500 * time.Millisecond + pollInterval := 10 * time.Millisecond + deadline := time.Now().Add(maxWait) + + handoffCompleted := false + for time.Now().Before(deadline) { + if conn.IsUsable() && !processor.IsHandoffPending(conn) { + handoffCompleted = true + break + } + time.Sleep(pollInterval) + } + + if !handoffCompleted { + t.Fatalf("Handoff did not complete within %v (IsUsable=%v, IsHandoffPending=%v)", + maxWait, conn.IsUsable(), processor.IsHandoffPending(conn)) + } // After handoff completion, connection should be usable again if !conn.IsUsable() {