Skip to content

Commit

Permalink
Improve WebSocket health monitoring.
Browse files Browse the repository at this point in the history
  • Loading branch information
cody-signal committed Jul 27, 2021
1 parent fc6db45 commit 712b0c1
Show file tree
Hide file tree
Showing 20 changed files with 558 additions and 350 deletions.
1 change: 1 addition & 0 deletions app/build.gradle
Expand Up @@ -392,6 +392,7 @@ dependencies {
implementation 'androidx.lifecycle:lifecycle-extensions:2.1.0'
implementation 'androidx.lifecycle:lifecycle-viewmodel-savedstate:1.0.0-alpha05'
implementation 'androidx.lifecycle:lifecycle-common-java8:2.1.0'
implementation 'androidx.lifecycle:lifecycle-reactivestreams-ktx:2.1.0'
implementation "androidx.camera:camera-core:1.0.0-beta11"
implementation "androidx.camera:camera-camera2:1.0.0-beta11"
implementation "androidx.camera:camera-lifecycle:1.0.0-beta11"
Expand Down
Expand Up @@ -112,7 +112,6 @@
import org.thoughtcrime.securesms.megaphone.MegaphoneViewBuilder;
import org.thoughtcrime.securesms.megaphone.Megaphones;
import org.thoughtcrime.securesms.mms.GlideApp;
import org.thoughtcrime.securesms.net.PipeConnectivityListener;
import org.thoughtcrime.securesms.notifications.MarkReadReceiver;
import org.thoughtcrime.securesms.payments.preferences.PaymentsActivity;
import org.thoughtcrime.securesms.payments.preferences.details.PaymentDetailsFragmentArgs;
Expand Down Expand Up @@ -143,6 +142,7 @@
import org.thoughtcrime.securesms.util.task.SnackbarAsyncTask;
import org.thoughtcrime.securesms.util.views.Stub;
import org.whispersystems.libsignal.util.guava.Optional;
import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState;

import java.util.Collections;
import java.util.HashSet;
Expand Down Expand Up @@ -968,19 +968,21 @@ void updateEmptyState(boolean isConversationEmpty) {
}
}

private void updateProxyStatus(@NonNull PipeConnectivityListener.State state) {
private void updateProxyStatus(@NonNull WebSocketConnectionState state) {
if (SignalStore.proxy().isProxyEnabled()) {
proxyStatus.setVisibility(View.VISIBLE);

switch (state) {
case CONNECTING:
case DISCONNECTING:
case DISCONNECTED:
proxyStatus.setImageResource(R.drawable.ic_proxy_connecting_24);
break;
case CONNECTED:
proxyStatus.setImageResource(R.drawable.ic_proxy_connected_24);
break;
case FAILURE:
case AUTHENTICATION_FAILED:
case FAILED:
proxyStatus.setImageResource(R.drawable.ic_proxy_failed_24);
break;
}
Expand Down
Expand Up @@ -5,6 +5,7 @@

import androidx.annotation.NonNull;
import androidx.lifecycle.LiveData;
import androidx.lifecycle.LiveDataReactiveStreams;
import androidx.lifecycle.MutableLiveData;
import androidx.lifecycle.ViewModel;
import androidx.lifecycle.ViewModelProvider;
Expand All @@ -14,7 +15,6 @@
import org.signal.paging.PagingConfig;
import org.signal.paging.PagingController;
import org.thoughtcrime.securesms.conversationlist.model.Conversation;
import org.thoughtcrime.securesms.search.SearchResult;
import org.thoughtcrime.securesms.conversationlist.model.UnreadPayments;
import org.thoughtcrime.securesms.conversationlist.model.UnreadPaymentsLiveData;
import org.thoughtcrime.securesms.database.DatabaseFactory;
Expand All @@ -23,17 +23,20 @@
import org.thoughtcrime.securesms.megaphone.Megaphone;
import org.thoughtcrime.securesms.megaphone.MegaphoneRepository;
import org.thoughtcrime.securesms.megaphone.Megaphones;
import org.thoughtcrime.securesms.net.PipeConnectivityListener;
import org.thoughtcrime.securesms.payments.UnreadPaymentsRepository;
import org.thoughtcrime.securesms.search.SearchRepository;
import org.thoughtcrime.securesms.search.SearchResult;
import org.thoughtcrime.securesms.util.Debouncer;
import org.thoughtcrime.securesms.util.ThrottledDebouncer;
import org.thoughtcrime.securesms.util.livedata.LiveDataUtil;
import org.thoughtcrime.securesms.util.paging.Invalidator;
import org.whispersystems.libsignal.util.guava.Optional;
import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState;

import java.util.List;

import io.reactivex.rxjava3.core.BackpressureStrategy;

class ConversationListViewModel extends ViewModel {

private static final String TAG = Log.tag(ConversationListViewModel.class);
Expand Down Expand Up @@ -117,8 +120,8 @@ public LiveData<Boolean> hasNoConversations() {
return pagedData.getController();
}

@NonNull LiveData<PipeConnectivityListener.State> getPipeState() {
return ApplicationDependencies.getPipeListener().getState();
@NonNull LiveData<WebSocketConnectionState> getPipeState() {
return LiveDataReactiveStreams.fromPublisher(ApplicationDependencies.getSignalWebSocket().getWebSocketState().toFlowable(BackpressureStrategy.LATEST));
}

@NonNull LiveData<Optional<UnreadPayments>> getUnreadPaymentsLiveData() {
Expand Down
Expand Up @@ -19,7 +19,6 @@
import org.thoughtcrime.securesms.messages.BackgroundMessageRetriever;
import org.thoughtcrime.securesms.messages.IncomingMessageObserver;
import org.thoughtcrime.securesms.messages.IncomingMessageProcessor;
import org.thoughtcrime.securesms.net.PipeConnectivityListener;
import org.thoughtcrime.securesms.net.StandardUserAgentInterceptor;
import org.thoughtcrime.securesms.notifications.MessageNotifier;
import org.thoughtcrime.securesms.payments.Payments;
Expand Down Expand Up @@ -112,10 +111,6 @@ public static void init(@NonNull Application application, @NonNull Provider prov
return application;
}

public static @NonNull PipeConnectivityListener getPipeListener() {
return provider.providePipeListener();
}

public static @NonNull SignalServiceAccountManager getSignalServiceAccountManager() {
SignalServiceAccountManager local = accountManager;

Expand Down Expand Up @@ -227,7 +222,6 @@ public static void closeConnections() {

public static void resetNetworkConnectionsAfterProxyChange() {
synchronized (LOCK) {
getPipeListener().reset();
closeConnections();
}
}
Expand Down Expand Up @@ -509,7 +503,6 @@ public static TypingStatusSender getTypingStatusSender() {
}

public interface Provider {
@NonNull PipeConnectivityListener providePipeListener();
@NonNull GroupsV2Operations provideGroupsV2Operations();
@NonNull SignalServiceAccountManager provideSignalServiceAccountManager();
@NonNull SignalServiceMessageSender provideSignalServiceMessageSender(@NonNull SignalWebSocket signalWebSocket);
Expand Down
Expand Up @@ -6,14 +6,14 @@
import androidx.annotation.NonNull;

import org.signal.core.util.concurrent.SignalExecutors;
import org.signal.core.util.logging.Log;
import org.thoughtcrime.securesms.BuildConfig;
import org.thoughtcrime.securesms.components.TypingStatusRepository;
import org.thoughtcrime.securesms.components.TypingStatusSender;
import org.thoughtcrime.securesms.crypto.ReentrantSessionLock;
import org.thoughtcrime.securesms.crypto.storage.SignalProtocolStoreImpl;
import org.thoughtcrime.securesms.database.DatabaseObserver;
import org.thoughtcrime.securesms.database.JobDatabase;
import org.thoughtcrime.securesms.database.PendingRetryReceiptCache;
import org.thoughtcrime.securesms.jobmanager.JobManager;
import org.thoughtcrime.securesms.jobmanager.JobMigrator;
import org.thoughtcrime.securesms.jobmanager.impl.FactoryJobPredicate;
Expand All @@ -33,8 +33,7 @@
import org.thoughtcrime.securesms.messages.BackgroundMessageRetriever;
import org.thoughtcrime.securesms.messages.IncomingMessageObserver;
import org.thoughtcrime.securesms.messages.IncomingMessageProcessor;
import org.thoughtcrime.securesms.database.PendingRetryReceiptCache;
import org.thoughtcrime.securesms.net.PipeConnectivityListener;
import org.thoughtcrime.securesms.net.SignalWebSocketHealthMonitor;
import org.thoughtcrime.securesms.notifications.MessageNotifier;
import org.thoughtcrime.securesms.notifications.OptimizedMessageNotifier;
import org.thoughtcrime.securesms.payments.MobileCoinConfig;
Expand Down Expand Up @@ -75,25 +74,16 @@
*/
public class ApplicationDependencyProvider implements ApplicationDependencies.Provider {

private static final String TAG = Log.tag(ApplicationDependencyProvider.class);

private final Application context;
private final PipeConnectivityListener pipeListener;
private final Application context;

public ApplicationDependencyProvider(@NonNull Application context) {
this.context = context;
this.pipeListener = new PipeConnectivityListener(context);
this.context = context;
}

private @NonNull ClientZkOperations provideClientZkOperations() {
return ClientZkOperations.create(provideSignalServiceNetworkAccess().getConfiguration(context));
}

@Override
public @NonNull PipeConnectivityListener providePipeListener() {
return pipeListener;
}

@Override
public @NonNull GroupsV2Operations provideGroupsV2Operations() {
return new GroupsV2Operations(provideClientZkOperations());
Expand Down Expand Up @@ -126,13 +116,9 @@ public ApplicationDependencyProvider(@NonNull Application context) {

@Override
public @NonNull SignalServiceMessageReceiver provideSignalServiceMessageReceiver() {
SleepTimer sleepTimer = TextSecurePreferences.isFcmDisabled(context) ? new AlarmSleepTimer(context)
: new UptimeSleepTimer();
return new SignalServiceMessageReceiver(provideSignalServiceNetworkAccess().getConfiguration(context),
new DynamicCredentialsProvider(context),
BuildConfig.SIGNAL_AGENT,
pipeListener,
sleepTimer,
provideClientZkOperations().getProfileOperations(),
FeatureFlags.okHttpAutomaticRetry());
}
Expand Down Expand Up @@ -265,35 +251,33 @@ public ApplicationDependencyProvider(@NonNull Application context) {

@Override
public @NonNull SignalWebSocket provideSignalWebSocket() {
return new SignalWebSocket(provideWebSocketFactory());
SleepTimer sleepTimer = TextSecurePreferences.isFcmDisabled(context) ? new AlarmSleepTimer(context) : new UptimeSleepTimer();
SignalWebSocketHealthMonitor healthMonitor = new SignalWebSocketHealthMonitor(context, sleepTimer);
SignalWebSocket signalWebSocket = new SignalWebSocket(provideWebSocketFactory(healthMonitor));

healthMonitor.monitor(signalWebSocket);

return signalWebSocket;
}

private @NonNull WebSocketFactory provideWebSocketFactory() {
private @NonNull WebSocketFactory provideWebSocketFactory(@NonNull SignalWebSocketHealthMonitor healthMonitor) {
return new WebSocketFactory() {
@Override
public WebSocketConnection createWebSocket() {
SleepTimer sleepTimer = TextSecurePreferences.isFcmDisabled(context) ? new AlarmSleepTimer(context)
: new UptimeSleepTimer();

return new WebSocketConnection("normal",
provideSignalServiceNetworkAccess().getConfiguration(context),
Optional.of(new DynamicCredentialsProvider(context)),
BuildConfig.SIGNAL_AGENT,
pipeListener,
sleepTimer);
healthMonitor);
}

@Override
public WebSocketConnection createUnidentifiedWebSocket() {
SleepTimer sleepTimer = TextSecurePreferences.isFcmDisabled(context) ? new AlarmSleepTimer(context)
: new UptimeSleepTimer();

return new WebSocketConnection("unidentified",
provideSignalServiceNetworkAccess().getConfiguration(context),
Optional.absent(),
BuildConfig.SIGNAL_AGENT,
pipeListener,
sleepTimer);
healthMonitor);
}
};
}
Expand Down
Expand Up @@ -14,10 +14,12 @@
import androidx.core.app.NotificationCompat;
import androidx.core.content.ContextCompat;

import org.signal.core.util.ThreadUtil;
import org.signal.core.util.concurrent.SignalExecutors;
import org.signal.core.util.logging.Log;
import org.thoughtcrime.securesms.R;
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
import org.thoughtcrime.securesms.jobmanager.impl.BackoffUtil;
import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint;
import org.thoughtcrime.securesms.jobs.PushDecryptDrainedJob;
import org.thoughtcrime.securesms.keyvalue.SignalStore;
Expand Down Expand Up @@ -50,6 +52,7 @@ public class IncomingMessageObserver {
private final Application context;
private final SignalServiceNetworkAccess networkAccess;
private final List<Runnable> decryptionDrainedListeners;
private final BroadcastReceiver connectionReceiver;

private boolean appVisible;

Expand Down Expand Up @@ -84,20 +87,22 @@ public void onBackground() {
}
});

context.registerReceiver(new BroadcastReceiver() {
connectionReceiver = new BroadcastReceiver() {
@Override
public void onReceive(Context context, Intent intent) {
synchronized (IncomingMessageObserver.this) {
if (!NetworkConstraint.isMet(context)) {
Log.w(TAG, "Lost network connection. Shutting down our websocket connections and resetting the drained state.");
networkDrained = false;
decryptionDrained = false;
shutdown();
disconnect();
}
IncomingMessageObserver.this.notifyAll();
}
}
}, new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION));
};

context.registerReceiver(connectionReceiver, new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION));
}

public synchronized void notifyRegistrationChanged() {
Expand Down Expand Up @@ -169,14 +174,16 @@ private synchronized void waitForConnectionNecessary() {
public void terminateAsync() {
INSTANCE_COUNT.decrementAndGet();

context.unregisterReceiver(connectionReceiver);

SignalExecutors.BOUNDED.execute(() -> {
Log.w(TAG, "Beginning termination.");
terminated = true;
shutdown();
disconnect();
});
}

private void shutdown() {
private void disconnect() {
ApplicationDependencies.getSignalWebSocket().disconnect();
}

Expand All @@ -190,8 +197,15 @@ private class MessageRetrievalThread extends Thread implements Thread.UncaughtEx

@Override
public void run() {
int attempts = 0;

while (!terminated) {
Log.i(TAG, "Waiting for websocket state change....");
if (attempts > 1) {
long backoff = BackoffUtil.exponentialBackoff(attempts, TimeUnit.SECONDS.toMillis(30));
Log.w(TAG, "Too many failed connection attempts, attempts: " + attempts + " backing off: " + backoff);
ThreadUtil.sleep(backoff);
}
waitForConnectionNecessary();

Log.i(TAG, "Making websocket connection....");
Expand All @@ -208,6 +222,7 @@ public void run() {
processor.processEnvelope(envelope);
}
});
attempts = 0;

if (!result.isPresent() && !networkDrained) {
Log.i(TAG, "Network was newly-drained. Enqueuing a job to listen for decryption draining.");
Expand All @@ -219,13 +234,15 @@ public void run() {
signalWebSocket.connect();
} catch (TimeoutException e) {
Log.w(TAG, "Application level read timeout...");
attempts = 0;
}
}
} catch (Throwable e) {
attempts++;
Log.w(TAG, e);
} finally {
Log.w(TAG, "Shutting down pipe...");
shutdown();
disconnect();
}

Log.i(TAG, "Looping...");
Expand Down
@@ -0,0 +1,44 @@
package org.thoughtcrime.securesms.net;

import java.util.Arrays;

/**
* Track error occurrences by time and indicate if too many occur within the
* time limit.
*/
public final class HttpErrorTracker {

private final long[] timestamps;
private final long errorTimeRange;

public HttpErrorTracker(int samples, long errorTimeRange) {
this.timestamps = new long[samples];
this.errorTimeRange = errorTimeRange;
}

public synchronized boolean addSample(long now) {
long errorsMustBeAfter = now - errorTimeRange;
int count = 1;
int minIndex = 0;

for (int i = 0; i < timestamps.length; i++) {
if (timestamps[i] < errorsMustBeAfter) {
timestamps[i] = 0;
} else if (timestamps[i] != 0) {
count++;
}

if (timestamps[i] < timestamps[minIndex]) {
minIndex = i;
}
}

timestamps[minIndex] = now;

if (count >= timestamps.length) {
Arrays.fill(timestamps, 0);
return true;
}
return false;
}
}

0 comments on commit 712b0c1

Please sign in to comment.