Skip to content

Commit

Permalink
[FLINK-1634] Fix "Could not build up connection to JobManager" issue …
Browse files Browse the repository at this point in the history
…on some systems
  • Loading branch information
viduranga committed Mar 3, 2015
1 parent ce0c290 commit b7da22a
Show file tree
Hide file tree
Showing 31 changed files with 64 additions and 46 deletions.
10 changes: 5 additions & 5 deletions docs/streaming_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public class StreamingWordCount {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("localhost", 9999)
.socketTextStream(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE, 9999)
.flatMap(new Splitter())
.groupBy(0)
.sum(1);
Expand Down Expand Up @@ -718,7 +718,7 @@ Example:
~~~java
DataStream<String> stream = env
.addSource(new FlumeSource<String>("localhost", 41414, new SimpleStringSchema()))
.addSource(new FlumeSource<String>(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE, 41414, new SimpleStringSchema()))
.print();
~~~
Expand All @@ -734,7 +734,7 @@ The followings have to be provided for the `FlumeSink(…)` constructor in order
Example:
~~~java
stream.addSink(new FlumeSink<String>("localhost", 42424, new StringToByteSerializer()));
stream.addSink(new FlumeSink<String>(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE, 42424, new StringToByteSerializer()));
~~~
##### Configuration file<a name="config_file"></a>
Expand Down Expand Up @@ -791,7 +791,7 @@ Example:
~~~java
DataStream<String> stream = env
.addSource(new RMQSource<String>("localhost", "hello", new SimpleStringSchema()))
.addSource(new RMQSource<String>(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE, "hello", new SimpleStringSchema()))
.print();
~~~
Expand All @@ -807,7 +807,7 @@ The followings have to be provided for the `RMQSink(…)` constructor in order:
Example:
~~~java
stream.addSink(new RMQSink<String>("localhost", "hello", new StringToByteSerializer()));
stream.addSink(new RMQSink<String>(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE, "hello", new StringToByteSerializer()));
~~~
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void setUp() throws Exception {

final int freePort = NetUtils.getAvailablePort();
config = new Configuration();
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE);
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, freePort);
config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT);

Expand All @@ -108,7 +108,7 @@ public void setUp() throws Exception {
when(generatorMock.compileJobGraph(optimizedPlanMock)).thenReturn(jobGraph);

try {
Tuple2<String, Object> address = new Tuple2<String, Object>("localhost", freePort);
Tuple2<String, Object> address = new Tuple2<String, Object>(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE, freePort);
jobManagerSystem = AkkaUtils.createActorSystem(config, new Some<Tuple2<String, Object>>(address));
}
catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ public final class ConfigConstants {
*/
public static final String JOB_MANAGER_IPC_ADDRESS_KEY = "jobmanager.rpc.address";

/**
* The config parameter defining the value of the JOB_MANAGER_IPC_ADDRESS
*/
public static final String JOB_MANAGER_IPC_ADDRESS_VALUE = GlobalConfiguration.getString(JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");

/**
* The config parameter defining the network port to connect to
* for communication with the job manager.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.io.ObjectOutputStream;
import java.io.OutputStreamWriter;

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
Expand Down Expand Up @@ -184,7 +185,7 @@ private FileInputSplit createTempFile(String contents) throws IOException {
wrt.write(contents);
wrt.close();

return new FileInputSplit(0, new Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(), new String[] {"localhost"});
return new FileInputSplit(0, new Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(), new String[] {ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE});
}

protected static final class MyTextInputFormat extends org.apache.flink.api.common.io.DelimitedInputFormat<Record> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.io.IOException;
import java.util.Arrays;

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
Expand Down Expand Up @@ -582,7 +583,7 @@ private FileInputSplit createTempFile(String content) throws IOException {
dos.writeBytes(content);
dos.close();

return new FileInputSplit(0, new Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(), new String[] {"localhost"});
return new FileInputSplit(0, new Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(), new String[] {ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE});
}

private final Value[] createIntValues(int num) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.LocatableInputSplit;

import org.junit.Test;


Expand All @@ -41,7 +41,7 @@ public void testSerialSplitAssignmentWithNullHost() {
try {
final int NUM_SPLITS = 50;
final String[][] hosts = new String[][] {
new String[] { "localhost" },
new String[] { ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE },
new String[0],
null
};
Expand Down Expand Up @@ -298,7 +298,7 @@ public void testConcurrentSplitAssignmentNullHost() {
final int SUM_OF_IDS = (NUM_SPLITS-1) * (NUM_SPLITS) / 2;

final String[][] hosts = new String[][] {
new String[] { "localhost" },
new String[] { ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE },
new String[0],
null
};
Expand Down Expand Up @@ -518,11 +518,11 @@ public void testAssignmentOfManySplitsRandomly() {
final Random rand = new Random(seed);

for (int i = 0; i < splitHosts.length; i++) {
splitHosts[i] = "localHost" + i;
splitHosts[i] = ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE + i;
}
for (int i = 0; i < requestingHosts.length; i++) {
if (i % 2 == 0) {
requestingHosts[i] = "localHost" + i;
requestingHosts[i] = ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE + i;
} else {
requestingHosts[i] = "remoteHost" + i;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.rmi.registry.Registry;

import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;

/**
Expand Down Expand Up @@ -75,7 +76,7 @@ public class RemoteCollectorOutputFormat<T> implements OutputFormat<T> {
* @see RemoteCollectorOutputFormat#PORT
*/
public RemoteCollectorOutputFormat() {
this("localhost", 8888, null);
this(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE, 8888, null);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
Expand Down Expand Up @@ -618,7 +619,7 @@ private FileInputSplit createTempFile(String content) throws IOException {
wrt.write(content);
wrt.close();

return new FileInputSplit(0, new Path(tempFile.toURI().toString()), 0, tempFile.length(), new String[] {"localhost"});
return new FileInputSplit(0, new Path(tempFile.toURI().toString()), 0, tempFile.length(), new String[] {ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE});
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import java.io.IOException;

import org.junit.Test;

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
Expand Down Expand Up @@ -183,7 +183,7 @@ private FileInputSplit createInputSplit(String content) throws IOException {
wrt.write(content);
wrt.close();

return new FileInputSplit(0, new Path(tempFile.toURI().toString()), 0, tempFile.length(), new String[] {"localhost"});
return new FileInputSplit(0, new Path(tempFile.toURI().toString()), 0, tempFile.length(), new String[] {ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import java.io.OutputStreamWriter;

import org.junit.Assert;

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.FileInputSplit;
Expand Down Expand Up @@ -330,7 +330,7 @@ private FileInputSplit createTempFile(String content) throws IOException {
dos.writeBytes(content);
dos.close();

return new FileInputSplit(0, new Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(), new String[] {"localhost"});
return new FileInputSplit(0, new Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(), new String[] {ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE});
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.io.FileOutputStream;
import java.io.IOException;

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
Expand Down Expand Up @@ -177,7 +178,7 @@ private FileInputSplit createTempFile(int[] contents) throws IOException {

dos.close();

return new FileInputSplit(0, new Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(), new String[] {"localhost"});
return new FileInputSplit(0, new Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(), new String[] {ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE});
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ abstract class FlinkMiniCluster(val userConfiguration: Configuration,
// Construction
// --------------------------------------------------------------------------

// NOTE: THIS MUST BE getByName("localhost"), which is 127.0.0.1 and
// NOTE: THIS MUST BE getByName(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE), which is 127.0.0.1 and
// not getLocalHost(), which may be 127.0.1.1
val HOSTNAME = InetAddress.getByName("localhost").getHostAddress()
val HOSTNAME = InetAddress.getByName(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE).getHostAddress()

val timeout = AkkaUtils.getTimeout(userConfiguration)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo,
if (blobPort > 0) {

val address = new InetSocketAddress(
currentJobManager.flatMap(_.path.address.host).getOrElse("localhost"),
currentJobManager.flatMap(_.path.address.host).getOrElse(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE),
blobPort)

log.info("Determined BLOB server address to be {}.", address)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.blob;

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.junit.Test;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.net.InetSocketAddress;
import java.security.MessageDigest;

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobID;
import org.junit.AfterClass;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.InputSplit;
Expand Down Expand Up @@ -58,7 +59,7 @@ public class MockInputSplitProvider implements InputSplitProvider {
public void addInputSplits(final String path, final int noSplits) {

final InputSplit[] tmp = new InputSplit[noSplits];
final String[] hosts = { "localhost" };
final String[] hosts = { ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE };

final String localPath;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
val config = new Configuration()
config.setString(ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, "1 second")

val tm = TestingUtils.startTestingTaskManagerWithConfiguration("localhost",
val tm = TestingUtils.startTestingTaskManagerWithConfiguration(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE,
self.path.toString, config, _system)

watch(tm)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class TestingCluster(userConfiguration: Configuration, singleActorSystem: Boolea

override def generateConfiguration(userConfig: Configuration): Configuration = {
val cfg = new Configuration()
cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost")
cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE)
cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, NetUtils.getAvailablePort())
cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 10)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ object TestingUtils {
val config = new Configuration()

val (tmConfig, netConfig, connectionInfo, _) =
TaskManager.parseTaskManagerConfiguration(config, "localhost", true, true)
TaskManager.parseTaskManagerConfiguration(config, ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE, true, true)

val tmProps = Props(classOf[TestingTaskManager], connectionInfo, jmURL, tmConfig, netConfig)
system.actorOf(tmProps)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ public void testExternalProgram() {
String testData = getClass().getResource(TEST_DATA_FILE).toString();

PackagedProgram program = new PackagedProgram(new File(jarFile), new String[] { testData });

Client c = new Client(new InetSocketAddress("localhost", testMiniCluster.getJobManagerRPCPort()),
Client c = new Client(new InetSocketAddress(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE, testMiniCluster.getJobManagerRPCPort()),
new Configuration(), program.getUserCodeClassLoader());
c.run(program, 4, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ public static void main(String[] args) throws Exception {
// This will execute the word-count embedded in a local context. replace this line by the commented
// succeeding line to send the job to a local installation or to a cluster for execution
LocalExecutor.execute(plan);
// PlanExecutor ex = new RemoteExecutor("localhost", 6123, "target/pact-examples-0.4-SNAPSHOT-WordCount.jar");
// PlanExecutor ex = new RemoteExecutor(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE, 6123, "target/pact-examples-0.4-SNAPSHOT-WordCount.jar");
// ex.executePlan(plan);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public static void main(String[] args) throws Exception {
// This will execute the word-count embedded in a local context. replace this line by the commented
// succeeding line to send the job to a local installation or to a cluster for execution
LocalExecutor.execute(plan);
// PlanExecutor ex = new RemoteExecutor("localhost", 6123, "target/pact-examples-0.4-SNAPSHOT-WordCount.jar");
// PlanExecutor ex = new RemoteExecutor(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE, 6123, "target/pact-examples-0.4-SNAPSHOT-WordCount.jar");
// ex.executePlan(plan);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.io.IOException;
import java.net.InetSocketAddress;

import org.apache.flink.configuration.ConfigConstants;

import net.spy.memcached.MemcachedClient;

/**
Expand All @@ -36,7 +38,7 @@ public class MemcachedState<V> implements DBState<String, V> {

public MemcachedState() {
try {
memcached = new MemcachedClient(new InetSocketAddress("localhost", 11211));
memcached = new MemcachedClient(new InetSocketAddress(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE, 11211));
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.util.Iterator;
import java.util.Set;

import org.apache.flink.configuration.ConfigConstants;

import redis.clients.jedis.Jedis;

/**
Expand All @@ -40,7 +42,7 @@ public class RedisState<K extends Serializable, V extends Serializable> extends

public RedisState(DBSerializer<K> keySerializer, DBSerializer<V> valueSerializer) {
super(keySerializer, valueSerializer);
jedis = new Jedis("localhost");
jedis = new Jedis(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_VALUE);
}

public RedisState() {
Expand Down

0 comments on commit b7da22a

Please sign in to comment.