Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Various smaller sliding sync fixes #1372

Merged
merged 11 commits into from Jan 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions bindings/matrix-sdk-ffi/src/api.udl
Expand Up @@ -104,6 +104,9 @@ interface SlidingSyncViewBuilder {
[Self=ByArc]
SlidingSyncViewBuilder sync_mode(SlidingSyncMode mode);

[Self=ByArc]
SlidingSyncViewBuilder send_updates_for_items(boolean enable);

[Throws=ClientError, Self=ByArc]
SlidingSyncView build();
};
Expand Down
92 changes: 56 additions & 36 deletions bindings/matrix-sdk-ffi/src/sliding_sync.rs
@@ -1,4 +1,7 @@
use std::sync::{Arc, RwLock};
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc, RwLock,
};

use futures_signals::{
signal::SignalExt,
Expand All @@ -18,7 +21,7 @@ pub use matrix_sdk::{
SlidingSyncBuilder as MatrixSlidingSyncBuilder, SlidingSyncMode, SlidingSyncState,
};
use tokio::task::JoinHandle;
use tracing::{debug, error, warn};
use tracing::{debug, error, trace, warn};

use super::{Client, Room, RUNTIME};
use crate::{
Expand All @@ -29,13 +32,16 @@ use crate::{
type StoppableSpawnCallback = Box<dyn FnOnce() + Send + Sync>;

pub struct StoppableSpawn {
handle: JoinHandle<()>,
handle: Option<JoinHandle<()>>,
callback: RwLock<Option<StoppableSpawnCallback>>,
}

impl StoppableSpawn {
fn with_handle(handle: JoinHandle<()>) -> StoppableSpawn {
StoppableSpawn { handle, callback: Default::default() }
StoppableSpawn { handle: Some(handle), callback: Default::default() }
}
fn with_callback(callback: StoppableSpawnCallback) -> StoppableSpawn {
StoppableSpawn { handle: Default::default(), callback: RwLock::new(Some(callback)) }
}

fn set_callback(&mut self, f: StoppableSpawnCallback) {
Expand All @@ -53,13 +59,15 @@ impl From<JoinHandle<()>> for StoppableSpawn {
impl StoppableSpawn {
pub fn cancel(&self) {
debug!("stoppable.cancel() called");
self.handle.abort();
if let Some(handle) = &self.handle {
handle.abort();
}
if let Some(callback) = self.callback.write().unwrap().take() {
callback();
}
}
pub fn is_finished(&self) -> bool {
self.handle.is_finished()
self.handle.as_ref().map(|h| h.is_finished()).unwrap_or_default()
}
}

Expand Down Expand Up @@ -370,6 +378,12 @@ impl SlidingSyncViewBuilder {
Arc::new(builder)
}

pub fn send_updates_for_items(self: Arc<Self>, enable: bool) -> Arc<Self> {
let mut builder = unwrap_or_clone_arc(self);
builder.inner = builder.inner.send_updates_for_items(enable);
Arc::new(builder)
}

pub fn ranges(self: Arc<Self>, ranges: Vec<(u32, u32)>) -> Arc<Self> {
let mut builder = unwrap_or_clone_arc(self);
builder.inner = builder.inner.ranges(ranges);
Expand Down Expand Up @@ -491,10 +505,8 @@ impl SlidingSyncView {
) -> Arc<StoppableSpawn> {
let mut room_list = self.inner.rooms_list.signal_vec_cloned().to_stream();
Arc::new(StoppableSpawn::with_handle(RUNTIME.spawn(async move {
loop {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

grrr... leftover from a bad refactor that wasn't fully reverted: we still need this loop { here ...caused stupid bug...

if let Some(diff) = room_list.next().await {
observer.did_receive_update(diff.into());
}
if let Some(diff) = room_list.next().await {
observer.did_receive_update(diff.into());
}
})))
}
Expand Down Expand Up @@ -652,35 +664,43 @@ impl SlidingSync {
let inner = self.inner.clone();
let client = self.client.clone();
let observer = self.observer.clone();
let stop_loop = Arc::new(AtomicBool::new(false));
let remote_stopper = stop_loop.clone();

Arc::new(
RUNTIME
.spawn(async move {
let stream = inner.stream().await.unwrap();
pin_mut!(stream);
loop {
let update = match stream.next().await {
Some(Ok(u)) => u,
Some(Err(e)) => {
if client.process_sync_error(e) == LoopCtrl::Break {
warn!("loop was stopped by client error processing");
break;
} else {
continue;
}
}
None => {
warn!("Inner streaming loop ended unexpectedly");
break;
}
};
if let Some(ref observer) = *observer.read().unwrap() {
observer.did_receive_sync_update(update.into());
let stoppable = Arc::new(StoppableSpawn::with_callback(Box::new(move || {
remote_stopper.store(true, Ordering::Relaxed);
gnunicorn marked this conversation as resolved.
Show resolved Hide resolved
})));

RUNTIME.spawn(async move {
let stream = inner.stream().await.unwrap();
pin_mut!(stream);
loop {
let update = match stream.next().await {
Some(Ok(u)) => u,
Some(Err(e)) => {
if client.process_sync_error(e) == LoopCtrl::Break {
warn!("loop was stopped by client error processing");
break;
} else {
continue;
}
}
})
.into(),
)
None => {
warn!("Inner streaming loop ended unexpectedly");
break;
}
};
if let Some(ref observer) = *observer.read().unwrap() {
observer.did_receive_sync_update(update.into());
}
if stop_loop.load(Ordering::Relaxed) {
trace!("stopped sync loop after cancellation");
break;
}
}
});

stoppable
}
}

Expand Down
14 changes: 12 additions & 2 deletions crates/matrix-sdk-base/src/sliding_sync.rs
Expand Up @@ -4,6 +4,7 @@ use std::ops::Deref;
use ruma::api::client::sync::sync_events::{v3, v4};
#[cfg(feature = "e2e-encryption")]
use ruma::UserId;
use tracing::{debug, info, instrument};

use super::BaseClient;
use crate::{
Expand All @@ -21,6 +22,7 @@ impl BaseClient {
///
/// * `response` - The response that we received after a successful sliding
/// sync.
#[instrument(skip_all, level = "trace")]
pub async fn process_sliding_sync(&self, response: v4::Response) -> Result<SyncResponse> {
#[allow(unused_variables)]
let v4::Response {
Expand All @@ -34,6 +36,7 @@ impl BaseClient {
//presence,
..
} = response;
info!(rooms = rooms.len(), lists = lists.len(), extensions = !extensions.is_empty());

if rooms.is_empty() && extensions.is_empty() {
// we received a room reshuffling event only, there won't be anything for us to
Expand All @@ -59,6 +62,13 @@ impl BaseClient {
})
.unwrap_or_default();

info!(
to_device_events = to_device_events.len(),
device_one_time_keys_count = device_one_time_keys_count.len(),
device_unused_fallback_key_types =
device_unused_fallback_key_types.as_ref().map(|v| v.len())
);

// Process the to-device events and other related e2ee data. This returns a list
// of all the to-device events that were passed in but encrypted ones
// were replaced with their decrypted version.
Expand Down Expand Up @@ -226,11 +236,11 @@ impl BaseClient {

changes.ambiguity_maps = ambiguity_cache.cache;

tracing::debug!("ready to submit changes to store");
debug!("ready to submit changes to store");

store.save_changes(&changes).await?;
self.apply_changes(&changes).await;
tracing::debug!("applied changes");
debug!("applied changes");

let device_one_time_keys_count =
device_one_time_keys_count.into_iter().map(|(k, v)| (k, v.into())).collect();
Expand Down