Skip to content

Commit

Permalink
feat: Implement shared subject for 'BehaviorSubject'
Browse files Browse the repository at this point in the history
  • Loading branch information
utilForever committed Sep 3, 2021
1 parent 63f6e06 commit 462bb12
Showing 1 changed file with 52 additions and 0 deletions.
52 changes: 52 additions & 0 deletions src/subject/shared_subject.rs
Expand Up @@ -7,6 +7,9 @@ type SharedPublishers<Item, Err> =
pub type SharedSubject<Item, Err> =
Subject<SharedPublishers<Item, Err>, SharedSubscription>;

pub type SharedBehaviorSubject<Item, Err> =
BehaviorSubject<SharedPublishers<Item, Err>, SharedSubscription, Item>;

impl<Item, Err> SharedSubject<Item, Err> {
#[inline]
pub fn new() -> Self
Expand Down Expand Up @@ -46,6 +49,55 @@ impl<Item, Err> SharedObservable for SharedSubject<Item, Err> {
}
}

impl<Item, Err> Observable for SharedBehaviorSubject<Item, Err> {
type Item = Item;
type Err = Err;
}

impl<Item, Err> SharedObservable for SharedBehaviorSubject<Item, Err> {
type Unsub = SharedSubscription;
fn actual_subscribe<
O: Observer<Item = Self::Item, Err = Self::Err> + Sync + Send + 'static,
>(
self,
subscriber: Subscriber<O, SharedSubscription>,
) -> Self::Unsub {
let subscription = subscriber.subscription.clone();
self.subscription.add(subscription.clone());

self
.observers
.observers
.lock()
.unwrap()
.push(Box::new(subscriber));

if !subscription.is_closed() {
self.observers.observers.lock().unwrap().last_mut().unwrap().next(self.value);
}

subscription
}
}

impl<Item, Err> SharedBehaviorSubject<Item, Err> {
#[inline]
pub fn new(value: Item) -> Self
where
Self: Default,
{
SharedBehaviorSubject {
observers: Default::default(),
subscription: Default::default(),
value,
}
}
#[inline]
pub fn subscribed_size(&self) -> usize {
self.observers.observers.lock().unwrap().len()
}
}

#[test]
fn smoke() {
let test_code = Arc::new(Mutex::new("".to_owned()));
Expand Down

0 comments on commit 462bb12

Please sign in to comment.