Skip to content

Commit

Permalink
sliding_sync: Refactor for code review
Browse files Browse the repository at this point in the history
Signed-off-by: Timo Kösters <timo@koesters.xyz>
  • Loading branch information
timokoesters committed Feb 12, 2024
1 parent 9ea4cf2 commit e648a3e
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 67 deletions.
19 changes: 11 additions & 8 deletions crates/matrix-sdk-base/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,10 @@ pub struct BaseClient {
/// Observable of when a user is ignored/unignored.
pub(crate) ignore_user_list_changes: SharedObservable<()>,

/// A sender that is used to communicate changes to room information. Each
/// event contains the room and a boolean whether this event should
/// trigger a room list update.
pub(crate) roominfo_update_sender: broadcast::Sender<RoomInfoUpdate>,
pub(crate) roominfo_update_receiver: Arc<broadcast::Receiver<RoomInfoUpdate>>,
}

#[cfg(not(tarpaulin_include))]
Expand All @@ -122,7 +124,7 @@ impl BaseClient {
/// * `config` - An optional session if the user already has one from a
/// previous login call.
pub fn with_store_config(config: StoreConfig) -> Self {
let (roominfo_update_sender, roominfo_update_receiver) =
let (roominfo_update_sender, _roominfo_update_receiver) =
tokio::sync::broadcast::channel(100);

BaseClient {
Expand All @@ -133,7 +135,6 @@ impl BaseClient {
olm_machine: Default::default(),
ignore_user_list_changes: Default::default(),
roominfo_update_sender,
roominfo_update_receiver: Arc::new(roominfo_update_receiver),
}
}

Expand Down Expand Up @@ -1356,11 +1357,13 @@ impl BaseClient {
.collect()
}

/// Returns a receiver that gets events for each room info update. To watch
/// for new events, use `receiver.resubscribe()`. Each event contains the
/// room and a boolean whether this event should trigger a room list update.
pub fn roominfo_update_receiver(&self) -> &broadcast::Receiver<RoomInfoUpdate> {
&self.roominfo_update_receiver
/// Returns a new receiver that gets events for all future room info
/// updates.
///
/// Each event contains the room and a boolean whether this event should
/// trigger a room list update.
pub fn roominfo_update_receiver(&self) -> broadcast::Receiver<RoomInfoUpdate> {
self.roominfo_update_sender.subscribe()
}
}

Expand Down
19 changes: 11 additions & 8 deletions crates/matrix-sdk-base/src/rooms/normal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,17 @@ use crate::{
MinimalStateEvent, OriginalMinimalStateEvent, RoomMemberships,
};

/// Each time a room's RoomInfo is changed, the roominfo_update_receiver in the
/// BaseClient will receive this struct containing the update information.
/// The sender must decide if this update should cause a room list update.
/// If the sender knows that another action has already caused the room to
/// update, it should not send another update.
/// A summary of changes to room information.
///
/// It also indicates whether this update should update the room list.
#[derive(Debug, Clone)]
pub struct RoomInfoUpdate {
/// The room which was updated.
pub room_id: OwnedRoomId,
/// Whether this event should trigger the room list to update.
///
/// If the change is minor or if another action already causes the room list
/// to update, this should be false to avoid duplicate updates.
pub trigger_room_list_update: bool,
}

Expand Down Expand Up @@ -655,12 +656,14 @@ impl Room {
self.inner.get()
}

/// Update the summary with given RoomInfo. This also triggers an update for
/// the roominfo_update_recv.
/// Update the summary with given RoomInfo.
///
/// This also triggers an update for room info observers if
/// `trigger_room_list_update` is true.
pub fn set_room_info(&self, room_info: RoomInfo, trigger_room_list_update: bool) {
self.inner.set(room_info);

// Ignore error if no receiver exists
// Ignore error if no receiver exists.
let _ = self
.roominfo_update_sender
.send(RoomInfoUpdate { room_id: self.room_id.clone(), trigger_room_list_update });
Expand Down
17 changes: 9 additions & 8 deletions crates/matrix-sdk-ui/src/room_list_service/room_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ impl RoomList {
pub fn entries_with_dynamic_adapters(
&self,
page_size: usize,
roominfo_update_recv: &broadcast::Receiver<RoomInfoUpdate>,
roominfo_update_recv: broadcast::Receiver<RoomInfoUpdate>,
) -> (impl Stream<Item = Vec<VectorDiff<RoomListEntry>>>, RoomListDynamicEntriesController)
{
let list = self.sliding_sync_list.clone();
Expand All @@ -142,17 +142,15 @@ impl RoomList {
list.maximum_number_of_rooms_stream(),
);

let roominfo_update_recv = roominfo_update_recv.resubscribe();

let stream = stream! {
loop {
let filter_fn = filter_fn_cell.take().await;
let (raw_values, raw_stream) = list.room_list_stream();

// Combine normal stream events with other updates from rooms
let raw_stream_with_recv = merge_stream_and_receiver(raw_values.clone(), raw_stream, roominfo_update_recv.resubscribe());
let merged_stream = merge_stream_and_receiver(raw_values.clone(), raw_stream, roominfo_update_recv.resubscribe());

let (values, stream) = (raw_values, raw_stream_with_recv)
let (values, stream) = (raw_values, merged_stream)
.filter(filter_fn)
.dynamic_limit_with_initial_value(page_size, limit_stream.clone());

Expand All @@ -175,15 +173,18 @@ fn merge_stream_and_receiver(
raw_stream: impl Stream<Item = Vec<VectorDiff<RoomListEntry>>>,
mut roominfo_update_recv: broadcast::Receiver<RoomInfoUpdate>,
) -> impl Stream<Item = Vec<VectorDiff<RoomListEntry>>> {
let raw_stream_with_recv = stream! {
stream! {
pin_mut!(raw_stream);

loop {
select! {
biased; // Prefer manual updates for easier test code

Ok(update) = roominfo_update_recv.recv() => {
if !update.trigger_room_list_update {
continue;
}

// Search list for the updated room
for (index, room) in raw_current_values.iter().enumerate() {
if let RoomListEntry::Filled(r) = room {
Expand All @@ -195,6 +196,7 @@ fn merge_stream_and_receiver(
}
}
}

v = raw_stream.next() => {
if let Some(v) = v {
for change in &v {
Expand All @@ -208,8 +210,7 @@ fn merge_stream_and_receiver(
}
}
}
};
raw_stream_with_recv
}
}

/// The loading state of a [`RoomList`].
Expand Down
2 changes: 1 addition & 1 deletion crates/matrix-sdk/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ impl Client {
/// Returns a receiver that gets events for each room info update. To watch
/// for new events, use `receiver.resubscribe()`. Each event contains the
/// room and a boolean whether this event should trigger a room list update.
pub fn roominfo_update_receiver(&self) -> &broadcast::Receiver<RoomInfoUpdate> {
pub fn roominfo_update_receiver(&self) -> broadcast::Receiver<RoomInfoUpdate> {
self.base_client().roominfo_update_receiver()
}

Expand Down

0 comments on commit e648a3e

Please sign in to comment.