Skip to content

Commit

Permalink
[FLINK-8942][runtime] Pass heartbeat target ResourceID
Browse files Browse the repository at this point in the history
received payload field now volatile

Add HeartbeatMonitor#getHeartbeatTargetId

This closes apache#5699.
  • Loading branch information
zentol authored and sampath s committed Jul 26, 2018
1 parent 3922e80 commit d5a51db
Show file tree
Hide file tree
Showing 7 changed files with 177 additions and 15 deletions.
Expand Up @@ -57,7 +57,8 @@ public interface HeartbeatListener<I, O> {
* Retrieves the payload value for the next heartbeat message. Since the operation can happen
* asynchronously, the result is returned wrapped in a future.
*
* @param resourceID Resource ID identifying the receiver of the payload
* @return Future containing the next payload for heartbeats
*/
CompletableFuture<O> retrievePayload();
CompletableFuture<O> retrievePayload(ResourceID resourceID);
}
Expand Up @@ -106,7 +106,7 @@ HeartbeatListener<I, O> getHeartbeatListener() {
return heartbeatListener;
}

Collection<HeartbeatManagerImpl.HeartbeatMonitor<O>> getHeartbeatTargets() {
Collection<HeartbeatMonitor<O>> getHeartbeatTargets() {
return heartbeatTargets.values();
}

Expand Down Expand Up @@ -202,7 +202,7 @@ public void requestHeartbeat(final ResourceID requestOrigin, I heartbeatPayload)
heartbeatListener.reportPayload(requestOrigin, heartbeatPayload);
}

CompletableFuture<O> futurePayload = heartbeatListener.retrievePayload();
CompletableFuture<O> futurePayload = heartbeatListener.retrievePayload(requestOrigin);

if (futurePayload != null) {
CompletableFuture<Void> sendHeartbeatFuture = futurePayload.thenAcceptAsync(
Expand Down Expand Up @@ -289,6 +289,10 @@ HeartbeatTarget<O> getHeartbeatTarget() {
return heartbeatTarget;
}

ResourceID getHeartbeatTargetId() {
return resourceID;
}

public long getLastHeartbeat() {
return lastHeartbeat;
}
Expand Down
Expand Up @@ -63,7 +63,7 @@ public void run() {
if (!stopped) {
log.debug("Trigger heartbeat request.");
for (HeartbeatMonitor<O> heartbeatMonitor : getHeartbeatTargets()) {
CompletableFuture<O> futurePayload = getHeartbeatListener().retrievePayload();
CompletableFuture<O> futurePayload = getHeartbeatListener().retrievePayload(heartbeatMonitor.getHeartbeatTargetId());
final HeartbeatTarget<O> heartbeatTarget = heartbeatMonitor.getHeartbeatTarget();

if (futurePayload != null) {
Expand Down
Expand Up @@ -1527,7 +1527,7 @@ public void reportPayload(ResourceID resourceID, Void payload) {
}

@Override
public CompletableFuture<Void> retrievePayload() {
public CompletableFuture<Void> retrievePayload(ResourceID resourceID) {
return CompletableFuture.completedFuture(null);
}
}
Expand All @@ -1551,7 +1551,7 @@ public void reportPayload(ResourceID resourceID, Void payload) {
}

@Override
public CompletableFuture<Void> retrievePayload() {
public CompletableFuture<Void> retrievePayload(ResourceID resourceID) {
return CompletableFuture.completedFuture(null);
}
}
Expand Down
Expand Up @@ -1076,7 +1076,7 @@ public void run() {
}

@Override
public CompletableFuture<Void> retrievePayload() {
public CompletableFuture<Void> retrievePayload(ResourceID resourceID) {
return CompletableFuture.completedFuture(null);
}
}
Expand Down Expand Up @@ -1109,7 +1109,7 @@ public void reportPayload(ResourceID resourceID, Void payload) {
}

@Override
public CompletableFuture<Void> retrievePayload() {
public CompletableFuture<Void> retrievePayload(ResourceID resourceID) {
return CompletableFuture.completedFuture(null);
}
}
Expand Down
Expand Up @@ -1515,7 +1515,7 @@ public void reportPayload(ResourceID resourceID, Void payload) {
}

@Override
public CompletableFuture<Void> retrievePayload() {
public CompletableFuture<Void> retrievePayload(ResourceID resourceID) {
return CompletableFuture.completedFuture(null);
}
}
Expand Down Expand Up @@ -1544,7 +1544,7 @@ public void reportPayload(ResourceID resourceID, Void payload) {
}

@Override
public CompletableFuture<SlotReport> retrievePayload() {
public CompletableFuture<SlotReport> retrievePayload(ResourceID resourceID) {
return callAsync(
() -> taskSlotTable.createSlotReport(getResourceID()),
taskManagerConfiguration.getTimeout());
Expand Down
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.heartbeat;

import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
Expand Down Expand Up @@ -75,7 +76,7 @@ public void testRegularHeartbeat() {

Object expectedObject = new Object();

when(heartbeatListener.retrievePayload()).thenReturn(CompletableFuture.completedFuture(expectedObject));
when(heartbeatListener.retrievePayload(any(ResourceID.class))).thenReturn(CompletableFuture.completedFuture(expectedObject));

HeartbeatManagerImpl<Object, Object> heartbeatManager = new HeartbeatManagerImpl<>(
heartbeatTimeout,
Expand All @@ -93,7 +94,7 @@ public void testRegularHeartbeat() {
heartbeatManager.requestHeartbeat(targetResourceID, expectedObject);

verify(heartbeatListener, times(1)).reportPayload(targetResourceID, expectedObject);
verify(heartbeatListener, times(1)).retrievePayload();
verify(heartbeatListener, times(1)).retrievePayload(any(ResourceID.class));
verify(heartbeatTarget, times(1)).receiveHeartbeat(ownResourceID, expectedObject);

heartbeatManager.receiveHeartbeat(targetResourceID, expectedObject);
Expand All @@ -118,7 +119,7 @@ public void testHeartbeatMonitorUpdate() {

Object expectedObject = new Object();

when(heartbeatListener.retrievePayload()).thenReturn(CompletableFuture.completedFuture(expectedObject));
when(heartbeatListener.retrievePayload(any(ResourceID.class))).thenReturn(CompletableFuture.completedFuture(expectedObject));

HeartbeatManagerImpl<Object, Object> heartbeatManager = new HeartbeatManagerImpl<>(
heartbeatTimeout,
Expand Down Expand Up @@ -207,7 +208,7 @@ public void testHeartbeatCluster() throws Exception {
@SuppressWarnings("unchecked")
HeartbeatListener<Object, Object> heartbeatListener = mock(HeartbeatListener.class);

when(heartbeatListener.retrievePayload()).thenReturn(CompletableFuture.completedFuture(object));
when(heartbeatListener.retrievePayload(any(ResourceID.class))).thenReturn(CompletableFuture.completedFuture(object));

TestingHeartbeatListener heartbeatListener2 = new TestingHeartbeatListener(object2);

Expand Down Expand Up @@ -347,6 +348,162 @@ public void testLastHeartbeatFrom() {
}
}

/**
* Tests that the heartbeat target {@link ResourceID} is properly passed to the {@link HeartbeatListener} by the
* {@link HeartbeatManagerImpl}.
*/
@Test
public void testHeartbeatManagerTargetPayload() {
final long heartbeatTimeout = 100L;

final ResourceID someTargetId = ResourceID.generate();
final ResourceID specialTargetId = ResourceID.generate();
final TargetDependentHeartbeatReceiver someHeartbeatTarget = new TargetDependentHeartbeatReceiver();
final TargetDependentHeartbeatReceiver specialHeartbeatTarget = new TargetDependentHeartbeatReceiver();

final int defaultResponse = 0;
final int specialResponse = 1;

HeartbeatManager<?, Integer> heartbeatManager = new HeartbeatManagerImpl<>(
heartbeatTimeout,
ResourceID.generate(),
new TargetDependentHeartbeatSender(specialTargetId, specialResponse, defaultResponse),
Executors.directExecutor(),
mock(ScheduledExecutor.class),
LOG);

try {
heartbeatManager.monitorTarget(someTargetId, someHeartbeatTarget);
heartbeatManager.monitorTarget(specialTargetId, specialHeartbeatTarget);

heartbeatManager.requestHeartbeat(someTargetId, null);
assertEquals(defaultResponse, someHeartbeatTarget.getLastReceivedHeartbeatPayload());

heartbeatManager.requestHeartbeat(specialTargetId, null);
assertEquals(specialResponse, specialHeartbeatTarget.getLastReceivedHeartbeatPayload());
} finally {
heartbeatManager.stop();
}
}

/**
* Tests that the heartbeat target {@link ResourceID} is properly passed to the {@link HeartbeatListener} by the
* {@link HeartbeatManagerSenderImpl}.
*/
@Test
public void testHeartbeatManagerSenderTargetPayload() throws Exception {
final long heartbeatTimeout = 100L;
final long heartbeatPeriod = 2000L;

final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);

final ResourceID someTargetId = ResourceID.generate();
final ResourceID specialTargetId = ResourceID.generate();

final OneShotLatch someTargetReceivedLatch = new OneShotLatch();
final OneShotLatch specialTargetReceivedLatch = new OneShotLatch();

final TargetDependentHeartbeatReceiver someHeartbeatTarget = new TargetDependentHeartbeatReceiver(someTargetReceivedLatch);
final TargetDependentHeartbeatReceiver specialHeartbeatTarget = new TargetDependentHeartbeatReceiver(specialTargetReceivedLatch);

final int defaultResponse = 0;
final int specialResponse = 1;

HeartbeatManager<?, Integer> heartbeatManager = new HeartbeatManagerSenderImpl<>(
heartbeatPeriod,
heartbeatTimeout,
ResourceID.generate(),
new TargetDependentHeartbeatSender(specialTargetId, specialResponse, defaultResponse),
Executors.directExecutor(),
new ScheduledExecutorServiceAdapter(scheduledThreadPoolExecutor),
LOG);

try {
heartbeatManager.monitorTarget(someTargetId, someHeartbeatTarget);
heartbeatManager.monitorTarget(specialTargetId, specialHeartbeatTarget);

someTargetReceivedLatch.await(5, TimeUnit.SECONDS);
specialTargetReceivedLatch.await(5, TimeUnit.SECONDS);

assertEquals(defaultResponse, someHeartbeatTarget.getLastRequestedHeartbeatPayload());
assertEquals(specialResponse, specialHeartbeatTarget.getLastRequestedHeartbeatPayload());
} finally {
heartbeatManager.stop();
scheduledThreadPoolExecutor.shutdown();
}
}

/**
* Test {@link HeartbeatTarget} that exposes the last received payload.
*/
private static class TargetDependentHeartbeatReceiver implements HeartbeatTarget<Integer> {

private volatile int lastReceivedHeartbeatPayload = -1;
private volatile int lastRequestedHeartbeatPayload = -1;

private final OneShotLatch latch;

public TargetDependentHeartbeatReceiver() {
this(new OneShotLatch());
}

public TargetDependentHeartbeatReceiver(OneShotLatch latch) {
this.latch = latch;
}

@Override
public void receiveHeartbeat(ResourceID heartbeatOrigin, Integer heartbeatPayload) {
this.lastReceivedHeartbeatPayload = heartbeatPayload;
latch.trigger();
}

@Override
public void requestHeartbeat(ResourceID requestOrigin, Integer heartbeatPayload) {
this.lastRequestedHeartbeatPayload = heartbeatPayload;
latch.trigger();
}

public int getLastReceivedHeartbeatPayload() {
return lastReceivedHeartbeatPayload;
}

public int getLastRequestedHeartbeatPayload() {
return lastRequestedHeartbeatPayload;
}
}

/**
* Test {@link HeartbeatListener} that returns different payloads based on the target {@link ResourceID}.
*/
private static class TargetDependentHeartbeatSender implements HeartbeatListener<Object, Integer> {
private final ResourceID specialId;
private final int specialResponse;
private final int defaultResponse;

TargetDependentHeartbeatSender(ResourceID specialId, int specialResponse, int defaultResponse) {
this.specialId = specialId;
this.specialResponse = specialResponse;
this.defaultResponse = defaultResponse;
}

@Override
public void notifyHeartbeatTimeout(ResourceID resourceID) {
}

@Override
public void reportPayload(ResourceID resourceID, Object payload) {
}

@Override
public CompletableFuture<Integer> retrievePayload(ResourceID resourceID) {
if (resourceID.equals(specialId)) {
return CompletableFuture.completedFuture(specialResponse);
} else {
return CompletableFuture.completedFuture(defaultResponse);
}
}
}

static class TestingHeartbeatListener implements HeartbeatListener<Object, Object> {

private final CompletableFuture<ResourceID> future = new CompletableFuture<>();
Expand Down Expand Up @@ -378,7 +535,7 @@ public void reportPayload(ResourceID resourceID, Object payload) {
}

@Override
public CompletableFuture<Object> retrievePayload() {
public CompletableFuture<Object> retrievePayload(ResourceID resourceID) {
return CompletableFuture.completedFuture(payload);
}
}
Expand Down

0 comments on commit d5a51db

Please sign in to comment.