From e648a3eb1ae4735d180896c6213fffb2bb6a57f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Tue, 30 Jan 2024 17:21:36 +0100 Subject: [PATCH] sliding_sync: Refactor for code review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Timo Kösters --- crates/matrix-sdk-base/src/client.rs | 19 ++- crates/matrix-sdk-base/src/rooms/normal.rs | 19 ++- .../src/room_list_service/room_list.rs | 17 +- crates/matrix-sdk/src/client/mod.rs | 2 +- .../src/tests/sliding_sync/room.rs | 160 +++++++++++++----- 5 files changed, 150 insertions(+), 67 deletions(-) diff --git a/crates/matrix-sdk-base/src/client.rs b/crates/matrix-sdk-base/src/client.rs index 2ef8a5713a2..680057b2ec3 100644 --- a/crates/matrix-sdk-base/src/client.rs +++ b/crates/matrix-sdk-base/src/client.rs @@ -95,8 +95,10 @@ pub struct BaseClient { /// Observable of when a user is ignored/unignored. pub(crate) ignore_user_list_changes: SharedObservable<()>, + /// A sender that is used to communicate changes to room information. Each + /// event contains the room and a boolean whether this event should + /// trigger a room list update. pub(crate) roominfo_update_sender: broadcast::Sender, - pub(crate) roominfo_update_receiver: Arc>, } #[cfg(not(tarpaulin_include))] @@ -122,7 +124,7 @@ impl BaseClient { /// * `config` - An optional session if the user already has one from a /// previous login call. pub fn with_store_config(config: StoreConfig) -> Self { - let (roominfo_update_sender, roominfo_update_receiver) = + let (roominfo_update_sender, _roominfo_update_receiver) = tokio::sync::broadcast::channel(100); BaseClient { @@ -133,7 +135,6 @@ impl BaseClient { olm_machine: Default::default(), ignore_user_list_changes: Default::default(), roominfo_update_sender, - roominfo_update_receiver: Arc::new(roominfo_update_receiver), } } @@ -1356,11 +1357,13 @@ impl BaseClient { .collect() } - /// Returns a receiver that gets events for each room info update. To watch - /// for new events, use `receiver.resubscribe()`. Each event contains the - /// room and a boolean whether this event should trigger a room list update. - pub fn roominfo_update_receiver(&self) -> &broadcast::Receiver { - &self.roominfo_update_receiver + /// Returns a new receiver that gets events for all future room info + /// updates. + /// + /// Each event contains the room and a boolean whether this event should + /// trigger a room list update. + pub fn roominfo_update_receiver(&self) -> broadcast::Receiver { + self.roominfo_update_sender.subscribe() } } diff --git a/crates/matrix-sdk-base/src/rooms/normal.rs b/crates/matrix-sdk-base/src/rooms/normal.rs index fef3beccbd6..39e84a05fbb 100644 --- a/crates/matrix-sdk-base/src/rooms/normal.rs +++ b/crates/matrix-sdk-base/src/rooms/normal.rs @@ -70,16 +70,17 @@ use crate::{ MinimalStateEvent, OriginalMinimalStateEvent, RoomMemberships, }; -/// Each time a room's RoomInfo is changed, the roominfo_update_receiver in the -/// BaseClient will receive this struct containing the update information. -/// The sender must decide if this update should cause a room list update. -/// If the sender knows that another action has already caused the room to -/// update, it should not send another update. +/// A summary of changes to room information. +/// +/// It also indicates whether this update should update the room list. #[derive(Debug, Clone)] pub struct RoomInfoUpdate { /// The room which was updated. pub room_id: OwnedRoomId, /// Whether this event should trigger the room list to update. + /// + /// If the change is minor or if another action already causes the room list + /// to update, this should be false to avoid duplicate updates. pub trigger_room_list_update: bool, } @@ -655,12 +656,14 @@ impl Room { self.inner.get() } - /// Update the summary with given RoomInfo. This also triggers an update for - /// the roominfo_update_recv. + /// Update the summary with given RoomInfo. + /// + /// This also triggers an update for room info observers if + /// `trigger_room_list_update` is true. pub fn set_room_info(&self, room_info: RoomInfo, trigger_room_list_update: bool) { self.inner.set(room_info); - // Ignore error if no receiver exists + // Ignore error if no receiver exists. let _ = self .roominfo_update_sender .send(RoomInfoUpdate { room_id: self.room_id.clone(), trigger_room_list_update }); diff --git a/crates/matrix-sdk-ui/src/room_list_service/room_list.rs b/crates/matrix-sdk-ui/src/room_list_service/room_list.rs index abaafa33f21..49bc129b2ee 100644 --- a/crates/matrix-sdk-ui/src/room_list_service/room_list.rs +++ b/crates/matrix-sdk-ui/src/room_list_service/room_list.rs @@ -125,7 +125,7 @@ impl RoomList { pub fn entries_with_dynamic_adapters( &self, page_size: usize, - roominfo_update_recv: &broadcast::Receiver, + roominfo_update_recv: broadcast::Receiver, ) -> (impl Stream>>, RoomListDynamicEntriesController) { let list = self.sliding_sync_list.clone(); @@ -142,17 +142,15 @@ impl RoomList { list.maximum_number_of_rooms_stream(), ); - let roominfo_update_recv = roominfo_update_recv.resubscribe(); - let stream = stream! { loop { let filter_fn = filter_fn_cell.take().await; let (raw_values, raw_stream) = list.room_list_stream(); // Combine normal stream events with other updates from rooms - let raw_stream_with_recv = merge_stream_and_receiver(raw_values.clone(), raw_stream, roominfo_update_recv.resubscribe()); + let merged_stream = merge_stream_and_receiver(raw_values.clone(), raw_stream, roominfo_update_recv.resubscribe()); - let (values, stream) = (raw_values, raw_stream_with_recv) + let (values, stream) = (raw_values, merged_stream) .filter(filter_fn) .dynamic_limit_with_initial_value(page_size, limit_stream.clone()); @@ -175,15 +173,18 @@ fn merge_stream_and_receiver( raw_stream: impl Stream>>, mut roominfo_update_recv: broadcast::Receiver, ) -> impl Stream>> { - let raw_stream_with_recv = stream! { + stream! { pin_mut!(raw_stream); + loop { select! { biased; // Prefer manual updates for easier test code + Ok(update) = roominfo_update_recv.recv() => { if !update.trigger_room_list_update { continue; } + // Search list for the updated room for (index, room) in raw_current_values.iter().enumerate() { if let RoomListEntry::Filled(r) = room { @@ -195,6 +196,7 @@ fn merge_stream_and_receiver( } } } + v = raw_stream.next() => { if let Some(v) = v { for change in &v { @@ -208,8 +210,7 @@ fn merge_stream_and_receiver( } } } - }; - raw_stream_with_recv + } } /// The loading state of a [`RoomList`]. diff --git a/crates/matrix-sdk/src/client/mod.rs b/crates/matrix-sdk/src/client/mod.rs index 6031273b8b7..e5381517fbe 100644 --- a/crates/matrix-sdk/src/client/mod.rs +++ b/crates/matrix-sdk/src/client/mod.rs @@ -447,7 +447,7 @@ impl Client { /// Returns a receiver that gets events for each room info update. To watch /// for new events, use `receiver.resubscribe()`. Each event contains the /// room and a boolean whether this event should trigger a room list update. - pub fn roominfo_update_receiver(&self) -> &broadcast::Receiver { + pub fn roominfo_update_receiver(&self) -> broadcast::Receiver { self.base_client().roominfo_update_receiver() } diff --git a/testing/matrix-sdk-integration-testing/src/tests/sliding_sync/room.rs b/testing/matrix-sdk-integration-testing/src/tests/sliding_sync/room.rs index b6c98f84541..83cb2cf5d35 100644 --- a/testing/matrix-sdk-integration-testing/src/tests/sliding_sync/room.rs +++ b/testing/matrix-sdk-integration-testing/src/tests/sliding_sync/room.rs @@ -4,6 +4,7 @@ use std::{ }; use anyhow::Result; +use eyeball_im::VectorDiff; use futures_util::{pin_mut, StreamExt as _}; use matrix_sdk::{ bytes::Bytes, @@ -551,34 +552,42 @@ async fn test_room_notification_count() -> Result<()> { Ok(()) } -static DROP_TODEVICE: StdMutex = StdMutex::new(true); -fn drop_todevice(response: &mut Bytes) { +/// Response preprocessor that drops to_device events +fn drop_todevice_events(response: &mut Bytes) { + // Looks for a json payload containing "extensions" with a "to_device" part. + // This should only match the sliding sync response. In all other cases, it + // makes no changes. let Ok(mut json) = serde_json::from_slice::(response) else { return; }; let Some(extensions) = json.get_mut("extensions").and_then(|e| e.as_object_mut()) else { return; }; + // Remove to_device field if it exists let Some(to_device) = extensions.remove("to_device") else { return; }; - if *DROP_TODEVICE.lock().unwrap() { - info!("Dropping to_device: {to_device}"); - *response = serde_json::to_vec(&json).unwrap().into(); - return; - } + + info!("Dropping to_device: {to_device}"); + *response = serde_json::to_vec(&json).unwrap().into(); } +/// Proxy between client and homeserver that can do arbitrary changes to the +/// payloads. +/// +/// It uses wiremock, but sends the actual server when it runs. struct CustomResponder { client: reqwest::Client, - response_preprocessor: fn(&mut Bytes), + drop_todevice: Arc>, } + impl CustomResponder { - fn new(response_preprocessor: fn(&mut Bytes)) -> Self { - Self { client: reqwest::Client::new(), response_preprocessor } + fn new() -> Self { + Self { client: reqwest::Client::new(), drop_todevice: Arc::new(StdMutex::new(true)) } } } -impl wiremock::Respond for CustomResponder { + +impl wiremock::Respond for &CustomResponder { fn respond(&self, request: &wiremock::Request) -> wiremock::ResponseTemplate { let mut req = self.client.request( request.method.to_string().parse().expect("All methods exist"), @@ -592,14 +601,15 @@ impl wiremock::Respond for CustomResponder { req = req.body(request.body.clone()); // Run await inside of non-async fn by spawning a new thread and creating a new - // runtime. Is there a better way? - let response_preprocessor = self.response_preprocessor; + // runtime. TODO: Is there a better way? + let drop_todevice = self.drop_todevice.clone(); std::thread::spawn(move || { let rt = tokio::runtime::Runtime::new().unwrap(); rt.block_on(async { let response = timeout(Duration::from_secs(2), req.send()).await; if let Ok(Ok(response)) = response { + // Convert reqwest response to wiremock response let mut r = wiremock::ResponseTemplate::new(u16::from(response.status())); for header in response.headers() { if header.0 == "Content-Length" { @@ -609,14 +619,16 @@ impl wiremock::Respond for CustomResponder { } let mut bytes = response.bytes().await.unwrap_or_default(); - response_preprocessor(&mut bytes); - r = r.set_body_bytes(bytes); - r + // Manipulate the response + if *drop_todevice.lock().unwrap() { + drop_todevice_events(&mut bytes); + } + + r.set_body_bytes(bytes) } else { // Gateway timeout - let r = wiremock::ResponseTemplate::new(504); - r + wiremock::ResponseTemplate::new(504) } }) }) @@ -628,10 +640,10 @@ impl wiremock::Respond for CustomResponder { #[tokio::test] async fn test_delayed_decryption_latest_event() -> Result<()> { let server = MockServer::start().await; - // Setup mockserver that drops to-device messages if DROP_TODEVICE is true - server - .register(Mock::given(AnyMatcher).respond_with(CustomResponder::new(drop_todevice))) - .await; + // Setup mockserver that can drop to-device messages + let custom_responder: &'static CustomResponder = Box::leak(Box::new(CustomResponder::new())); + + server.register(Mock::given(AnyMatcher).respond_with(custom_responder)).await; let alice = TestClientBuilder::new("alice".to_owned()) .randomize_username() @@ -717,28 +729,43 @@ async fn test_delayed_decryption_latest_event() -> Result<()> { entries.set_filter(new_filter_all()); pin_mut!(stream); - // Send a message, but the keys won't arrive - bob_room.send(RoomMessageEventContent::text_plain("hello world")).await?; + // Send a message, but the keys won't arrive because to-device events are stripped away from the server's response + let event = bob_room.send(RoomMessageEventContent::text_plain("hello world")).await?; - // Stream only has the initial Reset entry + // Wait shortly so the manual roominfo update is triggered before we load the + // stream. sleep(Duration::from_secs(1)).await; - timeout(Duration::from_millis(100), stream.next()).await.unwrap(); - assert!(timeout(Duration::from_millis(100), stream.next()).await.is_err()); + + // Stream only has the initial Reset entry. + assert_eq!( + timeout(Duration::from_millis(100), stream.next()).await, + Ok(Some(vec![VectorDiff::Reset { + values: vec![RoomListEntry::Filled(alice_room.room_id().to_owned())].into() + }])) + ); + assert_pending!(stream); // Latest event is not set yet - assert!(matches!(alice_room.latest_event(), None)); + assert!(alice_room.latest_event().is_none()); // Now we allow the key to come through - *DROP_TODEVICE.lock().unwrap() = false; + *custom_responder.drop_todevice.lock().unwrap() = false; + // Wait for next sync sleep(Duration::from_secs(3)).await; // Latest event is set now - alice_room.latest_event().unwrap(); + assert_eq!(alice_room.latest_event().unwrap().event_id(), Some(event.event_id)); // The stream has a single update - timeout(Duration::from_millis(100), stream.next()).await.unwrap(); - assert!(timeout(Duration::from_millis(100), stream.next()).await.is_err()); + assert_eq!( + timeout(Duration::from_millis(100), stream.next()).await, + Ok(Some(vec![VectorDiff::Set { + index: 0, + value: RoomListEntry::Filled(alice_room.room_id().to_owned()) + }])) + ); + assert_pending!(stream); Ok(()) } @@ -809,18 +836,28 @@ async fn test_roominfo_update_deduplication() -> Result<()> { .await?; alice_room.enable_encryption().await.unwrap(); - sleep(Duration::from_secs(1)).await; let (stream, entries) = alice_sync_service .room_list_service() .all_rooms() .await .unwrap() .entries_with_dynamic_adapters(10, alice.roominfo_update_receiver()); + entries.set_filter(new_filter_all()); pin_mut!(stream); - // Initial reset event - timeout(Duration::from_millis(100), stream.next()).await.unwrap(); - assert!(timeout(Duration::from_millis(100), stream.next()).await.is_err()); + + // Wait shortly so the manual roominfo update is triggered before we load the + // stream. + sleep(Duration::from_secs(1)).await; + + // Stream only has the initial Reset entry. + assert_eq!( + timeout(Duration::from_millis(100), stream.next()).await, + Ok(Some(vec![VectorDiff::Reset { + values: vec![RoomListEntry::Filled(alice_room.room_id().to_owned())].into() + }])) + ); + assert_pending!(stream); sleep(Duration::from_secs(1)).await; let alice_room = alice.get_room(alice_room.room_id()).unwrap(); @@ -832,17 +869,56 @@ async fn test_roominfo_update_deduplication() -> Result<()> { assert!(alice_room.is_encrypted().await.unwrap()); assert_eq!(bob_room.state(), RoomState::Joined); // Room update for join - timeout(Duration::from_millis(100), stream.next()).await.unwrap(); - assert!(timeout(Duration::from_millis(100), stream.next()).await.is_err()); + assert_eq!( + timeout(Duration::from_millis(100), stream.next()).await, + Ok(Some(vec![VectorDiff::Set { + index: 0, + value: RoomListEntry::Filled(alice_room.room_id().to_owned()) + }])) + ); + assert_pending!(stream); // Send a message, it should arrive - bob_room.send(RoomMessageEventContent::text_plain("hello world")).await?; + let event = bob_room.send(RoomMessageEventContent::text_plain("hello world")).await?; sleep(Duration::from_secs(1)).await; - alice_room.latest_event().unwrap(); + + // Latest event is set now + assert_eq!(alice_room.latest_event().unwrap().event_id(), Some(event.event_id)); + // Stream has the room again, but no second event - timeout(Duration::from_millis(100), stream.next()).await.unwrap(); - assert!(timeout(Duration::from_millis(100), stream.next()).await.is_err()); + // TODO: Synapse sometimes sends the same event two times. This is the + // workaround: + let updated_rooms = timeout(Duration::from_millis(100), stream.next()).await.unwrap().unwrap(); + assert!( + updated_rooms + == vec![VectorDiff::Set { + index: 0, + value: RoomListEntry::Filled(alice_room.room_id().to_owned()) + }] + || updated_rooms + == vec![ + VectorDiff::Set { + index: 0, + value: RoomListEntry::Filled(alice_room.room_id().to_owned()) + }, + VectorDiff::Set { + index: 0, + value: RoomListEntry::Filled(alice_room.room_id().to_owned()) + } + ] + ); + /* + assert_eq!( + updated_rooms, + vec![VectorDiff::Set { + index: 0, + value: RoomListEntry::Filled(alice_room.room_id().to_owned()) + }] + ); + */ + + assert_pending!(stream); Ok(()) }