Skip to content

Commit

Permalink
Fix cursor reset issue and cancelled effects spawning (#184)
Browse files Browse the repository at this point in the history
feat(subscription): add more types to create `SubscriptionCursor` from

Make it possible to create `SubscriptionCursor` from the string slice.

feat(subscription): add methods to manipulate subscriptions in set

Add `add_subscriptions(..)` and `sub_subscriptions(..)` to `SubscriptionSet` to make it possible in
addition to sets manipulation, use a list of subscriptions.

feat(subscribe): add subscription list changed status

fix(subscribe): fix `cursor` reset issue

Fix issue because of which `cursor` is not reset on `Subscription` and `SubscriptionSet`
on unsubscribe.

fix(subscription): fix issue with cancelled effects

Fix issue because of which cancelled effects still asynchronously spawned for processing.

refactor(examples): refactor `subscribe` example

Separate `subscribe` example into two to show separately `subscribe` feature and `presence state`
maintenance with subscribe.

refactor(subscribe): add subscription token validation

Added a method to validate the provided subscription token to conform to PubNub time token
requirements with precision.

refactor(subscription): split `subscribe` method

Split the `subscribe` method into two: `subscribe` and `subscribe_with_timetoken`.

docs(inline): change `client` to `pubnub` in inline docs

---------

Co-authored-by: Mateusz Wiktor <mateusz.wiktor@pubnub.com>
  • Loading branch information
parfeon and techwritermat committed Feb 8, 2024
1 parent c3b921f commit dab433d
Show file tree
Hide file tree
Showing 28 changed files with 1,162 additions and 332 deletions.
21 changes: 20 additions & 1 deletion .pubnub.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,28 @@
name: rust
version: 0.5.0
version: 0.6.0
schema: 1
scm: github.com/pubnub/rust
files: []
changelog:
- date: 2024-02-07
version: 0.6.0
changes:
- type: feature
text: "Make it possible to create `SubscriptionCursor` from the string slice."
- type: feature
text: "Add `add_subscriptions(..)` and `sub_subscriptions(..)` to `SubscriptionSet` to make it possible in addition to sets manipulation use list of subscriptions."
- type: bug
text: "Fix issue because of which `cursor` is not reset on `Subscription` and `SubscriptionSet` on unsubscribe."
- type: bug
text: "Fix issue because of which cancelled effects still asynchronously spawned for processing."
- type: improvement
text: "Change `client` to `pubnub` in inline docs."
- type: improvement
text: "Add subscription token validation."
- type: improvement
text: "Added a method to validate the provided subscription token to conform to PubNub time token requirements with precision."
- type: improvement
text: "Separate `subscribe` example into two to show separately `subscribe` feature and `presence state` maintenance with subscribe."
- date: 2024-01-25
version: 0.5.0
changes:
Expand Down
6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pubnub"
version = "0.5.0"
version = "0.6.0"
edition = "2021"
license-file = "LICENSE"
authors = ["PubNub <support@pubnub.com>"]
Expand Down Expand Up @@ -165,6 +165,10 @@ required-features = ["default"]

[[example]]
name = "subscribe"
required-features = ["default", "subscribe"]

[[example]]
name = "subscribe_with_presence_state"
required-features = ["default", "subscribe", "presence"]

[[example]]
Expand Down
56 changes: 44 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,29 +36,32 @@ Add `pubnub` to your Rust project in the `Cargo.toml` file:
```toml
# default features
[dependencies]
pubnub = "0.5.0"
pubnub = "0.6.0"

# all features
[dependencies]
pubnub = { version = "0.5.0", features = ["full"] }
pubnub = { version = "0.6.0", features = ["full"] }
```

### Example

Try the following sample code to get up and running quickly!

```rust
use pubnub::{Keyset, PubNubClientBuilder};
use pubnub::dx::subscribe::{SubscribeStreamEvent, Update};
use pubnub::subscribe::Subscriber;
use futures::StreamExt;
use tokio::time::sleep;
use std::time::Duration;
use serde_json;

use pubnub::{
dx::subscribe::Update,
subscribe::EventSubscriber,
Keyset, PubNubClientBuilder,
};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
use pubnub::subscribe::{EventEmitter, SubscriptionParams};
let publish_key = "my_publish_key";
let publish_key = "my_publish_key";
let subscribe_key = "my_subscribe_key";
let client = PubNubClientBuilder::with_reqwest_transport()
.with_keyset(Keyset {
Expand All @@ -68,6 +71,7 @@ let publish_key = "my_publish_key";
})
.with_user_id("user_id")
.build()?;

println!("PubNub instance created");

let subscription = client.subscription(SubscriptionParams {
Expand All @@ -76,7 +80,13 @@ let publish_key = "my_publish_key";
options: None
});

println!("Subscribed to channel");
let channel_entity = client.channel("my_channel_2");
let channel_entity_subscription = channel_entity.subscription(None);

subscription.subscribe();
channel_entity_subscription.subscribe();

println!("Subscribed to channels");

// Launch a new task to print out each received message
tokio::spawn(client.status_stream().for_each(|status| async move {
Expand Down Expand Up @@ -107,7 +117,21 @@ let publish_key = "my_publish_key";
}
}));

sleep(Duration::from_secs(1)).await;
// Explicitly listen only for real-time `message` updates.
tokio::spawn(
channel_entity_subscription
.messages_stream()
.for_each(|message| async move {
if let Ok(utf8_message) = String::from_utf8(message.data.clone()) {
if let Ok(cleaned) = serde_json::from_str::<String>(&utf8_message) {
println!("message: {}", cleaned);
}
}
}),
);

sleep(Duration::from_secs(2)).await;

// Send a message to the channel
client
.publish_message("hello world!")
Expand All @@ -116,7 +140,15 @@ let publish_key = "my_publish_key";
.execute()
.await?;

sleep(Duration::from_secs(10)).await;
// Send a message to another channel
client
.publish_message("hello world on the other channel!")
.channel("my_channel_2")
.r#type("text-message")
.execute()
.await?;

sleep(Duration::from_secs(15)).await;

Ok(())
}
Expand All @@ -132,11 +164,11 @@ disable them in the `Cargo.toml` file, like so:
```toml
# only blocking and access + default features
[dependencies]
pubnub = { version = "0.5.0", features = ["blocking", "access"] }
pubnub = { version = "0.6.0", features = ["blocking", "access"] }

# only parse_token + default features
[dependencies]
pubnub = { version = "0.5.0", features = ["parse_token"] }
pubnub = { version = "0.6.0", features = ["parse_token"] }
```

### Available features
Expand Down Expand Up @@ -175,7 +207,7 @@ you need, for example:

```toml
[dependencies]
pubnub = { version = "0.5.0", default-features = false, features = ["serde", "publish",
pubnub = { version = "0.6.0", default-features = false, features = ["serde", "publish",
"blocking"] }
```

Expand Down
69 changes: 27 additions & 42 deletions examples/subscribe.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::collections::HashMap;

use futures::StreamExt;
use serde::Deserialize;
use std::env;
Expand All @@ -26,7 +24,7 @@ async fn main() -> Result<(), Box<dyn snafu::Error>> {
let publish_key = env::var("SDK_PUB_KEY")?;
let subscribe_key = env::var("SDK_SUB_KEY")?;

let client = PubNubClientBuilder::with_reqwest_transport()
let pubnub = PubNubClientBuilder::with_reqwest_transport()
.with_keyset(Keyset {
subscribe_key,
publish_key: Some(publish_key),
Expand All @@ -40,36 +38,25 @@ async fn main() -> Result<(), Box<dyn snafu::Error>> {

println!("running!");

client
.set_presence_state(HashMap::<String, String>::from([
(
"is_doing".to_string(),
"Nothing... Just hanging around...".to_string(),
),
("flag".to_string(), "false".to_string()),
]))
.channels(["my_channel".into(), "other_channel".into()].to_vec())
.user_id("user_id")
.execute()
.await?;

tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;

let subscription = client.subscription(SubscriptionParams {
let subscription = pubnub.subscription(SubscriptionParams {
channels: Some(&["my_channel", "other_channel"]),
channel_groups: None,
options: Some(vec![SubscriptionOptions::ReceivePresenceEvents]),
});
subscription.subscribe(None);
subscription.subscribe();
let subscription_clone = subscription.clone_empty();

// Attach connection status to the PubNub client instance.
tokio::spawn(
client
pubnub
.status_stream()
.for_each(|status| async move { println!("\nstatus: {:?}", status) }),
);

// Example of the "global" listener for multiplexed subscription object from
// PubNub client.
tokio::spawn(subscription.stream().for_each(|event| async move {
match event {
Update::Message(message) | Update::Signal(message) => {
Expand All @@ -96,48 +83,44 @@ async fn main() -> Result<(), Box<dyn snafu::Error>> {
}
}));

tokio::spawn(subscription_clone.stream().for_each(|event| async move {
match event {
Update::Message(message) | Update::Signal(message) => {
// Explicitly listen only for real-time `message` updates.
tokio::spawn(
subscription_clone
.messages_stream()
.for_each(|message| async move {
// Deserialize the message payload as you wish
match serde_json::from_slice::<Message>(&message.data) {
Ok(message) => println!("(b) defined message: {:?}", message),
Err(_) => {
println!("(b) other message: {:?}", String::from_utf8(message.data))
}
}
}
Update::Presence(presence) => {
println!("(b) presence: {:?}", presence)
}
Update::AppContext(object) => {
println!("(b) object: {:?}", object)
}
Update::MessageAction(action) => {
println!("(b) message action: {:?}", action)
}
Update::File(file) => {
println!("(b) file: {:?}", file)
}
}
}));
}),
);

// Explicitly listen only for real-time `file` updates.
tokio::spawn(
subscription_clone
.files_stream()
.for_each(|file| async move { println!("(b) file: {:?}", file) }),
);

// Sleep for a minute. Now you can send messages to the channels
// "my_channel" and "other_channel" and see them printed in the console.
// You can use the publish example or [PubNub console](https://www.pubnub.com/docs/console/)
// You can use the publishing example or [PubNub console](https://www.pubnub.com/docs/console/)
// to send messages.
tokio::time::sleep(tokio::time::Duration::from_secs(15)).await;

// You can also cancel the subscription at any time.
// subscription.unsubscribe();

println!("\nDisconnect from the real-time data stream");
client.disconnect();
pubnub.disconnect();

tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;

println!("\nReconnect to the real-time data stream");
client.reconnect(None);
pubnub.reconnect(None);

// Let event engine process unsubscribe request
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
Expand All @@ -147,9 +130,11 @@ async fn main() -> Result<(), Box<dyn snafu::Error>> {
// drop(subscription_clone);

println!(
"\nUnsubscribe from all data streams. To restore requires `subscription.subscribe(None)` call." );
"\nUnsubscribe from all data streams. To restore call `subscription.subscribe()` or \
`subscription.subscribe_with_timetoken(Some(<timetoken>)) call."
);
// Clean up before complete work with PubNub client instance.
client.unsubscribe_all();
pubnub.unsubscribe_all();
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;

Ok(())
Expand Down
Loading

0 comments on commit dab433d

Please sign in to comment.