diff --git a/src/dx/subscribe/event_engine/effect_handler.rs b/src/dx/subscribe/event_engine/effect_handler.rs index 065cadad..ba5e2a7b 100644 --- a/src/dx/subscribe/event_engine/effect_handler.rs +++ b/src/dx/subscribe/event_engine/effect_handler.rs @@ -7,20 +7,22 @@ use crate::{ lib::alloc::{string::String, vec::Vec}, }; +use super::SubscribeEvent; + pub(crate) type HandshakeFunction = fn( - channels: Option>, - channel_groups: Option>, + channels: &Option>, + channel_groups: &Option>, attempt: u8, reason: Option, -); +) -> Vec; pub(crate) type ReceiveFunction = fn( - channels: Option>, - channel_groups: Option>, - cursor: SubscribeCursor, + channels: &Option>, + channel_groups: &Option>, + cursor: &SubscribeCursor, attempt: u8, reason: Option, -); +) -> Vec; /// Subscription effect handler. /// @@ -52,6 +54,7 @@ impl EffectHandler for SubscribeEffe } => Some(SubscribeEffect::Handshake { channels: channels.clone(), channel_groups: channel_groups.clone(), + executor: self.handshake, }), SubscribeEffectInvocation::HandshakeReconnect { channels, diff --git a/src/dx/subscribe/event_engine/effects/handshake.rs b/src/dx/subscribe/event_engine/effects/handshake.rs new file mode 100644 index 00000000..c62fcd64 --- /dev/null +++ b/src/dx/subscribe/event_engine/effects/handshake.rs @@ -0,0 +1,47 @@ +use crate::dx::subscribe::event_engine::{effect_handler::HandshakeFunction, SubscribeEvent}; +use crate::lib::alloc::{string::String, vec::Vec}; + +pub(super) fn execute( + channels: &Option>, + channel_groups: &Option>, + executor: HandshakeFunction, +) -> Option> { + Some(executor(channels, channel_groups, 0, None)) +} + +#[cfg(test)] +mod should { + use super::*; + use crate::{core::PubNubError, dx::subscribe::SubscribeCursor}; + + #[test] + fn initialize_handshake_for_first_attempt() { + fn mock_handshake_function( + channels: &Option>, + channel_groups: &Option>, + attempt: u8, + reason: Option, + ) -> Vec { + assert_eq!(channels, &Some(vec!["ch1".to_string()])); + assert_eq!(channel_groups, &Some(vec!["cg1".to_string()])); + assert_eq!(attempt, 0); + assert_eq!(reason, None); + + vec![SubscribeEvent::HandshakeSuccess { + cursor: SubscribeCursor { + timetoken: 0, + region: 0, + }, + }] + } + + let result = execute( + &Some(vec!["ch1".to_string()]), + &Some(vec!["cg1".to_string()]), + mock_handshake_function, + ); + + assert!(result.is_some()); + assert!(!result.unwrap().is_empty()) + } +} diff --git a/src/dx/subscribe/event_engine/effect.rs b/src/dx/subscribe/event_engine/effects/mod.rs similarity index 88% rename from src/dx/subscribe/event_engine/effect.rs rename to src/dx/subscribe/event_engine/effects/mod.rs index 6ede8cf0..b36a55cd 100644 --- a/src/dx/subscribe/event_engine/effect.rs +++ b/src/dx/subscribe/event_engine/effects/mod.rs @@ -5,6 +5,10 @@ use crate::{ lib::alloc::{string::String, vec::Vec}, }; +use super::effect_handler::HandshakeFunction; + +mod handshake; + /// Subscription state machine effects. #[allow(dead_code)] pub(crate) enum SubscribeEffect { @@ -21,6 +25,11 @@ pub(crate) enum SubscribeEffect { /// List of channel groups which will be source of real-time updates /// after initial subscription completion. channel_groups: Option>, + + /// Executor function. + /// + /// Function which will be used to execute initial subscription. + executor: HandshakeFunction, }, /// Retry initial subscribe effect invocation. @@ -121,7 +130,19 @@ impl Effect for SubscribeEffect { F: FnMut(Option>), { // TODO: Run actual effect implementation. Maybe Effect.run function need change something in arguments. - f(None); + let events = match self { + SubscribeEffect::Handshake { + channels, + channel_groups, + executor, + } => handshake::execute(channels, channel_groups, *executor), + _ => { + /* TODO: Implement other effects */ + None + } + }; + + f(events); } fn cancel(&self) { diff --git a/src/dx/subscribe/event_engine/mod.rs b/src/dx/subscribe/event_engine/mod.rs index 70510aef..87c70d6a 100644 --- a/src/dx/subscribe/event_engine/mod.rs +++ b/src/dx/subscribe/event_engine/mod.rs @@ -1,12 +1,12 @@ //! Subscribe Event Engine module #[doc(inline)] -pub(crate) use effect::SubscribeEffect; -pub(crate) mod effect; +pub(crate) use effects::SubscribeEffect; +pub(crate) mod effects; #[doc(inline)] #[allow(unused_imports)] -pub(crate) use effect_handler::SubscribeEffectHandler; +pub(crate) use effect_handler::{HandshakeFunction, ReceiveFunction, SubscribeEffectHandler}; pub(crate) mod effect_handler; #[doc(inline)] diff --git a/src/dx/subscribe/event_engine/state.rs b/src/dx/subscribe/event_engine/state.rs index 13903a47..5860a18f 100644 --- a/src/dx/subscribe/event_engine/state.rs +++ b/src/dx/subscribe/event_engine/state.rs @@ -711,22 +711,24 @@ mod should { use test_case::test_case; fn handshake_function( - _channels: Option>, - _channel_groups: Option>, + _channels: &Option>, + _channel_groups: &Option>, _attempt: u8, _reason: Option, - ) { + ) -> Vec { // Do nothing. + vec![] } fn receive_function( - _channels: Option>, - _channel_groups: Option>, - _cursor: SubscribeCursor, + _channels: &Option>, + _channel_groups: &Option>, + _cursor: &SubscribeCursor, _attempt: u8, _reason: Option, - ) { + ) -> Vec { // Do nothing. + vec![] } fn event_engine( diff --git a/src/dx/subscribe/mod.rs b/src/dx/subscribe/mod.rs index bce9f661..4f4fa4e0 100644 --- a/src/dx/subscribe/mod.rs +++ b/src/dx/subscribe/mod.rs @@ -6,4 +6,8 @@ pub(crate) mod event_engine; #[doc(inline)] pub use types::{SubscribeCursor, SubscribeStatus}; + pub mod types; + +#[allow(dead_code)] +pub(crate) mod subscription; diff --git a/src/dx/subscribe/subscription.rs b/src/dx/subscribe/subscription.rs new file mode 100644 index 00000000..2e03f8c8 --- /dev/null +++ b/src/dx/subscribe/subscription.rs @@ -0,0 +1,43 @@ +use crate::{ + core::{blocking::Transport, event_engine::EventEngine}, + dx::subscribe::event_engine::{ + effect_handler::{HandshakeFunction, ReceiveFunction, SubscribeEffectHandler}, + SubscribeState, + }, + lib::alloc::vec, + PubNubGenericClient, +}; + +use super::event_engine::{SubscribeEffect, SubscribeEffectInvocation}; + +type SubscribeEngine = + EventEngine; + +/// Subscription that is responsible for getting messages from PubNub. +/// +/// Subscription provides a way to get messages from PubNub. It is responsible +/// for handshake and receiving messages. +/// +/// TODO: more description and examples +pub struct Subscription { + engine: SubscribeEngine, +} + +impl Subscription { + pub(crate) fn subscribe(_client: PubNubGenericClient) -> Self + where + T: Transport, + { + // TODO: implementation is a part of the different task + let handshake: HandshakeFunction = |_, _, _, _| vec![]; + + let receive: ReceiveFunction = |&_, &_, &_, _, _| vec![]; + + Self { + engine: SubscribeEngine::new( + SubscribeEffectHandler::new(handshake, receive), + SubscribeState::Unsubscribed, + ), + } + } +}