Permalink
Browse files

init

  • Loading branch information...
boneill42 committed Aug 1, 2012
1 parent c870183 commit e89bb3cc144ac6808fbe813e0bf9b7a0f954c8dd
View
@@ -1,4 +1,12 @@
-zookeeper-distributed-queue-example
-===================================
+Example using Zookeeper for a Distributed Queue via Curator
+============================================================
+
+
+This example uses Netflix's Curator recipe for Distributed Queueing backed by Zookeeper. To use the example, simply run a local zookeeper. Then compile.
+
+This JUnit test uses the queue:
+./src/test/java/com/github/boneill42/ZkDqTest.java
+
+For details of the setup, look at the constructor in:
+./src/main/java/com/github/boneill42/ZkDqQueuer.java
-Example using Zookeeper for a Distributed Queue via Curator
View
70 pom.xml
@@ -0,0 +1,70 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <packaging>jar</packaging>
+ <name> Curator : Distributed Queue : Example</name>
+ <description>Reactor</description>
+ <url>https://github.com/boneill42/curator-distributed-queue-example</url>
+ <groupId>com.github.boneill42</groupId>
+ <artifactId>curator-distributed-queue</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+
+ <licenses>
+ <license>
+ <name>The Apache Software License, Version 2.0</name>
+ <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+ <distribution>repo</distribution>
+ </license>
+ </licenses>
+
+ <scm>
+ <connection>scm:git:git@github.com:boneill42/curator-distributed-queue-example.git</connection>
+ <developerConnection>scm:git:git@github.com:boneill42/curator-distributed-queue-example.git</developerConnection>
+ <url>git@github.com:boneill42/curator-distributed-queue-example.git</url>
+ </scm>
+
+ <developers>
+ <developer>
+ <id>boneill42</id>
+ <name>Brian O'Neill</name>
+ <email>bone@alumni.brown.edu</email>
+ </developer>
+ </developers>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.netflix.curator</groupId>
+ <artifactId>curator-recipes</artifactId>
+ <version>1.1.15</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.10</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.5.1</version>
+ <configuration>
+ <source>1.5</source>
+ <target>1.5</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-source-plugin</artifactId>
+ <version>2.1.2</version>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <version>2.8.1</version>
+ </plugin>
+ </plugins>
+ </build>
+</project>
@@ -0,0 +1,19 @@
+package com.github.boneill42;
+
+import com.netflix.curator.framework.CuratorFramework;
+import com.netflix.curator.framework.recipes.queue.QueueConsumer;
+import com.netflix.curator.framework.state.ConnectionState;
+
+public class ZkDqConsumer implements QueueConsumer<ZkDqWork> {
+
+ public void stateChanged(CuratorFramework framework, ConnectionState state) {
+ System.out.println("State [" + state + "]");
+
+ }
+
+ public void consumeMessage(ZkDqWork work) throws Exception {
+ System.out.println("Consuming (" + work + ")");
+ }
+
+
+}
@@ -0,0 +1,38 @@
+package com.github.boneill42;
+
+import com.netflix.curator.ensemble.fixed.FixedEnsembleProvider;
+import com.netflix.curator.framework.CuratorFramework;
+import com.netflix.curator.framework.CuratorFrameworkFactory;
+import com.netflix.curator.framework.recipes.queue.DistributedQueue;
+import com.netflix.curator.framework.recipes.queue.QueueBuilder;
+import com.netflix.curator.retry.RetryOneTime;
+
+public class ZkDqQueuer {
+ DistributedQueue<ZkDqWork> queue;
+
+ public ZkDqQueuer() throws Exception {
+ CuratorFramework client = CuratorFrameworkFactory.builder()
+ .retryPolicy(new RetryOneTime(10)).namespace("ZkDqTest")
+ .ensembleProvider(new FixedEnsembleProvider("localhost:2181"))
+ .build();
+ client.start();
+
+ ZkDqConsumer consumer = new ZkDqConsumer();
+ ZkDqSerializer serializer = new ZkDqSerializer();
+ QueueBuilder<ZkDqWork> builder = QueueBuilder.builder(client,
+ consumer, serializer, "/com/zq/test");
+ queue = builder.buildQueue();
+ queue.start();
+
+ }
+
+ public void queueMessages() throws Exception {
+ for (int i = 0; i < 10; i++) {
+ ZkDqWork work = new ZkDqWork("testWork [" + i + "]");
+ this.queue.put(work);
+ System.out.println("Queued [" + i + "]");
+ }
+ Thread.sleep(5000);
+ }
+
+}
@@ -0,0 +1,16 @@
+package com.github.boneill42;
+
+import com.netflix.curator.framework.recipes.queue.QueueSerializer;
+
+public class ZkDqSerializer implements QueueSerializer<ZkDqWork> {
+
+ public ZkDqWork deserialize(byte[] buffer) {
+ String work = new String(buffer);
+ return new ZkDqWork(work);
+ }
+
+ public byte[] serialize(ZkDqWork work) {
+ return work.toString().getBytes();
+ }
+
+}
@@ -0,0 +1,13 @@
+package com.github.boneill42;
+
+public class ZkDqWork {
+ private String work;
+
+ public ZkDqWork(String work){
+ this.work = work;
+ }
+
+ public String toString(){
+ return this.work;
+ }
+}
@@ -0,0 +1,18 @@
+create keyspace Indexing;
+
+use Indexing;
+
+create column family Configuration
+ with comparator = 'UTF8Type'
+ and default_validation_class = 'UTF8Type'
+ and key_validation_class = 'UTF8Type';
+
+create column family Indexes
+ with comparator = 'UTF8Type'
+ and default_validation_class = 'UTF8Type'
+ and key_validation_class = 'UTF8Type';
+
+create column family CommitLog
+ with comparator = 'UTF8Type'
+ and default_validation_class = 'UTF8Type'
+ and key_validation_class = 'UTF8Type';
@@ -0,0 +1,15 @@
+package com.github.boneill42;
+
+import org.junit.Test;
+
+import com.github.boneill42.ZkDqQueuer;
+
+public class ZkDqTest {
+
+ @Test
+ public void testDistributedQueue() throws Throwable {
+ ZkDqQueuer queuer = new ZkDqQueuer();
+ queuer.queueMessages();
+ }
+
+}

0 comments on commit e89bb3c

Please sign in to comment.