Skip to content

Commit

Permalink
sliding_sync: More documentation for roominfo sender/receiver
Browse files Browse the repository at this point in the history
Signed-off-by: Timo Kösters <timo@koesters.xyz>
  • Loading branch information
timokoesters committed Jan 23, 2024
1 parent cc2363b commit 83957f6
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 11 deletions.
4 changes: 3 additions & 1 deletion crates/matrix-sdk-base/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ impl BaseClient {
/// This method panics if it is called twice.
pub async fn set_session_meta(&self, session_meta: SessionMeta) -> Result<()> {
debug!(user_id = ?session_meta.user_id, device_id = ?session_meta.device_id, "Restoring login");
self.store.set_session_meta(session_meta.clone(), self).await?;
self.store.set_session_meta(session_meta.clone(), &self.roominfo_update_sender).await?;

#[cfg(feature = "e2e-encryption")]
self.regenerate_olm().await?;
Expand Down Expand Up @@ -1323,6 +1323,8 @@ impl BaseClient {
.collect()
}

/// Returns a receiver that gets events for each room info update. To watch
/// for new events, use `receiver.resubscribe()`.
pub fn roominfo_update_receiver(&self) -> &broadcast::Receiver<OwnedRoomId> {
&self.roominfo_update_receiver
}
Expand Down
2 changes: 1 addition & 1 deletion crates/matrix-sdk-base/src/rooms/normal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,7 @@ impl Room {
pub async fn update_summary(&self, summary: RoomInfo) {
self.inner.set(summary);

// Ignore error if receiver is down
// Ignore error if no receiver exists
let _ = self.roominfo_update_sender.send(self.room_id.clone());
}

Expand Down
6 changes: 3 additions & 3 deletions crates/matrix-sdk-base/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub type BoxStream<T> = Pin<Box<dyn futures_util::Stream<Item = T> + Send>>;

use crate::{
rooms::{RoomInfo, RoomState},
BaseClient, MinimalRoomMemberEvent, Room, RoomStateFilter, SessionMeta,
MinimalRoomMemberEvent, Room, RoomStateFilter, SessionMeta,
};

pub(crate) mod ambiguity_map;
Expand Down Expand Up @@ -177,14 +177,14 @@ impl Store {
pub async fn set_session_meta(
&self,
session_meta: SessionMeta,
client: &BaseClient,
roominfo_update_sender: &broadcast::Sender<OwnedRoomId>,
) -> Result<()> {
for info in self.inner.get_room_infos().await? {
let room = Room::restore(
&session_meta.user_id,
self.inner.clone(),
info,
client.roominfo_update_sender.clone(),
roominfo_update_sender.clone(),
);
self.rooms.write().unwrap().insert(room.room_id().to_owned(), room);
}
Expand Down
5 changes: 3 additions & 2 deletions crates/matrix-sdk-ui/src/room_list_service/room_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,9 @@ impl RoomList {
}
}

/// This function remembers the current state of the unfiltered room list, so it knows where all rooms are.
/// When the receiver is triggered, a Set operation for the room position is inserted to the stream.
/// This function remembers the current state of the unfiltered room list, so it
/// knows where all rooms are. When the receiver is triggered, a Set operation
/// for the room position is inserted to the stream.
fn merge_stream_and_receiver(
mut raw_current_values: Vector<RoomListEntry>,
raw_stream: impl Stream<Item = Vec<VectorDiff<RoomListEntry>>>,
Expand Down
6 changes: 4 additions & 2 deletions crates/matrix-sdk-ui/tests/integration/room_list_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2009,7 +2009,8 @@ async fn test_dynamic_entries_stream_manual_update() -> Result<(), Error> {
end;
};

// Variation 1: Send manual update after reading stream, !r0 should be at new pos 1
// Variation 1: Send manual update after reading stream, !r0 should be at new
// pos 1
let room = client.get_room(room_id!("!r0:bar.org")).unwrap();
room.update_summary(room.clone_info()).await;

Expand Down Expand Up @@ -2064,7 +2065,8 @@ async fn test_dynamic_entries_stream_manual_update() -> Result<(), Error> {
},
};

// Variation 2: Send manual update before reading stream, !r0 should still be at previous pos 1
// Variation 2: Send manual update before reading stream, !r0 should still be at
// previous pos 1
let room = client.get_room(room_id!("!r0:bar.org")).unwrap();
room.update_summary(room.clone_info()).await;

Expand Down
2 changes: 2 additions & 0 deletions crates/matrix-sdk/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,8 @@ impl Client {
self.base_client().session_meta()
}

/// Returns a receiver that gets events for each room info update. To watch
/// for new events, use `receiver.resubscribe()`.
pub fn roominfo_update_receiver(&self) -> &broadcast::Receiver<OwnedRoomId> {
self.base_client().roominfo_update_receiver()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,8 @@ impl wiremock::Respond for CustomResponder {
}
req = req.body(request.body.clone());

// Run await inside of non-async fn by spawning a new thread and creating a new runtime.
// Is there a better way?
// Run await inside of non-async fn by spawning a new thread and creating a new
// runtime. Is there a better way?
let response_preprocessor = self.response_preprocessor;
std::thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().unwrap();
Expand Down Expand Up @@ -282,6 +282,7 @@ impl wiremock::Respond for CustomResponder {
#[tokio::test]
async fn test_delayed_decryption_latest_event() -> Result<()> {
let server = MockServer::start().await;
// Setup mockserver that drops to-device messages if DROP_TODEVICE is true
server
.register(Mock::given(AnyMatcher).respond_with(CustomResponder::new(drop_todevice)))
.await;
Expand Down Expand Up @@ -355,6 +356,7 @@ async fn test_delayed_decryption_latest_event() -> Result<()> {
let alice_room = alice.get_room(alice_room.room_id()).unwrap();
let bob_room = bob.get_room(alice_room.room_id()).unwrap();
bob_room.join().await.unwrap();
// Send a message, but the keys won't arrive
bob_room.send(RoomMessageEventContent::text_plain("hello world")).await?;

sleep(Duration::from_secs(1)).await;
Expand All @@ -370,13 +372,22 @@ async fn test_delayed_decryption_latest_event() -> Result<()> {
.entries_with_dynamic_adapters(10, alice.roominfo_update_receiver());
entries.set_filter(new_filter_all());
pin_mut!(stream);

// Stream only has the initial Reset entry
timeout(Duration::from_millis(100), stream.next()).await.unwrap();
assert!(timeout(Duration::from_millis(100), stream.next()).await.is_err());

// Latest event is not set yet
assert!(matches!(alice_room.latest_event(), None));

// Now we allow the key to come through
*DROP_TODEVICE.lock().unwrap() = false;
sleep(Duration::from_secs(1)).await;

// Latest event is set now
alice_room.latest_event().unwrap();

// The stream has a single update
timeout(Duration::from_millis(100), stream.next()).await.unwrap();
assert!(timeout(Duration::from_millis(100), stream.next()).await.is_err());

Expand Down

0 comments on commit 83957f6

Please sign in to comment.