diff --git a/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencyProvider.java b/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencyProvider.java index 5eb124540fc..e3dab158205 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencyProvider.java +++ b/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencyProvider.java @@ -46,6 +46,7 @@ import org.whispersystems.signalservice.api.websocket.ConnectivityListener; import java.util.UUID; +import java.util.concurrent.Executors; /** * Implementation of {@link ApplicationDependencies.Provider} that provides real app dependencies. @@ -91,7 +92,7 @@ public ApplicationDependencyProvider(@NonNull Application context, @NonNull Sign Optional.fromNullable(IncomingMessageObserver.getUnidentifiedPipe()), Optional.of(new SecurityEventListener(context)), provideClientZkOperations().getProfileOperations(), - SignalExecutors.UNBOUNDED); + SignalExecutors.newCachedBoundedExecutor("signal-messages", 1, 16)); } @Override diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobController.java b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobController.java index 635f56ef4e6..ca5b354b717 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobController.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobController.java @@ -23,6 +23,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; /** @@ -150,6 +151,14 @@ synchronized void cancelJob(@NonNull String id) { } } + @WorkerThread + synchronized void cancelAllInQueue(@NonNull String queue) { + Stream.of(runningJobs.values()) + .filter(j -> Objects.equals(j.getParameters().getQueue(), queue)) + .map(Job::getId) + .forEach(this::cancelJob); + } + @WorkerThread synchronized void onRetry(@NonNull Job job) { int nextRunAttempt = job.getRunAttempt() + 1; diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobManager.java b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobManager.java index dc12ad04770..071522cdce3 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobManager.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobManager.java @@ -198,6 +198,13 @@ public void cancel(@NonNull String id) { executor.execute(() -> jobController.cancelJob(id)); } + /** + * Cancels all jobs in the specified queue. See {@link #cancel(String)} for details. + */ + public void cancelAllInQueue(@NonNull String queue) { + executor.execute(() -> jobController.cancelAllInQueue(queue)); + } + /** * Runs the specified job synchronously. Beware: All normal dependencies are respected, meaning * you must take great care where you call this. It could take a very long time to complete! diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/PushGroupSendJob.java b/app/src/main/java/org/thoughtcrime/securesms/jobs/PushGroupSendJob.java index 9b878519a23..dd77dab414b 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/PushGroupSendJob.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/PushGroupSendJob.java @@ -151,6 +151,9 @@ public void onPushSend() List existingNetworkFailures = message.getNetworkFailures(); List existingIdentityMismatches = message.getIdentityKeyMismatches(); + long threadId = DatabaseFactory.getThreadDatabase(context).getThreadIdFor(message.getRecipient()); + ApplicationDependencies.getJobManager().cancelAllInQueue(TypingSendJob.getQueue(threadId)); + if (database.isSent(messageId)) { log(TAG, "Message " + messageId + " was already sent. Ignoring."); return; diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/TypingSendJob.java b/app/src/main/java/org/thoughtcrime/securesms/jobs/TypingSendJob.java index f9e68a985e1..974ace882b9 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/TypingSendJob.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/TypingSendJob.java @@ -15,6 +15,7 @@ import org.thoughtcrime.securesms.recipients.RecipientUtil; import org.thoughtcrime.securesms.util.TextSecurePreferences; import org.whispersystems.libsignal.util.guava.Optional; +import org.whispersystems.signalservice.api.CancelationException; import org.whispersystems.signalservice.api.SignalServiceMessageSender; import org.whispersystems.signalservice.api.crypto.UnidentifiedAccessPair; import org.whispersystems.signalservice.api.messages.SignalServiceTypingMessage; @@ -39,7 +40,7 @@ public class TypingSendJob extends BaseJob { public TypingSendJob(long threadId, boolean typing) { this(new Job.Parameters.Builder() - .setQueue("TYPING_" + threadId) + .setQueue(getQueue(threadId)) .setMaxAttempts(1) .setLifespan(TimeUnit.SECONDS.toMillis(5)) .build(), @@ -47,6 +48,10 @@ public TypingSendJob(long threadId, boolean typing) { typing); } + public static String getQueue(long threadId) { + return "TYPING_" + threadId; + } + private TypingSendJob(@NonNull Job.Parameters parameters, long threadId, boolean typing) { super(parameters); @@ -101,7 +106,16 @@ public void onRun() throws Exception { List> unidentifiedAccess = Stream.of(recipients).map(r -> UnidentifiedAccessUtil.getAccessFor(context, r)).toList(); SignalServiceTypingMessage typingMessage = new SignalServiceTypingMessage(typing ? Action.STARTED : Action.STOPPED, System.currentTimeMillis(), groupId); - messageSender.sendTyping(addresses, unidentifiedAccess, typingMessage); + if (isCanceled()) { + Log.w(TAG, "Canceled before send!"); + return; + } + + try { + messageSender.sendTyping(addresses, unidentifiedAccess, typingMessage, this::isCanceled); + } catch (CancelationException e) { + Log.w(TAG, "Canceled during send!"); + } } @Override diff --git a/app/src/main/java/org/thoughtcrime/securesms/util/concurrent/SignalExecutors.java b/app/src/main/java/org/thoughtcrime/securesms/util/concurrent/SignalExecutors.java index c92ae6accdd..bcbff01d4fe 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/util/concurrent/SignalExecutors.java +++ b/app/src/main/java/org/thoughtcrime/securesms/util/concurrent/SignalExecutors.java @@ -6,9 +6,13 @@ import org.thoughtcrime.securesms.util.LinkedBlockingLifoQueue; +import java.util.Queue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.LinkedTransferQueue; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -26,6 +30,44 @@ public static ExecutorService newCachedSingleThreadExecutor(final String name) { return executor; } + /** + * ThreadPoolExecutor will only create a new thread if the provided queue returns false from + * offer(). That means if you give it an unbounded queue, it'll only ever create 1 thread, no + * matter how long the queue gets. + * + * But if you bound the queue and submit more runnables than there are threads, your task is + * rejected and throws an exception. + * + * So we make a queue that will always return false if it's non-empty to ensure new threads get + * created. Then, if a task gets rejected, we simply add it to the queue. + */ + public static ExecutorService newCachedBoundedExecutor(final String name, int minThreads, int maxThreads) { + ThreadPoolExecutor threadPool = new ThreadPoolExecutor(minThreads, + maxThreads, + 30, + TimeUnit.SECONDS, + new LinkedBlockingQueue() { + @Override + public boolean offer(Runnable runnable) { + if (isEmpty()) { + return super.offer(runnable); + } else { + return false; + } + } + }, new NumberedThreadFactory(name)); + + threadPool.setRejectedExecutionHandler((runnable, executor) -> { + try { + executor.getQueue().put(runnable); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + + return threadPool; + } + /** * Returns an executor that prioritizes newer work. This is the opposite of a traditional executor, * which processor work in FIFO order. diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/api/CancelationException.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/CancelationException.java new file mode 100644 index 00000000000..221433ae9a0 --- /dev/null +++ b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/CancelationException.java @@ -0,0 +1,6 @@ +package org.whispersystems.signalservice.api; + +import java.io.IOException; + +public class CancelationException extends IOException { +} diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessageSender.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessageSender.java index bd6a8b5e452..10964fd910b 100644 --- a/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessageSender.java +++ b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessageSender.java @@ -80,6 +80,7 @@ import org.whispersystems.signalservice.internal.push.exceptions.MismatchedDevicesException; import org.whispersystems.signalservice.internal.push.exceptions.StaleDevicesException; import org.whispersystems.signalservice.internal.push.http.AttachmentCipherOutputStreamFactory; +import org.whispersystems.signalservice.internal.push.http.CancelationSignal; import org.whispersystems.signalservice.internal.push.http.ResumableUploadSpec; import org.whispersystems.signalservice.internal.util.StaticCredentialsProvider; import org.whispersystems.signalservice.internal.util.Util; @@ -191,7 +192,7 @@ public void sendReceipt(SignalServiceAddress recipient, { byte[] content = createReceiptContent(message); - sendMessage(recipient, getTargetUnidentifiedAccess(unidentifiedAccess), message.getWhen(), content, false); + sendMessage(recipient, getTargetUnidentifiedAccess(unidentifiedAccess), message.getWhen(), content, false, null); } /** @@ -209,16 +210,17 @@ public void sendTyping(SignalServiceAddress recipient, { byte[] content = createTypingContent(message); - sendMessage(recipient, getTargetUnidentifiedAccess(unidentifiedAccess), message.getTimestamp(), content, true); + sendMessage(recipient, getTargetUnidentifiedAccess(unidentifiedAccess), message.getTimestamp(), content, true, null); } public void sendTyping(List recipients, List> unidentifiedAccess, - SignalServiceTypingMessage message) + SignalServiceTypingMessage message, + CancelationSignal cancelationSignal) throws IOException { byte[] content = createTypingContent(message); - sendMessage(recipients, getTargetUnidentifiedAccess(unidentifiedAccess), message.getTimestamp(), content, true); + sendMessage(recipients, getTargetUnidentifiedAccess(unidentifiedAccess), message.getTimestamp(), content, true, cancelationSignal); } @@ -235,7 +237,7 @@ public void sendCallMessage(SignalServiceAddress recipient, throws IOException, UntrustedIdentityException { byte[] content = createCallContent(message); - sendMessage(recipient, getTargetUnidentifiedAccess(unidentifiedAccess), System.currentTimeMillis(), content, false); + sendMessage(recipient, getTargetUnidentifiedAccess(unidentifiedAccess), System.currentTimeMillis(), content, false, null); } /** @@ -253,11 +255,11 @@ public SendMessageResult sendMessage(SignalServiceAddress recipient, { byte[] content = createMessageContent(message); long timestamp = message.getTimestamp(); - SendMessageResult result = sendMessage(recipient, getTargetUnidentifiedAccess(unidentifiedAccess), timestamp, content, false); + SendMessageResult result = sendMessage(recipient, getTargetUnidentifiedAccess(unidentifiedAccess), timestamp, content, false, null); if (result.getSuccess() != null && result.getSuccess().isNeedsSync()) { byte[] syncMessage = createMultiDeviceSentTranscriptContent(content, Optional.of(recipient), timestamp, Collections.singletonList(result), false); - sendMessage(localAddress, Optional.absent(), timestamp, syncMessage, false); + sendMessage(localAddress, Optional.absent(), timestamp, syncMessage, false, null); } if (message.isEndSession()) { @@ -291,7 +293,7 @@ public List sendMessage(List { byte[] content = createMessageContent(message); long timestamp = message.getTimestamp(); - List results = sendMessage(recipients, getTargetUnidentifiedAccess(unidentifiedAccess), timestamp, content, false); + List results = sendMessage(recipients, getTargetUnidentifiedAccess(unidentifiedAccess), timestamp, content, false, null); boolean needsSyncInResults = false; for (SendMessageResult result : results) { @@ -303,7 +305,7 @@ public List sendMessage(List if (needsSyncInResults || isMultiDevice.get()) { byte[] syncMessage = createMultiDeviceSentTranscriptContent(content, Optional.absent(), timestamp, results, isRecipientUpdate); - sendMessage(localAddress, Optional.absent(), timestamp, syncMessage, false); + sendMessage(localAddress, Optional.absent(), timestamp, syncMessage, false, null); } return results; @@ -347,7 +349,7 @@ public void sendMessage(SignalServiceSyncMessage message, Optionalabsent(), timestamp, content, false); + sendMessage(localAddress, Optional.absent(), timestamp, content, false, null); } public void setSoTimeoutMillis(long soTimeoutMillis) { @@ -478,11 +480,11 @@ private void sendMessage(VerifiedMessage message, Optionalabsent(), message.getTimestamp(), syncMessage, false); + sendMessage(localAddress, Optional.absent(), message.getTimestamp(), syncMessage, false, null); } } @@ -1200,7 +1202,8 @@ private List sendMessage(List r List> unidentifiedAccess, long timestamp, byte[] content, - boolean online) + boolean online, + CancelationSignal cancelationSignal) throws IOException { long startTime = System.currentTimeMillis(); @@ -1211,7 +1214,7 @@ private List sendMessage(List r while (recipientIterator.hasNext()) { SignalServiceAddress recipient = recipientIterator.next(); Optional access = unidentifiedAccessIterator.next(); - futureResults.add(executor.submit(() -> sendMessage(recipient, access, timestamp, content, online))); + futureResults.add(executor.submit(() -> sendMessage(recipient, access, timestamp, content, online, cancelationSignal))); } List results = new ArrayList<>(futureResults.size()); @@ -1247,14 +1250,24 @@ private SendMessageResult sendMessage(SignalServiceAddress recipient, Optional unidentifiedAccess, long timestamp, byte[] content, - boolean online) + boolean online, + CancelationSignal cancelationSignal) throws UntrustedIdentityException, IOException { long startTime = System.currentTimeMillis(); for (int i = 0; i < RETRY_COUNT; i++) { + if (cancelationSignal != null && cancelationSignal.isCanceled()) { + throw new CancelationException(); + } + try { - OutgoingPushMessageList messages = getEncryptedMessages(socket, recipient, unidentifiedAccess, timestamp, content, online); + OutgoingPushMessageList messages = getEncryptedMessages(socket, recipient, unidentifiedAccess, timestamp, content, online); + + if (cancelationSignal != null && cancelationSignal.isCanceled()) { + throw new CancelationException(); + } + Optional pipe = this.pipe.get(); Optional unidentifiedPipe = this.unidentifiedPipe.get(); @@ -1278,6 +1291,10 @@ private SendMessageResult sendMessage(SignalServiceAddress recipient, } } + if (cancelationSignal != null && cancelationSignal.isCanceled()) { + throw new CancelationException(); + } + SendMessageResponse response = socket.sendMessage(messages, unidentifiedAccess); Log.d(TAG, "[sendMessage] Completed over REST in " + (System.currentTimeMillis() - startTime) + " ms and " + (i + 1) + " attempt(s)");