Skip to content
This repository has been archived by the owner on Sep 12, 2024. It is now read-only.

Commit

Permalink
Remove Kafka functionality from QueuingHistoryWriter
Browse files Browse the repository at this point in the history
Have the caller handle sending events to Kafka to make
QueuingHistoryWriter only be responsible for sending to
ZooKeeper. Simplifies things.
  • Loading branch information
davidxia committed Jul 22, 2015
1 parent 64e75ca commit 687bda5
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 86 deletions.
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@
* </pre> * </pre>
*/ */
@JsonIgnoreProperties(ignoreUnknown = true) @JsonIgnoreProperties(ignoreUnknown = true)
public class TaskStatusEvent { public class TaskStatusEvent extends Descriptor {

public static final String KAFKA_TOPIC = "HeliosTaskStatusEvents";

private final TaskStatus status; private final TaskStatus status;
private final long timestamp; private final long timestamp;
private final String host; private final String host;
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@
*/ */
public class TaskHistoryWriter extends QueueingHistoryWriter<TaskStatusEvent> { public class TaskHistoryWriter extends QueueingHistoryWriter<TaskStatusEvent> {


private static final String KAFKA_TOPIC = "HeliosEvents";

private final String hostname; private final String hostname;


@Override @Override
Expand All @@ -57,21 +55,15 @@ protected byte[] toBytes(final TaskStatusEvent event) {
return event.getStatus().toJsonBytes(); return event.getStatus().toJsonBytes();
} }


@Override
protected String getKafkaTopic() {
return KAFKA_TOPIC;
}

@Override @Override
protected String getZkEventsPath(TaskStatusEvent event) { protected String getZkEventsPath(TaskStatusEvent event) {
final JobId jobId = event.getStatus().getJob().getId(); final JobId jobId = event.getStatus().getJob().getId();
return Paths.historyJobHostEvents(jobId, hostname); return Paths.historyJobHostEvents(jobId, hostname);
} }


public TaskHistoryWriter(final String hostname, final ZooKeeperClient client, public TaskHistoryWriter(final String hostname, final ZooKeeperClient client,
final KafkaClientProvider kafkaProvider,
final Path backingFile) throws IOException, InterruptedException { final Path backingFile) throws IOException, InterruptedException {
super(client, backingFile, kafkaProvider); super(client, backingFile);
this.hostname = hostname; this.hostname = hostname;
} }


Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -29,13 +29,18 @@
import com.spotify.helios.common.descriptors.JobId; import com.spotify.helios.common.descriptors.JobId;
import com.spotify.helios.common.descriptors.Task; import com.spotify.helios.common.descriptors.Task;
import com.spotify.helios.common.descriptors.TaskStatus; import com.spotify.helios.common.descriptors.TaskStatus;
import com.spotify.helios.common.descriptors.TaskStatusEvent;
import com.spotify.helios.servicescommon.KafkaRecord;
import com.spotify.helios.servicescommon.KafkaSender;
import com.spotify.helios.servicescommon.coordination.Paths; import com.spotify.helios.servicescommon.coordination.Paths;
import com.spotify.helios.servicescommon.coordination.PersistentPathChildrenCache; import com.spotify.helios.servicescommon.coordination.PersistentPathChildrenCache;
import com.spotify.helios.servicescommon.coordination.ZooKeeperClient; import com.spotify.helios.servicescommon.coordination.ZooKeeperClient;
import com.spotify.helios.servicescommon.coordination.ZooKeeperClientProvider; import com.spotify.helios.servicescommon.coordination.ZooKeeperClientProvider;
import com.spotify.helios.servicescommon.coordination.ZooKeeperUpdatingPersistentDirectory; import com.spotify.helios.servicescommon.coordination.ZooKeeperUpdatingPersistentDirectory;


import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionState;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


Expand Down Expand Up @@ -64,6 +69,7 @@ public class ZooKeeperAgentModel extends AbstractIdleService implements AgentMod
private final PersistentPathChildrenCache<Task> tasks; private final PersistentPathChildrenCache<Task> tasks;
private final ZooKeeperUpdatingPersistentDirectory taskStatuses; private final ZooKeeperUpdatingPersistentDirectory taskStatuses;
private final TaskHistoryWriter historyWriter; private final TaskHistoryWriter historyWriter;
private final KafkaSender kafkaSender;


private final String agent; private final String agent;
private final CopyOnWriteArrayList<AgentModel.Listener> listeners = new CopyOnWriteArrayList<>(); private final CopyOnWriteArrayList<AgentModel.Listener> listeners = new CopyOnWriteArrayList<>();
Expand All @@ -86,8 +92,11 @@ public ZooKeeperAgentModel(final ZooKeeperClientProvider provider,
provider, provider,
taskStatusFile, taskStatusFile,
Paths.statusHostJobs(host)); Paths.statusHostJobs(host));
this.historyWriter = new TaskHistoryWriter(host, client, kafkaProvider, this.historyWriter = new TaskHistoryWriter(
stateDirectory.resolve(TASK_HISTORY_FILENAME)); host, client, stateDirectory.resolve(TASK_HISTORY_FILENAME));

this.kafkaSender = new KafkaSender(
kafkaProvider.getProducer(new StringSerializer(), new ByteArraySerializer()));
} }


@Override @Override
Expand Down Expand Up @@ -149,6 +158,8 @@ public void setTaskStatus(final JobId jobId, final TaskStatus status)
log.debug("setting task status: {}", status); log.debug("setting task status: {}", status);
taskStatuses.put(jobId.toString(), status.toJsonBytes()); taskStatuses.put(jobId.toString(), status.toJsonBytes());
historyWriter.saveHistoryItem(status); historyWriter.saveHistoryItem(status);
final TaskStatusEvent event = new TaskStatusEvent(status, System.currentTimeMillis(), agent);
kafkaSender.send(KafkaRecord.of(TaskStatusEvent.KAFKA_TOPIC, event.toJsonBytes()));
} }


/** /**
Expand Down
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright (c) 2014 Spotify AB.
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.spotify.helios.rollingupdate;

import com.google.common.annotations.VisibleForTesting;

import com.spotify.helios.common.descriptors.DeploymentGroupEvent;
import com.spotify.helios.servicescommon.QueueingHistoryWriter;
import com.spotify.helios.servicescommon.coordination.Paths;
import com.spotify.helios.servicescommon.coordination.ZooKeeperClient;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.file.Path;
import java.util.List;

/**
* Writes rolling update history to ZK.
*/
public class DeploymentGroupHistoryWriter extends QueueingHistoryWriter<DeploymentGroupEvent> {

private static final Logger log = LoggerFactory.getLogger(DeploymentGroupHistoryWriter.class);

@Override
protected String getKey(final DeploymentGroupEvent event) {
return event.getDeploymentGroup().getName();
}

@Override
protected long getTimestamp(final DeploymentGroupEvent event) {
return event.getTimestamp();
}

@Override
protected String getZkEventsPath(final DeploymentGroupEvent event) {
return Paths.historyDeploymentGroup(event.getDeploymentGroup());
}

@Override
protected byte[] toBytes(final DeploymentGroupEvent deploymentGroupEvent) {
return deploymentGroupEvent.toJsonBytes();
}

public DeploymentGroupHistoryWriter(final ZooKeeperClient client,
final Path backingFile)
throws IOException, InterruptedException {
super(client, backingFile);
}

public void saveHistoryItem(final DeploymentGroupEvent event) {
try {
add(event);
} catch (InterruptedException e) {
log.error("error saving deployment group event: {} - {}", event, e);
}
}

public void saveHistoryItems(final List<DeploymentGroupEvent> events) {
for (final DeploymentGroupEvent e : events) {
saveHistoryItem(e);
}
}

@Override @VisibleForTesting
protected void startUp() throws Exception {
super.startUp();
}
}
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
package com.spotify.helios.servicescommon; package com.spotify.helios.servicescommon;


import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
Expand All @@ -32,16 +31,10 @@
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;


import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.spotify.helios.agent.KafkaClientProvider;
import com.spotify.helios.servicescommon.coordination.PathFactory; import com.spotify.helios.servicescommon.coordination.PathFactory;
import com.spotify.helios.servicescommon.coordination.ZooKeeperClient; import com.spotify.helios.servicescommon.coordination.ZooKeeperClient;


import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.ConnectionLossException; import org.apache.zookeeper.KeeperException.ConnectionLossException;
import org.apache.zookeeper.KeeperException.NodeExistsException; import org.apache.zookeeper.KeeperException.NodeExistsException;
Expand All @@ -57,21 +50,18 @@
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;


import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Preconditions.checkState;
import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.concurrent.TimeUnit.SECONDS;


/** /**
* Writes historical events to ZooKeeper and sends them to Kafka. We attempt to gracefully handle * Writes historical events to ZooKeeper. We attempt to gracefully handle
* the case where ZK is down by persisting events in a backing file. * the case where ZK is down by persisting events in a backing file.
* *
* Theory of operation: * Theory of operation:
Expand Down Expand Up @@ -105,8 +95,6 @@ public abstract class QueueingHistoryWriter<TEvent>
private final ZooKeeperClient client; private final ZooKeeperClient client;
private final PersistentAtomicReference<ConcurrentMap<String, Deque<TEvent>>> backingStore; private final PersistentAtomicReference<ConcurrentMap<String, Deque<TEvent>>> backingStore;


private final Optional<KafkaProducer<String, TEvent>> kafkaProducer;

/** /**
* Get the key associated with an event. * Get the key associated with an event.
* @param event * @param event
Expand All @@ -121,12 +109,6 @@ public abstract class QueueingHistoryWriter<TEvent>
*/ */
protected abstract long getTimestamp(TEvent event); protected abstract long getTimestamp(TEvent event);


/**
* Get the Kafka topic for events sent to Kafka.
* @return Topic string.
*/
protected abstract String getKafkaTopic();

/** /**
* Get the path at which events should be stored. Generally the path will differ based on * Get the path at which events should be stored. Generally the path will differ based on
* some parameters of the event. For example, all events associated with a particular host * some parameters of the event. For example, all events associated with a particular host
Expand All @@ -153,8 +135,7 @@ protected int getMaxQueueSize() {
return DEFAULT_MAX_QUEUE_SIZE; return DEFAULT_MAX_QUEUE_SIZE;
} }


public QueueingHistoryWriter(final ZooKeeperClient client, final Path backingFile, public QueueingHistoryWriter(final ZooKeeperClient client, final Path backingFile)
final KafkaClientProvider kafkaProvider)
throws IOException, InterruptedException { throws IOException, InterruptedException {
this.client = checkNotNull(client, "client"); this.client = checkNotNull(client, "client");
this.backingStore = PersistentAtomicReference.create( this.backingStore = PersistentAtomicReference.create(
Expand All @@ -167,14 +148,6 @@ public QueueingHistoryWriter(final ZooKeeperClient client, final Path backingFil
}); });
this.events = backingStore.get(); this.events = backingStore.get();


if (kafkaProvider != null) {
// Get the Kafka Producer suitable for TaskStatus events.
this.kafkaProducer = kafkaProvider.getProducer(
new StringSerializer(), new KafkaEventSerializer());
} else {
this.kafkaProducer = Optional.absent();
}

// Clean out any errant null values. Normally shouldn't have any, but we did have a few // Clean out any errant null values. Normally shouldn't have any, but we did have a few
// where it happened, and this will make sure we can get out of a bad state if we get into it. // where it happened, and this will make sure we can get out of a bad state if we get into it.
final ImmutableSet<String> curKeys = ImmutableSet.copyOf(this.events.keySet()); final ImmutableSet<String> curKeys = ImmutableSet.copyOf(this.events.keySet());
Expand All @@ -200,15 +173,10 @@ protected void startUp() throws Exception {
protected void shutDown() throws Exception { protected void shutDown() throws Exception {
zkWriterExecutor.shutdownNow(); zkWriterExecutor.shutdownNow();
zkWriterExecutor.awaitTermination(1, TimeUnit.MINUTES); zkWriterExecutor.awaitTermination(1, TimeUnit.MINUTES);

if (kafkaProducer.isPresent()) {
// Otherwise it enters an infinite loop for some reason.
kafkaProducer.get().close();
}
} }


/** /**
* Add an event to the queue to be written to ZooKeeper and optionally sent to Kafka. * Add an event to the queue to be written to ZooKeeper.
* @param event * @param event
* @throws InterruptedException * @throws InterruptedException
*/ */
Expand Down Expand Up @@ -346,13 +314,7 @@ public void run() {
return; return;
} }


if (tryWriteToZooKeeper(event)) { if (!tryWriteToZooKeeper(event)) {
// managed to write to ZK. also send to kafka if we need to. we do it like this
// to avoid sending duplicate events to kafka in case ZK write fails.
if (kafkaProducer.isPresent()) {
sendToKafka(event);
}
} else {
putBack(event); putBack(event);
} }
} }
Expand Down Expand Up @@ -407,17 +369,6 @@ private boolean tryWriteToZooKeeper(TEvent event) {
return true; return true;
} }


private void sendToKafka(TEvent event) {
try {
final Future<RecordMetadata> future = kafkaProducer.get().send(
new ProducerRecord<String, TEvent>(getKafkaTopic(), event));
final RecordMetadata metadata = future.get(5, TimeUnit.SECONDS);
log.debug("Sent an event to Kafka, meta: {}", metadata);
} catch (ExecutionException | InterruptedException | TimeoutException e) {
log.error("Unable to send an event to Kafka", e);
}
}

private void trimStatusEvents(final List<String> events, final String eventsPath) { private void trimStatusEvents(final List<String> events, final String eventsPath) {
// All this to sort numerically instead of lexically.... // All this to sort numerically instead of lexically....
final List<Long> eventsAsLongs = Lists.newArrayList(Iterables.transform(events, final List<Long> eventsAsLongs = Lists.newArrayList(Iterables.transform(events,
Expand All @@ -438,17 +389,4 @@ public Long apply(String name) {
} }
} }
} }

public class KafkaEventSerializer implements Serializer<TEvent> {
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) { }

@Override
public byte[] serialize(final String topic, final TEvent value) {
return toBytes(value);
}

@Override
public void close() { }
}
} }
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ public class TaskHistoryWriterTest {


private ZooKeeperTestManager zk; private ZooKeeperTestManager zk;
private DefaultZooKeeperClient client; private DefaultZooKeeperClient client;
private KafkaClientProvider kafkaProvider;
private TaskHistoryWriter writer; private TaskHistoryWriter writer;
private ZooKeeperMasterModel masterModel; private ZooKeeperMasterModel masterModel;
private Path agentStateDirs; private Path agentStateDirs;
Expand All @@ -94,8 +93,7 @@ public void setUp() throws Exception {
agentStateDirs = Files.createTempDirectory("helios-agents"); agentStateDirs = Files.createTempDirectory("helios-agents");


client = new DefaultZooKeeperClient(zk.curator()); client = new DefaultZooKeeperClient(zk.curator());
kafkaProvider = KafkaClientProvider.getTestingProvider(); makeWriter(client);
makeWriter(client, kafkaProvider);
masterModel = new ZooKeeperMasterModel(new ZooKeeperClientProvider(client, masterModel = new ZooKeeperMasterModel(new ZooKeeperClientProvider(client,
ZooKeeperModelReporter.noop())); ZooKeeperModelReporter.noop()));
client.ensurePath(Paths.configJobs()); client.ensurePath(Paths.configJobs());
Expand All @@ -111,17 +109,16 @@ public void tearDown() throws Exception {
zk.stop(); zk.stop();
} }


private void makeWriter(ZooKeeperClient client, KafkaClientProvider kafkaProvider) private void makeWriter(final ZooKeeperClient client)
throws Exception { throws Exception {
writer = new TaskHistoryWriter(HOSTNAME, client, kafkaProvider, writer = new TaskHistoryWriter(HOSTNAME, client, agentStateDirs.resolve("task-history.json"));
agentStateDirs.resolve("task-history.json"));
writer.startUp(); writer.startUp();
} }


@Test @Test
public void testZooKeeperErrorDoesntLoseItemsReally() throws Exception { public void testZooKeeperErrorDoesntLoseItemsReally() throws Exception {
final ZooKeeperClient mockClient = mock(ZooKeeperClient.class, delegatesTo(client)); final ZooKeeperClient mockClient = mock(ZooKeeperClient.class, delegatesTo(client));
makeWriter(mockClient, kafkaProvider); makeWriter(mockClient);
final String path = Paths.historyJobHostEventsTimestamp(JOB_ID, HOSTNAME, TIMESTAMP); final String path = Paths.historyJobHostEventsTimestamp(JOB_ID, HOSTNAME, TIMESTAMP);
final KeeperException exc = new ConnectionLossException(); final KeeperException exc = new ConnectionLossException();
// make save operations fail // make save operations fail
Expand Down Expand Up @@ -178,7 +175,7 @@ public void testWriteWithZooKeeperDownAndInterveningCrash() throws Exception {
writer.saveHistoryItem(TASK_STATUS, TIMESTAMP); writer.saveHistoryItem(TASK_STATUS, TIMESTAMP);
// simulate a crash by recreating the writer // simulate a crash by recreating the writer
writer.stopAsync().awaitTerminated(); writer.stopAsync().awaitTerminated();
makeWriter(client, kafkaProvider); makeWriter(client);
zk.start(); zk.start();
final TaskStatusEvent historyItem = Iterables.getOnlyElement(awaitHistoryItems()); final TaskStatusEvent historyItem = Iterables.getOnlyElement(awaitHistoryItems());
assertEquals(JOB_ID, historyItem.getStatus().getJob().getId()); assertEquals(JOB_ID, historyItem.getStatus().getJob().getId());
Expand Down

0 comments on commit 687bda5

Please sign in to comment.