Skip to content

Commit

Permalink
Move the Job#onSubmit call to be outside of the JobController lock.
Browse files Browse the repository at this point in the history
  • Loading branch information
greyson-signal committed Mar 6, 2023
1 parent 6e8f3d1 commit 81fc997
Showing 1 changed file with 57 additions and 43 deletions.
Expand Up @@ -80,66 +80,80 @@ synchronized void wakeUp() {
}

@WorkerThread
synchronized void submitNewJobChain(@NonNull List<List<Job>> chain) {
chain = Stream.of(chain).filterNot(List::isEmpty).toList();
void submitNewJobChain(@NonNull List<List<Job>> chain) {
synchronized (this) {
chain = Stream.of(chain).filterNot(List::isEmpty).toList();

if (chain.isEmpty()) {
Log.w(TAG, "Tried to submit an empty job chain. Skipping.");
return;
}
if (chain.isEmpty()) {
Log.w(TAG, "Tried to submit an empty job chain. Skipping.");
return;
}

if (chainExceedsMaximumInstances(chain)) {
Job solo = chain.get(0).get(0);
jobTracker.onStateChange(solo, JobTracker.JobState.IGNORED);
Log.w(TAG, JobLogger.format(solo, "Already at the max instance count. Factory limit: " + solo.getParameters().getMaxInstancesForFactory() + ", Queue limit: " + solo.getParameters().getMaxInstancesForQueue() + ". Skipping."));
return;
if (chainExceedsMaximumInstances(chain)) {
Job solo = chain.get(0).get(0);
jobTracker.onStateChange(solo, JobTracker.JobState.IGNORED);
Log.w(TAG, JobLogger.format(solo, "Already at the max instance count. Factory limit: " + solo.getParameters().getMaxInstancesForFactory() + ", Queue limit: " + solo.getParameters().getMaxInstancesForQueue() + ". Skipping."));
return;
}

insertJobChain(chain);
scheduleJobs(chain.get(0));
}

insertJobChain(chain);
// We have no control over what happens in jobs' onSubmit method, so we drop our lock to reduce the possibility of a deadlock
triggerOnSubmit(chain);
notifyAll();
scheduleJobs(chain.get(0));

synchronized (this) {
notifyAll();
}
}

@WorkerThread
synchronized void submitJobWithExistingDependencies(@NonNull Job job, @NonNull Collection<String> dependsOn, @Nullable String dependsOnQueue) {
void submitJobWithExistingDependencies(@NonNull Job job, @NonNull Collection<String> dependsOn, @Nullable String dependsOnQueue) {
List<List<Job>> chain = Collections.singletonList(Collections.singletonList(job));

if (chainExceedsMaximumInstances(chain)) {
jobTracker.onStateChange(job, JobTracker.JobState.IGNORED);
Log.w(TAG, JobLogger.format(job, "Already at the max instance count. Factory limit: " + job.getParameters().getMaxInstancesForFactory() + ", Queue limit: " + job.getParameters().getMaxInstancesForQueue() + ". Skipping."));
return;
}
synchronized (this) {
if (chainExceedsMaximumInstances(chain)) {
jobTracker.onStateChange(job, JobTracker.JobState.IGNORED);
Log.w(TAG, JobLogger.format(job, "Already at the max instance count. Factory limit: " + job.getParameters().getMaxInstancesForFactory() + ", Queue limit: " + job.getParameters().getMaxInstancesForQueue() + ". Skipping."));
return;
}

Set<String> allDependsOn = new HashSet<>(dependsOn);
Set<String> aliveDependsOn = Stream.of(dependsOn)
.filter(id -> jobStorage.getJobSpec(id) != null)
.collect(Collectors.toSet());
Set<String> allDependsOn = new HashSet<>(dependsOn);
Set<String> aliveDependsOn = Stream.of(dependsOn)
.filter(id -> jobStorage.getJobSpec(id) != null)
.collect(Collectors.toSet());

if (dependsOnQueue != null) {
List<String> inQueue = Stream.of(jobStorage.getJobsInQueue(dependsOnQueue))
.map(JobSpec::getId)
.toList();
if (dependsOnQueue != null) {
List<String> inQueue = Stream.of(jobStorage.getJobsInQueue(dependsOnQueue))
.map(JobSpec::getId)
.toList();

allDependsOn.addAll(inQueue);
aliveDependsOn.addAll(inQueue);
}
allDependsOn.addAll(inQueue);
aliveDependsOn.addAll(inQueue);
}

if (jobTracker.haveAnyFailed(allDependsOn)) {
Log.w(TAG, "This job depends on a job that failed! Failing this job immediately.");
List<Job> dependents = onFailure(job);
job.setContext(application);
job.onFailure();
Stream.of(dependents).forEach(Job::onFailure);
return;
}
if (jobTracker.haveAnyFailed(allDependsOn)) {
Log.w(TAG, "This job depends on a job that failed! Failing this job immediately.");
List<Job> dependents = onFailure(job);
job.setContext(application);
job.onFailure();
Stream.of(dependents).forEach(Job::onFailure);
return;
}

FullSpec fullSpec = buildFullSpec(job, aliveDependsOn);
jobStorage.insertJobs(Collections.singletonList(fullSpec));

FullSpec fullSpec = buildFullSpec(job, aliveDependsOn);
jobStorage.insertJobs(Collections.singletonList(fullSpec));
scheduleJobs(Collections.singletonList(job));
}

scheduleJobs(Collections.singletonList(job));
// We have no control over what happens in jobs' onSubmit method, so we drop our lock to reduce the possibility of a deadlock
triggerOnSubmit(chain);
notifyAll();

synchronized (this) {
notifyAll();
}
}

@WorkerThread
Expand Down

0 comments on commit 81fc997

Please sign in to comment.