Skip to content
This repository has been archived by the owner on Sep 29, 2021. It is now read-only.

Commit

Permalink
Fail softly when agent cannot save task history
Browse files Browse the repository at this point in the history
Bad data can get into the agent's task history backing store.
Fail softly to not screw up the important Helios job operations.
  • Loading branch information
davidxia committed Jul 30, 2015
1 parent 26c7a54 commit 4233263
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 10 deletions.
Expand Up @@ -157,7 +157,14 @@ public void setTaskStatus(final JobId jobId, final TaskStatus status)
throws InterruptedException {
log.debug("setting task status: {}", status);
taskStatuses.put(jobId.toString(), status.toJsonBytes());
historyWriter.saveHistoryItem(status);
try {
historyWriter.saveHistoryItem(status);
} catch (Exception e) {
// Log error here and keep going as saving task history is not critical.
// This is to prevent bad data in the queue from screwing up the actually important Helios
// agent operations.
log.error("Error saving task status {} to ZooKeeper: {}", status, e);
}
final TaskStatusEvent event = new TaskStatusEvent(status, System.currentTimeMillis(), agent);
kafkaSender.send(KafkaRecord.of(TaskStatusEvent.KAFKA_TOPIC, event.toJsonBytes()));
}
Expand Down
Expand Up @@ -46,6 +46,7 @@
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -94,14 +95,14 @@ public abstract class QueueingHistoryWriter<TEvent>

/**
* Get the key associated with an event.
* @param event
* @param event Event to save to ZooKeeper.
* @return Key for the event.
*/
protected abstract String getKey(TEvent event);

/**
* Get the Unix timestamp for an event.
* @param event
* @param event Event to save to ZooKeeper.
* @return Timestamp for the event.
*/
protected abstract long getTimestamp(TEvent event);
Expand All @@ -113,7 +114,7 @@ public abstract class QueueingHistoryWriter<TEvent>
*
* All events will be stored as children of the returned path.
*
* @param event
* @param event Event to save to ZooKeeper.
* @return A ZooKeeper path.
*/
protected abstract String getZkEventsPath(TEvent event);
Expand Down Expand Up @@ -148,7 +149,7 @@ public QueueingHistoryWriter(final ZooKeeperClient client, final Path backingFil
// Clean out any errant null values. Normally shouldn't have any, but we did have a few
// where it happened, and this will make sure we can get out of a bad state if we get into it.
final ImmutableSet<String> curKeys = ImmutableSet.copyOf(this.events.keySet());
for (Object key : curKeys) {
for (final String key : curKeys) {
if (this.events.get(key) == null) {
this.events.remove(key);
}
Expand All @@ -174,7 +175,7 @@ protected void shutDown() throws Exception {

/**
* Add an event to the queue to be written to ZooKeeper.
* @param event
* @param event Event to save to ZooKeeper.
* @throws InterruptedException
*/
protected void add(TEvent event) throws InterruptedException {
Expand Down Expand Up @@ -210,7 +211,7 @@ private Deque<TEvent> getDeque(final String key) {
final Deque<TEvent> deque = events.get(key);
if (deque == null) { // try more assertively to get a deque
final ConcurrentLinkedDeque<TEvent> newDeque =
new ConcurrentLinkedDeque<TEvent>();
new ConcurrentLinkedDeque<>();
events.put(key, newDeque);
return newDeque;
}
Expand Down Expand Up @@ -287,13 +288,19 @@ private TEvent findEldestEvent() {
// within the same job id. Whether this is the best strategy (as opposed to fullest deque)
// is arguable.
TEvent current = null;
for (Deque<TEvent> queue : events.values()) {
for (final Map.Entry<String, Deque<TEvent>> entry : events.entrySet()) {
final Deque<TEvent> queue = entry.getValue();
if (queue == null) {
continue;
}
final TEvent event = queue.peek();
if (current == null || (getTimestamp(event) < getTimestamp(current))) {
current = event;
try {
if (current == null || (getTimestamp(event) < getTimestamp(current))) {
current = event;
}
} catch (ClassCastException e) {
// There was bad data. Remove it from the events Map.
events.remove(entry.getKey());
}
}
return current;
Expand Down

0 comments on commit 4233263

Please sign in to comment.