Skip to content

Commit

Permalink
Merge pull request #364 from mesos/ehancement/362-ReportIPAddress
Browse files Browse the repository at this point in the history
Executor IPAddress discovery. Executor will now send IP address over …
  • Loading branch information
Phil Winder committed Oct 21, 2015
2 parents 5b99cb8 + 26d7e2e commit e0f2e30
Show file tree
Hide file tree
Showing 9 changed files with 170 additions and 2 deletions.
2 changes: 2 additions & 0 deletions commons/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,6 @@ task taskCopyFilesForDocker(type: Copy) {

dependencies {
compile "com.beust:jcommander:1.48"
compile 'commons-lang:commons-lang:2.6'
compile "log4j:log4j:1.2.16"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package org.apache.mesos.elasticsearch.common;

import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
import java.util.Optional;
import java.util.stream.Collectors;

/**
* Obtains the first IP address from an adaptor
*/
public class AdaptorIPAddress {
private static final org.apache.log4j.Logger LOGGER = org.apache.log4j.Logger.getLogger(AdaptorIPAddress.class);
/**
* @return an InetAddress for the adaptor eth0 or en0 (so unit tests work on mac)
* @throws SocketException if adaptor doesn't exist or it doesn't have an IP Address
*/
public static InetAddress eth0() throws SocketException {
Enumeration<NetworkInterface> nets = NetworkInterface.getNetworkInterfaces();
ArrayList<NetworkInterface> netList = Collections.list(nets);
LOGGER.debug(netList.stream().map(NetworkInterface::getName).collect(Collectors.joining(", ")));
NetworkInterface eth0 = netList.stream().filter(s -> s.getName().matches("eth0|en0")).findFirst().get();
ArrayList<InetAddress> inetAddresses = Collections.list(eth0.getInetAddresses());
Optional<InetAddress> address = inetAddresses.stream().filter(inetAddress -> inetAddress.getHostAddress().matches("^((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\\.){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$")).findFirst();
LOGGER.debug(eth0.getDisplayName() + ": " + address.get().getHostAddress());
return address.get();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package org.apache.mesos.elasticsearch.common;

import org.apache.commons.lang.builder.HashCodeBuilder;

import java.io.Serializable;
import java.net.InetAddress;
import java.net.UnknownHostException;

/**
* IP address to be sent over a mesos framework message
*/
public class SerializableIPAddress implements Serializable {
public InetAddress getAddress() {
return address;
}

private final InetAddress address;

public SerializableIPAddress(InetAddress address) {
this.address = address;
}

public byte[] toBytes() {
return address.getAddress();
}

public static SerializableIPAddress fromBytes(byte[] data) throws UnknownHostException {
return new SerializableIPAddress(InetAddress.getByAddress(data));
}

@Override
public String toString() {
return address.getHostAddress();
}

@Override
public int hashCode() {
return new HashCodeBuilder(17, 37).
append(address).
toHashCode();
}

@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (obj == null || obj.getClass() != this.getClass()) {
return false;
}

SerializableIPAddress inetAddress = (SerializableIPAddress) obj;
return this.getAddress().equals(inetAddress.getAddress());
}
}
21 changes: 21 additions & 0 deletions commons/src/main/resources/log4j.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">

<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
<appender name="console" class="org.apache.log4j.ConsoleAppender">
<param name="Target" value="System.out"/>
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="[%p] %d %c %M - %m%n"/>
</layout>
</appender>

<logger name="org.apache.mesos.elasticsearch">
<level value="DEBUG"/>
</logger>

<root>
<level value="WARN"/>
<appender-ref ref="console" />
</root>

</log4j:configuration>
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.apache.mesos.elasticsearch.common;

import org.junit.Test;

import static org.junit.Assert.*;

/**
* @author philwinder
* @since 21/10/2015
*/
public class AdaptorIPAddressTest {

@Test
public void testEth0() throws Exception {
assertNotNull(AdaptorIPAddress.eth0());
assertTrue(AdaptorIPAddress.eth0().getHostAddress().contains("."));
assertFalse(AdaptorIPAddress.eth0().getHostAddress().contains(":"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import org.apache.mesos.Executor;
import org.apache.mesos.ExecutorDriver;
import org.apache.mesos.Protos;
import org.apache.mesos.elasticsearch.common.AdaptorIPAddress;
import org.apache.mesos.elasticsearch.common.SerializableIPAddress;
import org.apache.mesos.elasticsearch.executor.Configuration;
import org.apache.mesos.elasticsearch.executor.elasticsearch.Launcher;
import org.apache.mesos.elasticsearch.executor.model.PortsModel;
Expand All @@ -13,11 +15,14 @@
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.node.Node;

import java.net.InetAddress;
import java.net.MalformedURLException;
import java.net.SocketException;
import java.net.URL;
import java.security.InvalidParameterException;
import java.util.Arrays;
import java.util.List;
import java.util.NoSuchElementException;

/**
* Executor for Elasticsearch.
Expand Down Expand Up @@ -101,6 +106,16 @@ public void run() {
}
}));

try {
InetAddress eth0 = AdaptorIPAddress.eth0();
LOGGER.debug("InetAddress: " + eth0);
SerializableIPAddress serializableIPAddress = new SerializableIPAddress(eth0);
driver.sendFrameworkMessage(serializableIPAddress.toBytes());
LOGGER.debug("Sent framework message: " + serializableIPAddress.toString());
} catch (NoSuchElementException | SocketException e) {
LOGGER.warn("Unable to obtain eth0 ip address", e);
}

// Send status update, running
driver.sendStatusUpdate(taskStatus.running());
} catch (InvalidParameterException | MalformedURLException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
import org.apache.mesos.Protos;
import org.apache.mesos.Scheduler;
import org.apache.mesos.SchedulerDriver;
import org.apache.mesos.elasticsearch.common.SerializableIPAddress;
import org.apache.mesos.elasticsearch.scheduler.state.*;

import java.net.UnknownHostException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -124,6 +126,17 @@ public void statusUpdate(SchedulerDriver driver, Protos.TaskStatus status) {
@Override
public void frameworkMessage(SchedulerDriver driver, Protos.ExecutorID executorId, Protos.SlaveID slaveId, byte[] data) {
LOGGER.info("Framework Message - Executor: " + executorId.getValue() + ", SlaveID: " + slaveId.getValue());
try {
SerializableIPAddress serializableIPAddress = SerializableIPAddress.fromBytes(data);
Protos.TaskInfo oldTask = clusterState.getTask(executorId);
Protos.TaskInfo newTask = Protos.TaskInfo.newBuilder().mergeFrom(oldTask)
.setData(taskInfoFactory.toData(serializableIPAddress.getAddress().getHostName(), serializableIPAddress.getAddress().getHostAddress(), new Clock().zonedNow()))
.build();
clusterState.removeTask(oldTask);
clusterState.addTask(newTask);
} catch (UnknownHostException e) {
LOGGER.warn("Unable to parse ip address discovery information");
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,15 @@ public Protos.TaskInfo createTask(Configuration configuration, FrameworkState fr

return Protos.TaskInfo.newBuilder()
.setName(configuration.getTaskName())
.setData(toData(offer.getHostname(), new InetSocketAddress(offer.getHostname(), 1).getAddress().getHostAddress(), clock.zonedNow()))
.setData(toData(offer.getHostname(), "UNKNOWN", clock.zonedNow()))
.setTaskId(Protos.TaskID.newBuilder().setValue(taskId(offer)))
.setSlaveId(offer.getSlaveId())
.addAllResources(acceptedResources)
.setDiscovery(discovery)
.setExecutor(newExecutorInfo(configuration)).build();
}

private ByteString toData(String hostname, String ipAddress, ZonedDateTime zonedDateTime) {
public ByteString toData(String hostname, String ipAddress, ZonedDateTime zonedDateTime) {
Properties data = new Properties();
data.put("hostname", hostname);
data.put("ipAddress", ipAddress);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import org.apache.log4j.Logger;
import org.apache.mesos.Protos;
import org.apache.mesos.SchedulerDriver;
import org.apache.mesos.elasticsearch.common.AdaptorIPAddress;
import org.apache.mesos.elasticsearch.common.SerializableIPAddress;
import org.apache.mesos.elasticsearch.scheduler.matcher.RequestMatcher;
import org.apache.mesos.elasticsearch.scheduler.state.ClusterState;
import org.apache.mesos.elasticsearch.scheduler.state.FrameworkState;
Expand All @@ -13,6 +15,7 @@
import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;

import java.net.SocketException;
import java.util.UUID;

import static java.util.Collections.singleton;
Expand Down Expand Up @@ -140,6 +143,15 @@ public void shouldRunWithCredentials() {
verify(configuration, atLeastOnce()).getFrameworkSecretPath();
}

@Test
public void shouldUpdateTaskInfoWhenIPAddressReceived() throws SocketException {
when(clusterState.getTask(ProtoTestUtil.getExecutorId())).thenReturn(ProtoTestUtil.getDefaultTaskInfo());
when(taskInfoFactory.toData(anyString(), anyString(), any())).thenCallRealMethod();
scheduler.frameworkMessage(driver, ProtoTestUtil.getExecutorId(), ProtoTestUtil.getSlaveId(), new SerializableIPAddress(AdaptorIPAddress.eth0()).toBytes());
verify(clusterState, times(1)).removeTask(ProtoTestUtil.getDefaultTaskInfo());
verify(clusterState, times(1)).addTask(any(Protos.TaskInfo.class));
}

private Protos.Offer.Builder newOffer(String hostname) {
return newOfferBuilder(UUID.randomUUID().toString(), hostname, UUID.randomUUID().toString(), frameworkID);
}
Expand Down

0 comments on commit e0f2e30

Please sign in to comment.