Skip to content

Commit

Permalink
feat(sliding-sync): add configuration to inform about room changes wi…
Browse files Browse the repository at this point in the history
…thout positional update
  • Loading branch information
gnunicorn committed Jan 23, 2023
1 parent a632355 commit 27d33f5
Show file tree
Hide file tree
Showing 3 changed files with 185 additions and 37 deletions.
3 changes: 3 additions & 0 deletions bindings/matrix-sdk-ffi/src/api.udl
Expand Up @@ -104,6 +104,9 @@ interface SlidingSyncViewBuilder {
[Self=ByArc]
SlidingSyncViewBuilder sync_mode(SlidingSyncMode mode);

[Self=ByArc]
SlidingSyncViewBuilder send_updates_for_items(boolean enable);

[Throws=ClientError, Self=ByArc]
SlidingSyncView build();
};
Expand Down
12 changes: 8 additions & 4 deletions bindings/matrix-sdk-ffi/src/sliding_sync.rs
Expand Up @@ -378,6 +378,12 @@ impl SlidingSyncViewBuilder {
Arc::new(builder)
}

pub fn send_updates_for_items(self: Arc<Self>, enable: bool) -> Arc<Self> {
let mut builder = unwrap_or_clone_arc(self);
builder.inner = builder.inner.send_updates_for_items(enable);
Arc::new(builder)
}

pub fn ranges(self: Arc<Self>, ranges: Vec<(u32, u32)>) -> Arc<Self> {
let mut builder = unwrap_or_clone_arc(self);
builder.inner = builder.inner.ranges(ranges);
Expand Down Expand Up @@ -499,10 +505,8 @@ impl SlidingSyncView {
) -> Arc<StoppableSpawn> {
let mut room_list = self.inner.rooms_list.signal_vec_cloned().to_stream();
Arc::new(StoppableSpawn::with_handle(RUNTIME.spawn(async move {
loop {
if let Some(diff) = room_list.next().await {
observer.did_receive_update(diff.into());
}
if let Some(diff) = room_list.next().await {
observer.did_receive_update(diff.into());
}
})))
}
Expand Down
207 changes: 174 additions & 33 deletions crates/matrix-sdk/src/sliding_sync.rs
Expand Up @@ -441,10 +441,6 @@ impl SlidingSyncConfig {

trace!(len = rooms_found.len(), "rooms unfrozen");
let rooms = Arc::new(MutableBTreeMap::with_values(rooms_found));
// map the roomsmap into the views:
for v in &mut views {
v.rooms = rooms.clone();
}

let views = Arc::new(MutableVec::new_with_values(views));

Expand All @@ -455,6 +451,7 @@ impl SlidingSyncConfig {

views,
rooms,

extensions: Mutex::new(extensions).into(),
sent_extensions: Mutex::new(None).into(),
failure_count: Default::default(),
Expand Down Expand Up @@ -795,28 +792,6 @@ impl SlidingSync {
debug!("main client processed.");
self.pos.replace(Some(resp.pos));
let update = {
let mut updated_views = Vec::new();
if resp.lists.len() != views.len() {
return Err(Error::BadViewsCount {
found: resp.lists.len(),
expected: views.len(),
}
.into());
}

for (((view, generator), ranges), updates) in std::iter::zip(
std::iter::zip(std::iter::zip(views, generators), ranges),
&resp.lists,
) {
let count: u32 =
updates.count.try_into().expect("the list total count convertible into u32");
trace!("view {:?} update: {:?}", view.name, !updates.ops.is_empty());
if view.handle_response(count, &updates.ops, ranges)? {
updated_views.push(view.name.clone());
generator.update_state(count, ranges);
}
}

let mut rooms = Vec::new();
let mut rooms_map = self.rooms.lock_mut();
for (id, mut room_data) in resp.rooms.into_iter() {
Expand All @@ -841,6 +816,28 @@ impl SlidingSync {
}
}

let mut updated_views = Vec::new();
if resp.lists.len() != views.len() {
return Err(Error::BadViewsCount {
found: resp.lists.len(),
expected: views.len(),
}
.into());
}

for (((view, generator), ranges), updates) in std::iter::zip(
std::iter::zip(std::iter::zip(views, generators), ranges),
&resp.lists,
) {
let count: u32 =
updates.count.try_into().expect("the list total count convertible into u32");
trace!("view {:?} update: {:?}", view.name, !updates.ops.is_empty());
if view.handle_response(count, &updates.ops, ranges, &rooms)? {
updated_views.push(view.name.clone());
generator.update_state(count, ranges);
}
}

// Update the `to-device` next-batch if found.
if let Some(to_device_since) = resp.extensions.to_device.map(|t| t.next_batch) {
self.update_to_device_since(to_device_since)
Expand Down Expand Up @@ -1030,7 +1027,12 @@ pub struct SlidingSyncView {
#[builder(default = "20")]
batch_size: u32,

/// How many rooms request at a time when doing a full-sync catch up
/// Whether the view should send `UpdatedAt`-Diff signals for rooms
/// that have changed
#[builder(default = "false")]
send_updates_for_items: bool,

/// How many rooms request a total hen doing a full-sync catch up
#[builder(setter(into), default)]
limit: Option<u32>,

Expand Down Expand Up @@ -1142,7 +1144,6 @@ impl SlidingSyncViewBuilder {
self.finish_build()
}

// defaults
fn default_sort() -> Vec<String> {
vec!["by_recency".to_owned(), "by_name".to_owned()]
}
Expand Down Expand Up @@ -1593,6 +1594,61 @@ impl SlidingSyncView {
.collect()
}

/// Find the current valid position of the room in the vies room_list.
///
/// Only matches against the current ranges and only against filled items.
/// Invalid items are ignore. Return the total position the item was
/// found in the room_list, return None otherwise.
pub fn find_room_in_view(&self, room_id: &RoomId) -> Option<usize> {
let ranges = self.ranges.lock_ref();
let listing = self.rooms_list.lock_ref();
for (start_uint, end_uint) in ranges.iter() {
let mut cur_pos: usize = (*start_uint).try_into().unwrap();
let end: usize = (*end_uint).try_into().unwrap();
let mut iterator = listing.iter().skip(cur_pos);
while let Some(n) = iterator.next() {
if let RoomListEntry::Filled(r) = n {
if room_id == r {
return Some(cur_pos);
}
}
if cur_pos == end {
break;
}
cur_pos += 1;
}
}
None
}

/// Find the current valid position of the rooms in the views room_list.
///
/// Only matches against the current ranges and only against filled items.
/// Invalid items are ignore. Return the total position the items that were
/// found in the room_list, will skip any room not found in the rooms_list.
pub fn find_rooms_in_view(&self, room_ids: &Vec<OwnedRoomId>) -> Vec<(usize, OwnedRoomId)> {
let ranges = self.ranges.lock_ref();
let listing = self.rooms_list.lock_ref();
let mut rooms_found = Vec::new();
for (start_uint, end_uint) in ranges.iter() {
let mut cur_pos: usize = (*start_uint).try_into().unwrap();
let end: usize = (*end_uint).try_into().unwrap();
let mut iterator = listing.iter().skip(cur_pos);
while let Some(n) = iterator.next() {
if let RoomListEntry::Filled(r) = n {
if room_ids.contains(r) {
rooms_found.push((cur_pos, r.clone()));
}
}
if cur_pos == end {
break;
}
cur_pos += 1;
}
}
rooms_found
}

/// Return the room_id at the given index
pub fn get_room_id(&self, index: usize) -> Option<OwnedRoomId> {
self.rooms_list.lock_ref().get(index).and_then(|e| e.as_room_id().map(ToOwned::to_owned))
Expand All @@ -1604,6 +1660,7 @@ impl SlidingSyncView {
rooms_count: u32,
ops: &Vec<v4::SyncOp>,
ranges: &Vec<(usize, usize)>,
rooms: &Vec<OwnedRoomId>,
) -> Result<bool, Error> {
let current_rooms_count = self.rooms_count.get();
if current_rooms_count.is_none()
Expand Down Expand Up @@ -1634,19 +1691,34 @@ impl SlidingSyncView {
changed = true;
}

let mut rooms_list = self.rooms_list.lock_mut();
let _rooms_map = self.rooms.lock_mut();
{
// keep the lock scoped so that the later find_rooms_in_view doesn't deadlock
let mut rooms_list = self.rooms_list.lock_mut();
let _rooms_map = self.rooms.lock_mut();

if !ops.is_empty() {
room_ops(&mut rooms_list, ops, ranges)?;
changed = true;
if !ops.is_empty() {
room_ops(&mut rooms_list, ops, ranges)?;
changed = true;
}
}

if self.rooms_count.get() != Some(rooms_count) {
self.rooms_count.set(Some(rooms_count));
changed = true;
}

if self.send_updates_for_items && !rooms.is_empty() {
let found_views = self.find_rooms_in_view(&rooms);
if !found_views.is_empty() {
let mut rooms_list = self.rooms_list.lock_mut();
for (pos, room_id) in found_views {
// trigger an `UpdatedAt` update
rooms_list.set_cloned(pos, RoomListEntry::Filled(room_id));
changed = true;
}
}
}

if changed {
if let Err(e) = self.rooms_updated_signal.send(()) {
warn!("Could not inform about rooms updated: {:?}", e);
Expand Down Expand Up @@ -1686,3 +1758,72 @@ impl Client {
Ok(response)
}
}

#[cfg(test)]
mod test {
use ruma::room_id;
use serde_json::json;

use super::*;

#[tokio::test]
async fn check_find_room_in_view() -> crate::Result<()> {
let view = SlidingSyncViewBuilder::default()
.name("testview")
.add_range(0u32, 9u32)
.build()
.unwrap();
let full_window_update: v4::SyncOp = serde_json::from_value(json! ({
"op": "SYNC",
"range": [0, 9],
"room_ids": [
"!A00000:matrix.example",
"!A00001:matrix.example",
"!A00002:matrix.example",
"!A00003:matrix.example",
"!A00004:matrix.example",
"!A00005:matrix.example",
"!A00006:matrix.example",
"!A00007:matrix.example",
"!A00008:matrix.example",
"!A00009:matrix.example"
],
}))
.unwrap();

view.handle_response(10u32, &vec![full_window_update], &vec![(0, 9)], &vec![]).unwrap();

let a02 = room_id!("!A00002:matrix.example").to_owned();
let a05 = room_id!("!A00005:matrix.example").to_owned();
let a09 = room_id!("!A00009:matrix.example").to_owned();

assert_eq!(view.find_room_in_view(&a02), Some(2));
assert_eq!(view.find_room_in_view(&a05), Some(5));
assert_eq!(view.find_room_in_view(&a09), Some(9));

assert_eq!(
view.find_rooms_in_view(&vec![a02.clone(), a05.clone(), a09.clone()]),
vec![(2, a02.clone()), (5, a05.clone()), (9, a09.clone())]
);

// we invalidate a few in the center
let update: v4::SyncOp = serde_json::from_value(json! ({
"op": "INVALIDATE",
"range": [4, 7],
}))
.unwrap();

view.handle_response(10u32, &vec![update], &vec![(0, 3), (8, 9)], &vec![]).unwrap();

assert_eq!(view.find_room_in_view(room_id!("!A00002:matrix.example")), Some(2));
assert_eq!(view.find_room_in_view(room_id!("!A00005:matrix.example")), None);
assert_eq!(view.find_room_in_view(room_id!("!A00009:matrix.example")), Some(9));

assert_eq!(
view.find_rooms_in_view(&vec![a02.clone(), a05.clone(), a09.clone()]),
vec![(2, a02.clone()), (9, a09.clone())]
);

Ok(())
}
}

0 comments on commit 27d33f5

Please sign in to comment.