Skip to content

Commit

Permalink
[FLINK-14316] Properly manage rpcConnection in JobManagerLeaderListen…
Browse files Browse the repository at this point in the history
…er under leader change

This commit changes how the rpcConnection is managed in JobManagerLeaderListener under leader change.
This component clears now the fields rpcConnection and currentJobMasterId if the leader loses leadership.
Moreover, it only restarts a connection attempt if the leader session id is new.

This closes apache#11603.
  • Loading branch information
tillrohrmann authored and leonardBang committed Apr 10, 2020
1 parent 78d3aa3 commit 4451cc3
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 52 deletions.
Expand Up @@ -271,9 +271,7 @@ public void stop() {
if (!stopped) {
stopped = true;

if (rpcConnection != null) {
rpcConnection.close();
}
closeRpcConnection();
}
}
}
Expand Down Expand Up @@ -310,37 +308,16 @@ public void notifyLeaderAddress(@Nullable final String leaderAddress, @Nullable

if (leaderAddress == null || leaderAddress.isEmpty()) {
// the leader lost leadership but there is no other leader yet.
if (rpcConnection != null) {
rpcConnection.close();
rpcConnection = null;
}

jobManagerLostLeadership = Optional.ofNullable(currentJobMasterId);
currentJobMasterId = jobMasterId;
closeRpcConnection();
} else {
currentJobMasterId = jobMasterId;

if (rpcConnection != null) {
// check if we are already trying to connect to this leader
if (!Objects.equals(jobMasterId, rpcConnection.getTargetLeaderId())) {
rpcConnection.close();

rpcConnection = new JobManagerRegisteredRpcConnection(
LOG,
leaderAddress,
jobMasterId,
rpcService.getExecutor());
}
// check whether we are already connecting to this leader
if (Objects.equals(jobMasterId, currentJobMasterId)) {
LOG.debug("Ongoing attempt to connect to leader of job {}. Ignoring duplicate leader information.", jobId);
} else {
rpcConnection = new JobManagerRegisteredRpcConnection(
LOG,
leaderAddress,
jobMasterId,
rpcService.getExecutor());
closeRpcConnection();
openRpcConnectionTo(leaderAddress, jobMasterId);
}

LOG.info("Try to register at job manager {} with leader id {}.", leaderAddress, leaderId);
rpcConnection.start();
}
}
}
Expand All @@ -349,6 +326,32 @@ public void notifyLeaderAddress(@Nullable final String leaderAddress, @Nullable
jobManagerLostLeadership.ifPresent(oldJobMasterId -> jobLeaderListener.jobManagerLostLeadership(jobId, oldJobMasterId));
}

@GuardedBy("lock")
private void openRpcConnectionTo(String leaderAddress, JobMasterId jobMasterId) {
Preconditions.checkState(
currentJobMasterId == null && rpcConnection == null,
"Cannot open a new rpc connection if the previous connection has not been closed.");

currentJobMasterId = jobMasterId;
rpcConnection = new JobManagerRegisteredRpcConnection(
LOG,
leaderAddress,
jobMasterId,
rpcService.getExecutor());

LOG.info("Try to register at job manager {} with leader id {}.", leaderAddress, jobMasterId.toUUID());
rpcConnection.start();
}

@GuardedBy("lock")
private void closeRpcConnection() {
if (rpcConnection != null) {
rpcConnection.close();
rpcConnection = null;
currentJobMasterId = null;
}
}

@Override
public void handleError(Exception exception) {
if (stopped) {
Expand Down
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.api.common.JobID;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServicesBuilder;
Expand All @@ -41,6 +42,10 @@
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;

import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;

/**
* Tests for the {@link JobLeaderService}.
Expand Down Expand Up @@ -106,34 +111,25 @@ public void go() throws Exception {
*/
@Test
public void doesNotReconnectAfterTargetLostLeadership() throws Exception {
final JobLeaderService jobLeaderService = new JobLeaderService(
new LocalUnresolvedTaskManagerLocation(),
RetryingRegistrationConfiguration.defaultConfiguration());

final JobID jobId = new JobID();

final SettableLeaderRetrievalService leaderRetrievalService = new SettableLeaderRetrievalService();
final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServicesBuilder()
.setJobMasterLeaderRetrieverFunction(ignored -> leaderRetrievalService)
.build();
final TestingJobMasterGateway jobMasterGateway = registerJobMaster();

final String jmAddress = "foobar";
final TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder().build();
rpcServiceResource.getTestingRpcService().registerGateway(jmAddress, jobMasterGateway);
final OneShotLatch jobManagerGainedLeadership = new OneShotLatch();
final TestingJobLeaderListener testingJobLeaderListener = new TestingJobLeaderListener(ignored -> jobManagerGainedLeadership.trigger());

final TestingJobLeaderListener testingJobLeaderListener = new TestingJobLeaderListener();
jobLeaderService.start(
"foobar",
rpcServiceResource.getTestingRpcService(),
haServices,
testingJobLeaderListener);
final JobLeaderService jobLeaderService = createAndStartJobLeaderService(haServices, testingJobLeaderListener);

try {
jobLeaderService.addJob(jobId, jmAddress);
jobLeaderService.addJob(jobId, jobMasterGateway.getAddress());

leaderRetrievalService.notifyListener(jmAddress, UUID.randomUUID());
leaderRetrievalService.notifyListener(jobMasterGateway.getAddress(), UUID.randomUUID());

testingJobLeaderListener.waitUntilJobManagerGainedLeadership();
jobManagerGainedLeadership.await();

// revoke the leadership
leaderRetrievalService.notifyListener(null, null);
Expand All @@ -145,14 +141,86 @@ public void doesNotReconnectAfterTargetLostLeadership() throws Exception {
}
}

/**
* Tests that the JobLeaderService can reconnect to an old leader which seemed
* to have lost the leadership in between. See FLINK-14316.
*/
@Test
public void canReconnectToOldLeaderWithSameLeaderAddress() throws Exception {
final JobID jobId = new JobID();

final SettableLeaderRetrievalService leaderRetrievalService = new SettableLeaderRetrievalService();
final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServicesBuilder()
.setJobMasterLeaderRetrieverFunction(ignored -> leaderRetrievalService)
.build();

final TestingJobMasterGateway jobMasterGateway = registerJobMaster();

final BlockingQueue<JobID> leadershipQueue = new ArrayBlockingQueue<>(1);
final TestingJobLeaderListener testingJobLeaderListener = new TestingJobLeaderListener(leadershipQueue::offer);

final JobLeaderService jobLeaderService = createAndStartJobLeaderService(haServices, testingJobLeaderListener);

try {
jobLeaderService.addJob(jobId, jobMasterGateway.getAddress());

final UUID leaderSessionId = UUID.randomUUID();
leaderRetrievalService.notifyListener(jobMasterGateway.getAddress(), leaderSessionId);

// wait for the first leadership
assertThat(leadershipQueue.take(), is(jobId));

// revoke the leadership
leaderRetrievalService.notifyListener(null, null);

testingJobLeaderListener.waitUntilJobManagerLostLeadership();

leaderRetrievalService.notifyListener(jobMasterGateway.getAddress(), leaderSessionId);

// check that we obtain the leadership a second time
assertThat(leadershipQueue.take(), is(jobId));
} finally {
jobLeaderService.stop();
}
}

private JobLeaderService createAndStartJobLeaderService(HighAvailabilityServices haServices, JobLeaderListener testingJobLeaderListener) {
final JobLeaderService jobLeaderService = new JobLeaderService(
new LocalUnresolvedTaskManagerLocation(),
RetryingRegistrationConfiguration.defaultConfiguration());

jobLeaderService.start(
"foobar",
rpcServiceResource.getTestingRpcService(),
haServices,
testingJobLeaderListener);
return jobLeaderService;
}

private TestingJobMasterGateway registerJobMaster() {
final TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder().build();
rpcServiceResource.getTestingRpcService().registerGateway(jobMasterGateway.getAddress(), jobMasterGateway);

return jobMasterGateway;
}

private static final class TestingJobLeaderListener implements JobLeaderListener {

private final CountDownLatch jobManagerGainedLeadership = new CountDownLatch(1);
private final CountDownLatch jobManagerLostLeadership = new CountDownLatch(1);

private final Consumer<JobID> jobManagerGainedLeadership;

private TestingJobLeaderListener() {
this(ignored -> {});
}

private TestingJobLeaderListener(Consumer<JobID> jobManagerGainedLeadership) {
this.jobManagerGainedLeadership = jobManagerGainedLeadership;
}

@Override
public void jobManagerGainedLeadership(JobID jobId, JobMasterGateway jobManagerGateway, JMTMRegistrationSuccess registrationMessage) {
jobManagerGainedLeadership.countDown();
jobManagerGainedLeadership.accept(jobId);
}

@Override
Expand All @@ -165,10 +233,6 @@ public void handleError(Throwable throwable) {
// ignored
}

private void waitUntilJobManagerGainedLeadership() throws InterruptedException {
jobManagerGainedLeadership.await();
}

private void waitUntilJobManagerLostLeadership() throws InterruptedException {
jobManagerLostLeadership.await();
}
Expand Down

0 comments on commit 4451cc3

Please sign in to comment.