Skip to content

Chat application using Reactive Extennsions (RX)

vtortola edited this page Sep 3, 2014 · 1 revision

Rx WebSocket Chat is an example of how to use Reactive Extensions (RX) to create a very simple chat server. The app also uses dynamic objects (System.Dynamic) and JSON serialization.

The approach is very simple. Using RX the application listen for new clients, and when connected it creates a ChatSession object that contains an "In" stream for incoming messages and an "Out" stream for outcoming messages:

Observable.FromAsync(server.AcceptWebSocketAsync)
          .Select(ws => new ChatSession(ws) 
                  { 
                      In = Observable.FromAsync<dynamic>(ws.ReadDynamicAsync)
                                     .DoWhile(() => ws.IsConnected)
                                     .Where(msg => msg != null), 
        
                      Out = Observer.Create<dynamic>(ws.WriteDynamic) 
                  })
          .DoWhile(() => server.IsStarted && !cancellation.IsCancellationRequested)
          .Subscribe(chatSessionObserver);

ReadDynamicAsync is an extension method for vtortola.WebSockets.WebSocket that allows to read a message serialized in JSON as a dynamic object asynchronously.

public static async Task<dynamic> ReadDynamicAsync(this WebSocket ws, CancellationToken cancel)
{
    var message = await ws.ReadMessageAsync(cancel);
    if (message != null)
    {
        using (var sr = new StreamReader(message, Encoding.UTF8))
            return (dynamic)JObject.Load(new JsonTextReader(sr));
    }
    else
        return null;
}

The ChatSessionObserver is subscribed to new chat sessions, and is responsible of attaching the observers that define how the server reacts to incoming messages:

  public void OnNext(ChatSession chat)
  {
      var published = chat.In.Publish().RefCount();
      
      published.Where(msgIn => msgIn.cls != null && msgIn.cls == "join" && msgIn.room != null)
               .Subscribe(new ChatJoinMessageHandler(_chatRoomManager, chat));

      published.Where(msgIn => msgIn.cls != null && msgIn.cls == "msg" && msgIn.message != null && msgIn.room != null)
               .Subscribe(new ChatMessageHandler(_chatRoomManager, chat));
  }

ChatJoinMessageHandler handles joins and leaves, and ChatMessageHandler broadcasts chat messages to the right rooms.

Other examples

You may like to take a look on the Terminal Server sample