Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions src/dx/subscribe/event_engine/effect_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,22 @@ use crate::{
lib::alloc::{string::String, vec::Vec},
};

use super::SubscribeEvent;

pub(crate) type HandshakeFunction = fn(
channels: Option<Vec<String>>,
channel_groups: Option<Vec<String>>,
channels: &Option<Vec<String>>,
channel_groups: &Option<Vec<String>>,
attempt: u8,
reason: Option<PubNubError>,
);
) -> Vec<SubscribeEvent>;

pub(crate) type ReceiveFunction = fn(
channels: Option<Vec<String>>,
channel_groups: Option<Vec<String>>,
cursor: SubscribeCursor,
channels: &Option<Vec<String>>,
channel_groups: &Option<Vec<String>>,
cursor: &SubscribeCursor,
attempt: u8,
reason: Option<PubNubError>,
);
) -> Vec<SubscribeEvent>;

/// Subscription effect handler.
///
Expand Down Expand Up @@ -52,6 +54,7 @@ impl EffectHandler<SubscribeEffectInvocation, SubscribeEffect> for SubscribeEffe
} => Some(SubscribeEffect::Handshake {
channels: channels.clone(),
channel_groups: channel_groups.clone(),
executor: self.handshake,
}),
SubscribeEffectInvocation::HandshakeReconnect {
channels,
Expand Down
47 changes: 47 additions & 0 deletions src/dx/subscribe/event_engine/effects/handshake.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use crate::dx::subscribe::event_engine::{effect_handler::HandshakeFunction, SubscribeEvent};
use crate::lib::alloc::{string::String, vec::Vec};

pub(super) fn execute(
channels: &Option<Vec<String>>,
channel_groups: &Option<Vec<String>>,
executor: HandshakeFunction,
) -> Option<Vec<SubscribeEvent>> {
Some(executor(channels, channel_groups, 0, None))
}

#[cfg(test)]
mod should {
use super::*;
use crate::{core::PubNubError, dx::subscribe::SubscribeCursor};

#[test]
fn initialize_handshake_for_first_attempt() {
fn mock_handshake_function(
channels: &Option<Vec<String>>,
channel_groups: &Option<Vec<String>>,
attempt: u8,
reason: Option<PubNubError>,
) -> Vec<SubscribeEvent> {
assert_eq!(channels, &Some(vec!["ch1".to_string()]));
assert_eq!(channel_groups, &Some(vec!["cg1".to_string()]));
assert_eq!(attempt, 0);
assert_eq!(reason, None);

vec![SubscribeEvent::HandshakeSuccess {
cursor: SubscribeCursor {
timetoken: 0,
region: 0,
},
}]
}

let result = execute(
&Some(vec!["ch1".to_string()]),
&Some(vec!["cg1".to_string()]),
mock_handshake_function,
);

assert!(result.is_some());
assert!(!result.unwrap().is_empty())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ use crate::{
lib::alloc::{string::String, vec::Vec},
};

use super::effect_handler::HandshakeFunction;

mod handshake;

/// Subscription state machine effects.
#[allow(dead_code)]
pub(crate) enum SubscribeEffect {
Expand All @@ -21,6 +25,11 @@ pub(crate) enum SubscribeEffect {
/// List of channel groups which will be source of real-time updates
/// after initial subscription completion.
channel_groups: Option<Vec<String>>,

/// Executor function.
///
/// Function which will be used to execute initial subscription.
executor: HandshakeFunction,
},

/// Retry initial subscribe effect invocation.
Expand Down Expand Up @@ -121,7 +130,19 @@ impl Effect for SubscribeEffect {
F: FnMut(Option<Vec<SubscribeEvent>>),
{
// TODO: Run actual effect implementation. Maybe Effect.run function need change something in arguments.
f(None);
let events = match self {
SubscribeEffect::Handshake {
channels,
channel_groups,
executor,
} => handshake::execute(channels, channel_groups, *executor),
_ => {
/* TODO: Implement other effects */
None
}
};

f(events);
}

fn cancel(&self) {
Expand Down
6 changes: 3 additions & 3 deletions src/dx/subscribe/event_engine/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
//! Subscribe Event Engine module

#[doc(inline)]
pub(crate) use effect::SubscribeEffect;
pub(crate) mod effect;
pub(crate) use effects::SubscribeEffect;
pub(crate) mod effects;

#[doc(inline)]
#[allow(unused_imports)]
pub(crate) use effect_handler::SubscribeEffectHandler;
pub(crate) use effect_handler::{HandshakeFunction, ReceiveFunction, SubscribeEffectHandler};
pub(crate) mod effect_handler;

#[doc(inline)]
Expand Down
16 changes: 9 additions & 7 deletions src/dx/subscribe/event_engine/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -711,22 +711,24 @@ mod should {
use test_case::test_case;

fn handshake_function(
_channels: Option<Vec<String>>,
_channel_groups: Option<Vec<String>>,
_channels: &Option<Vec<String>>,
_channel_groups: &Option<Vec<String>>,
_attempt: u8,
_reason: Option<PubNubError>,
) {
) -> Vec<SubscribeEvent> {
// Do nothing.
vec![]
}

fn receive_function(
_channels: Option<Vec<String>>,
_channel_groups: Option<Vec<String>>,
_cursor: SubscribeCursor,
_channels: &Option<Vec<String>>,
_channel_groups: &Option<Vec<String>>,
_cursor: &SubscribeCursor,
_attempt: u8,
_reason: Option<PubNubError>,
) {
) -> Vec<SubscribeEvent> {
// Do nothing.
vec![]
}

fn event_engine(
Expand Down
4 changes: 4 additions & 0 deletions src/dx/subscribe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,8 @@ pub(crate) mod event_engine;

#[doc(inline)]
pub use types::{SubscribeCursor, SubscribeStatus};

pub mod types;

#[allow(dead_code)]
pub(crate) mod subscription;
43 changes: 43 additions & 0 deletions src/dx/subscribe/subscription.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use crate::{
core::{blocking::Transport, event_engine::EventEngine},
dx::subscribe::event_engine::{
effect_handler::{HandshakeFunction, ReceiveFunction, SubscribeEffectHandler},
SubscribeState,
},
lib::alloc::vec,
PubNubGenericClient,
};

use super::event_engine::{SubscribeEffect, SubscribeEffectInvocation};

type SubscribeEngine =
EventEngine<SubscribeState, SubscribeEffectHandler, SubscribeEffect, SubscribeEffectInvocation>;

/// Subscription that is responsible for getting messages from PubNub.
///
/// Subscription provides a way to get messages from PubNub. It is responsible
/// for handshake and receiving messages.
///
/// TODO: more description and examples
pub struct Subscription {
engine: SubscribeEngine,
}

impl Subscription {
pub(crate) fn subscribe<T>(_client: PubNubGenericClient<T>) -> Self
where
T: Transport,
{
// TODO: implementation is a part of the different task
let handshake: HandshakeFunction = |_, _, _, _| vec![];

let receive: ReceiveFunction = |&_, &_, &_, _, _| vec![];

Self {
engine: SubscribeEngine::new(
SubscribeEffectHandler::new(handshake, receive),
SubscribeState::Unsubscribed,
),
}
}
}