Skip to content

Commit

Permalink
Added JobTracker.
Browse files Browse the repository at this point in the history
  • Loading branch information
greyson-signal committed Oct 18, 2019
1 parent 5b1d910 commit ccb18cd
Show file tree
Hide file tree
Showing 4 changed files with 197 additions and 26 deletions.
47 changes: 28 additions & 19 deletions src/org/thoughtcrime/securesms/jobmanager/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

/**
Expand All @@ -30,7 +31,6 @@ public abstract class Job {

private final Parameters parameters;

private String id;
private int runAttempt;
private long nextRunAttemptTime;

Expand All @@ -41,7 +41,7 @@ public Job(@NonNull Parameters parameters) {
}

public final String getId() {
return id;
return parameters.getId();
}

public final @NonNull Parameters getParameters() {
Expand All @@ -64,11 +64,6 @@ public final void setContext(@NonNull Context context) {
this.context = context;
}

/** Should only be invoked by {@link JobController} */
final void setId(@NonNull String id) {
this.id = id;
}

/** Should only be invoked by {@link JobController} */
final void setRunAttempt(int runAttempt) {
this.runAttempt = runAttempt;
Expand Down Expand Up @@ -203,6 +198,7 @@ public static final class Parameters {
public static final int IMMORTAL = -1;
public static final int UNLIMITED = -1;

private final String id;
private final long createTime;
private final long lifespan;
private final int maxAttempts;
Expand All @@ -211,14 +207,16 @@ public static final class Parameters {
private final String queue;
private final List<String> constraintKeys;

private Parameters(long createTime,
private Parameters(@NonNull String id,
long createTime,
long lifespan,
int maxAttempts,
long maxBackoff,
int maxInstances,
@Nullable String queue,
@NonNull List<String> constraintKeys)
{
this.id = id;
this.createTime = createTime;
this.lifespan = lifespan;
this.maxAttempts = maxAttempts;
Expand All @@ -228,41 +226,46 @@ private Parameters(long createTime,
this.constraintKeys = constraintKeys;
}

public long getCreateTime() {
@NonNull String getId() {
return id;
}

long getCreateTime() {
return createTime;
}

public long getLifespan() {
long getLifespan() {
return lifespan;
}

public int getMaxAttempts() {
int getMaxAttempts() {
return maxAttempts;
}

public long getMaxBackoff() {
long getMaxBackoff() {
return maxBackoff;
}

public int getMaxInstances() {
int getMaxInstances() {
return maxInstances;
}

public @Nullable String getQueue() {
@Nullable String getQueue() {
return queue;
}

public List<String> getConstraintKeys() {
List<String> getConstraintKeys() {
return constraintKeys;
}

public Builder toBuilder() {
return new Builder(createTime, maxBackoff, lifespan, maxAttempts, maxInstances, queue, constraintKeys);
return new Builder(id, createTime, maxBackoff, lifespan, maxAttempts, maxInstances, queue, constraintKeys);
}


public static final class Builder {

private String id;
private long createTime;
private long maxBackoff;
private long lifespan;
Expand All @@ -272,17 +275,23 @@ public static final class Builder {
private List<String> constraintKeys;

public Builder() {
this(System.currentTimeMillis(), TimeUnit.SECONDS.toMillis(30), IMMORTAL, 1, UNLIMITED, null, new LinkedList<>());
this(UUID.randomUUID().toString());
}

Builder(@NonNull String id) {
this(id, System.currentTimeMillis(), TimeUnit.SECONDS.toMillis(30), IMMORTAL, 1, UNLIMITED, null, new LinkedList<>());
}

private Builder(long createTime,
private Builder(@NonNull String id,
long createTime,
long maxBackoff,
long lifespan,
int maxAttempts,
int maxInstances,
@Nullable String queue,
@NonNull List<String> constraintKeys)
{
this.id = id;
this.createTime = createTime;
this.maxBackoff = maxBackoff;
this.lifespan = lifespan;
Expand Down Expand Up @@ -368,7 +377,7 @@ Builder setCreateTime(long createTime) {
}

public @NonNull Parameters build() {
return new Parameters(createTime, lifespan, maxAttempts, maxBackoff, maxInstances, queue, constraintKeys);
return new Parameters(id, createTime, lifespan, maxAttempts, maxBackoff, maxInstances, queue, constraintKeys);
}
}
}
Expand Down
14 changes: 9 additions & 5 deletions src/org/thoughtcrime/securesms/jobmanager/JobController.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class JobController {
private final JobInstantiator jobInstantiator;
private final ConstraintInstantiator constraintInstantiator;
private final Data.Serializer dataSerializer;
private final JobTracker jobTracker;
private final Scheduler scheduler;
private final Debouncer debouncer;
private final Callback callback;
Expand All @@ -46,6 +47,7 @@ class JobController {
@NonNull JobInstantiator jobInstantiator,
@NonNull ConstraintInstantiator constraintInstantiator,
@NonNull Data.Serializer dataSerializer,
@NonNull JobTracker jobTracker,
@NonNull Scheduler scheduler,
@NonNull Debouncer debouncer,
@NonNull Callback callback)
Expand All @@ -55,6 +57,7 @@ class JobController {
this.jobInstantiator = jobInstantiator;
this.constraintInstantiator = constraintInstantiator;
this.dataSerializer = dataSerializer;
this.jobTracker = jobTracker;
this.scheduler = scheduler;
this.debouncer = debouncer;
this.callback = callback;
Expand Down Expand Up @@ -82,6 +85,7 @@ synchronized void submitNewJobChain(@NonNull List<List<Job>> chain) {

if (chainExceedsMaximumInstances(chain)) {
Job solo = chain.get(0).get(0);
jobTracker.onStateChange(solo.getId(), JobTracker.JobState.IGNORED);
Log.w(TAG, JobLogger.format(solo, "Already at the max instance count of " + solo.getParameters().getMaxInstances() + ". Skipping."));
return;
}
Expand All @@ -98,6 +102,7 @@ synchronized void onRetry(@NonNull Job job) {
long nextRunAttemptTime = calculateNextRunAttemptTime(System.currentTimeMillis(), nextRunAttempt, job.getParameters().getMaxBackoff());

jobStorage.updateJobAfterRetry(job.getId(), false, nextRunAttempt, nextRunAttemptTime);
jobTracker.onStateChange(job.getId(), JobTracker.JobState.PENDING);

List<Constraint> constraints = Stream.of(jobStorage.getConstraintSpecs(job.getId()))
.map(ConstraintSpec::getFactoryKey)
Expand All @@ -120,6 +125,7 @@ synchronized void onJobFinished(@NonNull Job job) {
@WorkerThread
synchronized void onSuccess(@NonNull Job job) {
jobStorage.deleteJob(job.getId());
jobTracker.onStateChange(job.getId(), JobTracker.JobState.SUCCESS);
notifyAll();
}

Expand All @@ -143,6 +149,7 @@ synchronized void onSuccess(@NonNull Job job) {
all.addAll(dependents);

jobStorage.deleteJobs(Stream.of(all).map(Job::getId).toList());
Stream.of(all).forEach(j -> jobTracker.onStateChange(j.getId(), JobTracker.JobState.FAILURE));

return dependents;
}
Expand Down Expand Up @@ -170,6 +177,7 @@ synchronized void onSuccess(@NonNull Job job) {

jobStorage.updateJobRunningState(job.getId(), true);
runningJobs.add(job.getId());
jobTracker.onStateChange(job.getId(), JobTracker.JobState.RUNNING);

return job;
} catch (InterruptedException e) {
Expand Down Expand Up @@ -253,9 +261,6 @@ private void insertJobChain(@NonNull List<List<Job>> chain) {

@WorkerThread
private @NonNull FullSpec buildFullSpec(@NonNull Job job, @NonNull List<Job> dependsOn) {
String id = UUID.randomUUID().toString();

job.setId(id);
job.setRunAttempt(0);

JobSpec jobSpec = new JobSpec(job.getId(),
Expand Down Expand Up @@ -319,7 +324,6 @@ private void scheduleJobs(@NonNull List<Job> jobs) {
Data data = dataSerializer.deserialize(jobSpec.getSerializedData());
Job job = jobInstantiator.instantiate(jobSpec.getFactoryKey(), parameters, data);

job.setId(jobSpec.getId());
job.setRunAttempt(jobSpec.getRunAttempt());
job.setNextRunAttemptTime(jobSpec.getNextRunAttemptTime());
job.setContext(application);
Expand All @@ -328,7 +332,7 @@ private void scheduleJobs(@NonNull List<Job> jobs) {
}

private @NonNull Job.Parameters buildJobParameters(@NonNull JobSpec jobSpec, @NonNull List<ConstraintSpec> constraintSpecs) {
return new Job.Parameters.Builder()
return new Job.Parameters.Builder(jobSpec.getId())
.setCreateTime(jobSpec.getCreateTime())
.setLifespan(jobSpec.getLifespan())
.setMaxAttempts(jobSpec.getMaxAttempts())
Expand Down
40 changes: 38 additions & 2 deletions src/org/thoughtcrime/securesms/jobmanager/JobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import android.os.Build;
import androidx.annotation.NonNull;
import androidx.annotation.WorkerThread;
import androidx.lifecycle.LiveData;

import org.thoughtcrime.securesms.jobmanager.impl.DefaultExecutorFactory;
import org.thoughtcrime.securesms.jobmanager.impl.JsonDataSerializer;
Expand Down Expand Up @@ -39,18 +40,21 @@ public class JobManager implements ConstraintObserver.Notifier {
private final Configuration configuration;
private final ExecutorService executor;
private final JobController jobController;
private final JobTracker jobTracker;

private final Set<EmptyQueueListener> emptyQueueListeners = new CopyOnWriteArraySet<>();

public JobManager(@NonNull Application application, @NonNull Configuration configuration) {
this.application = application;
this.configuration = configuration;
this.executor = configuration.getExecutorFactory().newSingleThreadExecutor("signal-JobManager");
this.jobTracker = configuration.getJobTracker();
this.jobController = new JobController(application,
configuration.getJobStorage(),
configuration.getJobInstantiator(),
configuration.getConstraintFactories(),
configuration.getDataSerializer(),
configuration.getJobTracker(),
Build.VERSION.SDK_INT < 26 ? new AlarmManagerScheduler(application)
: new CompositeScheduler(new InAppScheduler(this), new JobSchedulerScheduler(application)),
new Debouncer(500),
Expand Down Expand Up @@ -92,6 +96,23 @@ public void beginJobLoop() {
});
}

/**
* Add a listener to subscribe to job state updates. Listeners will be invoked on an arbitrary
* background thread. You must eventually call {@link #removeListener(JobTracker.JobListener)} to avoid
* memory leaks.
*/
public void addListener(@NonNull String id, @NonNull JobTracker.JobListener listener) {
jobTracker.addListener(id, listener);
}

/**
* Unsubscribe the provided listener from all job updates.
*/
public void removeListener(@NonNull JobTracker.JobListener listener) {
jobTracker.removeListener(listener);
}


/**
* Enqueues a single job to be run.
*/
Expand Down Expand Up @@ -160,6 +181,12 @@ void wakeUp() {
}

private void enqueueChain(@NonNull Chain chain) {
for (List<Job> jobList : chain.getJobListChain()) {
for (Job job : jobList) {
jobTracker.onStateChange(job.getId(), JobTracker.JobState.PENDING);
}
}

executor.execute(() -> {
jobController.submitNewJobChain(chain.getJobListChain());
wakeUp();
Expand Down Expand Up @@ -225,6 +252,7 @@ public static class Configuration {
private final Data.Serializer dataSerializer;
private final JobStorage jobStorage;
private final JobMigrator jobMigrator;
private final JobTracker jobTracker;

private Configuration(int jobThreadCount,
@NonNull ExecutorFactory executorFactory,
Expand All @@ -233,7 +261,8 @@ private Configuration(int jobThreadCount,
@NonNull List<ConstraintObserver> constraintObservers,
@NonNull Data.Serializer dataSerializer,
@NonNull JobStorage jobStorage,
@NonNull JobMigrator jobMigrator)
@NonNull JobMigrator jobMigrator,
@NonNull JobTracker jobTracker)
{
this.executorFactory = executorFactory;
this.jobThreadCount = jobThreadCount;
Expand All @@ -243,6 +272,7 @@ private Configuration(int jobThreadCount,
this.dataSerializer = dataSerializer;
this.jobStorage = jobStorage;
this.jobMigrator = jobMigrator;
this.jobTracker = jobTracker;
}

int getJobThreadCount() {
Expand Down Expand Up @@ -278,6 +308,10 @@ ConstraintInstantiator getConstraintFactories() {
return jobMigrator;
}

@NonNull JobTracker getJobTracker() {
return jobTracker;
}

public static class Builder {

private ExecutorFactory executorFactory = new DefaultExecutorFactory();
Expand All @@ -288,6 +322,7 @@ public static class Builder {
private Data.Serializer dataSerializer = new JsonDataSerializer();
private JobStorage jobStorage = null;
private JobMigrator jobMigrator = null;
private JobTracker jobTracker = new JobTracker();

public @NonNull Builder setJobThreadCount(int jobThreadCount) {
this.jobThreadCount = jobThreadCount;
Expand Down Expand Up @@ -337,7 +372,8 @@ public static class Builder {
new ArrayList<>(constraintObservers),
dataSerializer,
jobStorage,
jobMigrator);
jobMigrator,
jobTracker);
}
}
}
Expand Down

0 comments on commit ccb18cd

Please sign in to comment.