Skip to content

Commit

Permalink
Merge pull request #367 from mwl/hotfix/367-parsing-startedAt
Browse files Browse the repository at this point in the history
Error parsing startedAt field from executor
  • Loading branch information
Phil Winder committed Oct 27, 2015
2 parents 2d3e437 + d94be04 commit 81c11da
Show file tree
Hide file tree
Showing 10 changed files with 212 additions and 52 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.apache.mesos.elasticsearch.scheduler;

import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.Date;

Expand All @@ -15,4 +16,8 @@ public Date now() {
public ZonedDateTime zonedNow() {
return ZonedDateTime.now();
}

public ZonedDateTime nowUTC() {
return ZonedDateTime.now(ZoneOffset.UTC);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,15 @@ public void run(String[] args) {
configuration.getZookeeperCLI().getZookeeperMesosTimeout(),
TimeUnit.MILLISECONDS,
"/" + configuration.getFrameworkName() + "/" + configuration.getElasticsearchCLI().getElasticsearchClusterName()));
final FrameworkState frameworkState = new FrameworkState(zookeeperStateDriver);
final ClusterState clusterState = new ClusterState(zookeeperStateDriver, frameworkState);
final TaskInfoFactory taskInfoFactory = new TaskInfoFactory();
final FrameworkState frameworkState = new FrameworkState(zookeeperStateDriver, taskInfoFactory);
final ClusterState clusterState = new ClusterState(zookeeperStateDriver, frameworkState, taskInfoFactory);

final ElasticsearchScheduler scheduler = new ElasticsearchScheduler(
configuration,
frameworkState,
clusterState,
new TaskInfoFactory(),
taskInfoFactory,
new OfferStrategy(configuration, clusterState),
zookeeperStateDriver
);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
package org.apache.mesos.elasticsearch.scheduler;

import org.apache.mesos.Protos;
import org.apache.mesos.elasticsearch.common.Discovery;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.time.ZonedDateTime;
import java.util.Properties;

/**
* Task on a host.
Expand All @@ -21,6 +18,7 @@ public class Task {
private ZonedDateTime startedAt;
private InetSocketAddress clientAddress;
private InetSocketAddress transportAddress;

public Task(String hostname, String taskId, Protos.TaskState state, ZonedDateTime startedAt, InetSocketAddress clientInterface, InetSocketAddress transportAddress) {
this.hostname = hostname;
this.taskId = taskId;
Expand Down Expand Up @@ -57,30 +55,4 @@ public InetSocketAddress getClientAddress() {
public InetSocketAddress getTransportAddress() {
return transportAddress;
}

public static Task from(Protos.TaskInfo taskInfo, Protos.TaskStatus taskStatus) {
Properties data = new Properties();
try {
data.load(taskInfo.getData().newInput());
} catch (IOException e) {
throw new RuntimeException("Failed to parse properties", e);
}
String hostName = data.getProperty("hostname", "UNKNOWN");
String ipAddress = data.getProperty("ipAddress", hostName);
ZonedDateTime startedAt = ZonedDateTime.parse(data.getProperty("startedAt", ZonedDateTime.now().toString()));
Protos.TaskState taskState = null;
if (taskStatus == null) {
taskState = Protos.TaskState.TASK_STAGING;
} else {
taskState = taskStatus.getState();
}
return new Task(
hostName,
taskInfo.getTaskId().getValue(),
taskState,
startedAt,
new InetSocketAddress(ipAddress, taskInfo.getDiscovery().getPorts().getPorts(Discovery.CLIENT_PORT_INDEX).getNumber()),
new InetSocketAddress(ipAddress, taskInfo.getDiscovery().getPorts().getPorts(Discovery.TRANSPORT_PORT_INDEX).getNumber())
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,14 @@
import org.apache.mesos.elasticsearch.scheduler.configuration.ExecutorEnvironmentalVariables;
import org.apache.mesos.elasticsearch.scheduler.state.FrameworkState;

import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.InetSocketAddress;
import java.text.SimpleDateFormat;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.*;

import static java.util.Arrays.asList;

Expand Down Expand Up @@ -64,7 +63,7 @@ public Protos.TaskInfo createTask(Configuration configuration, FrameworkState fr

return Protos.TaskInfo.newBuilder()
.setName(configuration.getTaskName())
.setData(toData(offer.getHostname(), "UNKNOWN", clock.zonedNow()))
.setData(toData(offer.getHostname(), "UNKNOWN", clock.nowUTC()))
.setTaskId(Protos.TaskID.newBuilder().setValue(taskId(offer)))
.setSlaveId(offer.getSlaveId())
.addAllResources(acceptedResources)
Expand All @@ -79,7 +78,11 @@ public ByteString toData(String hostname, String ipAddress, ZonedDateTime zonedD
data.put("startedAt", zonedDateTime.toString());

StringWriter writer = new StringWriter();
data.list(new PrintWriter(writer));
try {
data.store(new PrintWriter(writer), "Task metadata");
} catch (IOException e) {
throw new RuntimeException("Failed to write task metadata", e);
}
return ByteString.copyFromUtf8(writer.getBuffer().toString());
}

Expand Down Expand Up @@ -150,4 +153,29 @@ private String taskId(Protos.Offer offer) {
return String.format("elasticsearch_%s_%s", offer.getHostname(), date);
}

public Task parse(Protos.TaskInfo taskInfo, Protos.TaskStatus taskStatus) {
Properties data = new Properties();
try {
data.load(taskInfo.getData().newInput());
} catch (IOException e) {
throw new RuntimeException("Failed to parse properties", e);
}
String hostName = data.getProperty("hostname", "UNKNOWN");
String ipAddress = data.getProperty("ipAddress", hostName);

final ZonedDateTime startedAt = Optional.ofNullable(data.getProperty("startedAt"))
.map(s -> s.endsWith("...") ? s.substring(0, 29) : s) //We're convert dates that was capped with Properties.list() method, see https://github.com/mesos/elasticsearch/pull/367
.map(ZonedDateTime::parse)
.orElseGet(clock::nowUTC)
.withZoneSameInstant(ZoneOffset.UTC);

return new Task(
hostName,
taskInfo.getTaskId().getValue(),
taskStatus == null ? Protos.TaskState.TASK_STAGING : taskStatus.getState(),
startedAt,
new InetSocketAddress(ipAddress, taskInfo.getDiscovery().getPorts().getPorts(Discovery.CLIENT_PORT_INDEX).getNumber()),
new InetSocketAddress(ipAddress, taskInfo.getDiscovery().getPorts().getPorts(Discovery.TRANSPORT_PORT_INDEX).getNumber())
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import org.apache.mesos.Protos;
import org.apache.mesos.Protos.TaskInfo;
import org.apache.mesos.elasticsearch.scheduler.Task;
import org.apache.mesos.elasticsearch.scheduler.TaskInfoFactory;

import javax.validation.constraints.NotNull;
import java.io.IOException;
Expand All @@ -21,13 +22,15 @@ public class ClusterState {
public static final String STATE_LIST = "stateList";
private SerializableState zooKeeperStateDriver;
private FrameworkState frameworkState;
private TaskInfoFactory taskInfoFactory;

public ClusterState(@NotNull SerializableState zooKeeperStateDriver, @NotNull FrameworkState frameworkState) {
public ClusterState(@NotNull SerializableState zooKeeperStateDriver, @NotNull FrameworkState frameworkState, @NotNull TaskInfoFactory taskInfoFactory) {
if (zooKeeperStateDriver == null || frameworkState == null) {
throw new NullPointerException();
}
this.zooKeeperStateDriver = zooKeeperStateDriver;
this.frameworkState = frameworkState;
this.taskInfoFactory = taskInfoFactory;
frameworkState.onStatusUpdate(this::updateTask);
}

Expand All @@ -51,7 +54,7 @@ public List<TaskInfo> getTaskList() {
*/
public Map<String, Task> getGuiTaskList() {
Map<String, Task> tasks = new HashMap<>();
getTaskList().forEach(taskInfo -> tasks.put(taskInfo.getTaskId().getValue(), Task.from(taskInfo, getStatus(taskInfo.getTaskId()).getStatus())));
getTaskList().forEach(taskInfo -> tasks.put(taskInfo.getTaskId().getValue(), taskInfoFactory.parse(taskInfo, getStatus(taskInfo.getTaskId()).getStatus())));
return tasks;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import org.apache.log4j.Logger;
import org.apache.mesos.Protos;
import org.apache.mesos.SchedulerDriver;
import org.apache.mesos.elasticsearch.scheduler.TaskInfoFactory;

import java.io.IOException;
import java.util.List;
Expand All @@ -25,9 +26,11 @@ public class FrameworkState {
private final SerializableState zookeeperStateDriver;
private final StatePath statePath;
private SchedulerDriver driver;
private TaskInfoFactory taskInfoFactory;

public FrameworkState(SerializableState zookeeperStateDriver) {
public FrameworkState(SerializableState zookeeperStateDriver, TaskInfoFactory taskInfoFactory) {
this.zookeeperStateDriver = zookeeperStateDriver;
this.taskInfoFactory = taskInfoFactory;
statePath = new StatePath(zookeeperStateDriver);
}

Expand Down Expand Up @@ -56,7 +59,8 @@ public void markRegistered(Protos.FrameworkID frameworkId, SchedulerDriver drive
LOGGER.error("Unable to store framework ID in zookeeper", e);
}
this.driver = driver;
final ClusterState clusterState = new ClusterState(zookeeperStateDriver, this);

final ClusterState clusterState = new ClusterState(zookeeperStateDriver, this, taskInfoFactory);
registeredListeners.forEach(listener -> listener.accept(clusterState));
}

Expand Down

0 comments on commit 81c11da

Please sign in to comment.