Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 1 addition & 8 deletions examples/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,7 @@ async fn main() -> Result<(), Box<dyn snafu::Error>> {

let subscription = client
.subscribe()
.channels(
[
"my_channel".into(),
"other_channel".into(),
"channel-test-history".into(),
]
.to_vec(),
)
.channels(["my_channel".into(), "other_channel".into()].to_vec())
.heartbeat(10)
.filter_expression("some_filter")
.execute()?;
Expand Down
3 changes: 2 additions & 1 deletion src/core/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ pub enum PubNubError {
details: String,
},

///this error is returned when REST API request can't be handled by service.
///this error is returned when REST API request can't be handled by
/// service.
#[snafu(display("REST API error: {message}"))]
API {
/// Operation status (HTTP) code.
Expand Down
3 changes: 2 additions & 1 deletion src/dx/presence/builders/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ pub struct HeartbeatRequest<T, D> {
/// ```
#[builder(
field(vis = "pub(in crate::dx::presence)"),
setter(custom, strip_option)
setter(custom, strip_option),
default = "None"
)]
pub(in crate::dx::presence) state: Option<Vec<u8>>,

Expand Down
4 changes: 2 additions & 2 deletions src/dx/presence/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ impl<T, D> PubNubClientInstance<T, D> {
/// `user_id` on channels.
///
/// Instance of [`LeaveRequestBuilder`] returned.
pub(in crate::dx::presence) fn leave(&self) -> LeaveRequestBuilder<T, D> {
pub(crate) fn leave(&self) -> LeaveRequestBuilder<T, D> {
LeaveRequestBuilder {
pubnub_client: Some(self.clone()),
user_id: Some(self.config.user_id.clone().to_string()),
Expand Down Expand Up @@ -358,7 +358,7 @@ where
/// Prepare presence event engine instance which will be used for `user_id`
/// presence announcement and management.
#[cfg(feature = "std")]
pub(crate) fn presence_event_engine(&self) -> Arc<PresenceEventEngine> {
fn presence_event_engine(&self) -> Arc<PresenceEventEngine> {
let channel_bound = 3;
let (cancel_tx, cancel_rx) = async_channel::bounded::<String>(channel_bound);
let delayed_heartbeat_cancel_rx = cancel_rx.clone();
Expand Down
42 changes: 27 additions & 15 deletions src/dx/presence/presence_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//! presence / heartbeat module components.

use crate::{
dx::presence::event_engine::{PresenceEvent, PresenceEventEngine},
dx::presence::event_engine::PresenceEventEngine,
lib::{
alloc::sync::Arc,
core::{
Expand Down Expand Up @@ -88,29 +88,41 @@ pub(crate) struct PresenceManagerRef {

impl PresenceManagerRef {
/// Announce `join` for `user_id` on provided channels and groups.
#[allow(dead_code)]
pub(crate) fn announce_join(
&self,
channels: Option<Vec<String>>,
channel_groups: Option<Vec<String>>,
_channels: Option<Vec<String>>,
_channel_groups: Option<Vec<String>>,
) {
self.event_engine.process(&PresenceEvent::Joined {
channels,
channel_groups,
})
// TODO: Uncomment after contract test server fix.
// self.event_engine.process(&PresenceEvent::Joined {
// channels,
// channel_groups,
// })
}

/// Announce `leave` for `user_id` on provided channels and groups.
#[allow(dead_code)]
pub(crate) fn announce_left(
&self,
channels: Option<Vec<String>>,
channel_groups: Option<Vec<String>>,
_channels: Option<Vec<String>>,
_channel_groups: Option<Vec<String>>,
) {
self.event_engine.process(&PresenceEvent::Left {
channels,
channel_groups,
})
// TODO: Uncomment after contract test server fix.
// self.event_engine.process(&PresenceEvent::Left {
// channels,
// channel_groups,
// })
}

/// Announce `leave` while client disconnected.
pub(crate) fn disconnect(&self) {
// TODO: Uncomment after contract test server fix.
// self.event_engine.process(&PresenceEvent::Disconnect);
}

/// Announce `join` upon client connection.
pub(crate) fn reconnect(&self) {
// TODO: Uncomment after contract test server fix.
// self.event_engine.process(&PresenceEvent::Reconnect);
}
}

Expand Down
29 changes: 18 additions & 11 deletions src/dx/pubnub_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ pub type PubNubGenericClient<T, D> = PubNubClientInstance<PubNubMiddleware<T>, D
/// You must provide a valid [`Keyset`] with pub/sub keys and a string User ID
/// to identify the client.
///
/// To see available methods, please refer to the [`PubNubClientInstance`] documentation.
/// To see available methods, please refer to the [`PubNubClientInstance`]
/// documentation.
///
/// # Examples
/// ```
Expand Down Expand Up @@ -216,7 +217,8 @@ pub type PubNubClient = PubNubGenericClient<TransportReqwest, DeserializerSerde>
/// PubNub client raw instance.
///
/// This struct contains the actual client state.
/// It shouldn't be used directly. Use [`PubNubGenericClient`] or [`PubNubClient`] instead.
/// It shouldn't be used directly. Use [`PubNubGenericClient`] or
/// [`PubNubClient`] instead.
#[derive(Debug)]
pub struct PubNubClientInstance<T, D> {
pub(crate) inner: Arc<PubNubClientRef<T, D>>,
Expand Down Expand Up @@ -592,7 +594,8 @@ pub struct PubNubClientBuilder;
impl PubNubClientBuilder {
/// Set the transport layer for the client.
///
/// Returns [`PubNubClientRuntimeBuilder`] where depending from enabled `features` following can be set:
/// Returns [`PubNubClientRuntimeBuilder`] where depending from enabled
/// `features` following can be set:
/// * runtime environment
/// * API ket set to access [`PubNub API`].
///
Expand Down Expand Up @@ -641,7 +644,8 @@ impl PubNubClientBuilder {

/// Set the transport layer for the client.
///
/// Returns [`PubNubClientDeserializerBuilder`] where depending from enabled `features` following can be set:
/// Returns [`PubNubClientDeserializerBuilder`] where depending from enabled
/// `features` following can be set:
/// * [`PubNub API`] response deserializer
/// * API ket set to access [`PubNub API`].
///
Expand Down Expand Up @@ -690,7 +694,8 @@ impl PubNubClientBuilder {

/// Set the blocking transport layer for the client.
///
/// Returns [`PubNubClientRuntimeBuilder`] where depending from enabled `features` following can be set:
/// Returns [`PubNubClientRuntimeBuilder`] where depending from enabled
/// `features` following can be set:
/// * runtime environment
/// * API ket set to access [`PubNub API`].
///
Expand Down Expand Up @@ -742,7 +747,8 @@ impl PubNubClientBuilder {

/// Set the blocking transport layer for the client.
///
/// Returns [`PubNubClientDeserializerBuilder`] where depending from enabled `features` following can be set:
/// Returns [`PubNubClientDeserializerBuilder`] where depending from enabled
/// `features` following can be set:
/// * [`PubNub API`] response deserializer
/// * API ket set to access [`PubNub API`].
///
Expand Down Expand Up @@ -795,8 +801,8 @@ impl PubNubClientBuilder {

/// PubNub builder for [`PubNubClient`] to set API keys.
///
/// The builder provides methods to set the [`PubNub API`] keys set and returns the next
/// step of the builder with the remaining parameters.
/// The builder provides methods to set the [`PubNub API`] keys set and returns
/// the next step of the builder with the remaining parameters.
///
/// See [`PubNubClient`] for more information.
///
Expand Down Expand Up @@ -862,7 +868,8 @@ impl<T, D> PubNubClientKeySetBuilder<T, D> {
/// Runtime will be used for detached tasks spawning and delayed task execution.
///
/// Depending from enabled `features` methods may return:
/// * [`PubNubClientDeserializerBuilder`] to set custom [`PubNub API`] deserializer
/// * [`PubNubClientDeserializerBuilder`] to set custom [`PubNub API`]
/// deserializer
/// * [`PubNubClientKeySetBuilder`] to set API keys set to access [`PubNub API`]
/// * [`PubNubClientUserIdBuilder`] to set user id for the client.
///
Expand All @@ -877,7 +884,8 @@ pub struct PubNubClientRuntimeBuilder<T> {
impl<T> PubNubClientRuntimeBuilder<T> {
/// Set runtime environment.
///
/// Returns [`PubNubClientDeserializerBuilder`] where depending from enabled `features` following can be set:
/// Returns [`PubNubClientDeserializerBuilder`] where depending from enabled
/// `features` following can be set:
/// * [`PubNub API`] response deserializer
/// * API ket set to access [`PubNub API`].
///
Expand Down Expand Up @@ -1244,7 +1252,6 @@ where
/// secret_key: Some("sec-c-abc123"),
/// };
/// ```
///
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct Keyset<S>
where
Expand Down
2 changes: 1 addition & 1 deletion src/dx/subscribe/builders/raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::{
///
/// It should not be created directly, but via [`PubNubClient::subscribe`]
/// and wrapped in [`Subscription`] struct.
#[derive(Debug, Builder)]
#[derive(Builder)]
#[builder(
pattern = "owned",
name = "RawSubscriptionBuilder",
Expand Down
2 changes: 1 addition & 1 deletion src/dx/subscribe/builders/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use crate::{core::event_engine::cancel::CancellationTask, lib::alloc::sync::Arc}
/// from the [`PubNub`] network.
///
/// [`PubNub`]:https://www.pubnub.com/
#[derive(Debug, Builder)]
#[derive(Builder)]
#[builder(
pattern = "owned",
build_fn(vis = "pub(in crate::dx::subscribe)", validate = "Self::validate"),
Expand Down
34 changes: 34 additions & 0 deletions src/dx/subscribe/builders/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ pub struct SubscriptionStreamRef<D> {
///
/// Handler used each time when new data available for a stream listener.
waker: RwLock<Option<Waker>>,

/// Whether stream still valid or not.
is_valid: bool,
}

/// Subscription that is responsible for getting messages from PubNub.
Expand Down Expand Up @@ -505,6 +508,27 @@ impl Subscription {
let subscription = &update.subscription();
self.input.contains_channel(subscription) || self.input.contains_channel_group(subscription)
}

/// Invalidate all streams.
pub(crate) fn invalidate(&mut self) {
let mut stream_slot = self.stream.write();
if let Some(mut stream) = stream_slot.clone() {
stream.invalidate()
}
*stream_slot = None;

let mut stream_slot = self.status_stream.write();
if let Some(mut stream) = stream_slot.clone() {
stream.invalidate()
}
*stream_slot = None;

let mut stream_slot = self.updates_stream.write();
if let Some(mut stream) = stream_slot.clone() {
stream.invalidate()
}
*stream_slot = None;
}
}

impl<D> SubscriptionStream<D> {
Expand All @@ -516,10 +540,16 @@ impl<D> SubscriptionStream<D> {
inner: Arc::new(SubscriptionStreamRef {
updates: RwLock::new(stream_updates),
waker: RwLock::new(None),
is_valid: true,
}),
}
}

pub(crate) fn invalidate(&mut self) {
self.is_valid = false;
self.wake_task();
}

fn wake_task(&self) {
if let Some(waker) = self.waker.write().take() {
waker.wake();
Expand All @@ -531,6 +561,10 @@ impl<D> Stream for SubscriptionStream<D> {
type Item = D;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if !self.is_valid {
return Poll::Ready(None);
}

let mut waker_slot = self.waker.write();
*waker_slot = Some(cx.waker().clone());

Expand Down
4 changes: 4 additions & 0 deletions src/dx/subscribe/event_engine/effects/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ pub(super) async fn execute(
input.channel_groups().unwrap_or(Vec::new())
);

if input.is_empty {
return vec![SubscribeEvent::UnsubscribeAll];
}

executor(SubscriptionParams {
channels: &input.channels(),
channel_groups: &input.channel_groups(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ pub(super) async fn execute(
input.channel_groups().unwrap_or(Vec::new())
);

if input.is_empty {
return vec![SubscribeEvent::UnsubscribeAll];
}

executor(SubscriptionParams {
channels: &input.channels(),
channel_groups: &input.channel_groups(),
Expand Down
10 changes: 9 additions & 1 deletion src/dx/subscribe/event_engine/effects/receive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use futures::TryFutureExt;
use log::info;

use crate::{
core::PubNubError,
dx::subscribe::{
event_engine::{
effects::SubscribeEffectExecutor, types::SubscriptionParams, SubscribeEvent,
Expand All @@ -25,6 +26,10 @@ pub(crate) async fn execute(
input.channel_groups().unwrap_or(Vec::new())
);

if input.is_empty {
return vec![SubscribeEvent::UnsubscribeAll];
}

executor(SubscriptionParams {
channels: &input.channels(),
channel_groups: &input.channel_groups(),
Expand All @@ -36,7 +41,10 @@ pub(crate) async fn execute(
.map_ok_or_else(
|error| {
log::error!("Receive error: {:?}", error);
vec![SubscribeEvent::ReceiveFailure { reason: error }]

(!matches!(error, PubNubError::EffectCanceled))
.then(|| vec![SubscribeEvent::ReceiveFailure { reason: error }])
.unwrap_or(vec![])
},
|subscribe_result| {
vec![SubscribeEvent::ReceiveSuccess {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ pub(crate) async fn execute(
input.channel_groups().unwrap_or(Vec::new())
);

if input.is_empty {
return vec![SubscribeEvent::UnsubscribeAll];
}

executor(SubscriptionParams {
channels: &input.channels(),
channel_groups: &input.channel_groups(),
Expand Down
6 changes: 6 additions & 0 deletions src/dx/subscribe/event_engine/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,12 @@ impl Add for SubscribeInput {
}
}

impl Default for SubscribeInput {
fn default() -> Self {
SubscribeInput::new(&None, &None)
}
}

impl AddAssign for SubscribeInput {
fn add_assign(&mut self, rhs: Self) {
let channel_groups = self.join_sets(&self.channel_groups, &rhs.channel_groups);
Expand Down
Loading