Skip to content

Commit

Permalink
[FLINK-8176][flip6] Start SubmittedJobGraphStore in Dispatcher
Browse files Browse the repository at this point in the history
Implement SubmittedJobGraphListener interface in Dispatcher

Call start() on SubmittedJobGraphStore with Dispatcher as listener. To enable
this, the dispatcher must implement the SubmittedJobGraphListener interface. Add
simple unit tests for the new methods. Refactor DispatcherTest to remove
redundancy.

[FLINK-8176][flip6] Make InMemorySubmittedJobGraphStore thread-safe

[FLINK-8176][flip6] Add method isStarted() to TestingLeaderElectionService

[FLINK-8176][flip6] Return same RunningJobsRegistry instance from TestingHighAvailabilityServices

[FLINK-8176][flip6] Fix race conditions in Dispatcher and DispatcherTest

Check if jobManagerRunner exists before submitting job.
Replace JobManagerRunner mock used in tests with real instance.
Do not run job graph recovery in actor main thread when job graph is recovered
from SubmittedJobGraphListener#onAddedJobGraph(JobID).

[FLINK-8176][flip6] Rename variables in DispatcherTest

[FLINK-8176][flip6] Remove injectMocks in DispatcherTest

[FLINK-8176][flip6] Update Dispatcher's SubmittedJobGraphListener callbacks

Always attempt the job submission if onAddedJobGraph or onRemovedJobGraph are
called. The checks in submitJob and removeJob are sufficient.

This closes apache#5107.
  • Loading branch information
GJL authored and tillrohrmann committed Dec 11, 2017
1 parent 1cfc6a3 commit 08e18af
Show file tree
Hide file tree
Showing 5 changed files with 235 additions and 100 deletions.
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.dispatcher;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
Expand Down Expand Up @@ -63,6 +64,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
Expand All @@ -73,7 +75,8 @@
* the jobs and to recover them in case of a master failure. Furthermore, it knows
* about the state of the Flink session cluster.
*/
public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> implements DispatcherGateway, LeaderContender {
public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> implements
DispatcherGateway, LeaderContender, SubmittedJobGraphStore.SubmittedJobGraphListener {

public static final String DISPATCHER_NAME = "dispatcher";

Expand Down Expand Up @@ -173,6 +176,7 @@ public void postStop() throws Exception {
public void start() throws Exception {
super.start();

submittedJobGraphStore.start(this);
leaderElectionService.start(this);
}

Expand All @@ -197,7 +201,8 @@ public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout)
new JobSubmissionException(jobId, "Could not retrieve the job status.", e));
}

if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.PENDING) {
if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.PENDING &&
!jobManagerRunners.containsKey(jobId)) {
try {
submittedJobGraphStore.putJobGraph(new SubmittedJobGraph(jobGraph, null));
} catch (Exception e) {
Expand Down Expand Up @@ -248,7 +253,8 @@ public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout)

@Override
public CompletableFuture<Collection<JobID>> listJobs(Time timeout) {
return CompletableFuture.completedFuture(jobManagerRunners.keySet());
return CompletableFuture.completedFuture(
Collections.unmodifiableSet(new HashSet<>(jobManagerRunners.keySet())));
}

@Override
Expand Down Expand Up @@ -399,7 +405,8 @@ private void clearState() throws Exception {
/**
* Recovers all jobs persisted via the submitted job graph store.
*/
private void recoverJobs() {
@VisibleForTesting
void recoverJobs() {
log.info("Recovering all persisted jobs.");

getRpcService().execute(
Expand Down Expand Up @@ -507,6 +514,37 @@ public void handleError(final Exception exception) {
onFatalError(new DispatcherException("Received an error from the LeaderElectionService.", exception));
}

//------------------------------------------------------
// SubmittedJobGraphListener
//------------------------------------------------------

@Override
public void onAddedJobGraph(final JobID jobId) {
getRpcService().execute(() -> {
final SubmittedJobGraph submittedJobGraph;
try {
submittedJobGraph = submittedJobGraphStore.recoverJobGraph(jobId);
} catch (final Exception e) {
log.error("Could not recover job graph for job {}.", jobId, e);
return;
}
runAsync(() -> {
submitJob(submittedJobGraph.getJobGraph(), RpcUtils.INF_TIMEOUT);
});
});
}

@Override
public void onRemovedJobGraph(final JobID jobId) {
runAsync(() -> {
try {
removeJob(jobId, false);
} catch (final Exception e) {
log.error("Could not remove job {}.", jobId, e);
}
});
}

//------------------------------------------------------
// Utility classes
//------------------------------------------------------
Expand Down

0 comments on commit 08e18af

Please sign in to comment.