Skip to content

Commit

Permalink
Load history before making latest messages requests in order to get u…
Browse files Browse the repository at this point in the history
…p-to-date message reference.

Add fuzz interval to timestamp based `chathistory` requests.
Ignore messages received at/after `JOIN` when finding the latest message in a channel (replaces ignoring `RPL_TOPIC` and `RPL_TOPICWHOTIME` messages).
Ignore internal messages when finding the oldest/latest message in a channel.
  • Loading branch information
andymandias committed May 9, 2024
1 parent 8fc0cc2 commit 2d6f78c
Show file tree
Hide file tree
Showing 7 changed files with 141 additions and 117 deletions.
124 changes: 73 additions & 51 deletions data/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use chrono::{DateTime, Utc};
use chrono::{DateTime, TimeDelta, Utc};
use futures::channel::mpsc;
use irc::proto::{self, command, Command};
use itertools::{Either, Itertools};
Expand Down Expand Up @@ -817,7 +817,7 @@ impl Client {

if let Some(state) = self.chanmap.get_mut(channel) {
// Sends WHO to get away state on users.
if self.isupport.get(&isupport::Kind::WHOX).is_some() {
if self.isupport.contains_key(&isupport::Kind::WHOX) {
let _ = self.handle.try_send(command!(
"WHO",
channel,
Expand All @@ -836,7 +836,7 @@ impl Client {

if self.supports_chathistory {
return Some(vec![Event::ChatHistoryCommand(
ChatHistorySubcommand::Latest,
ChatHistorySubcommand::Latest(server_time(&message)),
channel.clone(),
self.get_chathistory_message_reference_type(),
self.get_chathistory_limit(),
Expand Down Expand Up @@ -997,7 +997,7 @@ impl Client {
}
Command::TOPIC(channel, topic) => {
if let Some(channel) = self.chanmap.get_mut(channel) {
channel.topic.text = topic.to_owned();
topic.clone_into(&mut channel.topic.text);

channel.topic.who = message
.user()
Expand Down Expand Up @@ -1034,7 +1034,7 @@ impl Client {
// If the channel has not been joined but is in the configured channels,
// then interpret this numeric as ERR_NEEDREGGEDNICK (which has the
// same number as ERR_NOCHANMODES)
if self.chanmap.get(channel).is_none()
if self.chanmap.contains_key(channel)
&& self
.config
.channels
Expand Down Expand Up @@ -1125,40 +1125,35 @@ impl Client {
pub fn finish_chathistory_batch(&mut self, batch: &mut Batch) {
if let Some(chathistory_target) = &batch.chathistory_target {
if let Some(channel) = self.chanmap.get_mut(chathistory_target) {
let limited_latest_batch = if let Some(ChatHistoryRequest {
if let Some(ChatHistoryRequest {
subcommand,
message_reference,
limit,
}) = &channel.chathistory_request
}) = channel.chathistory_request.clone()
{
let subcommand_is_latest_since = *subcommand == ChatHistorySubcommand::Latest
&& !matches!(message_reference, MessageReference::None);

if matches!(*subcommand, ChatHistorySubcommand::After)
|| subcommand_is_latest_since
if matches!(subcommand, ChatHistorySubcommand::Latest(_))
&& !matches!(message_reference, MessageReference::None)
{
batch.events.reverse();
}

subcommand_is_latest_since && batch.events.len() == *limit as usize
} else {
false
};
if batch.events.len() == limit as usize {
let message_reference_type =
self.get_chathistory_message_reference_type().clone();
let limit = self.get_chathistory_limit();

if limited_latest_batch {
let message_reference_type =
self.get_chathistory_message_reference_type().clone();
let limit = self.get_chathistory_limit();
batch.events.push(Event::ChatHistoryCommand(
subcommand.clone(),
chathistory_target.to_string(),
message_reference_type,
limit,
));

batch.events.push(Event::ChatHistoryCommand(
ChatHistorySubcommand::Latest,
chathistory_target.to_string(),
message_reference_type,
limit,
));
} else {
channel.chathistory_request = None;
return;
}
}
}

channel.chathistory_request = None;
}
}
}
Expand Down Expand Up @@ -1251,7 +1246,7 @@ impl Client {
};

if let Some(request) = request {
if self.isupport.get(&isupport::Kind::WHOX).is_some() {
if self.isupport.contains_key(&isupport::Kind::WHOX) {
let _ = self.handle.try_send(command!(
"WHO",
channel,
Expand Down Expand Up @@ -1424,11 +1419,11 @@ impl Map {
if let Some(channel) = client.chanmap.get_mut(target) {
if client.supports_chathistory
&& (channel.chathistory_request.is_none()
|| (subcommand == ChatHistorySubcommand::Latest
|| (matches!(subcommand, ChatHistorySubcommand::Latest(_))
&& matches!(
channel.chathistory_request,
Some(ChatHistoryRequest {
subcommand: ChatHistorySubcommand::Latest,
subcommand: ChatHistorySubcommand::Latest(_),
..
})
)))
Expand All @@ -1440,43 +1435,70 @@ impl Map {
});

match subcommand {
ChatHistorySubcommand::Latest => {
log::debug!("[{server}] requesting {limit} latest messages in {target} since {message_reference}");

let _ = client.handle.try_send(command!(
"CHATHISTORY",
"LATEST",
target,
message_reference.to_string(),
limit.to_string()
));
}
ChatHistorySubcommand::After => {
if !matches!(message_reference, MessageReference::None) {
log::debug!(
"[{server}] requesting {limit} messages in {target} after {message_reference}"
);
ChatHistorySubcommand::Latest(_) => {
let command_message_reference = match message_reference {
MessageReference::Timestamp(timestamp) => {
if let Some(fuzzed_timestamp) =
TimeDelta::try_seconds(isupport::CHATHISTORY_FUZZ_SECONDS)
.and_then(|time_delta| {
timestamp.checked_sub_signed(time_delta)
})
{
MessageReference::Timestamp(fuzzed_timestamp)
} else {
message_reference
}
}
_ => message_reference,
};

log::debug!("[{server}] requesting {limit} latest messages in {target} since {command_message_reference}");

if matches!(command_message_reference, MessageReference::None) {
let _ = client.handle.try_send(command!(
"CHATHISTORY",
"LATEST",
target,
command_message_reference.to_string(),
limit.to_string()
));
} else {
let _ = client.handle.try_send(command!(
"CHATHISTORY",
"AFTER",
target,
message_reference.to_string(),
command_message_reference.to_string(),
limit.to_string()
));
}
}
ChatHistorySubcommand::Before => {
if !matches!(message_reference, MessageReference::None) {
let command_message_reference = match message_reference {
MessageReference::Timestamp(timestamp) => {
if let Some(fuzzed_timestamp) = TimeDelta::try_seconds(
isupport::CHATHISTORY_FUZZ_SECONDS,
)
.and_then(|time_delta| {
timestamp.checked_add_signed(time_delta)
}) {
MessageReference::Timestamp(fuzzed_timestamp)
} else {
message_reference
}
}
_ => message_reference,
};

log::debug!(
"[{server}] requesting {limit} messages in {target} before {message_reference}"
"[{server}] requesting {limit} messages in {target} before {command_message_reference}"
);

let _ = client.handle.try_send(command!(
"CHATHISTORY",
"BEFORE",
target,
message_reference.to_string(),
command_message_reference.to_string(),
limit.to_string()
));
}
Expand Down
62 changes: 34 additions & 28 deletions data/src/history.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use chrono::{DateTime, Utc};
use std::path::PathBuf;
use std::time::Duration;
use std::{fmt, io};
Expand Down Expand Up @@ -186,7 +187,7 @@ impl History {
..
} => {
let insert_position = match subcommand {
ChatHistorySubcommand::Latest | ChatHistorySubcommand::After => {
ChatHistorySubcommand::Latest(_) => {
if message.id.is_some() {
if messages
.iter()
Expand All @@ -201,9 +202,7 @@ impl History {
return;
}

if subcommand == ChatHistorySubcommand::Latest
&& matches!(message_reference, MessageReference::None)
{
if matches!(message_reference, MessageReference::None) {
Some(messages.len())
} else {
messages
Expand Down Expand Up @@ -249,7 +248,7 @@ impl History {
..
} => {
let insert_position = match subcommand {
ChatHistorySubcommand::Latest | ChatHistorySubcommand::After => {
ChatHistorySubcommand::Latest(_) => {
if message.id.is_some() {
if messages
.iter()
Expand All @@ -264,7 +263,7 @@ impl History {
return;
}

if subcommand == ChatHistorySubcommand::Latest
if matches!(subcommand, ChatHistorySubcommand::Latest(_))
&& matches!(message_reference, MessageReference::None)
{
Some(messages.len())
Expand Down Expand Up @@ -401,18 +400,32 @@ impl History {
fn get_latest_message(
&self,
message_reference_type: isupport::MessageReferenceType,
join_server_time: DateTime<Utc>,
) -> Option<&Message> {
match self {
History::Partial { messages, .. } | History::Full { messages, .. } => {
match message_reference_type {
isupport::MessageReferenceType::MessageId => messages
.iter()
.rev()
.find(|message| message.id.is_some() && !is_topic_message(message)),
isupport::MessageReferenceType::Timestamp => messages
.iter()
.rev()
.find(|message| !is_topic_message(message)),
isupport::MessageReferenceType::MessageId => {
messages.iter().rev().find(|message| {
message
.server_time
.signed_duration_since(join_server_time)
.num_seconds()
< 0
&& message.id.is_some()
&& !matches!(message.target.source(), message::Source::Internal(_))
})
}
isupport::MessageReferenceType::Timestamp => {
messages.iter().rev().find(|message| {
message
.server_time
.signed_duration_since(join_server_time)
.num_seconds()
< 0
&& !matches!(message.target.source(), message::Source::Internal(_))
})
}
}
}
}
Expand All @@ -425,26 +438,19 @@ impl History {
match self {
History::Partial { messages, .. } | History::Full { messages, .. } => {
match message_reference_type {
isupport::MessageReferenceType::MessageId => messages
.iter()
.find(|message| message.id.is_some() && !is_topic_message(message)),
isupport::MessageReferenceType::Timestamp => {
messages.iter().find(|message| !is_topic_message(message))
}
isupport::MessageReferenceType::MessageId => messages.iter().find(|message| {
message.id.is_some()
&& !matches!(message.target.source(), message::Source::Internal(_))
}),
isupport::MessageReferenceType::Timestamp => messages.iter().find(|message| {
!matches!(message.target.source(), message::Source::Internal(_))
}),
}
}
}
}
}

pub fn is_topic_message(message: &Message) -> bool {
if let message::Source::Server(Some(source)) = message.target.source() {
matches!(source.kind(), message::source::server::Kind::ReplyTopic)
} else {
false
}
}

#[derive(Debug)]
pub struct View<'a> {
pub total: usize,
Expand Down
14 changes: 12 additions & 2 deletions data/src/history/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use chrono::{DateTime, Utc};
use futures::future::BoxFuture;
use futures::{future, Future, FutureExt};
use itertools::Itertools;
use tokio::time::Instant;
use tokio::{self, time::Instant};

use crate::history::{self, History};
use crate::isupport::{ChatHistorySubcommand, MessageReference};
Expand Down Expand Up @@ -205,17 +205,27 @@ impl Manager {
);
}

#[tokio::main]
pub async fn load(&mut self, server: Server, kind: history::Kind) {
let loaded_messages = history::load(&server.clone(), &kind.clone())
.map(move |result| Message::Loaded(server, kind, result))
.await;

self.update(loaded_messages);
}

pub fn get_latest_message(
&self,
server: &Server,
kind: &history::Kind,
message_reference_type: isupport::MessageReferenceType,
join_server_time: DateTime<Utc>,
) -> Option<&crate::Message> {
self.data
.map
.get(server)
.and_then(|map| map.get(kind))
.map(|history| history.get_latest_message(message_reference_type))?
.map(|history| history.get_latest_message(message_reference_type, join_server_time))?
}

pub fn get_oldest_message(
Expand Down

0 comments on commit 2d6f78c

Please sign in to comment.