Skip to content

Commit

Permalink
Lazily clone structs only when collector filters are actually matched (
Browse files Browse the repository at this point in the history
…#1419)

This adds a new internal `LazyArc` type that is used to delay cloning `Message`s,
`Reaction`s, and `Interaction`s until it's determined whether any filters
are actually matched or not.
  • Loading branch information
sbrocket committed Jul 4, 2021
1 parent 46ace1f commit 695bbef
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 58 deletions.
58 changes: 20 additions & 38 deletions src/client/bridge/gateway/shard_runner.rs
Expand Up @@ -21,7 +21,7 @@ use crate::client::{EventHandler, RawEventHandler};
#[cfg(all(feature = "unstable_discord_api", feature = "collector"))]
use crate::collector::ComponentInteractionFilter;
#[cfg(feature = "collector")]
use crate::collector::{MessageFilter, ReactionAction, ReactionFilter};
use crate::collector::{LazyArc, LazyReactionAction, MessageFilter, ReactionFilter};
#[cfg(feature = "framework")]
use crate::framework::Framework;
use crate::gateway::{GatewayError, InterMessage, ReconnectType, Shard, ShardAction};
Expand Down Expand Up @@ -214,47 +214,29 @@ impl ShardRunner {
}
}

// Avoid the clone if there is no message filter.
if !self.message_filters.is_empty() {
if let Event::MessageCreate(ref msg_event) = &event {
let msg = Arc::new(msg_event.message.clone());

retain(&mut self.message_filters, |f| f.send_message(&msg));
}
}

// Avoid the clone if there is no reaction filter.
if !self.reaction_filters.is_empty() {
match &event {
Event::ReactionAdd(ref reaction_event) => {
let reaction =
Arc::new(ReactionAction::Added(Arc::new(reaction_event.reaction.clone())));

retain(&mut self.reaction_filters, |f| f.send_reaction(&reaction));
},
Event::ReactionRemove(ref reaction_event) => {
let reaction = Arc::new(ReactionAction::Removed(Arc::new(
reaction_event.reaction.clone(),
)));

retain(&mut self.reaction_filters, |f| f.send_reaction(&reaction));
},
_ => {},
}
}

// Avoid the clone if there is no interaction filter.
#[cfg(all(feature = "unstable_discord_api", feature = "collector"))]
if !self.component_interaction_filters.is_empty() {
if let Event::InteractionCreate(ref interaction_event) = &event {
match &event {
Event::MessageCreate(ref msg_event) => {
let mut msg = LazyArc::new(&msg_event.message);
retain(&mut self.message_filters, |f| f.send_message(&mut msg));
},
Event::ReactionAdd(ref reaction_event) => {
let mut reaction = LazyReactionAction::new(&reaction_event.reaction, true);
retain(&mut self.reaction_filters, |f| f.send_reaction(&mut reaction));
},
Event::ReactionRemove(ref reaction_event) => {
let mut reaction = LazyReactionAction::new(&reaction_event.reaction, false);
retain(&mut self.reaction_filters, |f| f.send_reaction(&mut reaction));
},
#[cfg(all(feature = "unstable_discord_api", feature = "collector"))]
Event::InteractionCreate(ref interaction_event) => {
if interaction_event.interaction.kind == InteractionType::MessageComponent {
let interaction = Arc::new(interaction_event.interaction.clone());

let mut interaction = LazyArc::new(&interaction_event.interaction);
retain(&mut self.component_interaction_filters, |f| {
f.send_interaction(&interaction)
f.send_interaction(&mut interaction)
});
}
}
},
_ => {},
}
}

Expand Down
12 changes: 7 additions & 5 deletions src/collector/component_interaction_collector.rs
Expand Up @@ -22,6 +22,7 @@ use tokio::time::{delay_for as sleep, Delay as Sleep};
use tokio::time::{sleep, Sleep};

use crate::client::bridge::gateway::ShardMessenger;
use crate::collector::LazyArc;
use crate::model::interactions::Interaction;

macro_rules! impl_component_interaction_collector {
Expand Down Expand Up @@ -129,11 +130,11 @@ impl ComponentInteractionFilter {

/// Sends an `interaction` to the consuming collector if the `interaction` conforms
/// to the constraints and the limits are not reached yet.
pub(crate) fn send_interaction(&mut self, interaction: &Arc<Interaction>) -> bool {
if self.is_passing_constraints(&interaction) {
pub(crate) fn send_interaction(&mut self, interaction: &mut LazyArc<'_, Interaction>) -> bool {
if self.is_passing_constraints(interaction) {
self.collected += 1;

if self.sender.send(Arc::clone(interaction)).is_err() {
if self.sender.send(interaction.as_arc()).is_err() {
return false;
}
}
Expand All @@ -146,7 +147,8 @@ impl ComponentInteractionFilter {
/// Checks if the `interaction` passes set constraints.
/// Constraints are optional, as it is possible to limit interactions to
/// be sent by a specific author or in a specifc guild.
fn is_passing_constraints(&self, interaction: &Arc<Interaction>) -> bool {
fn is_passing_constraints(&self, interaction: &mut LazyArc<'_, Interaction>) -> bool {
// TODO: On next branch, switch filter arg to &T so this as_arc() call can be removed.
self.options.guild_id.map_or(true, |id| Some(id) == interaction.guild_id.map(|g| g.0))
&& self.options.message_id.map_or(true, |id| {
interaction.message.as_ref().expect("expected message id").id().0 == id
Expand All @@ -155,7 +157,7 @@ impl ComponentInteractionFilter {
id == interaction.channel_id.as_ref().expect("expected channel id").0
})
&& self.options.author_id.map_or(true, |id| id == interaction.user.id.0)
&& self.options.filter.as_ref().map_or(true, |f| f(&interaction))
&& self.options.filter.as_ref().map_or(true, |f| f(&interaction.as_arc()))
}

/// Checks if the filter is within set receive and collect limits.
Expand Down
13 changes: 7 additions & 6 deletions src/collector/message_collector.rs
Expand Up @@ -21,7 +21,7 @@ use tokio::time::{delay_for as sleep, Delay as Sleep};
#[cfg(feature = "tokio")]
use tokio::time::{sleep, Sleep};

use crate::{client::bridge::gateway::ShardMessenger, model::channel::Message};
use crate::{client::bridge::gateway::ShardMessenger, collector::LazyArc, model::channel::Message};

macro_rules! impl_message_collector {
($($name:ident;)*) => {
Expand Down Expand Up @@ -111,12 +111,13 @@ impl MessageFilter {

/// Sends a `message` to the consuming collector if the `message` conforms
/// to the constraints and the limits are not reached yet.
pub(crate) fn send_message(&mut self, message: &Arc<Message>) -> bool {
if self.is_passing_constraints(&message) {
if self.options.filter.as_ref().map_or(true, |f| f(&message)) {
pub(crate) fn send_message(&mut self, message: &mut LazyArc<'_, Message>) -> bool {
if self.is_passing_constraints(message) {
// TODO: On next branch, switch filter arg to &T so this as_arc() call can be removed.
if self.options.filter.as_ref().map_or(true, |f| f(&message.as_arc())) {
self.collected += 1;

if let Err(_) = self.sender.send(Arc::clone(message)) {
if let Err(_) = self.sender.send(message.as_arc()) {
return false;
}
}
Expand All @@ -130,7 +131,7 @@ impl MessageFilter {
/// Checks if the `message` passes set constraints.
/// Constraints are optional, as it is possible to limit messages to
/// be sent by a specific author or in a specifc guild.
fn is_passing_constraints(&self, message: &Arc<Message>) -> bool {
fn is_passing_constraints(&self, message: &Message) -> bool {
self.options.guild_id.map_or(true, |g| Some(g) == message.guild_id.map(|g| g.0))
&& self.options.channel_id.map_or(true, |g| g == message.channel_id.0)
&& self.options.author_id.map_or(true, |g| g == message.author.id.0)
Expand Down
32 changes: 32 additions & 0 deletions src/collector/mod.rs
Expand Up @@ -6,7 +6,39 @@ pub mod component_interaction_collector;
pub mod message_collector;
pub mod reaction_collector;

use std::sync::Arc;

#[cfg(feature = "unstable_discord_api")]
pub use component_interaction_collector::*;
pub use message_collector::*;
pub use reaction_collector::*;

/// Wraps a &T and clones the value into an Arc<T> lazily. Used with collectors to allow inspecting
/// the value in filters while only cloning values that actually match.
#[derive(Debug)]
pub(crate) struct LazyArc<'a, T> {
value: &'a T,
arc: Option<Arc<T>>,
}

impl<'a, T: Clone> LazyArc<'a, T> {
pub fn new(value: &'a T) -> Self {
LazyArc {
value,
arc: None,
}
}

pub fn as_arc(&mut self) -> Arc<T> {
let value = self.value;
self.arc.get_or_insert_with(|| Arc::new(value.clone())).clone()
}
}

impl<'a, T> std::ops::Deref for LazyArc<'a, T> {
type Target = T;

fn deref(&self) -> &Self::Target {
&self.value
}
}
55 changes: 46 additions & 9 deletions src/collector/reaction_collector.rs
Expand Up @@ -21,7 +21,12 @@ use tokio::time::{delay_for as sleep, Delay as Sleep};
#[cfg(feature = "tokio")]
use tokio::time::{sleep, Sleep};

use crate::{client::bridge::gateway::ShardMessenger, model::channel::Reaction, model::id::UserId};
use crate::{
client::bridge::gateway::ShardMessenger,
collector::LazyArc,
model::channel::Reaction,
model::id::UserId,
};

macro_rules! impl_reaction_collector {
($($name:ident;)*) => {
Expand Down Expand Up @@ -152,6 +157,37 @@ impl ReactionAction {
}
}

#[derive(Debug)]
pub(crate) struct LazyReactionAction<'a> {
reaction: LazyArc<'a, Reaction>,
added: bool,
arc: Option<Arc<ReactionAction>>,
}

impl<'a> LazyReactionAction<'a> {
pub fn new(reaction: &'a Reaction, added: bool) -> Self {
Self {
reaction: LazyArc::new(reaction),
added,
arc: None,
}
}

pub fn as_arc(&mut self) -> Arc<ReactionAction> {
let added = self.added;
let reaction = &mut self.reaction;
self.arc
.get_or_insert_with(|| {
if added {
Arc::new(ReactionAction::Added(reaction.as_arc()))
} else {
Arc::new(ReactionAction::Removed(reaction.as_arc()))
}
})
.clone()
}
}

/// Filters events on the shard's end and sends them to the collector.
#[derive(Clone, Debug)]
pub struct ReactionFilter {
Expand All @@ -178,11 +214,11 @@ impl ReactionFilter {

/// Sends a `reaction` to the consuming collector if the `reaction` conforms
/// to the constraints and the limits are not reached yet.
pub(crate) fn send_reaction(&mut self, reaction: &Arc<ReactionAction>) -> bool {
if self.is_passing_constraints(&reaction) {
pub(crate) fn send_reaction(&mut self, reaction: &mut LazyReactionAction<'_>) -> bool {
if self.is_passing_constraints(reaction) {
self.collected += 1;

if self.sender.send(Arc::clone(reaction)).is_err() {
if self.sender.send(reaction.as_arc()).is_err() {
return false;
}
}
Expand All @@ -195,16 +231,16 @@ impl ReactionFilter {
/// Checks if the `reaction` passes set constraints.
/// Constraints are optional, as it is possible to limit reactions to
/// be sent by a specific author or in a specifc guild.
fn is_passing_constraints(&self, reaction: &Arc<ReactionAction>) -> bool {
let reaction = match **reaction {
ReactionAction::Added(ref reaction) => {
fn is_passing_constraints(&self, reaction: &mut LazyReactionAction<'_>) -> bool {
let reaction = match (reaction.added, &mut reaction.reaction) {
(true, reaction) => {
if self.options.accept_added {
reaction
} else {
return false;
}
},
ReactionAction::Removed(ref reaction) => {
(false, reaction) => {
if self.options.accept_removed {
reaction
} else {
Expand All @@ -213,14 +249,15 @@ impl ReactionFilter {
},
};

// TODO: On next branch, switch filter arg to &T so this as_arc() call can be removed.
self.options.guild_id.map_or(true, |id| Some(id) == reaction.guild_id.map(|g| g.0))
&& self.options.message_id.map_or(true, |id| id == reaction.message_id.0)
&& self.options.channel_id.map_or(true, |id| id == reaction.channel_id.0)
&& self
.options
.author_id
.map_or(true, |id| id == reaction.user_id.unwrap_or(UserId(0)).0)
&& self.options.filter.as_ref().map_or(true, |f| f(&reaction))
&& self.options.filter.as_ref().map_or(true, |f| f(&reaction.as_arc()))
}

/// Checks if the filter is within set receive and collect limits.
Expand Down

0 comments on commit 695bbef

Please sign in to comment.