Skip to content

Commit

Permalink
Implement new insertion strategy for message history
Browse files Browse the repository at this point in the history
  • Loading branch information
tarkah committed May 17, 2024
1 parent cb9b86c commit 0cd37be
Show file tree
Hide file tree
Showing 2 changed files with 183 additions and 166 deletions.
329 changes: 182 additions & 147 deletions data/src/history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use tokio::fs;
use tokio::time::Instant;

pub use self::manager::{Manager, Resource};
use crate::isupport::{ChatHistorySubcommand, MessageReference};
use crate::time::Posix;
use crate::user::Nick;
use crate::{compression, environment, isupport, message, server, Message};
Expand Down Expand Up @@ -162,166 +161,35 @@ impl History {
}
}

fn add_message(&mut self, message: Message) {
match self {
History::Partial {
messages,
last_received_at,
unread_message_count,
..
} => {
if message.triggers_unread() {
*unread_message_count += 1;
}

messages.push(message);
*last_received_at = Some(Instant::now());
}
History::Full {
messages,
last_received_at,
..
} => {
messages.push(message);
*last_received_at = Some(Instant::now());
}
fn inc_unread_count(&mut self) {
if let History::Partial {
unread_message_count,
..
} = self
{
*unread_message_count += 1;
}
}

fn add_chathistory_message(
&mut self,
message: Message,
subcommand: ChatHistorySubcommand,
message_reference: MessageReference,
) {
fn add_message(&mut self, message: Message) {
if message.triggers_unread() {
self.inc_unread_count();
}

match self {
History::Partial {
messages,
last_received_at,
unread_message_count,
..
} => {
let insert_position = match subcommand {
ChatHistorySubcommand::Latest(_) => match message_reference {
MessageReference::None => 0,
_ => messages
.iter()
.rev()
.position(|existing_message| message_reference == *existing_message)
.map_or(0, |reference_position| messages.len() - reference_position),
},
ChatHistorySubcommand::Before => return,
};

let insert_position = if let Some(unreferenceable_messages_len) = messages
.iter()
.skip(insert_position)
.position(|existing_message| {
is_referenceable_message(existing_message, None)
|| existing_message
.server_time
.signed_duration_since(message.server_time)
.num_seconds()
> 0
}) {
insert_position + unreferenceable_messages_len
} else {
messages.len()
};

if message.triggers_unread() {
*unread_message_count += 1;
}

messages.insert(insert_position, message);
*last_received_at = Some(Instant::now());
}
History::Full {
| History::Full {
messages,
last_received_at,
..
} => {
let insert_position = match subcommand {
ChatHistorySubcommand::Latest(_) => {
if message.id.clone().is_some_and(|_| {
messages.iter().any(|existing_message| {
existing_message.server_time == message.server_time
&& existing_message.id == message.id
})
}) {
return;
}

if matches!(message_reference, MessageReference::None) {
Some(0)
} else {
messages
.iter()
.rev()
.position(|existing_message| message_reference == *existing_message)
.map(|reference_position| messages.len() - reference_position)
}
}
ChatHistorySubcommand::Before => {
if message.id.clone().is_some_and(|_| {
messages.iter().any(|existing_message| {
existing_message.server_time == message.server_time
&& existing_message.id == message.id
})
}) {
return;
}

messages
.iter()
.position(|existing_message| message_reference == *existing_message)
}
};

if let Some(insert_position) = insert_position {
let insert_position = match subcommand {
ChatHistorySubcommand::Latest(_) => {
if let Some(unreferenceable_messages_len) = messages
.iter()
.skip(insert_position)
.position(|existing_message| {
is_referenceable_message(existing_message, None)
|| existing_message
.server_time
.signed_duration_since(message.server_time)
.num_seconds()
> 0
})
{
insert_position + unreferenceable_messages_len
} else {
messages.len()
}
}
ChatHistorySubcommand::Before => {
if let Some(unreferenceable_messages_len) = messages
.iter()
.rev()
.skip(messages.len() - insert_position)
.position(|existing_message| {
is_referenceable_message(existing_message, None)
&& existing_message
.server_time
.signed_duration_since(message.server_time)
.num_seconds()
<= 0
})
{
insert_position - unreferenceable_messages_len
} else {
0
}
}
};
*last_received_at = Some(Instant::now());

messages.insert(insert_position, message);
*last_received_at = Some(Instant::now());
}
insert_message(messages, message);
}
}
}
Expand Down Expand Up @@ -495,6 +363,61 @@ fn is_referenceable_message(
}
}

/// Insert the incoming message into the provided vector, sorted
/// on server time
///
/// Deduplication is only checked +/- 1 second around the server time
/// of the incoming message. Either message IDs match, or server times
/// have an exact match + target & content.
fn insert_message(messages: &mut Vec<Message>, message: Message) {
#[allow(deprecated)]
const FUZZ_DURATION: chrono::Duration = chrono::Duration::seconds(1);

if messages.is_empty() {
messages.push(message);
return;
}

let start = message.server_time - FUZZ_DURATION;
let end = message.server_time + FUZZ_DURATION;

let start_index = match messages.binary_search_by(|stored| stored.server_time.cmp(&start)) {
Ok(i) => i,
Err(i) => i,
};
let end_index = match messages.binary_search_by(|stored| stored.server_time.cmp(&end)) {
Ok(i) => i,
Err(i) => i,
};

let mut current_index = start_index;
let mut insert_at = start_index;
let mut replace_at = None;

for stored in &messages[start_index..end_index] {
if (stored.id.is_some() && message.id.is_some() && stored.id == message.id)
|| (stored.server_time == message.server_time
&& stored.target == message.target
&& stored.text == message.text)
{
replace_at = Some(current_index);
break;
}

if message.server_time >= stored.server_time {
insert_at = current_index + 1;
}

current_index += 1;
}

if let Some(index) = replace_at {
messages[index] = message;
} else {
messages.insert(insert_at, message);
}
}

#[derive(Debug)]
pub struct View<'a> {
pub total: usize,
Expand All @@ -509,3 +432,115 @@ pub enum Error {
#[error(transparent)]
Io(#[from] io::Error),
}

#[cfg(test)]
mod test {
use rand::seq::SliceRandom;

use super::*;

#[test]
#[allow(clippy::needless_range_loop)]
fn test_insert_message() {
let mut messages = vec![];

insert_message(&mut messages, message(1, None, "one"));

assert_eq!(messages.len(), 1);

// Insert before single message
insert_message(&mut messages, message(0, None, "zero"));
assert_eq!(messages[0].text, "zero".to_string());
messages.remove(0);

// Insert after single message
insert_message(&mut messages, message(2, None, "two"));
assert_eq!(messages[1].text, "two".to_string());
messages.remove(1);

// Insert way before (search slice will be empty)
insert_message(&mut messages, message(-3_000_000_000, None, "past"));
assert_eq!(messages[0].text, "past".to_string());
messages.remove(0);

// Insert way after (search slice will be empty)
insert_message(&mut messages, message(3_000_000_000, None, "future"));
assert_eq!(messages[1].text, "future".to_string());
messages.remove(1);

// Insert in random order, assert messages are ordered
{
let mut rng = rand::thread_rng();
let mut tests = (0_i64..10_000).collect::<Vec<_>>();
tests.shuffle(&mut rng);

messages.clear();

for test in tests {
let millis = test * 1_000;
insert_message(
&mut messages,
message(millis, Some(&test.to_string()), millis),
);
}

assert_eq!(messages.len(), 10_000);

for i in 0usize..10_000 {
assert_eq!(messages[i].text, (i * 1000).to_string());
}
}

// REPLACE - id match within FUZZ duration (+-1 second)
for diff in [-999, 0, 999] {
let millis = 5_000_000 + diff;

insert_message(
&mut messages,
message(millis, Some(&5000.to_string()), diff),
);
assert_eq!(messages.len(), 10_000);
assert_eq!(messages[5000].text, diff.to_string());
}

// INSERT - id match outside FUZZ duration (1 second)
for (i, diff) in [-2000, 2000].iter().enumerate() {
let millis = 5_000_000 + diff;

insert_message(
&mut messages,
message(millis, Some(&5000.to_string()), diff),
);
assert_eq!(messages.len(), 10_000 + i + 1);
}
assert_eq!(messages.len(), 10_002);

let now = Posix::now();

// REPLACE - timestamp & content match
insert_message(&mut messages, message(0, None, 0));
assert_eq!(messages.len(), 10_002);
assert!(messages[0].id.is_none());
assert!(messages[0].received_at >= now);

// INSERT - timestamp matches but not content
insert_message(&mut messages, message(0, None, "BAR"));
assert_eq!(messages.len(), 10_003);
assert!(messages[1].id.is_none());
assert_eq!(messages[1].text, "BAR".to_string());
}

fn message(millis: i64, id: Option<&str>, text: impl ToString) -> Message {
Message {
received_at: Posix::now(),
server_time: DateTime::from_timestamp_millis(millis).unwrap(),
direction: message::Direction::Received,
target: message::Target::Channel {
channel: "test".to_string(),
source: message::Source::Server(None),
},
text: text.to_string(),
id: id.map(String::from),
}
}
}

0 comments on commit 0cd37be

Please sign in to comment.