Skip to content

Commit

Permalink
Prevent FCM bottlenecking.
Browse files Browse the repository at this point in the history
  • Loading branch information
greyson-signal committed May 29, 2020
1 parent 4cda267 commit fe25d94
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 66 deletions.
4 changes: 3 additions & 1 deletion app/src/main/AndroidManifest.xml
Expand Up @@ -531,7 +531,9 @@

<service android:name=".service.GenericForegroundService"/>

<service android:name=".gcm.FcmService">
<service android:name=".gcm.FcmFetchService" />

<service android:name=".gcm.FcmReceiveService">
<intent-filter>
<action android:name="com.google.firebase.MESSAGING_EVENT" />
</intent-filter>
Expand Down
@@ -0,0 +1,92 @@
package org.thoughtcrime.securesms.gcm;

import android.app.Service;
import android.content.Intent;
import android.os.Build;
import android.os.IBinder;

import androidx.annotation.Nullable;

import com.google.firebase.messaging.RemoteMessage;

import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
import org.thoughtcrime.securesms.jobs.PushNotificationReceiveJob;
import org.thoughtcrime.securesms.logging.Log;
import org.thoughtcrime.securesms.util.concurrent.SerialMonoLifoExecutor;
import org.thoughtcrime.securesms.util.concurrent.SignalExecutors;

import java.util.concurrent.atomic.AtomicInteger;

/**
* This service does the actual network fetch in response to an FCM message.
*
* Our goals with FCM processing are as follows:
* (1) Ensure some service is active for the duration of the fetch and processing stages.
* (2) Do not make unnecessary network requests.
*
* To fulfill goal 1, this service will not call {@link #stopSelf()} until there is no more running
* requests.
*
* To fulfill goal 2, this service will not enqueue a fetch if there are already 2 active fetches
* (or rather, 1 active and 1 waiting, since we use a single thread executor).
*
* Unfortunately we can't do this all in {@link FcmReceiveService} because it won't let us process
* the next FCM message until {@link FcmReceiveService#onMessageReceived(RemoteMessage)} returns,
* but as soon as that method returns, it could also destroy the service. By not letting us control
* when the service is destroyed, we can't accomplish both goals within that service.
*/
public class FcmFetchService extends Service {

private static final String TAG = Log.tag(FcmFetchService.class);

private static final SerialMonoLifoExecutor EXECUTOR = new SerialMonoLifoExecutor(SignalExecutors.UNBOUNDED);

private final AtomicInteger activeCount = new AtomicInteger(0);

@Override
public int onStartCommand(Intent intent, int flags, int startId) {
boolean performedReplace = EXECUTOR.enqueue(this::fetch);

if (performedReplace) {
Log.i(TAG, "Already have one running and one enqueued. Ignoring.");
} else {
int count = activeCount.incrementAndGet();
Log.i(TAG, "Incrementing active count to " + count);
}

return START_NOT_STICKY;
}

@Override
public void onDestroy() {
Log.i(TAG, "onDestroy()");
}

@Override
public @Nullable IBinder onBind(Intent intent) {
return null;
}

private void fetch() {
MessageRetriever retriever = ApplicationDependencies.getMessageRetriever();
boolean success = retriever.retrieveMessages(this, new RestStrategy(), new RestStrategy());

if (success) {
Log.i(TAG, "Successfully retrieved messages.");
} else {
if (Build.VERSION.SDK_INT >= 26) {
Log.w(TAG, "Failed to retrieve messages. Scheduling on the system JobScheduler (API " + Build.VERSION.SDK_INT + ").");
FcmJobService.schedule(this);
} else {
Log.w(TAG, "Failed to retrieve messages. Scheduling on JobManager (API " + Build.VERSION.SDK_INT + ").");
ApplicationDependencies.getJobManager().add(new PushNotificationReceiveJob(this));
}
}

if (activeCount.decrementAndGet() == 0) {
Log.e(TAG, "stopping");
stopSelf();
}
}
}

Expand Up @@ -9,16 +9,14 @@
import androidx.annotation.NonNull;
import androidx.annotation.RequiresApi;

import org.thoughtcrime.securesms.ApplicationContext;
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
import org.thoughtcrime.securesms.logging.Log;
import org.thoughtcrime.securesms.notifications.MessageNotifier;
import org.thoughtcrime.securesms.util.ServiceUtil;
import org.thoughtcrime.securesms.util.TextSecurePreferences;
import org.thoughtcrime.securesms.util.concurrent.SignalExecutors;

/**
* Pulls down messages. Used when we fail to pull down messages in {@link FcmService}.
* Pulls down messages. Used when we fail to pull down messages in {@link FcmReceiveService}.
*/
@RequiresApi(26)
public class FcmJobService extends JobService {
Expand Down
@@ -1,25 +1,22 @@
package org.thoughtcrime.securesms.gcm;

import android.content.Context;
import android.os.Build;
import android.content.Intent;

import androidx.annotation.NonNull;

import com.google.firebase.messaging.FirebaseMessagingService;
import com.google.firebase.messaging.RemoteMessage;

import org.thoughtcrime.securesms.ApplicationContext;
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
import org.thoughtcrime.securesms.jobs.FcmRefreshJob;
import org.thoughtcrime.securesms.jobs.PushNotificationReceiveJob;
import org.thoughtcrime.securesms.logging.Log;
import org.thoughtcrime.securesms.registration.PushChallengeRequest;
import org.thoughtcrime.securesms.util.TextSecurePreferences;
import org.thoughtcrime.securesms.util.concurrent.SignalExecutors;

public class FcmService extends FirebaseMessagingService {
public class FcmReceiveService extends FirebaseMessagingService {

private static final String TAG = FcmService.class.getSimpleName();
private static final String TAG = FcmReceiveService.class.getSimpleName();

@Override
public void onMessageReceived(RemoteMessage remoteMessage) {
Expand All @@ -46,27 +43,12 @@ public void onNewToken(String token) {
}

private static void handleReceivedNotification(Context context) {
MessageRetriever retriever = ApplicationDependencies.getMessageRetriever();
boolean success = retriever.retrieveMessages(context, new RestStrategy(), new RestStrategy());

if (success) {
Log.i(TAG, "Successfully retrieved messages.");
} else {
if (Build.VERSION.SDK_INT >= 26) {
Log.w(TAG, "Failed to retrieve messages. Scheduling on the system JobScheduler (API " + Build.VERSION.SDK_INT + ").");
FcmJobService.schedule(context);
} else {
Log.w(TAG, "Failed to retrieve messages. Scheduling on JobManager (API " + Build.VERSION.SDK_INT + ").");
ApplicationDependencies.getJobManager().add(new PushNotificationReceiveJob(context));
}
}

Log.i(TAG, "Processing complete.");
context.startService(new Intent(context, FcmFetchService.class));
}

private static void handlePushChallenge(@NonNull String challenge) {
Log.d(TAG, String.format("Got a push challenge \"%s\"", challenge));

PushChallengeRequest.postChallengeResponse(challenge);
}
}
}
@@ -0,0 +1,64 @@
package org.thoughtcrime.securesms.util.concurrent;

import androidx.annotation.NonNull;

import java.util.concurrent.Executor;

/**
* Wraps another executor to make a new executor that only keeps around two tasks:
* - The actively running task
* - A single enqueued task
*
* If multiple tasks are enqueued while one is running, only the latest task is kept. The rest are
* dropped.
*
* This is useful when you want to enqueue a bunch of tasks at unknown intervals, but only the most
* recent one is relevant. For instance, running a query in response to changing user input.
*
* Based on SerialExecutor
* https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html
*/
public final class SerialMonoLifoExecutor implements Executor {
private final Executor executor;

private Runnable next;
private Runnable active;

public SerialMonoLifoExecutor(@NonNull Executor executor) {
this.executor = executor;
}

@Override
public synchronized void execute(@NonNull Runnable command) {
enqueue(command);
}

/**
* @return True if a pending task was replaced by this one, otherwise false.
*/
public synchronized boolean enqueue(@NonNull Runnable command) {
boolean performedReplace = next != null;

next = () -> {
try {
command.run();
} finally {
scheduleNext();
}
};

if (active == null) {
scheduleNext();
}

return performedReplace;
}

private synchronized void scheduleNext() {
active = next;
next = null;
if (active != null) {
executor.execute(active);
}
}
}
Expand Up @@ -7,6 +7,7 @@

import com.annimon.stream.function.Predicate;

import org.thoughtcrime.securesms.util.concurrent.SerialMonoLifoExecutor;
import org.thoughtcrime.securesms.util.concurrent.SignalExecutors;
import org.whispersystems.libsignal.util.guava.Function;

Expand Down Expand Up @@ -57,7 +58,7 @@ public static <A, B> LiveData<B> mapAsync(@NonNull LiveData<A> source, @NonNull
*/
public static <A, B> LiveData<B> mapAsync(@NonNull Executor executor, @NonNull LiveData<A> source, @NonNull Function<A, B> backgroundFunction) {
MediatorLiveData<B> outputLiveData = new MediatorLiveData<>();
Executor liveDataExecutor = new SerialLiveDataExecutor(executor);
Executor liveDataExecutor = new SerialMonoLifoExecutor(executor);

outputLiveData.addSource(source, currentValue -> liveDataExecutor.execute(() -> outputLiveData.postValue(backgroundFunction.apply(currentValue))));

Expand Down Expand Up @@ -119,42 +120,4 @@ private static final class CombineLiveData<A, B, R> extends MediatorLiveData<R>
}
}
}

/**
* Executor decorator that runs serially but enqueues just the latest task, dropping any pending task.
* <p>
* Based on SerialExecutor https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html
* but modified to represent a queue of size one which is replaced by the latest call to {@link #execute(Runnable)}.
*/
private static final class SerialLiveDataExecutor implements Executor {
private final Executor executor;
private Runnable next;
private Runnable active;

SerialLiveDataExecutor(@NonNull Executor executor) {
this.executor = executor;
}

public synchronized void execute(@NonNull Runnable command) {
next = () -> {
try {
command.run();
} finally {
scheduleNext();
}
};

if (active == null) {
scheduleNext();
}
}

private synchronized void scheduleNext() {
active = next;
next = null;
if (active != null) {
executor.execute(active);
}
}
}
}

0 comments on commit fe25d94

Please sign in to comment.