Skip to content

tymoor/EasyNetQ.Rx

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

52 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

EasyNetQ.Rx Logo

Build Status

NuGet status

#EasyNetQ.Rx

EasyNetQ.Rx is an extension for enabling reactive subscriptions on EasyNetQ

##Usage

###To connect to a RabbitMQ broker...

var bus = RabbitHutch
    .CreateBus("host=localhost");

###To subscribe to a message and connect the Hot observable...

var topic = bus
    .ToObservable<MyTestMessage>("my_topic_id");

topic
    .Subscribe((x) => { max = x.Value; });

topic
    .Connect();

###To stop the Hot observable...

var topic = bus
    .ToObservable<MyTestMessage>("my_topic_id");

var subscription = topic
    .Subscribe(x => Console.Write(x.Value));

topic
    .Connect();

topic
    .Dispose();

###Or with aggregations

var topic = bus
    .ToObservable<MyTestMessage>("my_topic_id");

var filtered = topic
    .Where(x => x.Value < 5);

filtered
   .Max(x => x.Value)
   .Subscribe(x => Console.Write("Max value is: " + x));

filtered
   .Min(x => x.Value)
   .Subscribe(x => Console.Write("Min value is: " + x));

filtered
   .Average(x => x.Value)
   .Subscribe(x => Console.Write("Avg value is: " + x));

topic
    .Connect();

###Buffering

var topic = bus
    .ToObservable<Order>("new-orders-topic");

topic
    .Where(order => order.Total > 100)
    .Buffer(10)
    .SelectMany(x => x)
    .Subscribe(x => Console.WriteLine("Order of total ${0} has arrived", x.Total));

topic
    .Connect();

Install

PM> Install-Package EasyNetQ.Rx

About

Reactive extensions for EasyNetQ

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • C# 100.0%