Skip to content

Commit

Permalink
[FLINK-8957][tests] Port JMXJobManagerMetricTest to flip6
Browse files Browse the repository at this point in the history
This closes apache#5720.
  • Loading branch information
zentol committed Mar 23, 2018
1 parent d2f1155 commit 8588bb4
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 31 deletions.
6 changes: 6 additions & 0 deletions flink-metrics/flink-metrics-jmx/pom.xml
Expand Up @@ -85,5 +85,11 @@ under the License.
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Expand Up @@ -18,63 +18,72 @@

package org.apache.flink.runtime.jobmanager;

import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.metrics.jmx.JMXReporter;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.test.util.MiniClusterResource;

import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

import javax.management.MBeanServer;
import javax.management.ObjectName;

import java.lang.management.ManagementFactory;
import java.time.Duration;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;

import static org.junit.Assert.assertEquals;

/**
* Tests to verify JMX reporter functionality on the JobManager.
*/
public class JMXJobManagerMetricTest {

/**
* Tests that metrics registered on the JobManager are actually accessible via JMX.
*/
@Test
public void testJobManagerJMXMetricAccess() throws Exception {
Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow();
@ClassRule
public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
new MiniClusterResource.MiniClusterResourceConfiguration(
getConfiguration(),
1,
1),
true);

private static Configuration getConfiguration() {
Configuration flinkConfiguration = new Configuration();

flinkConfiguration.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
flinkConfiguration.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.port", "9060-9075");

flinkConfiguration.setString(MetricOptions.SCOPE_NAMING_JM_JOB, "jobmanager.<job_name>");

TestingCluster flink = new TestingCluster(flinkConfiguration);
return flinkConfiguration;
}

try {
flink.start();
/**
* Tests that metrics registered on the JobManager are actually accessible via JMX.
*/
@Test
public void testJobManagerJMXMetricAccess() throws Exception {
Deadline deadline = Deadline.now().plus(Duration.ofMinutes(2));

try {
JobVertex sourceJobVertex = new JobVertex("Source");
sourceJobVertex.setInvokableClass(BlockingInvokable.class);

Expand All @@ -92,28 +101,26 @@ public void testJobManagerJMXMetricAccess() throws Exception {
true),
null));

flink.waitForActorsToBeAlive();

flink.submitJobDetached(jobGraph);
ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient();
client.setDetached(true);
client.submitJob(jobGraph, JMXJobManagerMetricTest.class.getClassLoader());

Future<Object> jobRunning = flink.getLeaderGateway(deadline.timeLeft())
.ask(new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID()), deadline.timeLeft());
Await.ready(jobRunning, deadline.timeLeft());
FutureUtils.retrySuccesfulWithDelay(
() -> client.getJobStatus(jobGraph.getJobID()),
Time.milliseconds(10),
deadline,
status -> status == JobStatus.RUNNING,
TestingUtils.defaultScheduledExecutor()
).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);

MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
Set<ObjectName> nameSet = mBeanServer.queryNames(new ObjectName("org.apache.flink.jobmanager.job.lastCheckpointSize:job_name=TestingJob,*"), null);
Assert.assertEquals(1, nameSet.size());
assertEquals(-1L, mBeanServer.getAttribute(nameSet.iterator().next(), "Value"));

Future<Object> jobFinished = flink.getLeaderGateway(deadline.timeLeft())
.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()), deadline.timeLeft());

BlockingInvokable.unblock();

// wait til the job has finished
Await.ready(jobFinished, deadline.timeLeft());
} finally {
flink.stop();
BlockingInvokable.unblock();
}
}

Expand Down

0 comments on commit 8588bb4

Please sign in to comment.