Skip to content

Conversation

@jbrinkman
Copy link
Collaborator

Description

This PR implements all supported PubSub commands for both standalone and cluster clients, addressing issue #55.

Changes

New Interfaces

  • IPubSubCommands: Common PubSub operations available in both standalone and cluster modes

    • PubSubChannelsAsync() - List active channels
    • PubSubChannelsAsync(pattern) - List channels matching pattern
    • PubSubNumSubAsync(channels) - Get subscriber counts for channels
    • PubSubNumPatAsync() - Get active pattern subscription count
  • IPubSubStandaloneCommands: Standalone-specific operations

    • PublishAsync(channel, message) - Publish message to channel
  • IPubSubClusterCommands: Cluster-specific operations

    • PublishAsync(channel, message) - Publish message to channel
    • PublishAsync(channel, message, sharded) - Publish with optional sharded mode
    • PubSubShardChannelsAsync() - List active sharded channels
    • PubSubShardChannelsAsync(pattern) - List sharded channels matching pattern
    • PubSubShardNumSubAsync(channels) - Get subscriber counts for sharded channels

Implementation Files

  • sources/Valkey.Glide/Commands/IPubSubCommands.cs
  • sources/Valkey.Glide/Commands/IPubSubStandaloneCommands.cs
  • sources/Valkey.Glide/Commands/IPubSubClusterCommands.cs
  • sources/Valkey.Glide/GlideClient.PubSubCommands.cs
  • sources/Valkey.Glide/GlideClusterClient.PubSubCommands.cs
  • sources/Valkey.Glide/Internals/Request.PubSubCommands.cs

Test Coverage

  • Unit Tests (tests/Valkey.Glide.UnitTests/PubSubCommandTests.cs):

    • Command building and argument validation
    • Response conversion testing
    • Special character and Unicode handling
    • 30+ unit tests covering all command variations
  • Integration Tests:

    • tests/Valkey.Glide.IntegrationTests/PubSubCommandTests.cs - 11 standalone tests
    • tests/Valkey.Glide.IntegrationTests/PubSubClusterCommandTests.cs - 16 cluster tests
    • Updated existing callback and queue tests to use new API methods

Other Changes

  • Updated GlideClient and GlideClusterClient to be partial classes
  • Updated existing integration tests to use PublishAsync() instead of CustomCommand
  • All tests pass (275 unit tests, 45 PubSub integration tests)

Commands Implemented

  • PUBLISH - Publish message to channel
  • SPUBLISH - Publish message to sharded channel (cluster only)
  • PUBSUB CHANNELS - List active channels
  • PUBSUB NUMPAT - Get pattern subscription count
  • PUBSUB NUMSUB - Get subscriber counts for channels
  • PUBSUB SHARDCHANNELS - List active sharded channels (cluster only)
  • PUBSUB SHARDNUMSUB - Get subscriber counts for sharded channels (cluster only)

Testing

All tests pass successfully:

# Unit tests
dotnet test tests/Valkey.Glide.UnitTests/ --framework net8.0
# Result: 275 tests passed

# PubSub integration tests
dotnet test tests/Valkey.Glide.IntegrationTests/ --framework net8.0 --filter "FullyQualifiedName~PubSub"
# Result: 45 tests passed

Notes

  • Subscription commands (SUBSCRIBE, PSUBSCRIBE, SSUBSCRIBE, etc.) are not implemented as they are handled during client creation in GLIDE Core
  • Cluster mode PubSub introspection commands are node-specific, so tests account for this behavior
  • All commands include comprehensive XML documentation with examples

Closes #55

Signed-off-by: Joe Brinkman <joe.brinkman@improving.com>

Implements tasks 1-7 of PubSub support specification:

Task 1: Core PubSub message types and data structures
- Add PubSubMessage class with channel, pattern, and payload support
- Add PubSubMessageHandler delegate for callback handling
- Add PubSubSubscriptionConfig for subscription configuration

Task 2: Message queue implementation
- Add PubSubMessageQueue with thread-safe operations
- Implement async message retrieval with cancellation support
- Add capacity management and overflow handling

Task 3: Callback management system
- Add PubSubCallbackManager for FFI callback coordination
- Implement callback registration and cleanup
- Add thread-safe callback invocation

Task 4: FFI integration layer
- Extend FFI structs with PubSub message and callback types
- Add FFI methods for PubSub operations (subscribe, unsubscribe, publish)
- Implement callback marshaling and memory management

Task 5: Configuration integration
- Extend ConnectionConfiguration with PubSub callback support
- Add validation for PubSub configuration parameters
- Integrate callback setup in connection builder

Task 6: BaseClient PubSub foundation
- Add PubSub infrastructure to BaseClient
- Implement callback manager initialization
- Add foundation for PubSub command integration

Task 7: Comprehensive unit test coverage
- Add tests for all PubSub message types and operations
- Add FFI integration and workflow tests
- Add configuration and callback management tests
- Achieve comprehensive test coverage for core functionality

Signed-off-by: Joe Brinkman <joe.brinkman@improving.com>

# Conflicts:
#	sources/Valkey.Glide/ConnectionConfiguration.cs
…mory management

Add missing Rust FFI implementations for PubSub functionality:

- Add PubSubMessageInfo struct for FFI message data
- Add PubSubCallback type for callback function pointers
- Add register_pubsub_callback() function for callback registration
- Add free_pubsub_message() function for memory cleanup
- Include placeholder implementations with proper safety documentation

These functions provide the Rust-side implementation for the C# FFI
declarations added in the previous commit, enabling proper interop
between C# PubSub infrastructure and the Rust glide-core library.

Signed-off-by: Joe Brinkman <joe.brinkman@improving.com>
Replace placeholder PubSub FFI implementation with working callback system:

- Add pubsub_callback field to Client struct with thread-safe Mutex protection
- Implement proper callback registration in register_pubsub_callback()
- Add invoke_pubsub_callback() helper for glide-core integration
- Add create_pubsub_message() helper for message creation from Rust strings
- Use Arc reference counting for safe client pointer access
- Maintain proper memory management and thread safety

This provides a complete callback infrastructure ready for integration
with glide-core PubSub functionality when it becomes available.

Signed-off-by: Joe Brinkman <joe.brinkman@improving.com>
- Implement PubSubFFICallbackIntegrationTests with 11 comprehensive test methods
- Add ClientRegistry for thread-safe client management with weak references
- Test end-to-end message flow from FFI callbacks to message handlers
- Test client registry operations under concurrent access with 10 threads
- Test callback error handling and recovery with exception isolation
- Test memory management and cleanup of marshaled data
- Test async message processing without blocking FFI thread pool
- Test performance monitoring and logging of callback execution times
- Add test collection attributes to prevent parallel execution conflicts
- Ensure proper cleanup and disposal of client registry entries

Covers requirements 1.1, 1.2, 2.1-2.4, 7.1, 7.4-7.5, 8.1-8.2 from PubSub spec.
All 1,781 tests pass successfully with zero failures.

Addresses task 7.6: Write integration tests for FFI PubSub callback flow

Signed-off-by: Joe Brinkman <joe.brinkman@improving.com>
- Simplify test code by removing unnecessary imports
- Use collection initializers for lists and arrays
- Suppress unnecessary variable assignments with discard operator
- Enhance readability of test setup and configuration
- Improve error handling and message processing assertions
- Remove commented-out code and unnecessary placeholders
Refactors existing PubSub FFI callback integration tests to improve code quality, readability, and maintainability while preserving core testing functionality.

Signed-off-by: Joe Brinkman <joe.brinkman@improving.com>
- Refactor PubSub callback system from static to instance-based approach
- Remove `PubSubCallbackManager` and `ClientRegistry` static infrastructure
- Update Rust FFI layer to support direct instance callback registration
- Modify C# FFI methods and delegates to match new callback signature
- Simplify BaseClient PubSub callback handling and lifecycle management
- Improve performance by eliminating callback routing and lookup overhead
- Align PubSub callback pattern with existing success/failure callback mechanisms
- Remove unnecessary client ID tracking and static registration methods
Motivation:
- Eliminate potential race conditions in callback registration
- Reduce code complexity and improve maintainability
- Provide a more direct and performant message routing mechanism

Signed-off-by: Joe Brinkman <joe.brinkman@improving.com>
This commit addresses a critical memory leak in the FFI layer where Rust-allocated
memory was not properly freed after C# marshaling during PubSub message processing.

Key Changes:

Rust FFI Layer (rust/src/lib.rs):
- Replaced std::mem::forget() with scoped lifetime management in process_push_notification
- Vec<u8> instances now remain alive during callback execution and auto-cleanup on exit
- Added comprehensive message structure validation based on PushKind type
- Implemented proper error logging for invalid message formats and unexpected value types
- Enhanced validation ensures message structure matches expected format for each PushKind

Memory Leak Detection Tests:
- Added PubSubFFIMemoryLeakTests.cs with comprehensive memory leak detection
- Tests process 100,000+ messages and verify memory usage remains bounded
- Includes tests for various message sizes, GC pressure, concurrent access, and extended duration
- Added PubSubMemoryLeakFixValidationTests.cs for simple validation scenarios

Test Cleanup:
- Removed tests dependent on deleted PubSubCallbackManager and ClientRegistry classes
- Deleted PubSubCallbackIntegrationTests.cs (tested removed PubSubCallbackManager)
- Deleted PubSubFFIWorkflowTests.cs (tested removed PubSubCallbackManager)
- Deleted ClientRegistryTests.cs (tested removed ClientRegistry)
- Updated PubSubFFIIntegrationTests.cs to remove tests using removed infrastructure
- Updated PubSubFFICallbackIntegrationTests.cs to remove ClientRegistry tests
- Fixed Lock type to object for .NET 8 compatibility
- Changed explicit type declarations to var to avoid type resolution issues

All unit tests (242) now pass successfully.

Addresses requirements 1.1-1.6 and 9.1 from pubsub-critical-fixes spec.

Signed-off-by: Joe Brinkman <joe.brinkman@improving.com>
- Add volatile modifier to _pubSubHandler field for memory barrier guarantees
- Add _pubSubLock object for coordinating thread-safe access
- Implement lock-based handler access in HandlePubSubMessage() to prevent race conditions
- Update InitializePubSubHandler() with thread-safe initialization
- Enhance CleanupPubSubResources() with proper synchronization and timeout-based cleanup
- Add thread-safe access to PubSubQueue property

Add comprehensive thread safety tests:
- Test concurrent message processing from 100+ threads
- Test disposal during active message processing
- Add stress test with 100 iterations of concurrent operations
- Test concurrent access to PubSubQueue and HasPubSubSubscriptions properties
- Test rapid create/dispose cycles for memory leak detection
- Test disposal timeout handling

All 1,771 tests pass (250 unit + 1,521 integration tests).

Addresses requirements 2.1-2.6 from pubsub-critical-fixes spec.

Signed-off-by: Joe Brinkman <joe.brinkman@improving.com>
- Implement bounded channel configuration with PubSubPerformanceConfig
- Add configurable capacity (default 1000), backpressure strategies, and shutdown timeout
- Replace per-message Task.Run() calls with single dedicated background processing task
- Use System.Threading.Channels.Channel<PubSubMessage> for bounded message queuing
- Implement non-blocking TryWrite() in PubSubCallback for backpressure handling
- Add graceful shutdown with cancellation token support
- Create comprehensive performance validation tests covering:
  - High throughput (10,000+ msg/sec)
  - Allocation pressure and GC impact
  - Concurrent message handling
  - Burst traffic patterns
  - Long-running stability

Benefits:
- Single dedicated thread instead of thousands of Task.Run calls
- Reduced allocation pressure and GC impact
- Predictable performance characteristics
- Better resource utilization without thread pool starvation
- Configurable performance options for different scenarios

All tests pass: 255 unit tests, 1520 integration tests

Addresses requirements 3.1-3.6 and 6.1-6.4 from pubsub-critical-fixes spec

Signed-off-by: Joe Brinkman <joe.brinkman@improving.com>
…nd C#

- Add graceful shutdown signaling using tokio::sync::oneshot::channel in Rust
- Implement tokio::select! for coordinated task termination in PubSub processing
- Store shutdown sender and task handle in Client struct with Mutex for thread safety
- Add timeout-based task completion waiting (5 seconds) in close_client
- Implement CancellationTokenSource for C# message processing coordination
- Add configurable shutdown timeout from PubSubPerformanceConfig
- Ensure proper cleanup of channels, tasks, and handlers during disposal
- Add comprehensive logging for shutdown process (Debug, Info, Warn levels)
- Add unit tests for graceful shutdown coordination
- Optimize global usings in test project for cleaner code

Validates Requirements: 5.1, 5.2, 5.3, 5.4, 5.5, 5.6, 9.2

Test Results:
- All 262 unit tests pass
- All 1,772 integration tests pass (1,774 total, 2 skipped)
- No regressions introduced

Signed-off-by: Joe Brinkman <joe.brinkman@improving.com>
…egration tests

- Add PubSubQueue property to BaseClient for queue-based message access
- Enhance PubSubMessageHandler.GetQueue() to prevent mixing callback/queue modes
- Add 11 comprehensive integration tests for queue-based PubSub functionality
- Rename PubSubFFICallbackIntegrationTests to PubSubCallbackIntegrationTests
- Update unit tests to validate callback/queue mode separation
- Add testresults/ and reports/ to .gitignore
- Remove obsolete design documentation files

Tests cover:
- Standalone and cluster client queue retrieval
- Pattern subscriptions with queue mode
- Message ordering preservation
- Async operations with cancellation
- High volume message handling
- Multiple channel subscriptions
- Unicode and special character handling
- Error handling for mixed callback/queue modes

All 1,797 tests passing with 67.7% line coverage.

Signed-off-by: Joe Brinkman <joe.brinkman@improving.com>
- Remove unused PubSubConfigurationExtensions class and all extension methods
- Remove unused EnableMetrics and MetricsInterval properties from PubSubPerformanceConfig
- Simplify Validate() method to only check used properties
- Keep core functionality: ChannelCapacity, FullMode, and ShutdownTimeout

The extension methods were never integrated into the codebase and the metrics
properties were not being used. This cleanup reduces code complexity and
maintenance burden while preserving all actively used functionality.

Signed-off-by: Joe Brinkman <joe.brinkman@improving.com>
- Remove unused using statements from PubSubMessageHandler.cs
- Update lock object to use C# 13 Lock type
- Apply IDE auto-formatting

No functional changes, formatting only.

Signed-off-by: Joe Brinkman <joe.brinkman@improving.com>
Signed-off-by: Joe Brinkman <joe.brinkman@improving.com>
…ge handling

- Replace `Lock` with standard `object` for synchronization
- Remove unnecessary blank lines in namespace declarations

Signed-off-by: Joe Brinkman <joe.brinkman@improving.com>
- Populate all PushKind enum values in IsMessageNotification switch
- Remove unnecessary using directives (System, System.Collections.Generic)
- Remove unused private fields (_callbackExceptions, _receivedMessages)
- Fix xUnit2022: Use Assert.False instead of Assert.True with negation
- Keep System.Diagnostics using for Stopwatch (not in implicit usings for Lint config)

All lint errors resolved. Build passes with --configuration Lint.

Signed-off-by: Joe Brinkman <joe.brinkman@improving.com>
- Fix Rust FFI logic to create push notification channel when PubSub subscriptions are configured
- Previously required both subscriptions AND callback, preventing queue-based message retrieval
- Now creates channel whenever subscriptions exist, spawns callback task only when callback provided
- Update cluster pattern subscription test to account for cluster mode behavior
- Increase subscription establishment wait time from 1s to 5s for cluster propagation
- Remove unreliable assertion on PUBLISH return value in cluster mode
- Pattern subscriptions may be on different nodes than where channel is hashed

This fixes the ClusterPatternSubscription_WithServerPublish_ReceivesMatchingMessages test
and enables both callback-based and queue-based PubSub patterns in cluster mode.

Signed-off-by: Joe Brinkman <joe.brinkman@improving.com>
- Remove SimpleConfigTest.cs (trivial test covered elsewhere)
- Remove 2 weak FFI marshaling tests that only verified no exceptions
- Remove 1 duplicate Dispose_MultipleCalls_DoesNotThrow test
- Remove 2 redundant concurrent processing tests in PubSubThreadSafetyTests
- Remove 2 generic timeout tests not specific to PubSub
- Consolidate 2 async tests into single comprehensive test
- Rename test to accurately reflect behavior (disposal completion vs logging)

These changes improve test suite maintainability by removing 8 redundant
or misleading tests while maintaining comprehensive coverage (85%+ for
PubSub components). All 132 remaining PubSub tests pass successfully.

Test Results:
- Unit tests: 252 total, 119 PubSub-specific
- Integration tests: 1,777 total, 13 PubSub-specific
- Combined coverage: 72.7% line, 49.9% branch
- PubSub component coverage: 81.8-100% across all classes

Signed-off-by: Joe Brinkman <joe.brinkman@improving.com>
- Add IPubSubCommands interface with common PubSub operations
- Add IPubSubStandaloneCommands interface for standalone-specific operations
- Add IPubSubClusterCommands interface for cluster-specific operations
- Implement PublishAsync, PubSubChannelsAsync, PubSubNumSubAsync, PubSubNumPatAsync
- Implement cluster-specific PubSubShardChannelsAsync and PubSubShardNumSubAsync
- Add Request.PubSubCommands.cs with all PubSub command builders
- Add comprehensive unit tests for command building and validation
- Add integration tests for standalone and cluster PubSub functionality
- Update existing tests to use PublishAsync instead of CustomCommand
- Mark GlideClient and GlideClusterClient as partial classes

Addresses GitHub issue #55

Signed-off-by: Joe Brinkman <joe.brinkman@improving.com>
@jbrinkman jbrinkman requested a review from a team as a code owner October 21, 2025 15:06
- Replace .ToArray() with collection expressions [..]
- Replace new() with collection expressions []
- Addresses IDE0305 and IDE0028 linting rules

Signed-off-by: Joe Brinkman <joe.brinkman@improving.com>
- Add Assert.SkipWhen checks for sharded PubSub tests
- Sharded PubSub (SPUBLISH, PUBSUB SHARDCHANNELS, PUBSUB SHARDNUMSUB) requires server version 7.0.0+
- Tests will be skipped on servers < 7.0.0

Signed-off-by: Joe Brinkman <joe.brinkman@improving.com>
- Add IsVersionLessThan(string) and IsVersionLessThan(int, int, int) helper methods
- Simplifies version checks from 'TestConfiguration.SERVER_VERSION < new Version("7.0.0")'
  to 'TestConfiguration.IsVersionLessThan("7.0.0")'
- Update all sharded PubSub tests to use new helper method
- Centralizes version comparison logic for easier maintenance

Signed-off-by: Joe Brinkman <joe.brinkman@improving.com>
@yipin-chen yipin-chen requested review from alexr-bq and jduo October 21, 2025 17:14
// Take ownership of task handle and wait for completion with timeout
if let Ok(mut guard) = client.pubsub_task.lock() {
if let Some(task_handle) = guard.take() {
let timeout = std::time::Duration::from_secs(5);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be passed down from PubSubPerformanceConfig.ShutdownTimeout instead of hard-coded?

}
string message = System.Text.Encoding.UTF8.GetString(messageBytes);

if (string.IsNullOrEmpty(message))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a few places in the code that say a message cannot be null or empty. After I read in the PubSub protocol doc "The empty string's encoding is: $0\r\n\r\n", does that mean empty messages are permitted and we should only guard against nulls?

/// Indicates whether this client has PubSub subscriptions configured.
/// Uses volatile read for thread-safe access without locking.
/// </summary>
public bool HasPubSubSubscriptions => _pubSubHandler != null;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this use same locking logic as PubSubQueue above to prevent race conditions, or just comment that it is possible for HasPubSubSubscriptions to return true when PubSubQueue is null?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

C#: Pubsub Commands

3 participants