Skip to content

Commit

Permalink
Add received message stream (#63)
Browse files Browse the repository at this point in the history
  • Loading branch information
fredr committed Oct 11, 2022
1 parent 08f3975 commit 423a559
Showing 1 changed file with 33 additions and 0 deletions.
33 changes: 33 additions & 0 deletions pubsub/src/subscription.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

use google_cloud_gax::cancel::CancellationToken;
use google_cloud_gax::grpc::codegen::futures_core::Stream;
use google_cloud_gax::grpc::{Code, Status};
use google_cloud_gax::retry::RetrySetting;
use google_cloud_googleapis::pubsub::v1::seek_request::Target;
Expand Down Expand Up @@ -103,6 +106,25 @@ impl From<SeekTo> for Target {
}
}

pub struct MessageStream {
queue: async_channel::Receiver<ReceivedMessage>,
cancel: CancellationToken,
}

impl Drop for MessageStream {
fn drop(&mut self) {
self.cancel.cancel();
}
}

impl Stream for MessageStream {
type Item = ReceivedMessage;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.get_mut().queue).poll_next(cx)
}
}

/// Subscription is a reference to a PubSub subscription.
#[derive(Clone, Debug)]
pub struct Subscription {
Expand Down Expand Up @@ -312,6 +334,17 @@ impl Subscription {
.collect())
}

/// subscribe creates a `Stream` of `ReceivedMessage`
/// Terminates the underlying `Subscriber` when dropped.
pub async fn subscribe(&self, opt: Option<SubscriberConfig>) -> Result<MessageStream, Status> {
let (tx, rx) = async_channel::unbounded::<ReceivedMessage>();

let cancel = CancellationToken::new();
Subscriber::start(cancel.clone(), self.fqsn.clone(), self.subc.clone(), tx, opt);

Ok(MessageStream { queue: rx, cancel })
}

/// receive calls f with the outstanding messages from the subscription.
/// It blocks until cancellation token is cancelled, or the service returns a non-retryable error.
/// The standard way to terminate a receive is to use CancellationToken.
Expand Down

0 comments on commit 423a559

Please sign in to comment.