Skip to content

Commit

Permalink
Improve detection of websocket drained status.
Browse files Browse the repository at this point in the history
Will now work when you lose and regain network. Also removes the
unnecessary InitialMessageRetriever.
  • Loading branch information
greyson-signal committed Jul 21, 2020
1 parent 96ce42a commit 662f0b8
Show file tree
Hide file tree
Showing 10 changed files with 107 additions and 267 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import org.thoughtcrime.securesms.logging.Log;
import org.thoughtcrime.securesms.logging.PersistentLogger;
import org.thoughtcrime.securesms.logging.SignalUncaughtExceptionHandler;
import org.thoughtcrime.securesms.messages.InitialMessageRetriever;
import org.thoughtcrime.securesms.migrations.ApplicationMigrations;
import org.thoughtcrime.securesms.notifications.NotificationChannels;
import org.thoughtcrime.securesms.providers.BlobProvider;
Expand All @@ -61,7 +60,6 @@
import org.thoughtcrime.securesms.ringrtc.RingRtcLogger;
import org.thoughtcrime.securesms.service.DirectoryRefreshListener;
import org.thoughtcrime.securesms.service.ExpiringMessageManager;
import org.thoughtcrime.securesms.messages.IncomingMessageObserver;
import org.thoughtcrime.securesms.service.KeyCachingService;
import org.thoughtcrime.securesms.service.LocalBackupListener;
import org.thoughtcrime.securesms.service.RotateSenderCertificateListener;
Expand Down Expand Up @@ -97,7 +95,6 @@ public class ApplicationContext extends MultiDexApplication implements DefaultLi
private ViewOnceMessageManager viewOnceMessageManager;
private TypingStatusRepository typingStatusRepository;
private TypingStatusSender typingStatusSender;
private IncomingMessageObserver incomingMessageObserver;
private PersistentLogger persistentLogger;

private volatile boolean isAppVisible;
Expand Down Expand Up @@ -157,7 +154,6 @@ public void onStart(@NonNull LifecycleOwner owner) {
KeyCachingService.onAppForegrounded(this);
ApplicationDependencies.getFrameRateTracker().begin();
ApplicationDependencies.getMegaphoneRepository().onAppForegrounded();
catchUpOnMessages();
}

@Override
Expand Down Expand Up @@ -234,7 +230,7 @@ private void initializeApplicationMigrations() {
}

public void initializeMessageRetrieval() {
this.incomingMessageObserver = new IncomingMessageObserver(this);
ApplicationDependencies.getIncomingMessageObserver();
}

private void initializeAppDependencies() {
Expand Down Expand Up @@ -382,36 +378,6 @@ private void initializeCleanup() {
});
}

private void catchUpOnMessages() {
InitialMessageRetriever retriever = ApplicationDependencies.getInitialMessageRetriever();

if (retriever.isCaughtUp()) {
return;
}

SignalExecutors.UNBOUNDED.execute(() -> {
long startTime = System.currentTimeMillis();

switch (retriever.begin(TimeUnit.SECONDS.toMillis(60))) {
case SUCCESS:
Log.i(TAG, "Successfully caught up on messages. " + (System.currentTimeMillis() - startTime) + " ms");
break;
case FAILURE_TIMEOUT:
Log.w(TAG, "Did not finish catching up due to a timeout. " + (System.currentTimeMillis() - startTime) + " ms");
break;
case FAILURE_ERROR:
Log.w(TAG, "Did not finish catching up due to an error. " + (System.currentTimeMillis() - startTime) + " ms");
break;
case SKIPPED_ALREADY_CAUGHT_UP:
Log.i(TAG, "Already caught up. " + (System.currentTimeMillis() - startTime) + " ms");
break;
case SKIPPED_ALREADY_RUNNING:
Log.i(TAG, "Already in the process of catching up. " + (System.currentTimeMillis() - startTime) + " ms");
break;
}
});
}

@Override
protected void attachBaseContext(Context base) {
super.attachBaseContext(DynamicLanguageContextWrapper.updateContext(base, TextSecurePreferences.getLanguage(base)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
import org.thoughtcrime.securesms.keyvalue.KeyValueStore;
import org.thoughtcrime.securesms.keyvalue.SignalStore;
import org.thoughtcrime.securesms.megaphone.MegaphoneRepository;
import org.thoughtcrime.securesms.messages.InitialMessageRetriever;
import org.thoughtcrime.securesms.notifications.DefaultMessageNotifier;
import org.thoughtcrime.securesms.notifications.MessageNotifier;
import org.thoughtcrime.securesms.push.SignalServiceNetworkAccess;
import org.thoughtcrime.securesms.recipients.LiveRecipientCache;
Expand Down Expand Up @@ -48,6 +46,7 @@ public class ApplicationDependencies {
private static SignalServiceAccountManager accountManager;
private static SignalServiceMessageSender messageSender;
private static SignalServiceMessageReceiver messageReceiver;
private static IncomingMessageObserver incomingMessageObserver;
private static IncomingMessageProcessor incomingMessageProcessor;
private static BackgroundMessageRetriever backgroundMessageRetriever;
private static LiveRecipientCache recipientCache;
Expand All @@ -59,7 +58,6 @@ public class ApplicationDependencies {
private static GroupsV2StateProcessor groupsV2StateProcessor;
private static GroupsV2Operations groupsV2Operations;
private static EarlyMessageCache earlyMessageCache;
private static InitialMessageRetriever initialMessageRetriever;
private static MessageNotifier messageNotifier;

@MainThread
Expand Down Expand Up @@ -242,19 +240,19 @@ public static synchronized void resetSignalServiceMessageReceiver() {
return earlyMessageCache;
}

public static synchronized @NonNull InitialMessageRetriever getInitialMessageRetriever() {
public static synchronized @NonNull MessageNotifier getMessageNotifier() {
assertInitialization();

if (initialMessageRetriever == null) {
initialMessageRetriever = provider.provideInitialMessageRetriever();
}

return initialMessageRetriever;
return messageNotifier;
}

public static synchronized @NonNull MessageNotifier getMessageNotifier() {
public static synchronized @NonNull IncomingMessageObserver getIncomingMessageObserver() {
assertInitialization();
return messageNotifier;

if (incomingMessageObserver == null) {
incomingMessageObserver = provider.provideIncomingMessageObserver();
}

return incomingMessageObserver;
}

private static void assertInitialization() {
Expand All @@ -277,8 +275,8 @@ public interface Provider {
@NonNull KeyValueStore provideKeyValueStore();
@NonNull MegaphoneRepository provideMegaphoneRepository();
@NonNull EarlyMessageCache provideEarlyMessageCache();
@NonNull InitialMessageRetriever provideInitialMessageRetriever();
@NonNull MessageNotifier provideMessageNotifier();
@NonNull IncomingMessageObserver provideIncomingMessageObserver();
}

private static class UninitializedException extends IllegalStateException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.thoughtcrime.securesms.keyvalue.KeyValueStore;
import org.thoughtcrime.securesms.logging.Log;
import org.thoughtcrime.securesms.megaphone.MegaphoneRepository;
import org.thoughtcrime.securesms.messages.InitialMessageRetriever;
import org.thoughtcrime.securesms.notifications.DefaultMessageNotifier;
import org.thoughtcrime.securesms.notifications.MessageNotifier;
import org.thoughtcrime.securesms.notifications.OptimizedMessageNotifier;
Expand All @@ -55,7 +54,6 @@
import org.whispersystems.signalservice.api.websocket.ConnectivityListener;

import java.util.UUID;
import java.util.concurrent.Executors;

/**
* Implementation of {@link ApplicationDependencies.Provider} that provides real app dependencies.
Expand Down Expand Up @@ -171,13 +169,13 @@ public ApplicationDependencyProvider(@NonNull Application context, @NonNull Sign
}

@Override
public @NonNull InitialMessageRetriever provideInitialMessageRetriever() {
return new InitialMessageRetriever();
public @NonNull MessageNotifier provideMessageNotifier() {
return new OptimizedMessageNotifier(new DefaultMessageNotifier());
}

@Override
public @NonNull MessageNotifier provideMessageNotifier() {
return new OptimizedMessageNotifier(new DefaultMessageNotifier());
public @NonNull IncomingMessageObserver provideIncomingMessageObserver() {
return new IncomingMessageObserver(context);
}

private static class DynamicCredentialsProvider implements CredentialsProvider {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

/**
* A constraint that is met once we have pulled down all messages from the websocket during initial
* load. See {@link org.thoughtcrime.securesms.messages.InitialMessageRetriever}.
* load. See {@link org.thoughtcrime.securesms.messages.IncomingMessageObserver}.
*/
public final class WebsocketDrainedConstraint implements Constraint {

Expand All @@ -21,7 +21,7 @@ private WebsocketDrainedConstraint() {

@Override
public boolean isMet() {
return ApplicationDependencies.getInitialMessageRetriever().isCaughtUp();
return ApplicationDependencies.getIncomingMessageObserver().isWebsocketDrained();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
import org.thoughtcrime.securesms.jobmanager.ConstraintObserver;

/**
* An observer for {@link WebsocketDrainedConstraint}. Will fire when the
* {@link org.thoughtcrime.securesms.messages.InitialMessageRetriever} is caught up.
* An observer for {@link WebsocketDrainedConstraint}. Will fire when the websocket is drained
* (i.e. it has received an empty response).
*/
public class WebsocketDrainedConstraintObserver implements ConstraintObserver {

Expand All @@ -16,7 +16,7 @@ public class WebsocketDrainedConstraintObserver implements ConstraintObserver {
private volatile Notifier notifier;

public WebsocketDrainedConstraintObserver() {
ApplicationDependencies.getInitialMessageRetriever().addListener(() -> {
ApplicationDependencies.getIncomingMessageObserver().addWebsocketDrainedListener(() -> {
if (notifier != null) {
notifier.onConstraintMet(REASON);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ public class BackgroundMessageRetriever {

private static final Semaphore ACTIVE_LOCK = new Semaphore(2);

private static final long CATCHUP_TIMEOUT = TimeUnit.SECONDS.toMillis(60);
private static final long NORMAL_TIMEOUT = TimeUnit.SECONDS.toMillis(10);

/**
Expand Down Expand Up @@ -64,21 +63,8 @@ public boolean retrieveMessages(@NonNull Context context, MessageRetrievalStrate
Log.w(TAG, "We may be operating in a constrained environment. Doze: " + doze + " Network: " + network);
}

if (ApplicationDependencies.getInitialMessageRetriever().isCaughtUp()) {
Log.i(TAG, "Performing normal message fetch.");
return executeBackgroundRetrieval(context, startTime, strategies);
} else {
Log.i(TAG, "Performing initial message fetch.");
InitialMessageRetriever.Result result = ApplicationDependencies.getInitialMessageRetriever().begin(CATCHUP_TIMEOUT);
if (result == InitialMessageRetriever.Result.SUCCESS) {
Log.i(TAG, "Initial message request was completed successfully. " + logSuffix(startTime));
TextSecurePreferences.setNeedsMessagePull(context, false);
return true;
} else {
Log.w(TAG, "Initial message fetch returned result " + result + ", so doing a normal message fetch.");
return executeBackgroundRetrieval(context, System.currentTimeMillis(), strategies);
}
}
Log.i(TAG, "Performing normal message fetch.");
return executeBackgroundRetrieval(context, startTime, strategies);
} finally {
WakeLockUtil.release(wakeLock, WAKE_LOCK_TAG);
ACTIVE_LOCK.release();
Expand Down

0 comments on commit 662f0b8

Please sign in to comment.