Skip to content
This repository has been archived by the owner on Sep 12, 2024. It is now read-only.

Commit

Permalink
simplify usage of KafkaSender
Browse files Browse the repository at this point in the history
- KafkaSender is never null in non-test code, so save some indents by
  not checking for null
- same loop for sending a list of events appears a few times
  • Loading branch information
mattnworb committed Dec 3, 2015
1 parent 12040ea commit 8c17809
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.base.Joiner;
import com.google.common.base.MoreObjects;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -147,28 +148,18 @@ public int compare(TaskStatusEvent arg0, TaskStatusEvent arg1) {
private final String name;
private final KafkaSender kafkaSender;

public ZooKeeperMasterModel(final ZooKeeperClientProvider provider)
throws IOException, InterruptedException {
this(provider, null);
}

public ZooKeeperMasterModel(final ZooKeeperClientProvider provider, @Nullable final String name) {
this(provider, name, null);
}

/**
* Constructor
* @param provider {@link ZooKeeperClientProvider}
* @param name The hostname of the machine running the {@link MasterModel}
* @param kafkaSender {@link KafkaSender}
*/
public ZooKeeperMasterModel(
final ZooKeeperClientProvider provider,
@Nullable final String name,
@Nullable final KafkaSender kafkaSender) {
this.provider = provider;
this.name = name;
this.kafkaSender = kafkaSender;
public ZooKeeperMasterModel(final ZooKeeperClientProvider provider,
final String name,
final KafkaSender kafkaSender) {
this.provider = Preconditions.checkNotNull(provider);
this.name = Preconditions.checkNotNull(name);
this.kafkaSender = Preconditions.checkNotNull(kafkaSender);
}

/**
Expand Down Expand Up @@ -425,13 +416,7 @@ public void updateDeploymentGroupHosts(final String name, final List<String> hos
}

client.transaction(ops);

if (kafkaSender != null) {
for (final Map<String, Object> event : events) {
kafkaSender.send(KafkaRecord.of(
DEPLOYMENT_GROUP_EVENTS_KAFKA_TOPIC, Json.asBytesUnchecked(event)));
}
}
emitEvents(DEPLOYMENT_GROUP_EVENTS_KAFKA_TOPIC, events);
}
} catch (NoNodeException e) {
throw new DeploymentGroupDoesNotExistException(name, e);
Expand Down Expand Up @@ -470,12 +455,7 @@ public void rollingUpdate(final DeploymentGroup deploymentGroup,
client.ensurePath(Paths.statusDeploymentGroupTasks(updated.getName()));
client.transaction(operations);

if (kafkaSender != null) {
for (final Map<String, Object> event : op.events()) {
kafkaSender.send(KafkaRecord.of(
DEPLOYMENT_GROUP_EVENTS_KAFKA_TOPIC, Json.asBytesUnchecked(event)));
}
}
emitEvents(DEPLOYMENT_GROUP_EVENTS_KAFKA_TOPIC, op.events());
log.info("finished rolling-update on deployment-group: name={}", deploymentGroup.getName());
} catch (final NoNodeException e) {
throw new DeploymentGroupDoesNotExistException(deploymentGroup.getName());
Expand Down Expand Up @@ -600,14 +580,7 @@ public void rollingUpdateStep() {
ops.addAll(op.operations());
try {
client.transaction(ops);

// Emit events
if (kafkaSender != null) {
for (final Map<String, Object> event : op.events()) {
kafkaSender.send(KafkaRecord.of(
DEPLOYMENT_GROUP_EVENTS_KAFKA_TOPIC, Json.asBytesUnchecked(event)));
}
}
emitEvents(DEPLOYMENT_GROUP_EVENTS_KAFKA_TOPIC, op.events());
} catch (KeeperException.BadVersionException e) {
// some other master beat us in processing this rolling update step. not exceptional.
// ideally we would check the path in the exception, but curator doesn't provide a path
Expand All @@ -623,6 +596,13 @@ public void rollingUpdateStep() {
}
}

private void emitEvents(final String topic, final List<Map<String, Object>> events) {
// Emit events
for (final Map<String, Object> event : events) {
kafkaSender.send(KafkaRecord.of(topic, Json.asBytesUnchecked(event)));
}
}

private RollingUpdateOp rollingUpdateTimedoutError(final RollingUpdateOpFactory opFactory,
final String host,
final JobId jobId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.spotify.helios.master.JobNotDeployedException;
import com.spotify.helios.master.JobStillDeployedException;
import com.spotify.helios.master.ZooKeeperMasterModel;
import com.spotify.helios.servicescommon.KafkaSender;
import com.spotify.helios.servicescommon.coordination.DefaultZooKeeperClient;
import com.spotify.helios.servicescommon.coordination.Paths;
import com.spotify.helios.servicescommon.coordination.ZooKeeperClient;
Expand Down Expand Up @@ -62,6 +63,7 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;

@RunWith(MockitoJUnitRunner.class)
public class ZooKeeperMasterModelIntegrationTest {
Expand Down Expand Up @@ -103,8 +105,12 @@ public void setup() throws Exception {
client.ensurePath(Paths.statusMasters());
client.ensurePath(Paths.historyJobs());

model = new ZooKeeperMasterModel(
new ZooKeeperClientProvider(client, ZooKeeperModelReporter.noop()));
final ZooKeeperClientProvider zkProvider =
new ZooKeeperClientProvider(client, ZooKeeperModelReporter.noop());

final KafkaSender kafkaSender = mock(KafkaSender.class);

model = new ZooKeeperMasterModel(zkProvider, getClass().getName(), kafkaSender);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.spotify.helios.common.descriptors.TaskStatus.State;
import com.spotify.helios.common.descriptors.TaskStatusEvent;
import com.spotify.helios.master.ZooKeeperMasterModel;
import com.spotify.helios.servicescommon.KafkaSender;
import com.spotify.helios.servicescommon.coordination.DefaultZooKeeperClient;
import com.spotify.helios.servicescommon.coordination.Paths;
import com.spotify.helios.servicescommon.coordination.ZooKeeperClient;
Expand Down Expand Up @@ -90,8 +91,14 @@ public void setUp() throws Exception {

client = new DefaultZooKeeperClient(zk.curator());
makeWriter(client);
masterModel = new ZooKeeperMasterModel(new ZooKeeperClientProvider(client,
ZooKeeperModelReporter.noop()));

final ZooKeeperClientProvider zkProvider =
new ZooKeeperClientProvider(client, ZooKeeperModelReporter.noop());

final KafkaSender kafkaSender = mock(KafkaSender.class);

masterModel = new ZooKeeperMasterModel(zkProvider, "test", kafkaSender);

client.ensurePath(Paths.configJobs());
client.ensurePath(Paths.configJobRefs());
client.ensurePath(Paths.historyJobHostEvents(JOB_ID, HOSTNAME));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.spotify.helios.common.protocol.CreateJobResponse;
import com.spotify.helios.common.protocol.JobDeployResponse;
import com.spotify.helios.master.ZooKeeperMasterModel;
import com.spotify.helios.servicescommon.KafkaSender;
import com.spotify.helios.servicescommon.coordination.DefaultZooKeeperClient;
import com.spotify.helios.servicescommon.coordination.Paths;
import com.spotify.helios.servicescommon.coordination.ZooKeeperClientProvider;
Expand All @@ -47,6 +48,7 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;

public class UndeployFilteringTest extends SystemTestBase {

Expand All @@ -70,10 +72,15 @@ public class UndeployFilteringTest extends SystemTestBase {
public void setUp() throws Exception {
// make zookeeper interfaces
curator = zk().curator();

zkcp = new ZooKeeperClientProvider(
new DefaultZooKeeperClient(curator), ZooKeeperModelReporter.noop());
zkMasterModel = new ZooKeeperMasterModel(zkcp);

final KafkaSender kafkaSender = mock(KafkaSender.class);

zkMasterModel = new ZooKeeperMasterModel(zkcp, getClass().getName(), kafkaSender);
startDefaultMaster();

agent = startDefaultAgent(TEST_HOST);
client = defaultClient();
awaitHostRegistered(client, TEST_HOST, LONG_WAIT_SECONDS, SECONDS);
Expand Down

0 comments on commit 8c17809

Please sign in to comment.