From 83957f679f10ce502df096e8042040eb803e3899 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Tue, 23 Jan 2024 16:04:56 +0100 Subject: [PATCH] sliding_sync: More documentation for roominfo sender/receiver MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Timo Kösters --- crates/matrix-sdk-base/src/client.rs | 4 +++- crates/matrix-sdk-base/src/rooms/normal.rs | 2 +- crates/matrix-sdk-base/src/store/mod.rs | 6 +++--- .../src/room_list_service/room_list.rs | 5 +++-- .../tests/integration/room_list_service.rs | 6 ++++-- crates/matrix-sdk/src/client/mod.rs | 2 ++ .../src/tests/sliding_sync/room.rs | 15 +++++++++++++-- 7 files changed, 29 insertions(+), 11 deletions(-) diff --git a/crates/matrix-sdk-base/src/client.rs b/crates/matrix-sdk-base/src/client.rs index 3d5357b8e3e..18568fff9dd 100644 --- a/crates/matrix-sdk-base/src/client.rs +++ b/crates/matrix-sdk-base/src/client.rs @@ -199,7 +199,7 @@ impl BaseClient { /// This method panics if it is called twice. pub async fn set_session_meta(&self, session_meta: SessionMeta) -> Result<()> { debug!(user_id = ?session_meta.user_id, device_id = ?session_meta.device_id, "Restoring login"); - self.store.set_session_meta(session_meta.clone(), self).await?; + self.store.set_session_meta(session_meta.clone(), &self.roominfo_update_sender).await?; #[cfg(feature = "e2e-encryption")] self.regenerate_olm().await?; @@ -1323,6 +1323,8 @@ impl BaseClient { .collect() } + /// Returns a receiver that gets events for each room info update. To watch + /// for new events, use `receiver.resubscribe()`. pub fn roominfo_update_receiver(&self) -> &broadcast::Receiver { &self.roominfo_update_receiver } diff --git a/crates/matrix-sdk-base/src/rooms/normal.rs b/crates/matrix-sdk-base/src/rooms/normal.rs index 09c0384f351..012df80a37b 100644 --- a/crates/matrix-sdk-base/src/rooms/normal.rs +++ b/crates/matrix-sdk-base/src/rooms/normal.rs @@ -607,7 +607,7 @@ impl Room { pub async fn update_summary(&self, summary: RoomInfo) { self.inner.set(summary); - // Ignore error if receiver is down + // Ignore error if no receiver exists let _ = self.roominfo_update_sender.send(self.room_id.clone()); } diff --git a/crates/matrix-sdk-base/src/store/mod.rs b/crates/matrix-sdk-base/src/store/mod.rs index bef28da0837..98f8b8093f3 100644 --- a/crates/matrix-sdk-base/src/store/mod.rs +++ b/crates/matrix-sdk-base/src/store/mod.rs @@ -58,7 +58,7 @@ pub type BoxStream = Pin + Send>>; use crate::{ rooms::{RoomInfo, RoomState}, - BaseClient, MinimalRoomMemberEvent, Room, RoomStateFilter, SessionMeta, + MinimalRoomMemberEvent, Room, RoomStateFilter, SessionMeta, }; pub(crate) mod ambiguity_map; @@ -177,14 +177,14 @@ impl Store { pub async fn set_session_meta( &self, session_meta: SessionMeta, - client: &BaseClient, + roominfo_update_sender: &broadcast::Sender, ) -> Result<()> { for info in self.inner.get_room_infos().await? { let room = Room::restore( &session_meta.user_id, self.inner.clone(), info, - client.roominfo_update_sender.clone(), + roominfo_update_sender.clone(), ); self.rooms.write().unwrap().insert(room.room_id().to_owned(), room); } diff --git a/crates/matrix-sdk-ui/src/room_list_service/room_list.rs b/crates/matrix-sdk-ui/src/room_list_service/room_list.rs index c567c71f6a7..330c558da36 100644 --- a/crates/matrix-sdk-ui/src/room_list_service/room_list.rs +++ b/crates/matrix-sdk-ui/src/room_list_service/room_list.rs @@ -167,8 +167,9 @@ impl RoomList { } } -/// This function remembers the current state of the unfiltered room list, so it knows where all rooms are. -/// When the receiver is triggered, a Set operation for the room position is inserted to the stream. +/// This function remembers the current state of the unfiltered room list, so it +/// knows where all rooms are. When the receiver is triggered, a Set operation +/// for the room position is inserted to the stream. fn merge_stream_and_receiver( mut raw_current_values: Vector, raw_stream: impl Stream>>, diff --git a/crates/matrix-sdk-ui/tests/integration/room_list_service.rs b/crates/matrix-sdk-ui/tests/integration/room_list_service.rs index dd9937b5ebf..dac8a42a6bc 100644 --- a/crates/matrix-sdk-ui/tests/integration/room_list_service.rs +++ b/crates/matrix-sdk-ui/tests/integration/room_list_service.rs @@ -2009,7 +2009,8 @@ async fn test_dynamic_entries_stream_manual_update() -> Result<(), Error> { end; }; - // Variation 1: Send manual update after reading stream, !r0 should be at new pos 1 + // Variation 1: Send manual update after reading stream, !r0 should be at new + // pos 1 let room = client.get_room(room_id!("!r0:bar.org")).unwrap(); room.update_summary(room.clone_info()).await; @@ -2064,7 +2065,8 @@ async fn test_dynamic_entries_stream_manual_update() -> Result<(), Error> { }, }; - // Variation 2: Send manual update before reading stream, !r0 should still be at previous pos 1 + // Variation 2: Send manual update before reading stream, !r0 should still be at + // previous pos 1 let room = client.get_room(room_id!("!r0:bar.org")).unwrap(); room.update_summary(room.clone_info()).await; diff --git a/crates/matrix-sdk/src/client/mod.rs b/crates/matrix-sdk/src/client/mod.rs index 61b7859248e..61cdca9585d 100644 --- a/crates/matrix-sdk/src/client/mod.rs +++ b/crates/matrix-sdk/src/client/mod.rs @@ -442,6 +442,8 @@ impl Client { self.base_client().session_meta() } + /// Returns a receiver that gets events for each room info update. To watch + /// for new events, use `receiver.resubscribe()`. pub fn roominfo_update_receiver(&self) -> &broadcast::Receiver { self.base_client().roominfo_update_receiver() } diff --git a/testing/matrix-sdk-integration-testing/src/tests/sliding_sync/room.rs b/testing/matrix-sdk-integration-testing/src/tests/sliding_sync/room.rs index 8a2cdd779a9..64679b9c50d 100644 --- a/testing/matrix-sdk-integration-testing/src/tests/sliding_sync/room.rs +++ b/testing/matrix-sdk-integration-testing/src/tests/sliding_sync/room.rs @@ -245,8 +245,8 @@ impl wiremock::Respond for CustomResponder { } req = req.body(request.body.clone()); - // Run await inside of non-async fn by spawning a new thread and creating a new runtime. - // Is there a better way? + // Run await inside of non-async fn by spawning a new thread and creating a new + // runtime. Is there a better way? let response_preprocessor = self.response_preprocessor; std::thread::spawn(move || { let rt = tokio::runtime::Runtime::new().unwrap(); @@ -282,6 +282,7 @@ impl wiremock::Respond for CustomResponder { #[tokio::test] async fn test_delayed_decryption_latest_event() -> Result<()> { let server = MockServer::start().await; + // Setup mockserver that drops to-device messages if DROP_TODEVICE is true server .register(Mock::given(AnyMatcher).respond_with(CustomResponder::new(drop_todevice))) .await; @@ -355,6 +356,7 @@ async fn test_delayed_decryption_latest_event() -> Result<()> { let alice_room = alice.get_room(alice_room.room_id()).unwrap(); let bob_room = bob.get_room(alice_room.room_id()).unwrap(); bob_room.join().await.unwrap(); + // Send a message, but the keys won't arrive bob_room.send(RoomMessageEventContent::text_plain("hello world")).await?; sleep(Duration::from_secs(1)).await; @@ -370,13 +372,22 @@ async fn test_delayed_decryption_latest_event() -> Result<()> { .entries_with_dynamic_adapters(10, alice.roominfo_update_receiver()); entries.set_filter(new_filter_all()); pin_mut!(stream); + + // Stream only has the initial Reset entry timeout(Duration::from_millis(100), stream.next()).await.unwrap(); assert!(timeout(Duration::from_millis(100), stream.next()).await.is_err()); + + // Latest event is not set yet assert!(matches!(alice_room.latest_event(), None)); + // Now we allow the key to come through *DROP_TODEVICE.lock().unwrap() = false; sleep(Duration::from_secs(1)).await; + + // Latest event is set now alice_room.latest_event().unwrap(); + + // The stream has a single update timeout(Duration::from_millis(100), stream.next()).await.unwrap(); assert!(timeout(Duration::from_millis(100), stream.next()).await.is_err());