Skip to content

Commit

Permalink
Add MessagesIter and messages_iter (#942)
Browse files Browse the repository at this point in the history
Closes #800
  • Loading branch information
nextonesfaster committed Aug 25, 2020
1 parent 13bf356 commit a7e2f74
Showing 1 changed file with 155 additions and 0 deletions.
155 changes: 155 additions & 0 deletions src/model/channel/channel_id.rs
Expand Up @@ -19,6 +19,7 @@ use crate::utils;
use crate::http::{Http, CacheHttp};
#[cfg(feature = "model")]
use serde_json::json;
use futures::stream::Stream;
#[cfg(feature = "collector")]
use crate::client::bridge::gateway::ShardMessenger;
#[cfg(feature = "collector")]
Expand Down Expand Up @@ -409,6 +410,45 @@ impl ChannelId {
})
}

/// Streams over all the messages in a channel.
///
/// This is accomplished and equivalent to repeated calls to [`messages`].
/// A buffer of at most 100 messages is used to reduce the number of calls.
/// necessary.
///
/// The stream returns the oldest message first, followed by newer messages.
///
/// # Examples
///
/// ```rust,no_run
/// # use serenity::model::id::ChannelId;
/// # use serenity::http::Http;
/// #
/// # async fn run() {
/// # let channel_id = ChannelId::default();
/// # let ctx = Http::default();
/// use serenity::model::channel::MessagesIter;
/// use serenity::futures::StreamExt;
///
/// let mut messages = channel_id.messages_iter(&ctx).boxed();
/// while let Some(message_result) = messages.next().await {
/// match message_result {
/// Ok(message) => println!(
/// "{} said \"{}\".",
/// message.author.name,
/// message.content,
/// ),
/// Err(error) => eprintln!("Uh oh! Error: {}", error),
/// }
/// }
/// # }
/// ```
///
/// [`messages`]: ../id/struct.ChannelId.html#method.messages
pub fn messages_iter<H: AsRef<Http>>(self, http: H) -> impl Stream<Item=Result<Message>> {
MessagesIter::<H>::stream(http, self)
}

/// Returns the name of whatever channel this id holds.
#[cfg(feature = "cache")]
pub async fn name(self, cache: impl AsRef<Cache>) -> Option<String> {
Expand Down Expand Up @@ -715,3 +755,118 @@ impl<'a> From<&'a GuildChannel> for ChannelId {
/// Gets the Id of a guild channel.
fn from(public_channel: &GuildChannel) -> ChannelId { public_channel.id }
}

/// A helper class returned by [`ChannelId::messages_iter`]
///
/// [`ChannelId::messages_iter`]: ../id/struct.ChannelId.html#method.messages_iter
#[derive(Clone, Debug)]
#[cfg(feature = "model")]
pub struct MessagesIter<H: AsRef<Http>> {
channel_id: ChannelId,
http: H,
buffer: Vec<Message>,
after: Option<MessageId>,
tried_fetch: bool,
}

#[cfg(feature = "model")]
impl<H: AsRef<Http>> MessagesIter<H> {
fn new(channel_id: ChannelId, http: H) -> MessagesIter<H> {
MessagesIter {
channel_id,
http,
buffer: Vec::new(),
after: None,
tried_fetch: false,
}
}

/// Fills the `self.buffer` cache of Messages.
///
/// This drops any messages that were currently in the buffer, so it should
/// only be called when `self.buffer` is empty. Additionally, this updates
/// `self.after` so that the next call does not return duplicate items.
/// If there are no more messages to be fetched, then this marks
/// `self.after` as None, indicating that no more calls ought to be made.
///
/// If this method is called with `self.after` as None, the first 100
/// (or lower) messages sent in the channel are added in the buffer.
///
/// The messages are sorted such that the newest message is the first
/// element of the buffer and the oldest message is the last.
async fn refresh(&mut self) -> Result<()> {
// Number of messages to fetch.
let grab_size = 100;

// If `self.after` is not set yet, we can use the channel ID to fetch
// messages sent after the first message. It also includes the first
// message sent.
self.buffer = self.channel_id
.messages(&self.http, |b|
b.after(self.after.map_or(self.channel_id.0, |m| m.0))
.limit(grab_size)
)
.await?;

// The messages received are in reverse order, that is, newest first
// and oldest last. Therefore, the "last" message is actually the first
// one in the buffer.
self.after = self.buffer.get(0)
.map(|message| message.id);

self.tried_fetch = true;

Ok(())
}

/// Streams over all the messages in a channel.
///
/// This is accomplished and equivalent to repeated calls to [`messages`].
/// A buffer of at most 100 messages is used to reduce the number of calls.
/// necessary.
///
/// The stream returns the oldest message first, followed by newer messages.
///
/// # Examples
///
/// ```rust,no_run
/// # use serenity::model::id::ChannelId;
/// # use serenity::http::Http;
/// #
/// # async fn run() {
/// # let channel_id = ChannelId::default();
/// # let ctx = Http::default();
/// use serenity::model::channel::MessagesIter;
/// use serenity::futures::StreamExt;
///
/// let mut messages = MessagesIter::<Http>::stream(&ctx, channel_id).boxed();
/// while let Some(message_result) = messages.next().await {
/// match message_result {
/// Ok(message) => println!(
/// "{} said \"{}\"",
/// message.author.name,
/// message.content,
/// ),
/// Err(error) => eprintln!("Uh oh! Error: {}", error),
/// }
/// }
/// # }
/// ```
///
/// [`messages`]: ../id/struct.ChannelId.html#method.messages
pub fn stream(http: impl AsRef<Http>, channel_id: ChannelId) -> impl Stream<Item=Result<Message>> {
let init_state = MessagesIter::new(channel_id, http);

futures::stream::unfold(init_state, |mut state| async {
if state.buffer.is_empty() && state.after.is_some() || !state.tried_fetch {
if let Err(error) = state.refresh().await {
return Some((Err(error), state));
}
}

// `pop()` returns the last element which is actually the "first"
// message. Thus, the resultant stream goes from oldest to newest.
state.buffer.pop().map(|entry| (Ok(entry), state))
})
}
}

0 comments on commit a7e2f74

Please sign in to comment.