Skip to content
This repository has been archived by the owner on Feb 12, 2022. It is now read-only.

Add a reference implementation of a SidelineTrigger using Zookeeper watches #48

Merged
merged 22 commits into from Nov 10, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 14 additions & 5 deletions pom.xml
Expand Up @@ -148,14 +148,23 @@
<optional>true</optional>
</dependency>

<!-- For simple Json Serialization -->
<!-- Apache Curator Recipes - Optional Dependency -->
<!-- Only required if you make use of the ZookeeperWatchTrigger -->
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Swapping this to Gson because Gson lets us deserialize directly to a class, where this does not.

<version>1.1.1</version>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>${curatorVersion}</version>
<optional>true</optional>
</dependency>

<!-- For JSON Serialization -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.2</version>
</dependency>

<!-- Testing Dependencies -->
<!-- Testing Dependencies -->

<!-- Deal with hamcrest incompatibilities...argh -->
<dependency>
Expand Down
Expand Up @@ -38,17 +38,25 @@ public class StaticMessageFilter implements FilterChainStep {
/**
* We need a way to make this instance unique from others, so we use a UUID.
*/
private final UUID uniqueId;
private final String id;

public StaticMessageFilter() {
this.uniqueId = UUID.randomUUID();
this.id = UUID.randomUUID().toString();
}

public StaticMessageFilter(final String id) {
this.id = id;
}

@Override
public boolean filter(Message message) {
return true;
}

public String getId() {
return id;
}

@Override
public boolean equals(Object other) {
if (this == other) {
Expand All @@ -60,18 +68,16 @@ public boolean equals(Object other) {

StaticMessageFilter that = (StaticMessageFilter) other;

return uniqueId.equals(that.uniqueId);
return id.equals(that.id);
}

@Override
public int hashCode() {
return uniqueId.hashCode();
return id.hashCode();
}

@Override
public String toString() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if toString should just return the id here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, probably not a big deal honestly as this is just a test class.

return "StaticMessageFilter{"
+ "uniqueId=" + uniqueId
+ '}';
return id;
}
}
Expand Up @@ -42,7 +42,7 @@
* Persistence layer implemented using Zookeeper.
* Why Zookeeper? Because its easy, and you most likely have it around.
*/
public class ZookeeperPersistenceAdapter implements PersistenceAdapter, Serializable {
public class ZookeeperPersistenceAdapter implements PersistenceAdapter {

/**
* Logger for logging logs.
Expand Down
Expand Up @@ -26,14 +26,16 @@
package com.salesforce.storm.spout.dynamic.persistence.zookeeper;

import com.google.common.base.Charsets;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.json.simple.JSONValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand All @@ -52,6 +54,12 @@ public class CuratorHelper {
*/
private CuratorFramework curator;

/**
* JSON parser.
*/
private final Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss").create();


/**
* Helper methods for common tasks when working with Curator.
* @param curator curator instance.
Expand All @@ -66,25 +74,42 @@ public CuratorHelper(final CuratorFramework curator) {
* @param path node to write the JSON data into.
* @param data map representation of JSON data to write.
*/
public void writeJson(String path, Map data) {
public void writeJson(final String path, final Object data) {
logger.debug("Zookeeper Writing {} the data {}", path, data.toString());
writeBytes(path, JSONValue.toJSONString(data).getBytes(Charsets.UTF_8));
writeBytes(path, gson.toJson(data).getBytes(Charsets.UTF_8));
}

/**
* Internal method for reading JSON from a zookeeper node.
* Read data from Zookeeper that has been stored as JSON.
*
* This method will return a HashMap from the JSON. You should consider using {@link #readJson(String, Class)} instead as it
* will deserialize the JSON to a concrete class.
*
* @param path node containing JSON to read from.
* @param <K> key of the json field.
* @param <K> key to the json field.
* @param <V> value of the json field.
* @return map representing the JSON stored within the zookeeper node.
*/
public <K, V> Map<K, V> readJson(String path) {
public <K, V> Map<K, V> readJson(final String path) {
return readJson(path, HashMap.class);
}

/**
* Read data from Zookeeper that has been stored as JSON.
*
* This method will return the JSON deserialized to the provided class.
*
* @param path node containing JSON to read from.
* @param clazz class of the object the JSON should be deserialized to.
* @return map representing the JSON stored within the zookeeper node.
*/
public <T> T readJson(final String path, final Class<T> clazz) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be public or private?

try {
byte[] bytes = readBytes(path);
if (bytes == null) {
return null;
}
return (Map<K, V>) JSONValue.parse(new String(bytes, Charsets.UTF_8));
return gson.fromJson(new String(bytes, Charsets.UTF_8), clazz);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Expand Up @@ -53,7 +53,7 @@
* Persistence layer implemented using Zookeeper.
* Why Zookeeper? Because its easy, and you most likely have it around.
*/
public class ZookeeperPersistenceAdapter implements PersistenceAdapter, Serializable {
public class ZookeeperPersistenceAdapter implements PersistenceAdapter {

/**
* Logger for logging logs.
Expand Down Expand Up @@ -164,6 +164,7 @@ public SidelinePayload retrieveSidelineRequest(SidelineRequestIdentifier id, int

// Read!
final String path = getZkRequestStatePathForPartition(id.toString(), partitionId);
// TODO: We should make a real object for this and update readJson() to support a class declaration
Map<Object, Object> json = curatorHelper.readJson(path);
logger.debug("Read request state from Zookeeper at {}: {}", path, json);

Expand All @@ -177,15 +178,15 @@ public SidelinePayload retrieveSidelineRequest(SidelineRequestIdentifier id, int

final FilterChainStep step = parseJsonToFilterChainSteps(json);

final Long startingOffset = (Long) json.get("startingOffset");
final Long endingOffset = (Long) json.get("endingOffset");
final Double startingOffset = (Double) json.get("startingOffset");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels weird because we use long's everywhere else. I believe Long is the preferred way to store large integers, where Double can allow you to store LARGER numbers but with less precision (seems bad)

Copy link
Contributor

@Crim Crim Nov 6, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this an artifact of using Gson? Perhaps configuring GSON with:

Gson gson = new GsonBuilder().
        registerTypeAdapter(Double.class,  new JsonSerializer<Double>() {   
       .....implementation returns a Long here...
});

Tho even that feels like a hack, so maybe there's a better way around this? Persisting an actual object instead of a map?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an artifact of switching to Gson and serializing to a Map. I'd like to create a concrete class for this payload, but I think I'd prefer to do that in a separate PR if that's agreeable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Create an issue for it so we can capture it and I'll +1 this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #54

final Double endingOffset = (Double) json.get("endingOffset");

return new SidelinePayload(
type,
id,
new SidelineRequest(id, step),
startingOffset,
endingOffset
startingOffset != null ? startingOffset.longValue() : null,
endingOffset != null ? endingOffset.longValue() : null
);
}

Expand Down
@@ -0,0 +1,111 @@
/**
* Copyright (c) 2017, Salesforce.com, Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
* following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice, this list of conditions and the following
* disclaimer.
*
* * Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials provided with the distribution.
*
* * Neither the name of Salesforce.com nor the names of its contributors may be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
* INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
* WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
* USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

package com.salesforce.storm.spout.sideline.trigger.example;

import com.salesforce.storm.spout.dynamic.config.annotation.Documentation;

import java.util.List;

/**
* Configuration for the {@link ZookeeperWatchTrigger}.
*/
public class Config {

/**
* Prefix for this set of configuration directives.
*/
public static final String PREFIX = "sideline.zookeeper_watch_trigger.";

/**
* (String) Class name for the class of the {@link FilterChainStepBuilder} instance.
*/
@Documentation(
description = "Class name for the class of the FilterChainStepBuilder instance.",
type = String.class
)
public static final String FILTER_CHAIN_STEP_BUILDER_CLASS = "sideline.zookeeper_watch_trigger.filter_chain_step_builder_cass";

/**
* (List[String) Holds a list of Zookeeper server Hostnames + Ports in the following format:
* ["zkhost1:2181", "zkhost2:2181", ...]
*
* Optional - Only required if you use the Zookeeper persistence implementation.
*/
@Documentation(
description = "Holds a list of Zookeeper server Hostnames + Ports in the following format: "
+ "[\"zkhost1:2181\", \"zkhost2:2181\", ...]",
type = List.class
)
public static final String ZK_SERVERS = "sideline.zookeeper_watch_trigger.servers";

/**
* (String) Defines the root path to watch for events under.
* Example: "/consumer-state"
*
* Optional - Only required if you use the Zookeeper persistence implementation.
*/
@Documentation(
description = "Defines the root path to watch for events under. Example: \"/sideline-trigger\"",
type = String.class
)
public static final String ZK_ROOTS = "sideline.zookeeper_watch_trigger.roost";

/**
* (Integer) Zookeeper session timeout.
*/
@Documentation(
description = "Zookeeper session timeout.",
type = Integer.class
)
public static final String ZK_SESSION_TIMEOUT = "sideline.zookeeper_watch_trigger.session_timeout";

/**
* (Integer) Zookeeper connection timeout.
*/
@Documentation(
description = "Zookeeper connection timeout.",
type = Integer.class
)
public static final String ZK_CONNECTION_TIMEOUT = "sideline.zookeeper_watch_trigger.connection_timeout";

/**
* (Integer) Zookeeper retry attempts.
*/
@Documentation(
description = "Zookeeper retry attempts.",
type = Integer.class
)
public static final String ZK_RETRY_ATTEMPTS = "sideline.zookeeper_watch_trigger.retry_attempts";

/**
* (Integer) Zookeeper retry interval.
*/
@Documentation(
description = "Zookeeper retry interval.",
type = Integer.class
)
public static final String ZK_RETRY_INTERVAL = "sideline.zookeeper_watch_trigger.retry_interval";
}
@@ -0,0 +1,43 @@
/**
* Copyright (c) 2017, Salesforce.com, Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
* following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice, this list of conditions and the following
* disclaimer.
*
* * Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials provided with the distribution.
*
* * Neither the name of Salesforce.com nor the names of its contributors may be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
* INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
* WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
* USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

package com.salesforce.storm.spout.sideline.trigger.example;

import com.salesforce.storm.spout.dynamic.filter.FilterChainStep;

import java.util.Map;

/**
* Given a map of a data from a {@link TriggerEvent} implementations of this generate {@link FilterChainStep} instances.
*/
public interface FilterChainStepBuilder {

/**
* Given a map of a data from a {@link TriggerEvent} implementations of this generate {@link FilterChainStep} instances.
* @param data data from a trigger event.
* @return filter chain step.
*/
FilterChainStep build(final Map<String,Object> data);
}