Skip to content

A simple unbounded multi-producer multi-subscriber event bus built with crossbeam channels

License

Notifications You must be signed in to change notification settings

timfish/double_decker

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

4 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

double_decker

A simple unbounded multi-producer multi-subscriber event bus built with crossbeam channels.

Why double_decker?

Unlike the the Bus from the bus crate, double_decker::Bus is unbounded and everyone knows that double-decker buses carry more passengers than a regular bus 🤷‍♂️.

Unlike bus::Bus, double_decker::Bus implements a cheap Clone() which I've found useful.

It sounds like double-decker buses are better than regular buses. Does this imply that double_decker::Bus is better than bus::Bus?

No.

The bus crate is mature and completely lock-free. This implementation is neither!

Design

T must implement Clone so it can be passed to all consumers.

When you call add_rx(), a Sender/Receiver pair are created and the Sender is stored in a HashMap behind a RwLock.

broadcast() uses shared read access of the RwLock and sends out events to each Receiver in the order they were added.

Lock contention can only occur when the number of subscribers changes as this requires write access to the RwLock. This occurs when you call add_rx() or when you call broadcast() and one or more Sender returns SendError because it's become disconnected.

Examples plagiarised from bus crate

Single-send, multi-consumer example

use double_decker::Bus;

let mut bus = Bus::new();
let mut rx1 = bus.add_rx();
let mut rx2 = bus.add_rx();

bus.broadcast("Hello");
assert_eq!(rx1.recv(), Ok("Hello"));
assert_eq!(rx2.recv(), Ok("Hello"));

Multi-send, multi-consumer example

use double_decker::Bus;
use std::thread;

let mut bus = Bus::new();
let mut rx1 = bus.add_rx();
let mut rx2 = bus.add_rx();

// start a thread that sends 1..100
let j = thread::spawn(move || {
    for i in 1..100 {
        bus.broadcast(i);
    }
});

// every value should be received by both receivers
for i in 1..100 {
    // rx1
    assert_eq!(rx1.recv(), Ok(i));
    // and rx2
    assert_eq!(rx2.recv(), Ok(i));
}

j.join().unwrap();

Also included are subscribe and subscribe_on_thread which allow you to subscribe to broadcast events with a closure that is called on every broadcast. subscribe is blocking whereas subscribe_on_thread calls the closure from another thread.

subscribe_on_thread returns a Subscription which you should hang on to as the thread terminates when this is dropped.

use double_decker::{Bus, SubscribeToReader};

let bus = Bus::<i32>::new();

// This would block
// bus.subscribe(Box::new(move |_event| {
//     // This closure is called on every broadcast
// }));

let _subscription = bus.subscribe_on_thread(Box::new(move |_event| {
    // This closure is called on every broadcast
}));

bus.broadcast(5);

License: MIT

About

A simple unbounded multi-producer multi-subscriber event bus built with crossbeam channels

Resources

License

Stars

Watchers

Forks

Languages