Skip to content

Commit

Permalink
Add support for canceling Jobs.
Browse files Browse the repository at this point in the history
  • Loading branch information
greyson-signal committed Jan 24, 2020
1 parent b10ce08 commit 38597ae
Show file tree
Hide file tree
Showing 66 changed files with 137 additions and 79 deletions.
25 changes: 21 additions & 4 deletions app/src/main/java/org/thoughtcrime/securesms/jobmanager/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ public abstract class Job {

private final Parameters parameters;

private int runAttempt;
private long nextRunAttemptTime;
private int runAttempt;
private long nextRunAttemptTime;

private volatile boolean canceled;

protected Context context;

Expand Down Expand Up @@ -75,12 +77,27 @@ final void setNextRunAttemptTime(long nextRunAttemptTime) {
this.nextRunAttemptTime = nextRunAttemptTime;
}

/** Should only be invoked by {@link JobController} */
final void cancel() {
this.canceled = true;
}

@WorkerThread
final void onSubmit() {
Log.i(TAG, JobLogger.format(this, "onSubmit()"));
onAdded();
}

/**
* @return True if your job has been marked as canceled while it was running, otherwise false.
* If a job sees that it has been canceled, it should make a best-effort attempt at
* stopping it's work. This job will have {@link #onFailure()} called after {@link #run()}
* has finished.
*/
public final boolean isCanceled() {
return canceled;
}

/**
* Called when the job is first submitted to the {@link JobManager}.
*/
Expand Down Expand Up @@ -112,10 +129,10 @@ public void onRetry() {
public abstract @NonNull Result run();

/**
* Called when your job has completely failed.
* Called when your job has completely failed and will not be run again.
*/
@WorkerThread
public abstract void onCanceled();
public abstract void onFailure();

public interface Factory<T extends Job> {
@NonNull T create(@NonNull Parameters parameters, @NonNull Data data);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;

/**
* Manages the queue of jobs. This is the only class that should write to {@link JobStorage} to
Expand All @@ -40,7 +41,7 @@ class JobController {
private final Scheduler scheduler;
private final Debouncer debouncer;
private final Callback callback;
private final Set<String> runningJobs;
private final Map<String, Job> runningJobs;

JobController(@NonNull Application application,
@NonNull JobStorage jobStorage,
Expand All @@ -61,7 +62,7 @@ class JobController {
this.scheduler = scheduler;
this.debouncer = debouncer;
this.callback = callback;
this.runningJobs = new HashSet<>();
this.runningJobs = new HashMap<>();
}

@WorkerThread
Expand Down Expand Up @@ -96,6 +97,29 @@ synchronized void submitNewJobChain(@NonNull List<List<Job>> chain) {
notifyAll();
}

@WorkerThread
synchronized void cancelJob(@NonNull String id) {
Job runningJob = runningJobs.get(id);

if (runningJob != null) {
Log.w(TAG, JobLogger.format(runningJob, "Canceling while running."));
runningJob.cancel();
} else {
JobSpec jobSpec = jobStorage.getJobSpec(id);

if (jobSpec != null) {
Job job = createJob(jobSpec, jobStorage.getConstraintSpecs(id));
Log.w(TAG, JobLogger.format(job, "Canceling while inactive."));
Log.w(TAG, JobLogger.format(job, "Job failed."));

job.onFailure();
onFailure(job);
} else {
Log.w(TAG, "Tried to cancel JOB::" + id + ", but it could not be found.");
}
}
}

@WorkerThread
synchronized void onRetry(@NonNull Job job) {
int nextRunAttempt = job.getRunAttempt() + 1;
Expand Down Expand Up @@ -177,7 +201,7 @@ synchronized void onSuccess(@NonNull Job job) {
}

jobStorage.updateJobRunningState(job.getId(), true);
runningJobs.add(job.getId());
runningJobs.put(job.getId(), job);
jobTracker.onStateChange(job.getId(), JobTracker.JobState.RUNNING);

return job;
Expand Down Expand Up @@ -333,7 +357,7 @@ private void scheduleJobs(@NonNull List<Job> jobs) {

return job;
} catch (RuntimeException e) {
Log.e(TAG, "Failed to instantiate job! Failing it and its dependencies without calling Job#onCanceled. Crash imminent.");
Log.e(TAG, "Failed to instantiate job! Failing it and its dependencies without calling Job#onFailure. Crash imminent.");

List<String> failIds = Stream.of(jobStorage.getDependencySpecsThatDependOnJob(jobSpec.getId()))
.map(DependencySpec::getJobId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ public void removeListener(@NonNull JobTracker.JobListener listener) {
jobTracker.removeListener(listener);
}


/**
* Enqueues a single job to be run.
*/
Expand All @@ -136,9 +135,22 @@ public Chain startChain(@NonNull List<? extends Job> jobs) {
return new Chain(this, jobs);
}

/**
* Attempts to cancel a job. This is best-effort and may not actually prevent a job from
* completing if it was already running. If this job is running, this can only stop jobs that
* bother to check {@link Job#isCanceled()}.
*
* When a job is canceled, {@link Job#onFailure()} will be triggered at the earliest possible
* moment. Just like a normal failure, all later jobs in the same chain will also be failed.
*/
public void cancel(@NonNull String id) {
executor.execute(() -> jobController.cancelJob(id));
}

/**
* Retrieves a string representing the state of the job queue. Intended for debugging.
*/
@WorkerThread
public @NonNull String getDebugInfo() {
Future<String> result = executor.submit(jobController::getDebugInfo);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ public synchronized void run() {
job.onRetry();
} else if (result.isFailure()) {
List<Job> dependents = jobController.onFailure(job);
job.onCanceled();
Stream.of(dependents).forEach(Job::onCanceled);
job.onFailure();
Stream.of(dependents).forEach(Job::onFailure);

if (result.getException() != null) {
throw result.getException();
Expand All @@ -80,6 +80,11 @@ private Job.Result run(@NonNull Job job) {
try {
wakeLock = WakeLockUtil.acquire(application, PowerManager.PARTIAL_WAKE_LOCK, WAKE_LOCK_TIMEOUT, job.getId());
result = job.run();

if (job.isCanceled()) {
Log.w(TAG, JobLogger.format(job, String.valueOf(id), "Failing because the job was canceled."));
result = Job.Result.failure();
}
} catch (Exception e) {
Log.w(TAG, JobLogger.format(job, String.valueOf(id), "Failing due to an unexpected exception."), e);
return Job.Result.failure();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,7 @@ protected boolean onShouldRetry(@NonNull Exception e) {
}

@Override
public void onCanceled() {

}
public void onFailure() {}

private static abstract class Argon2RuntimeException extends RuntimeException {
private Argon2RuntimeException(String message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public void onRun() throws Exception {
}

@Override
public void onCanceled() { }
public void onFailure() { }

@Override
protected boolean onShouldRetry(@NonNull Exception exception) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ protected boolean onShouldRetry(@NonNull Exception e) {
}

@Override
public void onCanceled() { }
public void onFailure() { }

public static final class Factory implements Job.Factory<AttachmentCopyJob> {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.thoughtcrime.securesms.events.PartProgressEvent;
import org.thoughtcrime.securesms.jobmanager.Data;
import org.thoughtcrime.securesms.jobmanager.Job;
import org.thoughtcrime.securesms.jobmanager.JobLogger;
import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint;
import org.thoughtcrime.securesms.logging.Log;
import org.thoughtcrime.securesms.mms.MmsException;
Expand Down Expand Up @@ -138,8 +139,8 @@ public void doWork() throws IOException {
}

@Override
public void onCanceled() {
Log.w(TAG, "onCanceled() messageId: " + messageId + " partRowId: " + partRowId + " partUniqueId: " + partUniqueId + " manual: " + manual);
public void onFailure() {
Log.w(TAG, JobLogger.format(this, "onFailure() messageId: " + messageId + " partRowId: " + partRowId + " partUniqueId: " + partUniqueId + " manual: " + manual));

final AttachmentId attachmentId = new AttachmentId(partRowId, partUniqueId);
markFailed(messageId, attachmentId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public void onRun() throws Exception {
}

@Override
public void onCanceled() { }
public void onFailure() { }

@Override
protected boolean onShouldRetry(@NonNull Exception exception) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void onRun() throws IOException {
}

@Override
public void onCanceled() {}
public void onFailure() {}

@Override
public boolean onShouldRetry(@NonNull Exception exception) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public boolean onShouldRetry(@NonNull Exception throwable) {
}

@Override
public void onCanceled() {
public void onFailure() {
Log.w(TAG, "Failed to execute clean signed prekeys task.");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void onRun() throws IOException {
}

@Override
public void onCanceled() {}
public void onFailure() {}

@Override
public boolean onShouldRetry(@NonNull Exception exception) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public boolean onShouldRetry(@NonNull Exception exception) {
}

@Override
public void onCanceled() {}
public void onFailure() {}

public static final class Factory implements Job.Factory<DirectoryRefreshJob> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public String getFactoryKey() {
}

@Override
public void onCanceled() {
public void onFailure() {
}

public static final class Factory implements Job.Factory<FailingJob> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public void onRun() throws Exception {
}

@Override
public void onCanceled() {
public void onFailure() {
Log.w(TAG, "GCM reregistration failed after retry attempt exhaustion!");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public boolean onShouldRetry(@NonNull Exception e) {
}

@Override
public void onCanceled() {
public void onFailure() {
}

public static class Factory implements Job.Factory<LocalBackupJob> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ protected boolean onShouldRetry(@NonNull Exception e) {
}

@Override
public void onCanceled() {
public void onFailure() {
}

public static final class Factory implements Job.Factory<MarkerJob> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public void onRun() {
}

@Override
public void onCanceled() {
public void onFailure() {
MmsDatabase database = DatabaseFactory.getMmsDatabase(context);
database.markDownloadState(messageId, MmsDatabase.Status.DOWNLOAD_SOFT_FAILURE);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void onRun() {
}

@Override
public void onCanceled() {
public void onFailure() {
// TODO
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.thoughtcrime.securesms.database.ThreadDatabase;
import org.thoughtcrime.securesms.jobmanager.Data;
import org.thoughtcrime.securesms.jobmanager.Job;
import org.thoughtcrime.securesms.jobmanager.JobLogger;
import org.thoughtcrime.securesms.jobmanager.JobManager;
import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint;
import org.thoughtcrime.securesms.logging.Log;
Expand Down Expand Up @@ -152,8 +153,8 @@ public boolean onShouldRetry(@NonNull Exception exception) {
}

@Override
public void onCanceled() {
Log.i(TAG, "onCanceled() messageId: " + messageId);
public void onFailure() {
Log.i(TAG, JobLogger.format(this, "onFailure() messageId: " + messageId));
DatabaseFactory.getMmsDatabase(context).markAsSentFailed(messageId);
notifyMediaMessageDeliveryFailed(context, messageId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public boolean onShouldRetry(@NonNull Exception exception) {
}

@Override
public void onCanceled() {
public void onFailure() {
}

public static final class Factory implements Job.Factory<MultiDeviceBlockedUpdateJob> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public boolean onShouldRetry(@NonNull Exception e) {
}

@Override
public void onCanceled() {
public void onFailure() {
Log.w(TAG, "**** Failed to synchronize read receipts state!");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ public boolean onShouldRetry(@NonNull Exception exception) {
}

@Override
public void onCanceled() {
public void onFailure() {

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public boolean onShouldRetry(@NonNull Exception exception) {
}

@Override
public void onCanceled() {
public void onFailure() {

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public boolean onShouldRetry(@NonNull Exception e) {
}

@Override
public void onCanceled() {
public void onFailure() {
}

public static final class Factory implements Job.Factory<MultiDeviceKeysUpdateJob> {
Expand Down

0 comments on commit 38597ae

Please sign in to comment.