diff --git a/Cargo.toml b/Cargo.toml index 86451a99..2b200170 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,16 +14,7 @@ build = "build.rs" [features] # Enables all non-conflicting features -full = [ - "publish", - "access", - "serde", - "reqwest", - "aescbc", - "parse_token", - "blocking", - "std" -] +full = ["publish", "access", "serde", "reqwest", "aescbc", "parse_token", "blocking", "std"] # Enables all default features default = ["publish", "serde", "reqwest", "aescbc", "std", "blocking"] @@ -54,23 +45,7 @@ reqwest = ["dep:reqwest", "dep:bytes"] blocking = ["reqwest?/blocking"] ## Enables std library -std = [ - "derive_builder/std", - "log/std", - "uuid/std", - "base64/std", - "spin/std", - "snafu/std", - "hmac/std", - "sha2/std", - "time/std", - "bytes?/std", - "getrandom/std", - "rand/default", - "serde?/std", - "serde_json?/std", - "ciborium?/std" -] +std = ["derive_builder/std", "log/std", "uuid/std", "base64/std", "spin/std", "snafu/std", "hmac/std", "sha2/std", "time/std", "bytes?/std", "getrandom/std", "rand/default", "serde?/std", "serde_json?/std", "ciborium?/std"] ## Enables very specific implementations for different platforms. ## @@ -96,6 +71,7 @@ async-trait = "0.1" log = "0.4" hashbrown = "0.13" spin = "0.9" +phantom-type = { vestion = "0.4.2", default-features = false } percent-encoding = { version = "2.1", default-features = false } base64 = { version = "0.21", features = ["alloc"], default-features = false } derive_builder = {version = "0.12", default-features = false } diff --git a/src/core/error.rs b/src/core/error.rs index 3804f8c5..e29c9e08 100644 --- a/src/core/error.rs +++ b/src/core/error.rs @@ -28,7 +28,7 @@ use snafu::Snafu; /// ``` /// /// [`Result`]: https://doc.rust-lang.org/std/result/enum.Result.html -#[derive(Snafu, Debug, Clone)] +#[derive(Snafu, Debug, Clone, PartialEq)] pub enum PubNubError { /// this error is returned when the transport layer fails #[snafu(display("Transport error: {details}"))] diff --git a/src/core/event_engine/effect.rs b/src/core/event_engine/effect.rs new file mode 100644 index 00000000..97ebe5c7 --- /dev/null +++ b/src/core/event_engine/effect.rs @@ -0,0 +1,19 @@ +use crate::{ + core::event_engine::EffectInvocation, + lib::alloc::{string::String, vec::Vec}, +}; + +pub(crate) trait Effect { + type Invocation: EffectInvocation; + + /// Unique effect identifier. + fn id(&self) -> String; + + /// Run work associated with effect. + fn run(&self, f: F) + where + F: FnMut(Option::Event>>); + + /// Cancel any ongoing effect's work. + fn cancel(&self); +} diff --git a/src/core/event_engine/effect_dispatcher.rs b/src/core/event_engine/effect_dispatcher.rs new file mode 100644 index 00000000..e411db14 --- /dev/null +++ b/src/core/event_engine/effect_dispatcher.rs @@ -0,0 +1,260 @@ +use crate::{ + core::event_engine::{Effect, EffectHandler, EffectInvocation}, + lib::alloc::{rc::Rc, vec, vec::Vec}, +}; +use phantom_type::PhantomType; +use spin::rwlock::RwLock; + +/// State machine effects dispatcher. +#[allow(dead_code)] +pub(crate) struct EffectDispatcher +where + EI: EffectInvocation, + EH: EffectHandler, + EF: Effect, +{ + /// Effect invocation handler. + /// + /// Handler responsible for providing actual implementation of + handler: EH, + + /// Dispatched effects managed by dispatcher. + /// + /// There are effects whose lifetime should be managed by the dispatcher. + /// State machines may have some effects that are exclusive and can only run + /// one type of them at once. The dispatcher handles such effects + /// and cancels them when required. + managed: RwLock>>, + + _invocation: PhantomType, +} + +impl EffectDispatcher +where + EI: EffectInvocation, + EH: EffectHandler, + EF: Effect, +{ + /// Create new effects dispatcher. + pub fn new(handler: EH) -> Self { + EffectDispatcher { + handler, + managed: RwLock::new(vec![]), + _invocation: Default::default(), + } + } + + /// Dispatch effect associated with `invocation`. + pub fn dispatch(&self, invocation: &EI, mut f: F) + where + F: FnMut(Option>), + { + if let Some(effect) = self.handler.create(invocation) { + let effect = Rc::new(effect); + + if invocation.managed() { + let mut managed = self.managed.write(); + managed.push(effect.clone()); + } + + // Placeholder for effect invocation. + effect.run(|events| { + // Try remove effect from list of managed. + self.remove_managed_effect(&effect); + + // Notify about effect run completion. + // Placeholder for effect events processing (pass to effects handler). + f(events); + }); + } else if invocation.cancelling() { + self.cancel_effect(invocation); + + // Placeholder for effect events processing (pass to effects handler). + f(None); + } + } + + /// Handle effect cancellation. + /// + /// Effects with managed lifecycle can be cancelled by corresponding effect + /// invocations. + fn cancel_effect(&self, invocation: &EI) { + let mut managed = self.managed.write(); + if let Some(position) = managed.iter().position(|e| invocation.cancelling_effect(e)) { + managed.remove(position).cancel(); + } + } + + /// Remove managed effect. + fn remove_managed_effect(&self, effect: &EF) { + let mut managed = self.managed.write(); + if let Some(position) = managed.iter().position(|ef| ef.id() == effect.id()) { + managed.remove(position); + } + } +} + +#[cfg(test)] +mod should { + use super::*; + use crate::core::event_engine::Event; + + enum TestEvent {} + + impl Event for TestEvent { + fn id(&self) -> &str { + "no_id" + } + } + + enum TestEffect { + One, + Two, + Three, + } + + impl Effect for TestEffect { + type Invocation = TestInvocation; + + fn id(&self) -> String { + match self { + Self::One => "EFFECT_ONE".into(), + Self::Two => "EFFECT_TWO".into(), + Self::Three => "EFFECT_THREE".into(), + } + } + + fn run(&self, mut f: F) + where + F: FnMut(Option>), + { + match self { + Self::Three => {} + _ => f(None), + } + } + + fn cancel(&self) { + // Do nothing. + } + } + + enum TestInvocation { + One, + Two, + Three, + CancelThree, + } + + impl EffectInvocation for TestInvocation { + type Effect = TestEffect; + type Event = TestEvent; + + fn id(&self) -> &str { + match self { + Self::One => "EFFECT_ONE_INVOCATION", + Self::Two => "EFFECT_TWO_INVOCATION", + Self::Three => "EFFECT_THREE_INVOCATION", + Self::CancelThree => "EFFECT_THREE_CANCEL_INVOCATION", + } + } + + fn managed(&self) -> bool { + matches!(self, Self::Two | Self::Three) + } + + fn cancelling(&self) -> bool { + matches!(self, Self::CancelThree) + } + + fn cancelling_effect(&self, effect: &Self::Effect) -> bool { + match self { + TestInvocation::CancelThree => matches!(effect, TestEffect::Three), + _ => false, + } + } + } + + struct TestEffectHandler {} + + impl EffectHandler for TestEffectHandler { + fn create(&self, invocation: &TestInvocation) -> Option { + match invocation { + TestInvocation::One => Some(TestEffect::One), + TestInvocation::Two => Some(TestEffect::Two), + TestInvocation::Three => Some(TestEffect::Three), + _ => None, + } + } + } + + #[test] + fn run_not_managed_effect() { + let mut called = false; + let dispatcher = EffectDispatcher::new(TestEffectHandler {}); + dispatcher.dispatch(&TestInvocation::One, |_| { + called = true; + }); + + assert!(called, "Expected to call effect for TestInvocation::One"); + assert_eq!( + dispatcher.managed.read().len(), + 0, + "Non managed effects shouldn't be stored" + ); + } + + #[test] + fn run_managed_effect() { + let mut called = false; + let dispatcher = EffectDispatcher::new(TestEffectHandler {}); + dispatcher.dispatch(&TestInvocation::Two, |_| { + called = true; + }); + + assert!(called, "Expected to call effect for TestInvocation::Two"); + assert_eq!( + dispatcher.managed.read().len(), + 0, + "Managed effect should be removed on completion" + ); + } + + #[test] + fn cancel_managed_effect() { + let mut called_managed = false; + let mut cancelled_managed = false; + let dispatcher = EffectDispatcher::new(TestEffectHandler {}); + dispatcher.dispatch(&TestInvocation::Three, |_| { + called_managed = true; + }); + + assert!( + !called_managed, + "Expected that effect for TestInvocation::Three won't be called" + ); + assert_eq!( + dispatcher.managed.read().len(), + 1, + "Managed effect shouldn't complete run because doesn't have completion call" + ); + + dispatcher.dispatch(&TestInvocation::CancelThree, |_| { + cancelled_managed = true; + }); + + assert!( + cancelled_managed, + "Expected to call effect for TestInvocation::CancelThree" + ); + assert!( + !called_managed, + "Expected that effect for TestInvocation::Three won't be called" + ); + assert_eq!( + dispatcher.managed.read().len(), + 0, + "Managed effect should be cancelled" + ); + } +} diff --git a/src/core/event_engine/effect_handler.rs b/src/core/event_engine/effect_handler.rs new file mode 100644 index 00000000..fbf1d0fa --- /dev/null +++ b/src/core/event_engine/effect_handler.rs @@ -0,0 +1,10 @@ +use crate::core::event_engine::{Effect, EffectInvocation}; + +pub(crate) trait EffectHandler +where + I: EffectInvocation, + EF: Effect, +{ + /// Create effect using information of effect `invocation`. + fn create(&self, invocation: &I) -> Option; +} diff --git a/src/core/event_engine/effect_invocation.rs b/src/core/event_engine/effect_invocation.rs new file mode 100644 index 00000000..bf377aca --- /dev/null +++ b/src/core/event_engine/effect_invocation.rs @@ -0,0 +1,22 @@ +use crate::core::event_engine::{Effect, Event}; + +/// Effect invocation trait. +/// +/// Invocation is an intention to run an effect. Effect dispatcher uses intents +/// to schedule actual effect invocation. +pub(crate) trait EffectInvocation { + type Effect: Effect; + type Event: Event; + + /// Unique effect invocation identifier. + fn id(&self) -> &str; + + /// Whether invoked effect lifetime should be managed by dispatcher or not. + fn managed(&self) -> bool; + + /// Whether effect invocation cancels managed effect or not. + fn cancelling(&self) -> bool; + + /// Whether effect invocation cancels specific managed effect or not. + fn cancelling_effect(&self, effect: &Self::Effect) -> bool; +} diff --git a/src/core/event_engine/event.rs b/src/core/event_engine/event.rs new file mode 100644 index 00000000..a92fa00d --- /dev/null +++ b/src/core/event_engine/event.rs @@ -0,0 +1,10 @@ +/// Event engine external event. +/// +/// State machine uses events to calculate transition path and list of effects +/// invocations. +/// +/// Types which are expected to be used as events should implement this trait. +pub(crate) trait Event { + /// Event identifier. + fn id(&self) -> &str; +} diff --git a/src/core/event_engine/mod.rs b/src/core/event_engine/mod.rs new file mode 100644 index 00000000..2f01568d --- /dev/null +++ b/src/core/event_engine/mod.rs @@ -0,0 +1,309 @@ +//! Event Engine module + +use spin::rwlock::RwLock; + +#[doc(inline)] +pub(crate) use effect::Effect; +pub(crate) mod effect; + +#[doc(inline)] +pub(crate) use effect_dispatcher::EffectDispatcher; +pub(crate) mod effect_dispatcher; + +#[doc(inline)] +pub(crate) use effect_handler::EffectHandler; +pub(crate) mod effect_handler; + +#[doc(inline)] +pub(crate) use effect_invocation::EffectInvocation; +pub(crate) mod effect_invocation; + +#[doc(inline)] +pub(crate) use event::Event; +pub(crate) mod event; + +#[doc(inline)] +pub(crate) use state::State; +pub(crate) mod state; + +#[doc(inline)] +pub(crate) use transition::Transition; +pub(crate) mod transition; + +/// State machine's event engine. +/// +/// [`EventEngine`] is the core of state machines used in PubNub client and +/// manages current system state and handles external events. +#[allow(dead_code)] +pub(crate) struct EventEngine +where + EI: EffectInvocation, + EH: EffectHandler, + EF: Effect, + S: State, +{ + /// Effects dispatcher. + /// + /// Dispatcher responsible for effects invocation processing. + effect_dispatcher: EffectDispatcher, + + /// Current event engine state. + current_state: RwLock, +} + +impl EventEngine +where + EI: EffectInvocation, + EH: EffectHandler, + EF: Effect, + S: State, +{ + /// Create [`EventEngine`] with initial state for state machine. + #[allow(dead_code)] + pub fn new(handler: EH, state: S) -> Self { + EventEngine { + effect_dispatcher: EffectDispatcher::new(handler), + current_state: RwLock::new(state), + } + } + + /// Retrieve current engine state. + #[allow(dead_code)] + pub fn current_state(&self) -> S { + (*self.current_state.read()).clone() + } + + /// Process external event. + /// + /// Process event passed to the system and perform required transitions to + /// new state if required. + #[allow(dead_code)] + pub fn process(&self, event: &EI::Event) { + let state = self.current_state.read(); + if let Some(transition) = state.transition(event) { + drop(state); + self.process_transition(transition); + } + } + + /// Process transition. + /// + /// This method is responsible for transition maintenance: + /// * update current state + /// * call effects dispatcher to process effect invocation + fn process_transition(&self, transition: Transition) { + { + let mut writable_state = self.current_state.write(); + *writable_state = transition.state; + } + + transition.invocations.iter().for_each(|invocation| { + self.effect_dispatcher.dispatch(invocation, |events| { + if let Some(events) = events { + events.iter().for_each(|event| self.process(event)); + } + }); + }); + } +} + +#[cfg(test)] +mod should { + use super::*; + use crate::lib::alloc::{vec, vec::Vec}; + + #[derive(Debug, Clone, PartialEq)] + enum TestState { + NotStarted, + Started, + InProgress, + Completed, + } + + impl State for TestState { + type State = Self; + type Invocation = TestInvocation; + type Event = TestEvent; + + fn enter(&self) -> Option> { + Some(vec![TestInvocation::One]) + } + + fn exit(&self) -> Option> { + Some(vec![TestInvocation::Two]) + } + + fn transition( + &self, + event: &<::Invocation as EffectInvocation>::Event, + ) -> Option> { + match event { + TestEvent::One => { + if matches!(self, Self::NotStarted) { + Some(self.transition_to(Self::Started, None)) + } else if matches!(self, Self::Completed) { + Some( + self.transition_to(Self::NotStarted, Some(vec![TestInvocation::Three])), + ) + } else { + None + } + } + TestEvent::Two => matches!(self, Self::Started) + .then(|| self.transition_to(Self::InProgress, None)), + TestEvent::Three => matches!(self, Self::InProgress) + .then(|| self.transition_to(Self::Completed, Some(vec![TestInvocation::One]))), + } + } + + fn transition_to( + &self, + state: Self::State, + invocations: Option>, + ) -> Transition { + Transition { + invocations: self + .exit() + .unwrap_or(vec![]) + .into_iter() + .chain(invocations.unwrap_or(vec![]).into_iter()) + .chain(state.enter().unwrap_or(vec![]).into_iter()) + .collect(), + state, + } + } + } + + enum TestEvent { + One, + Two, + Three, + } + + impl Event for TestEvent { + fn id(&self) -> &str { + match self { + TestEvent::One => "EVENT_ONE", + TestEvent::Two => "EVENT_TWO", + TestEvent::Three => "EVENT_THREE", + } + } + } + + enum TestEffect { + One, + Two, + Three, + } + + impl Effect for TestEffect { + type Invocation = TestInvocation; + + fn id(&self) -> String { + match self { + Self::One => "EFFECT_ONE".into(), + Self::Two => "EFFECT_TWO".into(), + Self::Three => "EFFECT_THREE".into(), + } + } + + fn run(&self, mut f: F) + where + F: FnMut(Option>), + { + f(None) + } + + fn cancel(&self) { + // Do nothing. + } + } + + enum TestInvocation { + One, + Two, + Three, + } + + impl EffectInvocation for TestInvocation { + type Effect = TestEffect; + type Event = TestEvent; + + fn id(&self) -> &str { + match self { + Self::One => "EFFECT_ONE_INVOCATION", + Self::Two => "EFFECT_TWO_INVOCATION", + Self::Three => "EFFECT_THREE_INVOCATION", + } + } + + fn managed(&self) -> bool { + matches!(self, Self::Two | Self::Three) + } + + fn cancelling(&self) -> bool { + false + } + + fn cancelling_effect(&self, _effect: &Self::Effect) -> bool { + false + } + } + + struct TestEffectHandler {} + + impl EffectHandler for TestEffectHandler { + fn create(&self, invocation: &TestInvocation) -> Option { + match invocation { + TestInvocation::One => Some(TestEffect::One), + TestInvocation::Two => Some(TestEffect::Two), + TestInvocation::Three => Some(TestEffect::Three), + } + } + } + + #[test] + fn set_initial_state() { + let engine = EventEngine::new(TestEffectHandler {}, TestState::NotStarted); + assert!(matches!(engine.current_state(), TestState::NotStarted)); + } + + #[test] + fn transit_to_new_state() { + let engine = EventEngine::new(TestEffectHandler {}, TestState::NotStarted); + engine.process(&TestEvent::One); + assert!(matches!(engine.current_state(), TestState::Started)); + } + + #[test] + fn transit_between_states() { + let engine = EventEngine::new(TestEffectHandler {}, TestState::NotStarted); + + engine.process(&TestEvent::One); + assert!(matches!(engine.current_state(), TestState::Started)); + + engine.process(&TestEvent::Two); + assert!(matches!(engine.current_state(), TestState::InProgress)); + + engine.process(&TestEvent::Three); + assert!(matches!(*engine.current_state.read(), TestState::Completed)); + + engine.process(&TestEvent::One); + assert!(matches!( + *engine.current_state.read(), + TestState::NotStarted + )); + } + + #[test] + fn not_transit_for_unexpected_event() { + let engine = EventEngine::new(TestEffectHandler {}, TestState::NotStarted); + + engine.process(&TestEvent::One); + assert!(matches!(engine.current_state(), TestState::Started)); + + engine.process(&TestEvent::Three); + assert!(!matches!(engine.current_state(), TestState::Completed)); + assert!(matches!(engine.current_state(), TestState::Started)); + } +} diff --git a/src/core/event_engine/state.rs b/src/core/event_engine/state.rs new file mode 100644 index 00000000..aa3ceaf4 --- /dev/null +++ b/src/core/event_engine/state.rs @@ -0,0 +1,51 @@ +use crate::{ + core::event_engine::{EffectInvocation, Event, Transition}, + lib::alloc::vec::Vec, +}; + +/// State machine state trait. +/// +/// For transition, the state machine needs to know which effects should be +/// dispatched during transition to target state in response to a specific +/// event. +/// +/// Types which are expected to be used as states should implement the trait. +pub(crate) trait State: Clone + PartialEq { + type State: State; + type Invocation: EffectInvocation; + type Event: Event; + + /// State enter effects invocations. + /// + /// The list of effect invocations that should be called when the event + /// engine enters state. + fn enter(&self) -> Option>; + + /// State exit effects invocations. + /// + /// The list of effect invocations that should be called when the event + /// engine leaves state. + fn exit(&self) -> Option>; + + /// System event handler. + /// + /// State has information about the next state into which the state machine + /// should switch and a list of effects invocations which should be + /// scheduled. + fn transition( + &self, + event: &<::Invocation as EffectInvocation>::Event, + ) -> Option>; + + /// [`Transition`] build helper. + /// + /// Transition to a new state is a composite operation which includes + /// dispatching of [`exit`] effect invocations of receiver, followed by + /// dispatch of provided transition effect `invocations` and [`enter`] + /// effect invocations of target state. + fn transition_to( + &self, + state: Self::State, + invocations: Option>, + ) -> Transition; +} diff --git a/src/core/event_engine/transition.rs b/src/core/event_engine/transition.rs new file mode 100644 index 00000000..e1ca80ca --- /dev/null +++ b/src/core/event_engine/transition.rs @@ -0,0 +1,21 @@ +use crate::{ + core::event_engine::{EffectInvocation, State}, + lib::alloc::vec::Vec, +}; + +/// State machine transition type. +/// +/// State transition with information about target state and list of effect +/// invocations. +#[allow(dead_code)] +pub(crate) struct Transition +where + S: State, + I: EffectInvocation, +{ + /// Target state machine state. + pub state: S, + + /// List of effect invocation which should be scheduled during transition. + pub invocations: Vec, +} diff --git a/src/core/mod.rs b/src/core/mod.rs index ed620b50..abc15f07 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -54,4 +54,5 @@ pub mod serializer; pub use cryptor::Cryptor; pub mod cryptor; +pub(crate) mod event_engine; pub(crate) mod metadata; diff --git a/src/dx/mod.rs b/src/dx/mod.rs index 0daea659..7ed9e5e2 100644 --- a/src/dx/mod.rs +++ b/src/dx/mod.rs @@ -12,6 +12,8 @@ pub mod access; #[cfg(feature = "publish")] pub mod publish; +pub mod subscribe; + #[cfg(all(feature = "parse_token", feature = "serde"))] pub use parse_token::parse_token; #[cfg(feature = "parse_token")] diff --git a/src/dx/subscribe/event_engine/effect.rs b/src/dx/subscribe/event_engine/effect.rs new file mode 100644 index 00000000..6ede8cf0 --- /dev/null +++ b/src/dx/subscribe/event_engine/effect.rs @@ -0,0 +1,130 @@ +use crate::dx::subscribe::event_engine::{SubscribeEffectInvocation, SubscribeEvent}; +use crate::{ + core::{event_engine::Effect, PubNubError}, + dx::subscribe::{SubscribeCursor, SubscribeStatus}, + lib::alloc::{string::String, vec::Vec}, +}; + +/// Subscription state machine effects. +#[allow(dead_code)] +pub(crate) enum SubscribeEffect { + /// Initial subscribe effect invocation. + Handshake { + /// Optional list of channels. + /// + /// List of channels which will be source of real-time updates after + /// initial subscription completion. + channels: Option>, + + /// Optional list of channel groups. + /// + /// List of channel groups which will be source of real-time updates + /// after initial subscription completion. + channel_groups: Option>, + }, + + /// Retry initial subscribe effect invocation. + HandshakeReconnect { + /// Optional list of channels. + /// + /// List of channels which has been used during recently failed initial + /// subscription. + channels: Option>, + + /// Optional list of channel groups. + /// + /// List of channel groups which has been used during recently failed + /// initial subscription. + channel_groups: Option>, + + /// Current initial subscribe retry attempt. + /// + /// Used to track overall number of initial subscription retry attempts. + attempts: u8, + + /// Initial subscribe attempt failure reason. + reason: PubNubError, + }, + + /// Receive updates effect invocation. + Receive { + /// Optional list of channels. + /// + /// List of channels for which real-time updates will be delivered. + channels: Option>, + + /// Optional list of channel groups. + /// + /// List of channel groups for which real-time updates will be + /// delivered. + channel_groups: Option>, + + /// Time cursor. + /// + /// Cursor used by subscription loop to identify point in time after + /// which updates will be delivered. + cursor: SubscribeCursor, + }, + + /// Retry receive updates effect invocation. + ReceiveReconnect { + /// Optional list of channels. + /// + /// List of channels which has been used during recently failed receive + /// updates. + channels: Option>, + + /// Optional list of channel groups. + /// + /// List of channel groups which has been used during recently failed + /// receive updates. + channel_groups: Option>, + + /// Time cursor. + /// + /// Cursor used by subscription loop to identify point in time after + /// which updates will be delivered. + cursor: SubscribeCursor, + + /// Current receive retry attempt. + /// + /// Used to track overall number of receive updates retry attempts. + attempts: u8, + + /// Receive updates attempt failure reason. + reason: PubNubError, + }, + + /// Status change notification effect invocation. + EmitStatus(SubscribeStatus), + + /// Received updates notification effect invocation. + EmitMessages(Vec), +} + +impl Effect for SubscribeEffect { + type Invocation = SubscribeEffectInvocation; + + fn id(&self) -> String { + // TODO: Identifiers need to be unique, so we won't cancel wrong effect + match self { + SubscribeEffect::Handshake { .. } => "HANDSHAKE_EFFECT".into(), + SubscribeEffect::HandshakeReconnect { .. } => "HANDSHAKE_RECONNECT_EFFECT".into(), + SubscribeEffect::Receive { .. } => "RECEIVE_EFFECT".into(), + SubscribeEffect::ReceiveReconnect { .. } => "RECEIVE_RECONNECT_EFFECT".into(), + SubscribeEffect::EmitStatus(_) => "EMIT_STATUS_EFFECT".into(), + SubscribeEffect::EmitMessages(_) => "EMIT_MESSAGES_EFFECT".into(), + } + } + fn run(&self, mut f: F) + where + F: FnMut(Option>), + { + // TODO: Run actual effect implementation. Maybe Effect.run function need change something in arguments. + f(None); + } + + fn cancel(&self) { + // TODO: Cancellation required for corresponding SubscribeEffect variants. + } +} diff --git a/src/dx/subscribe/event_engine/effect_handler.rs b/src/dx/subscribe/event_engine/effect_handler.rs new file mode 100644 index 00000000..065cadad --- /dev/null +++ b/src/dx/subscribe/event_engine/effect_handler.rs @@ -0,0 +1,100 @@ +use crate::{ + core::{event_engine::EffectHandler, PubNubError}, + dx::subscribe::{ + event_engine::{SubscribeEffect, SubscribeEffectInvocation}, + SubscribeCursor, + }, + lib::alloc::{string::String, vec::Vec}, +}; + +pub(crate) type HandshakeFunction = fn( + channels: Option>, + channel_groups: Option>, + attempt: u8, + reason: Option, +); + +pub(crate) type ReceiveFunction = fn( + channels: Option>, + channel_groups: Option>, + cursor: SubscribeCursor, + attempt: u8, + reason: Option, +); + +/// Subscription effect handler. +/// +/// Handler responsible for effects implementation and creation in response on +/// effect invocation. +#[allow(dead_code)] +pub(crate) struct SubscribeEffectHandler { + /// Handshake function pointer. + handshake: HandshakeFunction, + + /// Receive updates function pointer. + receive: ReceiveFunction, +} + +impl SubscribeEffectHandler { + /// Create subscribe event handler. + #[allow(dead_code)] + pub fn new(handshake: HandshakeFunction, receive: ReceiveFunction) -> Self { + SubscribeEffectHandler { handshake, receive } + } +} + +impl EffectHandler for SubscribeEffectHandler { + fn create(&self, invocation: &SubscribeEffectInvocation) -> Option { + match invocation { + SubscribeEffectInvocation::Handshake { + channels, + channel_groups, + } => Some(SubscribeEffect::Handshake { + channels: channels.clone(), + channel_groups: channel_groups.clone(), + }), + SubscribeEffectInvocation::HandshakeReconnect { + channels, + channel_groups, + attempts, + reason, + } => Some(SubscribeEffect::HandshakeReconnect { + channels: channels.clone(), + channel_groups: channel_groups.clone(), + attempts: *attempts, + reason: reason.clone(), + }), + SubscribeEffectInvocation::Receive { + channels, + channel_groups, + cursor, + } => Some(SubscribeEffect::Receive { + channels: channels.clone(), + channel_groups: channel_groups.clone(), + cursor: *cursor, + }), + SubscribeEffectInvocation::ReceiveReconnect { + channels, + channel_groups, + cursor, + attempts, + reason, + } => Some(SubscribeEffect::ReceiveReconnect { + channels: channels.clone(), + channel_groups: channel_groups.clone(), + cursor: *cursor, + attempts: *attempts, + reason: reason.clone(), + }), + SubscribeEffectInvocation::EmitStatus(status) => { + // TODO: Provide emit status effect + Some(SubscribeEffect::EmitStatus(*status)) + } + SubscribeEffectInvocation::EmitMessages(messages) => { + // TODO: Provide emit messages effect + Some(SubscribeEffect::EmitMessages(messages.clone())) + } + _ => None, + } + } +} diff --git a/src/dx/subscribe/event_engine/event.rs b/src/dx/subscribe/event_engine/event.rs new file mode 100644 index 00000000..46bd3cf1 --- /dev/null +++ b/src/dx/subscribe/event_engine/event.rs @@ -0,0 +1,153 @@ +use crate::{ + core::{event_engine::Event, PubNubError}, + dx::subscribe::SubscribeCursor, + lib::alloc::{string::String, vec::Vec}, +}; + +/// Subscription events. +/// +/// Subscribe state machine behaviour depends from external events which it +/// receives. +#[allow(dead_code)] +pub(crate) enum SubscribeEvent { + /// Current list of channels / groups has been changed. + /// + /// Emitted when updates list of channels / groups has been passed for + /// subscription. + SubscriptionChanged { + channels: Option>, + channel_groups: Option>, + }, + + /// Catching up on updates. + /// + /// Emitted when subscription has been called with timetoken (cursor) + /// starting from which updates should be received. + SubscriptionRestored { + channels: Option>, + channel_groups: Option>, + cursor: SubscribeCursor, + }, + + /// Handshake completed successfully. + /// + /// Emitted when [`PubNub`] network returned timetoken (cursor) which will + /// be used for subscription loop. + /// + /// [`PubNub`]: https://www.pubnub.com/ + HandshakeSuccess { cursor: SubscribeCursor }, + + /// Handshake completed with error. + /// + /// Emitted when handshake effect was unable to receive response from + /// [`PubNub`] network (network issues or permissions). + /// + /// [`PubNub`]: https://www.pubnub.com/ + HandshakeFailure { reason: PubNubError }, + + /// Handshake reconnect completed successfully. + /// + /// Emitted when another handshake attempt was successful and [`PubNub`] + /// network returned timetoken (cursor) which will be used for subscription + /// loop. + /// + /// [`PubNub`]: https://www.pubnub.com/ + HandshakeReconnectSuccess { cursor: SubscribeCursor }, + + /// Handshake reconnect completed with error. + /// + /// Emitted when another handshake effect attempt was unable to receive + /// response from [`PubNub`] network (network issues or permissions). + /// + /// [`PubNub`]: https://www.pubnub.com/ + HandshakeReconnectFailure { reason: PubNubError }, + + /// All handshake attempts was unsuccessful. + /// + /// Emitted when handshake reconnect attempts reached maximum allowed count + /// (according to retry / reconnection policy) and all following attempts + /// should be stopped. + HandshakeReconnectGiveUp { reason: PubNubError }, + + /// Receive updates completed successfully. + /// + /// Emitted when [`PubNub`] network returned list of real-time updates along + /// with timetoken (cursor) which will be used for next subscription loop. + /// + /// [`PubNub`]: https://www.pubnub.com/ + ReceiveSuccess { + cursor: SubscribeCursor, + messages: Vec, + }, + + /// Receive updates completed with error. + /// + /// Emitted when receive updates effect was unable to receive response from + /// [`PubNub`] network (network issues or revoked / expired permissions). + /// + /// [`PubNub`]: https://www.pubnub.com/ + ReceiveFailure { reason: PubNubError }, + + /// Receive updates reconnect completed successfully. + /// + /// Emitted when another receive updates attempt was successful and + /// [`PubNub`] network returned list of real-time updates along + /// timetoken (cursor) which will be used for subscription loop. + /// + /// [`PubNub`]: https://www.pubnub.com/ + ReceiveReconnectSuccess { + cursor: SubscribeCursor, + messages: Vec, + }, + + /// Receive updates reconnect completed with error. + /// + /// Emitted when another receive updates effect attempt was unable to + /// receive response from [`PubNub`] network (network issues or + /// revoked permissions). + /// + /// [`PubNub`]: https://www.pubnub.com/ + ReceiveReconnectFailure { reason: PubNubError }, + + /// All receive updates attempts was unsuccessful. + /// + /// Emitted when receive updates reconnect attempts reached maximum allowed + /// count (according to retry / reconnection policy) and all following + /// attempts should be stopped. + ReceiveReconnectGiveUp { reason: PubNubError }, + + /// Disconnect from [`PubNub`] network. + /// + /// Emitted when explicitly requested to stop receiving real-time updates. + /// + /// [`PubNub`]: https://www.pubnub.com/ + Disconnect, + + /// Reconnect to [`PubNub`] network. + /// + /// Emitted when explicitly requested to restore real-time updates receive. + /// + /// [`PubNub`]: https://www.pubnub.com/ + Reconnect, +} + +impl Event for SubscribeEvent { + fn id(&self) -> &str { + match self { + SubscribeEvent::SubscriptionChanged { .. } => "SUBSCRIPTION_CHANGED", + SubscribeEvent::SubscriptionRestored { .. } => "SUBSCRIPTION_RESTORED", + SubscribeEvent::HandshakeSuccess { .. } => "HANDSHAKE_SUCCESS", + SubscribeEvent::HandshakeFailure { .. } => "HANDSHAKE_FAILURE", + SubscribeEvent::HandshakeReconnectSuccess { .. } => "HANDSHAKE_RECONNECT_SUCCESS", + SubscribeEvent::HandshakeReconnectFailure { .. } => "HANDSHAKE_RECONNECT_FAILURE", + SubscribeEvent::HandshakeReconnectGiveUp { .. } => "HANDSHAKE_RECONNECT_GIVEUP", + SubscribeEvent::ReceiveSuccess { .. } => "RECEIVE_SUCCESS", + SubscribeEvent::ReceiveFailure { .. } => "RECEIVE_FAILURE", + SubscribeEvent::ReceiveReconnectSuccess { .. } => "RECEIVE_RECONNECT_SUCCESS", + SubscribeEvent::ReceiveReconnectFailure { .. } => "RECEIVE_RECONNECT_FAILURE", + SubscribeEvent::ReceiveReconnectGiveUp { .. } => "RECEIVE_RECONNECT_GIVEUP", + SubscribeEvent::Disconnect => "DISCONNECT", + SubscribeEvent::Reconnect => "RECONNECT", + } + } +} diff --git a/src/dx/subscribe/event_engine/invocation.rs b/src/dx/subscribe/event_engine/invocation.rs new file mode 100644 index 00000000..09ebfec9 --- /dev/null +++ b/src/dx/subscribe/event_engine/invocation.rs @@ -0,0 +1,190 @@ +use crate::dx::subscribe::event_engine::SubscribeEvent; +use crate::{ + core::{event_engine::EffectInvocation, PubNubError}, + dx::subscribe::{event_engine::SubscribeEffect, SubscribeCursor, SubscribeStatus}, + lib::{ + alloc::{string::String, vec::Vec}, + core::fmt::{Formatter, Result}, + }, +}; + +/// Subscribe effect invocations +/// +/// Invocation is form of intention to call some action without any information +/// about it's implementation. +#[derive(Debug)] +#[allow(dead_code)] +pub(crate) enum SubscribeEffectInvocation { + /// Initial subscribe effect invocation. + Handshake { + /// Optional list of channels. + /// + /// List of channels which will be source of real-time updates after + /// initial subscription completion. + channels: Option>, + + /// Optional list of channel groups. + /// + /// List of channel groups which will be source of real-time updates + /// after initial subscription completion. + channel_groups: Option>, + }, + + /// Cancel initial subscribe effect invocation. + CancelHandshake, + + /// Retry initial subscribe effect invocation. + HandshakeReconnect { + /// Optional list of channels. + /// + /// List of channels which has been used during recently failed initial + /// subscription. + channels: Option>, + + /// Optional list of channel groups. + /// + /// List of channel groups which has been used during recently failed + /// initial subscription. + channel_groups: Option>, + + /// Current initial subscribe retry attempt. + /// + /// Used to track overall number of initial subscription retry attempts. + attempts: u8, + + /// Initial subscribe attempt failure reason. + reason: PubNubError, + }, + + /// Cancel initial subscribe retry effect invocation. + CancelHandshakeReconnect, + + /// Receive updates effect invocation. + Receive { + /// Optional list of channels. + /// + /// List of channels for which real-time updates will be delivered. + channels: Option>, + + /// Optional list of channel groups. + /// + /// List of channel groups for which real-time updates will be + /// delivered. + channel_groups: Option>, + + /// Time cursor. + /// + /// Cursor used by subscription loop to identify point in time after + /// which updates will be delivered. + cursor: SubscribeCursor, + }, + + /// Cancel receive updates effect invocation. + CancelReceive, + + /// Retry receive updates effect invocation. + ReceiveReconnect { + /// Optional list of channels. + /// + /// List of channels which has been used during recently failed receive + /// updates. + channels: Option>, + + /// Optional list of channel groups. + /// + /// List of channel groups which has been used during recently failed + /// receive updates. + channel_groups: Option>, + + /// Time cursor. + /// + /// Cursor used by subscription loop to identify point in time after + /// which updates will be delivered. + cursor: SubscribeCursor, + + /// Current receive retry attempt. + /// + /// Used to track overall number of receive updates retry attempts. + attempts: u8, + + /// Receive updates attempt failure reason. + reason: PubNubError, + }, + + /// Cancel receive updates retry effect invocation. + CancelReceiveReconnect, + + /// Status change notification effect invocation. + EmitStatus(SubscribeStatus), + + /// Received updates notification effect invocation. + EmitMessages(Vec), +} + +impl EffectInvocation for SubscribeEffectInvocation { + type Effect = SubscribeEffect; + type Event = SubscribeEvent; + + fn id(&self) -> &str { + match self { + Self::Handshake { .. } => "Handshake", + Self::CancelHandshake => "CancelHandshake", + Self::HandshakeReconnect { .. } => "HandshakeReconnect", + Self::CancelHandshakeReconnect => "CancelHandshakeReconnect", + Self::Receive { .. } => "Receive", + Self::CancelReceive { .. } => "CancelReceive", + Self::ReceiveReconnect { .. } => "ReceiveReconnect", + Self::CancelReceiveReconnect { .. } => "CancelReceiveReconnect", + Self::EmitStatus(_status) => "EmitStatus", + Self::EmitMessages(_messages) => "EmitMessages", + } + } + + fn managed(&self) -> bool { + matches!( + self, + Self::Handshake { .. } + | Self::HandshakeReconnect { .. } + | Self::Receive { .. } + | Self::ReceiveReconnect { .. } + ) + } + + fn cancelling(&self) -> bool { + matches!( + self, + Self::CancelHandshake + | Self::CancelHandshakeReconnect + | Self::CancelReceive + | Self::CancelReceiveReconnect + ) + } + + fn cancelling_effect(&self, effect: &Self::Effect) -> bool { + (matches!(effect, SubscribeEffect::Handshake { .. }) + && matches!(self, Self::CancelHandshake { .. })) + || (matches!(effect, SubscribeEffect::HandshakeReconnect { .. }) + && matches!(self, Self::CancelHandshakeReconnect { .. })) + || (matches!(effect, SubscribeEffect::Receive { .. }) + && matches!(self, Self::CancelReceive { .. })) + || (matches!(effect, SubscribeEffect::ReceiveReconnect { .. }) + && matches!(self, Self::CancelReceiveReconnect { .. })) + } +} + +impl core::fmt::Display for SubscribeEffectInvocation { + fn fmt(&self, f: &mut Formatter<'_>) -> Result { + match self { + Self::Handshake { .. } => write!(f, "Handshake"), + Self::CancelHandshake => write!(f, "CancelHandshake"), + Self::HandshakeReconnect { .. } => write!(f, "HandshakeReconnect"), + Self::CancelHandshakeReconnect => write!(f, "CancelHandshakeReconnect"), + Self::Receive { .. } => write!(f, "Receive"), + Self::CancelReceive { .. } => write!(f, "CancelReceive"), + Self::ReceiveReconnect { .. } => write!(f, "ReceiveReconnect"), + Self::CancelReceiveReconnect { .. } => write!(f, "CancelReceiveReconnect"), + Self::EmitStatus(status) => write!(f, "EmitStatus({})", status), + Self::EmitMessages(messages) => write!(f, "EmitMessages({:?})", messages), + } + } +} diff --git a/src/dx/subscribe/event_engine/mod.rs b/src/dx/subscribe/event_engine/mod.rs new file mode 100644 index 00000000..70510aef --- /dev/null +++ b/src/dx/subscribe/event_engine/mod.rs @@ -0,0 +1,23 @@ +//! Subscribe Event Engine module + +#[doc(inline)] +pub(crate) use effect::SubscribeEffect; +pub(crate) mod effect; + +#[doc(inline)] +#[allow(unused_imports)] +pub(crate) use effect_handler::SubscribeEffectHandler; +pub(crate) mod effect_handler; + +#[doc(inline)] +pub(crate) use invocation::SubscribeEffectInvocation; +pub(crate) mod invocation; + +#[doc(inline)] +pub(crate) use event::SubscribeEvent; +pub(crate) mod event; + +#[doc(inline)] +#[allow(unused_imports)] +pub(crate) use state::SubscribeState; +pub(crate) mod state; diff --git a/src/dx/subscribe/event_engine/state.rs b/src/dx/subscribe/event_engine/state.rs new file mode 100644 index 00000000..13903a47 --- /dev/null +++ b/src/dx/subscribe/event_engine/state.rs @@ -0,0 +1,1509 @@ +use crate::{ + core::{ + event_engine::{State, Transition}, + PubNubError, + }, + dx::subscribe::{ + event_engine::{ + SubscribeEffectInvocation::{ + self, CancelHandshake, CancelHandshakeReconnect, CancelReceive, + CancelReceiveReconnect, EmitMessages, EmitStatus, Handshake, HandshakeReconnect, + Receive, ReceiveReconnect, + }, + SubscribeEvent, + }, + SubscribeCursor, SubscribeStatus, + }, + lib::alloc::{string::String, vec, vec::Vec}, +}; + +/// States of subscribe state machine. +#[derive(Debug, Clone, PartialEq)] +#[allow(dead_code)] +pub(crate) enum SubscribeState { + /// Unsubscribed state. + /// + /// The initial state has no information about channels or groups from which + /// events should be retrieved in real-time. + Unsubscribed, + + /// Subscription initiation state. + /// + /// Retrieve the information that will be used to start the subscription + /// loop. + Handshaking { + /// Optional list of channels. + /// + /// List of channels which will be source of real-time updates after + /// initial subscription completion. + channels: Option>, + + /// Optional list of channel groups. + /// + /// List of channel groups which will be source of real-time updates + /// after initial subscription completion. + channel_groups: Option>, + }, + + /// Subscription recover state. + /// + /// The system is recovering after the initial subscription attempt failed. + HandshakeReconnecting { + /// Optional list of channels. + /// + /// List of channels which has been used during recently failed initial + /// subscription. + channels: Option>, + + /// Optional list of channel groups. + /// + /// List of channel groups which has been used during recently failed + /// initial subscription. + channel_groups: Option>, + + /// Current initial subscribe retry attempt. + /// + /// Used to track overall number of initial subscription retry attempts. + attempts: u8, + + /// Initial subscribe attempt failure reason. + reason: PubNubError, + }, + + /// Initial subscription stopped state. + /// + /// Subscription state machine state, which is set when + /// [`SubscribeEvent::Disconnect`] event sent while in + /// [`SubscribeState::Handshaking`] or + /// [`SubscribeState::HandshakeReconnecting`] state. + HandshakeStopped { + /// Optional list of channels. + /// + /// List of channels for which initial subscription stopped. + channels: Option>, + + /// Optional list of channel groups. + /// + /// List of channel groups for which initial subscription stopped. + channel_groups: Option>, + }, + + /// Initial subscription failure state. + /// + /// System wasn't able to perform successful initial subscription after + /// fixed number of attempts. + HandshakeFailed { + /// Optional list of channels. + /// + /// List of channels which has been used during recently failed initial + /// subscription. + channels: Option>, + + /// Optional list of channel groups. + /// + /// List of channel groups which has been used during recently failed + /// initial subscription. + channel_groups: Option>, + + /// Initial subscribe attempt failure reason. + reason: PubNubError, + }, + + /// Receiving updates state. + /// + /// Subscription state machine is in state where it receive real-time + /// updates from [`PubNub`] network. + /// + /// [`PubNub`]:https://www.pubnub.com/ + Receiving { + /// Optional list of channels. + /// + /// List of channels for which real-time updates will be delivered. + channels: Option>, + + /// Optional list of channel groups. + /// + /// List of channel groups for which real-time updates will be + /// delivered. + channel_groups: Option>, + + /// Time cursor. + /// + /// Cursor used by subscription loop to identify point in time after + /// which updates will be delivered. + cursor: SubscribeCursor, + }, + + /// Subscription recover state. + /// + /// The system is recovering after the updates receiving attempt failed. + ReceiveReconnecting { + /// Optional list of channels. + /// + /// List of channels which has been used during recently failed receive + /// updates. + channels: Option>, + + /// Optional list of channel groups. + /// + /// List of channel groups which has been used during recently failed + /// receive updates. + channel_groups: Option>, + + /// Time cursor. + /// + /// Cursor used by subscription loop to identify point in time after + /// which updates will be delivered. + cursor: SubscribeCursor, + + /// Current receive retry attempt. + /// + /// Used to track overall number of receive updates retry attempts. + attempts: u8, + + /// Receive updates attempt failure reason. + reason: PubNubError, + }, + + /// Updates receiving stopped state. + /// + /// Subscription state machine state, which is set when + /// [`SubscribeEvent::Disconnect`] event sent while in + /// [`SubscribeState::Handshaking`] or + /// [`SubscribeState::HandshakeReconnecting`] state. + ReceiveStopped { + /// Optional list of channels. + /// + /// List of channels for which updates receive stopped. + channels: Option>, + + /// Optional list of channels. + /// + /// List of channel groups for which updates receive stopped. + channel_groups: Option>, + + /// Time cursor. + /// + /// Cursor used by subscription loop to identify point in time after + /// which updates will be delivered. + cursor: SubscribeCursor, + }, + + /// Updates receiving failure state. + /// + /// System wasn't able to receive updates after fixed number of attempts. + ReceiveFailed { + /// Optional list of channels. + /// + /// List of channels which has been used during recently failed receive + /// updates. + channels: Option>, + + /// Optional list of channel groups. + /// + /// List of channel groups which has been used during recently failed + /// receive updates. + channel_groups: Option>, + + /// Time cursor. + /// + /// Cursor used by subscription loop to identify point in time after + /// which updates will be delivered. + cursor: SubscribeCursor, + + /// Receive updates attempt failure reason. + reason: PubNubError, + }, +} + +impl SubscribeState { + /// Handle channels / groups list change event. + fn subscription_changed_transition( + &self, + channels: &Option>, + channel_groups: &Option>, + ) -> Option> { + match self { + Self::Unsubscribed + | Self::Handshaking { .. } + | Self::HandshakeReconnecting { .. } + | Self::HandshakeFailed { .. } => Some(self.transition_to( + Self::Handshaking { + channels: channels.clone(), + channel_groups: channel_groups.clone(), + }, + None, + )), + Self::Receiving { cursor, .. } + | Self::ReceiveReconnecting { cursor, .. } + | Self::ReceiveFailed { cursor, .. } => Some(self.transition_to( + Self::Receiving { + channels: channels.clone(), + channel_groups: channel_groups.clone(), + cursor: *cursor, + }, + None, + )), + _ => None, + } + } + + /// Handle catchup event. + /// + /// Event is sent each time during attempt to subscribe with specific + /// `cursor`. + fn subscription_restored_transition( + &self, + channels: &Option>, + channel_groups: &Option>, + cursor: &SubscribeCursor, + ) -> Option> { + match self { + Self::Unsubscribed + | Self::Handshaking { .. } + | Self::HandshakeReconnecting { .. } + | Self::HandshakeFailed { .. } + | Self::Receiving { .. } + | Self::ReceiveReconnecting { .. } + | Self::ReceiveFailed { .. } => Some(self.transition_to( + Self::Receiving { + channels: channels.clone(), + channel_groups: channel_groups.clone(), + cursor: *cursor, + }, + None, + )), + _ => None, + } + } + + /// Handle initial (reconnect) handshake success event. + /// + /// Event is sent when provided set of channels and groups has been used for + /// first time. + fn handshake_success_transition( + &self, + cursor: &SubscribeCursor, + ) -> Option> { + match self { + Self::Handshaking { + channels, + channel_groups, + } + | Self::HandshakeReconnecting { + channels, + channel_groups, + .. + } => Some(self.transition_to( + Self::Receiving { + channels: channels.clone(), + channel_groups: channel_groups.clone(), + cursor: *cursor, + }, + Some(vec![EmitStatus(SubscribeStatus::Connected)]), + )), + _ => None, + } + } + + /// Handle initial handshake failure event. + fn handshake_failure_transition( + &self, + reason: &PubNubError, + ) -> Option> { + match self { + Self::Handshaking { + channels, + channel_groups, + } => Some(self.transition_to( + Self::HandshakeReconnecting { + channels: channels.clone(), + channel_groups: channel_groups.clone(), + attempts: 0, + reason: reason.clone(), + }, + None, + )), + _ => None, + } + } + + /// Handle handshake reconnect failure event. + /// + /// Event is sent if handshake reconnect effect failed due to any network + /// issues. + fn handshake_reconnect_failure_transition( + &self, + reason: &PubNubError, + ) -> Option> { + match self { + Self::HandshakeReconnecting { + channels, + channel_groups, + attempts, + .. + } => Some(self.transition_to( + Self::HandshakeReconnecting { + channels: channels.clone(), + channel_groups: channel_groups.clone(), + attempts: attempts + 1, + reason: reason.clone(), + }, + None, + )), + _ => None, + } + } + + /// Handle handshake reconnection limit event. + /// + /// Event is sent if handshake reconnect reached maximum number of reconnect + /// attempts. + fn handshake_reconnect_give_up_transition( + &self, + reason: &PubNubError, + ) -> Option> { + match self { + Self::HandshakeReconnecting { + channels, + channel_groups, + .. + } => Some(self.transition_to( + Self::HandshakeFailed { + channels: channels.clone(), + channel_groups: channel_groups.clone(), + reason: reason.clone(), + }, + None, + )), + _ => None, + } + } + + /// Handle updates receive (reconnect) success event. + /// + /// Event is sent when real-time updates received for previously subscribed + /// channels / groups. + fn receive_success_transition( + &self, + cursor: &SubscribeCursor, + messages: &[String], + ) -> Option> { + match self { + Self::Receiving { + channels, + channel_groups, + .. + } + | Self::ReceiveReconnecting { + channels, + channel_groups, + .. + } => Some(self.transition_to( + Self::Receiving { + channels: channels.clone(), + channel_groups: channel_groups.clone(), + cursor: *cursor, + }, + Some(vec![ + EmitMessages(messages.to_vec()), + EmitStatus(SubscribeStatus::Connected), + ]), + )), + _ => None, + } + } + + /// Handle updates receive failure event. + fn receive_failure_transition( + &self, + reason: &PubNubError, + ) -> Option> { + match self { + Self::Receiving { + channels, + channel_groups, + cursor, + .. + } => Some(self.transition_to( + Self::ReceiveReconnecting { + channels: channels.clone(), + channel_groups: channel_groups.clone(), + cursor: *cursor, + attempts: 0, + reason: reason.clone(), + }, + None, + )), + _ => None, + } + } + + /// Handle updates receive failure event. + /// + /// Event is sent if updates receive effect failed due to any network + /// issues. + fn receive_reconnect_failure_transition( + &self, + reason: &PubNubError, + ) -> Option> { + match self { + Self::ReceiveReconnecting { + channels, + channel_groups, + attempts, + cursor, + .. + } => Some(self.transition_to( + Self::ReceiveReconnecting { + channels: channels.clone(), + channel_groups: channel_groups.clone(), + cursor: *cursor, + attempts: attempts + 1, + reason: reason.clone(), + }, + None, + )), + _ => None, + } + } + + /// Handle receive updates reconnection limit event. + /// + /// Event is sent if receive updates reconnect reached maximum number of + /// reconnect attempts. + fn receive_reconnect_give_up_transition( + &self, + reason: &PubNubError, + ) -> Option> { + match self { + Self::ReceiveReconnecting { + channels, + channel_groups, + cursor, + .. + } => Some(self.transition_to( + Self::ReceiveFailed { + channels: channels.clone(), + channel_groups: channel_groups.clone(), + cursor: *cursor, + reason: reason.clone(), + }, + Some(vec![EmitStatus(SubscribeStatus::Disconnected)]), + )), + _ => None, + } + } + + /// Handle disconnect event. + /// + /// Event is sent each time when client asked to unsubscribe all + /// channels / groups or temporally stop any activity. + fn disconnect_transition(&self) -> Option> { + match self { + Self::Handshaking { + channels, + channel_groups, + } + | Self::HandshakeReconnecting { + channels, + channel_groups, + .. + } => Some(self.transition_to( + Self::HandshakeStopped { + channels: channels.clone(), + channel_groups: channel_groups.clone(), + }, + None, + )), + Self::Receiving { + channels, + channel_groups, + cursor, + } + | Self::ReceiveReconnecting { + channels, + channel_groups, + cursor, + .. + } => Some(self.transition_to( + Self::ReceiveStopped { + channels: channels.clone(), + channel_groups: channel_groups.clone(), + cursor: *cursor, + }, + Some(vec![EmitStatus(SubscribeStatus::Disconnected)]), + )), + _ => None, + } + } + + /// Handle reconnect event. + /// + /// Event is sent each time when client asked to restore activity for + /// channels / groups after which previously temporally stopped or restore + /// after reconnection failures. + fn reconnect_transition(&self) -> Option> { + match self { + Self::HandshakeStopped { + channels, + channel_groups, + .. + } + | Self::HandshakeFailed { + channels, + channel_groups, + .. + } => Some(self.transition_to( + Self::Handshaking { + channels: channels.clone(), + channel_groups: channel_groups.clone(), + }, + None, + )), + Self::ReceiveStopped { + channels, + channel_groups, + cursor, + } + | Self::ReceiveFailed { + channels, + channel_groups, + cursor, + .. + } => Some(self.transition_to( + Self::Receiving { + channels: channels.clone(), + channel_groups: channel_groups.clone(), + cursor: *cursor, + }, + None, + )), + _ => None, + } + } +} + +impl State for SubscribeState { + type State = Self; + type Invocation = SubscribeEffectInvocation; + type Event = SubscribeEvent; + + fn enter(&self) -> Option> { + match self { + SubscribeState::Handshaking { + channels, + channel_groups, + } => Some(vec![Handshake { + channels: channels.clone(), + channel_groups: channel_groups.clone(), + }]), + Self::HandshakeReconnecting { + channels, + channel_groups, + attempts, + reason, + } => Some(vec![HandshakeReconnect { + channels: channels.clone(), + channel_groups: channel_groups.clone(), + attempts: *attempts, + reason: reason.clone(), + }]), + Self::Receiving { + channels, + channel_groups, + cursor, + } => Some(vec![Receive { + channels: channels.clone(), + channel_groups: channel_groups.clone(), + cursor: *cursor, + }]), + Self::ReceiveReconnecting { + channels, + channel_groups, + cursor, + attempts, + reason, + } => Some(vec![ReceiveReconnect { + channels: channels.clone(), + channel_groups: channel_groups.clone(), + cursor: *cursor, + attempts: *attempts, + reason: reason.clone(), + }]), + _ => None, + } + } + + fn exit(&self) -> Option> { + match self { + Self::Handshaking { .. } => Some(vec![CancelHandshake]), + Self::HandshakeReconnecting { .. } => Some(vec![CancelHandshakeReconnect]), + Self::Receiving { .. } => Some(vec![CancelReceive]), + Self::ReceiveReconnecting { .. } => Some(vec![CancelReceiveReconnect]), + _ => None, + } + } + + fn transition(&self, event: &Self::Event) -> Option> { + match event { + SubscribeEvent::SubscriptionChanged { + channels, + channel_groups, + } => self.subscription_changed_transition(channels, channel_groups), + SubscribeEvent::SubscriptionRestored { + channels, + channel_groups, + cursor, + } => self.subscription_restored_transition(channels, channel_groups, cursor), + SubscribeEvent::HandshakeSuccess { cursor } + | SubscribeEvent::HandshakeReconnectSuccess { cursor } => { + self.handshake_success_transition(cursor) + } + SubscribeEvent::HandshakeFailure { reason } => { + self.handshake_failure_transition(reason) + } + SubscribeEvent::HandshakeReconnectFailure { reason } => { + self.handshake_reconnect_failure_transition(reason) + } + SubscribeEvent::HandshakeReconnectGiveUp { reason } => { + self.handshake_reconnect_give_up_transition(reason) + } + SubscribeEvent::ReceiveSuccess { cursor, messages } + | SubscribeEvent::ReceiveReconnectSuccess { cursor, messages } => { + self.receive_success_transition(cursor, messages) + } + SubscribeEvent::ReceiveFailure { reason } => self.receive_failure_transition(reason), + SubscribeEvent::ReceiveReconnectFailure { reason } => { + self.receive_reconnect_failure_transition(reason) + } + SubscribeEvent::ReceiveReconnectGiveUp { reason } => { + self.receive_reconnect_give_up_transition(reason) + } + SubscribeEvent::Disconnect => self.disconnect_transition(), + SubscribeEvent::Reconnect => self.reconnect_transition(), + } + } + + fn transition_to( + &self, + state: Self::State, + invocations: Option>, + ) -> Transition { + Transition { + invocations: self + .exit() + .unwrap_or(vec![]) + .into_iter() + .chain(invocations.unwrap_or(vec![]).into_iter()) + .chain(state.enter().unwrap_or(vec![]).into_iter()) + .collect(), + state, + } + } +} + +#[cfg(test)] +mod should { + use super::*; + use crate::core::event_engine::EventEngine; + use crate::dx::subscribe::event_engine::{SubscribeEffect, SubscribeEffectHandler}; + use test_case::test_case; + + fn handshake_function( + _channels: Option>, + _channel_groups: Option>, + _attempt: u8, + _reason: Option, + ) { + // Do nothing. + } + + fn receive_function( + _channels: Option>, + _channel_groups: Option>, + _cursor: SubscribeCursor, + _attempt: u8, + _reason: Option, + ) { + // Do nothing. + } + + fn event_engine( + start_state: SubscribeState, + ) -> EventEngine< + SubscribeState, + SubscribeEffectHandler, + SubscribeEffect, + SubscribeEffectInvocation, + > { + EventEngine::new( + SubscribeEffectHandler::new(handshake_function, receive_function), + start_state, + ) + } + + #[test_case( + SubscribeState::Unsubscribed, + SubscribeEvent::SubscriptionChanged { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + }, + SubscribeState::Handshaking { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]) + }; + "to handshaking on subscription changed" + )] + #[test_case( + SubscribeState::Unsubscribed, + SubscribeEvent::SubscriptionRestored { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: SubscribeCursor { timetoken: 10, region: 1 } + }, + SubscribeState::Receiving { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: SubscribeCursor { timetoken: 10, region: 1 } + }; + "to handshaking on subscription restored" + )] + #[test_case( + SubscribeState::Unsubscribed, + SubscribeEvent::ReceiveFailure { + reason: PubNubError::Transport { details: "Test".to_string(), } + }, + SubscribeState::Unsubscribed; + "to not change on unexpected event" + )] + fn transition_for_unsubscribed_state( + init_state: SubscribeState, + event: SubscribeEvent, + target_state: SubscribeState, + ) { + let engine = event_engine(init_state.clone()); + assert_eq!(engine.current_state(), init_state); + + // Process event. + engine.process(&event); + + assert_eq!(engine.current_state(), target_state); + } + + #[test_case( + SubscribeState::Handshaking { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]) + }, + SubscribeEvent::SubscriptionChanged { + channels: Some(vec!["ch2".to_string()]), + channel_groups: Some(vec!["gr2".to_string()]), + }, + SubscribeState::Handshaking { + channels: Some(vec!["ch2".to_string()]), + channel_groups: Some(vec!["gr2".to_string()]) + }; + "to handshaking on subscription changed" + )] + #[test_case( + SubscribeState::Handshaking { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]) + }, + SubscribeEvent::HandshakeFailure { + reason: PubNubError::Transport { details: "Test reason".to_string() }, + }, + SubscribeState::HandshakeReconnecting { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + attempts: 0, + reason: PubNubError::Transport { details: "Test reason".to_string() }, + }; + "to handshake reconnect on handshake failure" + )] + #[test_case( + SubscribeState::Handshaking { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]) + }, + SubscribeEvent::Disconnect, + SubscribeState::HandshakeStopped { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + }; + "to handshake stopped on disconnect" + )] + #[test_case( + SubscribeState::Handshaking { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]) + }, + SubscribeEvent::HandshakeSuccess { cursor: SubscribeCursor { timetoken: 10, region: 1 } }, + SubscribeState::Receiving { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: SubscribeCursor { timetoken: 10, region: 1 } + }; + "to receiving on handshake success" + )] + #[test_case( + SubscribeState::Handshaking { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]) + }, + SubscribeEvent::SubscriptionRestored { + channels: Some(vec!["ch2".to_string()]), + channel_groups: Some(vec!["gr2".to_string()]), + cursor: SubscribeCursor { timetoken: 10, region: 1 } + }, + SubscribeState::Receiving { + channels: Some(vec!["ch2".to_string()]), + channel_groups: Some(vec!["gr2".to_string()]), + cursor: SubscribeCursor { timetoken: 10, region: 1 } + }; + "to receiving on subscription restored" + )] + #[test_case( + SubscribeState::Handshaking { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]) + }, + SubscribeEvent::HandshakeReconnectGiveUp { + reason: PubNubError::Transport { details: "Test reason".to_string(), } + }, + SubscribeState::Handshaking { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]) + }; + "to not change on unexpected event" + )] + fn transition_handshaking_state( + init_state: SubscribeState, + event: SubscribeEvent, + target_state: SubscribeState, + ) { + let engine = event_engine(init_state.clone()); + assert_eq!(engine.current_state(), init_state); + + engine.process(&event); + + assert_eq!(engine.current_state(), target_state); + } + + #[test_case( + SubscribeState::HandshakeReconnecting { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + attempts: 0, + reason: PubNubError::Transport { details: "Test reason".to_string() }, + }, + SubscribeEvent::HandshakeReconnectFailure { + reason: PubNubError::Transport { details: "Test reason on error".to_string() }, + }, + SubscribeState::HandshakeReconnecting { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + attempts: 1, + reason: PubNubError::Transport { details: "Test reason on error".to_string() }, + }; + "to handshake reconnecting on reconnect failure" + )] + #[test_case( + SubscribeState::HandshakeReconnecting { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + attempts: 0, + reason: PubNubError::Transport { details: "Test reason".to_string() }, + }, + SubscribeEvent::SubscriptionChanged { + channels: Some(vec!["ch2".to_string()]), + channel_groups: Some(vec!["gr2".to_string()]), + }, + SubscribeState::Handshaking { + channels: Some(vec!["ch2".to_string()]), + channel_groups: Some(vec!["gr2".to_string()]), + }; + "to handshaking on subscription change" + )] + #[test_case( + SubscribeState::HandshakeReconnecting { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + attempts: 0, + reason: PubNubError::Transport { details: "Test reason".to_string() }, + }, + SubscribeEvent::Disconnect, + SubscribeState::HandshakeStopped { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + }; + "to handshake stopped on disconnect" + )] + #[test_case( + SubscribeState::HandshakeReconnecting { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + attempts: 0, + reason: PubNubError::Transport { details: "Test reason".to_string() }, + }, + SubscribeEvent::HandshakeReconnectGiveUp { + reason: PubNubError::Transport { details: "Test give up reason".to_string() } + }, + SubscribeState::HandshakeFailed { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + reason: PubNubError::Transport { details: "Test give up reason".to_string() } + }; + "to handshake failed on give up" + )] + #[test_case( + SubscribeState::HandshakeReconnecting { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + attempts: 0, + reason: PubNubError::Transport { details: "Test reason".to_string() }, + }, + SubscribeEvent::HandshakeReconnectSuccess { + cursor: SubscribeCursor { timetoken: 10, region: 1 } + }, + SubscribeState::Receiving { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: SubscribeCursor { timetoken: 10, region: 1 } + }; + "to receiving on reconnect success" + )] + #[test_case( + SubscribeState::HandshakeReconnecting { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + attempts: 0, + reason: PubNubError::Transport { details: "Test reason".to_string() }, + }, + SubscribeEvent::SubscriptionRestored { + channels: Some(vec!["ch2".to_string()]), + channel_groups: Some(vec!["gr2".to_string()]), + cursor: SubscribeCursor { timetoken: 10, region: 1 } + }, + SubscribeState::Receiving { + channels: Some(vec!["ch2".to_string()]), + channel_groups: Some(vec!["gr2".to_string()]), + cursor: SubscribeCursor { timetoken: 10, region: 1 } + }; + "to receiving on subscription restored" + )] + #[test_case( + SubscribeState::HandshakeReconnecting { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + attempts: 0, + reason: PubNubError::Transport { details: "Test reason".to_string() }, + }, + SubscribeEvent::ReceiveSuccess { + cursor: SubscribeCursor { timetoken: 10, region: 1 }, + messages: vec![] + }, + SubscribeState::HandshakeReconnecting { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + attempts: 0, + reason: PubNubError::Transport { details: "Test reason".to_string() }, + }; + "to not change on unexpected event" + )] + fn transition_handshake_reconnecting_state( + init_state: SubscribeState, + event: SubscribeEvent, + target_state: SubscribeState, + ) { + let engine = event_engine(init_state.clone()); + assert_eq!(engine.current_state(), init_state); + + engine.process(&event); + + assert_eq!(engine.current_state(), target_state); + } + + #[test_case( + SubscribeState::HandshakeFailed { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + reason: PubNubError::Transport { details: "Test reason".to_string() }, + }, + SubscribeEvent::SubscriptionChanged { + channels: Some(vec!["ch2".to_string()]), + channel_groups: Some(vec!["gr2".to_string()]), + }, + SubscribeState::Handshaking { + channels: Some(vec!["ch2".to_string()]), + channel_groups: Some(vec!["gr2".to_string()]), + }; + "to handshaking on subscription changed" + )] + #[test_case( + SubscribeState::HandshakeFailed { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + reason: PubNubError::Transport { details: "Test reason".to_string() }, + }, + SubscribeEvent::Reconnect, + SubscribeState::Handshaking { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + }; + "to handshaking on reconnect" + )] + #[test_case( + SubscribeState::HandshakeFailed { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + reason: PubNubError::Transport { details: "Test reason".to_string() }, + }, + SubscribeEvent::SubscriptionRestored { + channels: Some(vec!["ch2".to_string()]), + channel_groups: Some(vec!["gr2".to_string()]), + cursor: SubscribeCursor { timetoken: 10, region: 1 } + }, + SubscribeState::Receiving { + channels: Some(vec!["ch2".to_string()]), + channel_groups: Some(vec!["gr2".to_string()]), + cursor: SubscribeCursor { timetoken: 10, region: 1 } + }; + "to receiving on subscription restored" + )] + #[test_case( + SubscribeState::HandshakeFailed { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + reason: PubNubError::Transport { details: "Test reason".to_string() }, + }, + SubscribeEvent::ReceiveSuccess { + cursor: SubscribeCursor { timetoken: 10, region: 1 }, + messages: vec![] + }, + SubscribeState::HandshakeFailed { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + reason: PubNubError::Transport { details: "Test reason".to_string() }, + }; + "to not change on unexpected event" + )] + fn transition_handshake_failed_state( + init_state: SubscribeState, + event: SubscribeEvent, + target_state: SubscribeState, + ) { + let engine = event_engine(init_state.clone()); + assert_eq!(engine.current_state(), init_state); + + engine.process(&event); + + assert_eq!(engine.current_state(), target_state); + } + + #[test_case( + SubscribeState::HandshakeStopped { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + }, + SubscribeEvent::Reconnect, + SubscribeState::Handshaking { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + }; + "to handshaking on reconnect" + )] + #[test_case( + SubscribeState::HandshakeStopped { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + }, + SubscribeEvent::ReceiveSuccess { + cursor: SubscribeCursor { timetoken: 10, region: 1 }, + messages: vec![] + }, + SubscribeState::HandshakeStopped { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + }; + "to not change on unexpected event" + )] + fn transition_handshake_stopped_state( + init_state: SubscribeState, + event: SubscribeEvent, + target_state: SubscribeState, + ) { + let engine = event_engine(init_state.clone()); + assert_eq!(engine.current_state(), init_state); + + engine.process(&event); + + assert_eq!(engine.current_state(), target_state); + } + + #[test_case( + SubscribeState::Receiving { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: SubscribeCursor { timetoken: 10, region: 1 }, + }, + SubscribeEvent::SubscriptionChanged { + channels: Some(vec!["ch2".to_string()]), + channel_groups: Some(vec!["gr2".to_string()]), + }, + SubscribeState::Receiving { + channels: Some(vec!["ch2".to_string()]), + channel_groups: Some(vec!["gr2".to_string()]), + cursor: SubscribeCursor { timetoken: 10, region: 1 }, + }; + "to receiving on subscription changed" + )] + #[test_case( + SubscribeState::Receiving { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: SubscribeCursor { timetoken: 10, region: 1 }, + }, + SubscribeEvent::SubscriptionRestored { + channels: Some(vec!["ch2".to_string()]), + channel_groups: Some(vec!["gr2".to_string()]), + cursor: SubscribeCursor { timetoken: 100, region: 1 }, + }, + SubscribeState::Receiving { + channels: Some(vec!["ch2".to_string()]), + channel_groups: Some(vec!["gr2".to_string()]), + cursor: SubscribeCursor { timetoken: 100, region: 1 }, + }; + "to receiving on subscription restored" + )] + #[test_case( + SubscribeState::Receiving { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: SubscribeCursor { timetoken: 10, region: 1 }, + }, + SubscribeEvent::ReceiveSuccess { + cursor: SubscribeCursor { timetoken: 100, region: 1 }, + messages: vec![] + }, + SubscribeState::Receiving { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: SubscribeCursor { timetoken: 100, region: 1 }, + }; + "to receiving on receive success" + )] + #[test_case( + SubscribeState::Receiving { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: SubscribeCursor { timetoken: 10, region: 1 }, + }, + SubscribeEvent::ReceiveFailure { + reason: PubNubError::Transport { details: "Test reason".to_string() } + }, + SubscribeState::ReceiveReconnecting { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: SubscribeCursor { timetoken: 10, region: 1 }, + attempts: 0, + reason: PubNubError::Transport { details: "Test reason".to_string() } + }; + "to receive reconnecting on receive failure" + )] + #[test_case( + SubscribeState::Receiving { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: SubscribeCursor { timetoken: 10, region: 1 }, + }, + SubscribeEvent::Disconnect, + SubscribeState::ReceiveStopped { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: SubscribeCursor { timetoken: 10, region: 1 }, + }; + "to receive stopped on disconnect" + )] + #[test_case( + SubscribeState::Receiving { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: SubscribeCursor { timetoken: 10, region: 1 }, + }, + SubscribeEvent::HandshakeSuccess { + cursor: SubscribeCursor { timetoken: 100, region: 1 }, + }, + SubscribeState::Receiving { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: SubscribeCursor { timetoken: 10, region: 1 }, + }; + "to not change on unexpected event" + )] + fn transition_receiving_state( + init_state: SubscribeState, + event: SubscribeEvent, + target_state: SubscribeState, + ) { + let engine = event_engine(init_state.clone()); + assert_eq!(engine.current_state(), init_state); + + engine.process(&event); + + assert_eq!(engine.current_state(), target_state); + } + + #[test_case( + SubscribeState::ReceiveReconnecting { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: SubscribeCursor { timetoken: 10, region: 1 }, + attempts: 0, + reason: PubNubError::Transport { details: "Test error".to_string() } + }, + SubscribeEvent::ReceiveReconnectFailure { + reason: PubNubError::Transport { details: "Test reconnect error".to_string() } + }, + SubscribeState::ReceiveReconnecting { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: SubscribeCursor { timetoken: 10, region: 1 }, + attempts: 1, + reason: PubNubError::Transport { details: "Test reconnect error".to_string() } + }; + "to receive reconnecting on reconnect failure" + )] + #[test_case( + SubscribeState::ReceiveReconnecting { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: SubscribeCursor { timetoken: 10, region: 1 }, + attempts: 0, + reason: PubNubError::Transport { details: "Test error".to_string() } + }, + SubscribeEvent::SubscriptionChanged { + channels: Some(vec!["ch2".to_string()]), + channel_groups: Some(vec!["gr2".to_string()]), + }, + SubscribeState::Receiving { + channels: Some(vec!["ch2".to_string()]), + channel_groups: Some(vec!["gr2".to_string()]), + cursor: SubscribeCursor { timetoken: 10, region: 1 }, + }; + "to receiving on subscription changed" + )] + #[test_case( + SubscribeState::ReceiveReconnecting { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: SubscribeCursor { timetoken: 10, region: 1 }, + attempts: 0, + reason: PubNubError::Transport { details: "Test error".to_string() } + }, + SubscribeEvent::SubscriptionRestored { + channels: Some(vec!["ch2".to_string()]), + channel_groups: Some(vec!["gr2".to_string()]), + cursor: SubscribeCursor { timetoken: 100, region: 1 }, + }, + SubscribeState::Receiving { + channels: Some(vec!["ch2".to_string()]), + channel_groups: Some(vec!["gr2".to_string()]), + cursor: SubscribeCursor { timetoken: 100, region: 1 }, + }; + "to receiving on subscription restored" + )] + #[test_case( + SubscribeState::ReceiveReconnecting { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: SubscribeCursor { timetoken: 10, region: 1 }, + attempts: 0, + reason: PubNubError::Transport { details: "Test error".to_string() } + }, + SubscribeEvent::Disconnect, + SubscribeState::ReceiveStopped { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: SubscribeCursor { timetoken: 10, region: 1 }, + }; + "to receive stopped on disconnect" + )] + #[test_case( + SubscribeState::ReceiveReconnecting { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: SubscribeCursor { timetoken: 10, region: 1 }, + attempts: 0, + reason: PubNubError::Transport { details: "Test error".to_string() } + }, + SubscribeEvent::ReceiveReconnectGiveUp { + reason: PubNubError::Transport { details: "Test give up error".to_string() } + }, + SubscribeState::ReceiveFailed { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: SubscribeCursor { timetoken: 10, region: 1 }, + reason: PubNubError::Transport { details: "Test give up error".to_string() } + }; + "to receive failed on give up" + )] + #[test_case( + SubscribeState::ReceiveReconnecting { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: SubscribeCursor { timetoken: 10, region: 1 }, + attempts: 0, + reason: PubNubError::Transport { details: "Test error".to_string() } + }, + SubscribeEvent::HandshakeSuccess { + cursor: SubscribeCursor { timetoken: 100, region: 1 }, + }, + SubscribeState::ReceiveReconnecting { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: SubscribeCursor { timetoken: 10, region: 1 }, + attempts: 0, + reason: PubNubError::Transport { details: "Test error".to_string() } + }; + "to not change on unexpected event" + )] + fn transition_receiving_reconnecting_state( + init_state: SubscribeState, + event: SubscribeEvent, + target_state: SubscribeState, + ) { + let engine = event_engine(init_state.clone()); + assert_eq!(engine.current_state(), init_state); + + engine.process(&event); + + assert_eq!(engine.current_state(), target_state); + } + + #[test_case( + SubscribeState::ReceiveFailed { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: SubscribeCursor { timetoken: 10, region: 1 }, + reason: PubNubError::Transport { details: "Test error".to_string() } + }, + SubscribeEvent::SubscriptionChanged { + channels: Some(vec!["ch2".to_string()]), + channel_groups: Some(vec!["gr2".to_string()]), + }, + SubscribeState::Receiving { + channels: Some(vec!["ch2".to_string()]), + channel_groups: Some(vec!["gr2".to_string()]), + cursor: SubscribeCursor { timetoken: 10, region: 1 }, + }; + "to receiving on subscription changed" + )] + #[test_case( + SubscribeState::ReceiveFailed { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: SubscribeCursor { timetoken: 10, region: 1 }, + reason: PubNubError::Transport { details: "Test error".to_string() } + }, + SubscribeEvent::SubscriptionRestored { + channels: Some(vec!["ch2".to_string()]), + channel_groups: Some(vec!["gr2".to_string()]), + cursor: SubscribeCursor { timetoken: 100, region: 1 }, + }, + SubscribeState::Receiving { + channels: Some(vec!["ch2".to_string()]), + channel_groups: Some(vec!["gr2".to_string()]), + cursor: SubscribeCursor { timetoken: 100, region: 1 }, + }; + "to receiving on subscription restored" + )] + #[test_case( + SubscribeState::ReceiveFailed { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: SubscribeCursor { timetoken: 10, region: 1 }, + reason: PubNubError::Transport { details: "Test error".to_string() } + }, + SubscribeEvent::Reconnect, + SubscribeState::Receiving { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: SubscribeCursor { timetoken: 10, region: 1 }, + }; + "to receiving on reconnect" + )] + #[test_case( + SubscribeState::ReceiveFailed { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: SubscribeCursor { timetoken: 10, region: 1 }, + reason: PubNubError::Transport { details: "Test error".to_string() } + }, + SubscribeEvent::HandshakeSuccess { + cursor: SubscribeCursor { timetoken: 100, region: 1 } + }, + SubscribeState::ReceiveFailed { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: SubscribeCursor { timetoken: 10, region: 1 }, + reason: PubNubError::Transport { details: "Test error".to_string() } + }; + "to not change on unexpected event" + )] + fn transition_receive_failed_state( + init_state: SubscribeState, + event: SubscribeEvent, + target_state: SubscribeState, + ) { + let engine = event_engine(init_state.clone()); + assert_eq!(engine.current_state(), init_state); + + engine.process(&event); + + assert_eq!(engine.current_state(), target_state); + } + #[test_case( + SubscribeState::ReceiveStopped { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: SubscribeCursor { timetoken: 10, region: 1 }, + }, + SubscribeEvent::Reconnect, + SubscribeState::Receiving { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: SubscribeCursor { timetoken: 10, region: 1 }, + }; + "to receiving on reconnect" + )] + #[test_case( + SubscribeState::ReceiveStopped { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: SubscribeCursor { timetoken: 10, region: 1 }, + }, + SubscribeEvent::HandshakeSuccess { + cursor: SubscribeCursor { timetoken: 100, region: 1 } + }, + SubscribeState::ReceiveStopped { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: SubscribeCursor { timetoken: 10, region: 1 }, + }; + "to not change on unexpected event" + )] + fn transition_receive_stopped_state( + init_state: SubscribeState, + event: SubscribeEvent, + target_state: SubscribeState, + ) { + let engine = event_engine(init_state.clone()); + assert_eq!(engine.current_state(), init_state); + + engine.process(&event); + + assert_eq!(engine.current_state(), target_state); + } +} diff --git a/src/dx/subscribe/mod.rs b/src/dx/subscribe/mod.rs new file mode 100644 index 00000000..bce9f661 --- /dev/null +++ b/src/dx/subscribe/mod.rs @@ -0,0 +1,9 @@ +//! Subscribe module. +//! +//! Allows subscribe to real-time updates from channels and groups. + +pub(crate) mod event_engine; + +#[doc(inline)] +pub use types::{SubscribeCursor, SubscribeStatus}; +pub mod types; diff --git a/src/dx/subscribe/types.rs b/src/dx/subscribe/types.rs new file mode 100644 index 00000000..79afa5a4 --- /dev/null +++ b/src/dx/subscribe/types.rs @@ -0,0 +1,44 @@ +//! Subscription types module. + +use crate::lib::core::fmt::{Formatter, Result}; + +/// Time cursor. +/// +/// Cursor used by subscription loop to identify point in time after +/// which updates will be delivered. +#[derive(Debug, Copy, Clone, PartialEq)] +#[allow(dead_code)] +pub struct SubscribeCursor { + /// PubNub high-precision timestamp. + /// + /// Aside of specifying exact time of receiving data / event this token used + /// to catchup / follow on real-time updates. + pub timetoken: u64, + + /// Data center region for which `timetoken` has been generated. + pub region: u32, +} + +/// Subscription statuses. +#[derive(Debug, Copy, Clone)] +pub enum SubscribeStatus { + /// Successfully connected and receiving real-time updates. + Connected, + + /// Successfully reconnected after real-time updates received has been + /// stopped. + Reconnected, + + /// Real-time updates receive stopped. + Disconnected, +} + +impl core::fmt::Display for SubscribeStatus { + fn fmt(&self, f: &mut Formatter<'_>) -> Result { + match self { + Self::Connected => write!(f, "Connected"), + Self::Reconnected => write!(f, "Reconnected"), + Self::Disconnected => write!(f, "Disconnected"), + } + } +}