Skip to content

st0o0/WebSocket.Rx

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

25 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

WebSocket.Rx

WebSocket.Rx logo

A powerful .NET library for reactive WebSocket communication using R3 (Reactive Extensions)

NuGet Build Status License Downloads


πŸ“‹ Table of Contents

✨ Features

Client Features

  • πŸ”„ Automatic Reconnection - Built-in reconnection logic with configurable strategies
  • πŸ“‘ Reactive Streams - Observable sequences for messages, connections, and disconnections
  • 🧡 Thread-Safe - Safe concurrent message sending and receiving
  • πŸ“¦ Message Queuing - Automatic buffering with channel-based send queue
  • ⚑ High Performance - Built on System.Threading.Channels and ArrayPool<byte>
  • 🎯 Type-Safe - Strong typing with text/binary message support
  • πŸ”’ Proper Resource Management - Full IAsyncDisposable support with graceful shutdown

Server Features

  • πŸ‘₯ Multi-Client Support - Handle multiple WebSocket connections simultaneously
  • πŸ“Š Client Tracking - Built-in client metadata and connection management
  • πŸ”” Event Streams - Observables for client connect/disconnect events
  • 🎯 Selective Messaging - Send to specific clients or broadcast to all
  • πŸ›‘οΈ Robust Cleanup - Automatic client cleanup on disconnect

πŸ“¦ Installation

dotnet add package WebSocket.Rx

Requirements: .NET 10.0 or higher

πŸš€ Quick Start

Client Example

using WebSocket.Rx;
using R3;

// Create and configure client
await using var client = new ReactiveWebSocketClient(new Uri("wss://echo.websocket.org"))
{
    IsReconnectionEnabled = true,
    KeepAliveInterval = TimeSpan.FromSeconds(30),
    IsTextMessageConversionEnabled = true
};

// Subscribe to messages
client.MessageReceived
    .Subscribe(msg => Console.WriteLine($"Received: {msg.Text}"));

// Subscribe to connection events
client.ConnectionHappened
    .Subscribe(info => Console.WriteLine($"Connected: {info.Reason}"));

client.DisconnectionHappened
    .Subscribe(info => Console.WriteLine($"Disconnected: {info.Reason}"));

// Connect and send messages
await client.StartAsync();
await client.SendAsTextAsync("Hello WebSocket!");

Server Example

using WebSocket.Rx;
using R3;

// Create and start server
await using var server = new ReactiveWebSocketServer("http://localhost:8080/")
{
    IsTextMessageConversionEnabled = true
};

// Subscribe to client events
server.ClientConnected
    .Subscribe(client => Console.WriteLine($"Client connected: {client.Metadata.Id}"));

server.Messages
    .Subscribe(msg => 
    {
        Console.WriteLine($"From {msg.Metadata.Id}: {msg.Message.Text}");
        // Echo back to sender
        server.SendAsTextAsync(msg.Metadata.Id, $"Echo: {msg.Message.Text}");
    });

await server.StartAsync();
Console.WriteLine($"Server running with {server.ClientCount} clients");

πŸŽ“ Core Concepts

Observable Streams

WebSocket.Rx is built around reactive streams using R3:

// Filter and transform messages
client.MessageReceived
    .Where(msg => msg.MessageType == MessageType.Text)
    .Select(msg => msg.Text.ToUpper())
    .Subscribe(text => Console.WriteLine(text));

// Debounce reconnection events
client.ConnectionHappened
    .Throttle(TimeSpan.FromSeconds(1))
    .Subscribe(info => Console.WriteLine("Stable connection established"));

Message Types

// Send text message (queued)
await client.SendAsTextAsync("Hello");

// Send binary message (queued)
await client.SendAsBinaryAsync(new byte[] { 0x01, 0x02 });

// Send instant (bypasses queue)
await client.SendInstantAsync("Urgent message");

// Try send (non-blocking)
bool sent = client.TrySendAsText("Optional message");

Connection Lifecycle

// Start connection
await client.StartAsync();

// Reconnect manually
await client.ReconnectAsync();

// Stop gracefully
await client.StopAsync(WebSocketCloseStatus.NormalClosure, "Goodbye");

// Dispose (automatic cleanup)
await client.DisposeAsync();

πŸ”§ Advanced Usage

Custom Configuration

var client = new ReactiveWebSocketClient(new Uri("wss://example.com"))
{
    // Connection settings
    ConnectTimeout = TimeSpan.FromSeconds(10),
    KeepAliveInterval = TimeSpan.FromSeconds(30),
    KeepAliveTimeout = TimeSpan.FromSeconds(10),
    
    // Reconnection
    IsReconnectionEnabled = true,
    
    // Message handling
    IsTextMessageConversionEnabled = true,
    MessageEncoding = Encoding.UTF8
};

Server Broadcasting

// Broadcast to all clients
foreach (var clientId in server.ConnectedClients.Keys)
{
    await server.SendAsTextAsync(clientId, "Broadcast message");
}

// Send to specific clients
var targetClients = server.ConnectedClients
    .Where(c => c.Value.CustomData?.Contains("premium") == true)
    .Select(c => c.Key);

foreach (var clientId in targetClients)
{
    await server.SendAsTextAsync(clientId, "Premium feature alert!");
}

Error Handling

client.DisconnectionHappened
    .Subscribe(info => 
    {
        Console.WriteLine($"Disconnect reason: {info.Reason}");
        if (info.Exception != null)
        {
            Console.WriteLine($"Error: {info.Exception.Message}");
        }
    });

Combining Observables

// Wait for connection before sending
client.ConnectionHappened
    .Take(1)
    .Subscribe(_ => client.SendAsTextAsync("Connected!"));

// Process messages in batches
client.MessageReceived
    .Buffer(TimeSpan.FromSeconds(1))
    .Where(batch => batch.Count > 0)
    .Subscribe(batch => Console.WriteLine($"Processed {batch.Count} messages"));

πŸ“š API Reference

ReactiveWebSocketClient

Property Type Description
Url Uri WebSocket server URL
IsStarted bool Client started state
IsRunning bool Client running state
IsReconnectionEnabled bool Enable auto-reconnect
MessageReceived Observable<ReceivedMessage> Message stream
ConnectionHappened Observable<Connected> Connection stream
DisconnectionHappened Observable<Disconnected> Disconnection stream

Key Methods:

  • Task StartAsync() - Start the client
  • Task StopAsync(status, description) - Stop gracefully
  • Task ReconnectAsync() - Manual reconnect
  • Task SendAsTextAsync(message) - Send text (queued)
  • Task SendAsBinaryAsync(data) - Send binary (queued)
  • ValueTask DisposeAsync() - Clean up resources

ReactiveWebSocketServer

Property Type Description
IsRunning bool Server running state
ClientCount int Number of connected clients
ConnectedClients IReadOnlyDictionary<Guid, Metadata> Client metadata
ClientConnected Observable<ClientConnected> Client connect stream
ClientDisconnected Observable<ClientDisconnected> Client disconnect stream
Messages Observable<ServerReceivedMessage> Server message stream

Key Methods:

  • Task StartAsync() - Start the server
  • Task<bool> StopAsync(status, description) - Stop server
  • Task<bool> SendAsTextAsync(clientId, message) - Send to client
  • ValueTask DisposeAsync() - Clean up resources

πŸ’‘ Inspiration

This library is inspired by and builds upon the excellent work of:

Websocket.Client by Marfusios

WebSocket.Rx takes inspiration from Websocket.Client's elegant reactive approach to WebSocket communication. Key influences include:

  • Reactive-First Design - Using observables for all events and messages
  • Automatic Reconnection - Built-in reconnection logic for robust connections
  • Clean API - Intuitive and easy-to-use interface

What's Different?

While honoring the spirit of Websocket.Client, WebSocket.Rx offers:

  • βœ… R3 Integration - Built on the modern R3 reactive library (successor to Rx.NET)
  • βœ… Server Support - Full-featured WebSocket server implementation
  • βœ… Modern .NET - Built for .NET 10+ with latest performance optimizations
  • βœ… IAsyncDisposable - Proper async resource cleanup
  • βœ… Channel-Based Queuing - High-performance message queue using System.Threading.Channels
  • βœ… Enhanced Memory Management - Uses ArrayPool<byte> and RecyclableMemoryStream

Both libraries share the same core philosophy: WebSocket communication should be simple, reactive, and reliable.

🀝 Contributing

Contributions are welcome! This library grows with the community's needs.

How to Contribute

  1. Fork the repository
  2. Create a feature branch: git checkout -b feature/amazing-feature
  3. Write tests for your changes
  4. Ensure all tests pass: dotnet test
  5. Submit a Pull Request

Guidelines

  • βœ… Follow existing code style and conventions
  • βœ… Include unit tests for new features
  • βœ… Update documentation for API changes
  • βœ… Keep PRs focused and atomic
  • βœ… Write meaningful commit messages

Development Setup

git clone https://github.com/st0o0/WebSocket.Rx.git
cd WebSocket.Rx
dotnet restore
dotnet build
dotnet test

πŸ“„ License

This project is licensed under the MIT License - see the LICENSE file for details.


Built with ❀️ for the .NET community

Report Bug Β· Request Feature Β· Documentation

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Contributors 2

  •  
  •  

Languages