Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

let NSQProducer no longer bind with topic #13

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions .classpath
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,6 @@
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry excluding="**" kind="src" output="target/classes" path="src/main/resources">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="src" output="target/test-classes" path="src/test/java">
<attributes>
<attribute name="optional" value="true"/>
Expand All @@ -22,12 +17,12 @@
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/J2SE-1.5">
<classpathentry kind="con" path="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER">
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.7">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
Expand Down
66 changes: 54 additions & 12 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,16 +1,58 @@
*.class
# Node #
######################
lib-cov
*.seed
*.log
*.csv
*.dat
*.out
*.pid
*.gz

# Package Files #
*.jar
*.war
*.ear
pids
logs
results

# Eclipse settings
.settings
npm-debug.log
node_modules

# IDEA settings
*.iml
.idea
# Logs and databases #
######################
*.log
*.sql
*.sqlite

# Maven output
/target
# OS generated files #
######################
.DS_Store
.DS_Store?
._*
.Spotlight-V100
.Trashes
ehthumbs.db
Thumbs.db

# Editor #
######################
.idea/
iron.iml
*.sublime-workspace
*.sublime-project
iron.iml
/nbproject/
.tm_properties

/kdt.zip
.tags
.tags_sorted_by_file

.buildpath
.project
.classpath
.settings/
koala-config.json

.externalToolBuilders/
.gitignore

target/
19 changes: 16 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>ly.bit</groupId>
<artifactId>nsq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<version>0.0.4-SNAPSHOT</version>
<name>Java NSQ Client</name>
<dependencies>
<dependency>
Expand Down Expand Up @@ -57,10 +57,23 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>3.0</version>
<configuration>
<source>1.6</source>
<target>1.6</target>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
</plugins>
</build>

<distributionManagement>
<repository>
<id>releases</id>
<name>Nexus Releases Repository</name>
<url>http://192.168.66.204:8081/content/repositories/releases/</url>
</repository>
<snapshotRepository>
<id>snapshots</id>
<name>Nexus Snapshots Repository</name>
<url>http://192.168.66.204:8081/content/repositories/snapshots/</url>
</snapshotRepository>
</distributionManagement>
</project>
1 change: 0 additions & 1 deletion src/main/java/ly/bit/nsq/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
package ly.bit.nsq;

import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.Arrays;

import ly.bit.nsq.exceptions.NSQException;
import ly.bit.nsq.util.ConnectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BasicConnection extends Connection {
private static final Logger log = LoggerFactory.getLogger(BasicConnection.class);
public class DefaultConnection extends Connection {
private static final Logger log = LoggerFactory.getLogger(DefaultConnection.class);

private Socket sock;
private InputStream inputStream;
Expand Down
119 changes: 87 additions & 32 deletions src/main/java/ly/bit/nsq/NSQProducer.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package ly.bit.nsq;

import ly.bit.nsq.exceptions.NSQException;
import ly.bit.nsq.lookupd.DefaultLookup;
import ly.bit.nsq.util.StringUtils;

import org.apache.http.HttpResponse;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.params.ClientPNames;
import org.apache.http.client.params.CookiePolicy;
import org.apache.http.conn.ClientConnectionManager;
import org.apache.http.conn.scheme.PlainSocketFactory;
import org.apache.http.conn.scheme.Scheme;
import org.apache.http.conn.scheme.SchemeRegistry;
Expand All @@ -21,7 +23,10 @@

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
Expand All @@ -33,29 +38,37 @@ public class NSQProducer {
private static final String PUT_URL = "/put?topic=";
private static final int DEFAULT_SOCKET_TIMEOUT = 2000;
private static final int DEFAULT_CONNECTION_TIMEOUT = 2000;

private String url;
private String topic;
private static final int MAX_PER_ROUTE_CONNECTIONS = 32;
private static final int MAX_CONNECTIONS = 64;
private static final int MAX_RETRY_COUNT = 3;

private String defaultNsqdAddr;
private DefaultLookup lookup;
private ConcurrentHashMap<String, List<String>> hostIndex;
private ConcurrentHashMap<String, Integer> reTryCountMap;
protected ExecutorService executor = Executors.newCachedThreadPool();


protected HttpClient httpclient;
protected PoolingClientConnectionManager cm;
// TODO add timeout config / allow setting any httpclient param via getHtttpClient

// Convenience constructor assuming local nsqd on standard port
public NSQProducer(String topic) {
this("http://127.0.0.1:4151", topic);
public NSQProducer(String defaultNsqdAddr, String lookupAddr) {
this(lookupAddr);
this.defaultNsqdAddr = StringUtils.trimRight("/", defaultNsqdAddr);
}

public NSQProducer(String url, String topic) {
this.topic = topic;
this.url = url + PUT_URL + topic;
public NSQProducer(String lookupAddr) {
this.lookup = new DefaultLookup(lookupAddr);
this.hostIndex = new ConcurrentHashMap<String, List<String>>();
this.reTryCountMap = new ConcurrentHashMap<String, Integer>();

SchemeRegistry schemeRegistry = new SchemeRegistry();
schemeRegistry.register(
new Scheme("http", 80, PlainSocketFactory.getSocketFactory()));

ClientConnectionManager cm = new PoolingClientConnectionManager(schemeRegistry);
cm = new PoolingClientConnectionManager(schemeRegistry);
cm.setDefaultMaxPerRoute(MAX_PER_ROUTE_CONNECTIONS);
cm.setMaxTotal(MAX_CONNECTIONS);

this.httpclient = new DefaultHttpClient(cm);
this.setSocketTimeout(DEFAULT_SOCKET_TIMEOUT);
Expand All @@ -78,9 +91,13 @@ public void run(){
* @param message
* @throws NSQException
*/
public void put(String message) throws NSQException {
public void put(String message, String topic) throws NSQException {
HttpPost post = null;
try {
String url = getUrl(topic);
if (url == null) {
throw new NSQException("can't get topic:("+topic+") http producer");
}
post = new HttpPost(url);
post.setEntity(new StringEntity(message));
HttpResponse response = this.httpclient.execute(post);
Expand All @@ -90,12 +107,25 @@ public void put(String message) throws NSQException {
if (response.getEntity() != null) {
EntityUtils.consume(response.getEntity());
}
reTryCountMap.put(topic, 0);
} catch (UnsupportedEncodingException e) {
throw new NSQException(e);
} catch (ClientProtocolException e) {
throw new NSQException(e);
} catch (IOException e) {
throw new NSQException(e);
Integer reTryCount = reTryCountMap.get(topic);
if (reTryCount != null && reTryCount.intValue() < MAX_RETRY_COUNT) {
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
throw new NSQException(e1);
}
reTryCountMap.put(topic, MAX_RETRY_COUNT);
hostIndex.remove(topic);
put(message, topic);// retry
} else {
throw new NSQException(e);
}
} finally {
if (post != null) {
post.releaseConnection();
Expand All @@ -109,22 +139,24 @@ public void put(String message) throws NSQException {
* @param message
* @return
*/
public FutureTask<Void> putAsync(String message) {
FutureTask task = new FutureTask<Void>(new NSQAsyncWriter(message));
public FutureTask<Void> putAsync(String message, String topic) {
FutureTask<Void> task = new FutureTask<Void>(new NSQAsyncWriter(message, topic));
executor.execute(task);
return task;

}

public class NSQAsyncWriter implements Callable<Void> {
private String message = null;
private String topic = null;

NSQAsyncWriter(String message) {
NSQAsyncWriter(String message, String topic) {
this.message = message;
this.topic = topic;
}
public Void call() throws NSQException {
try {
NSQProducer.this.put(message);
NSQProducer.this.put(message, topic);
} catch (NSQException e) {
// Log the error here since caller probably won't ever check the future.
log.error("Error posting NSQ message:", e);
Expand All @@ -144,19 +176,26 @@ public void shutdown() {
}
}

public String toString(){
return "Writer<" + this.url + ">";
}
public String getUrl() {
return url;
}

public String getTopic() {
return topic;
}

public void setTopic(String topic) {
this.topic = topic;
public String getUrl(String topic) {
List<String> urls = hostIndex.get(topic);
if (urls == null) {
if (StringUtils.isBlank(lookup.getLookupAddr()) && !StringUtils.isBlank(defaultNsqdAddr)) {
return new StringBuffer(defaultNsqdAddr).append(PUT_URL).append(topic).toString();
} else {
List<String> httpAddrs = lookup.getHttpAddrs(topic);
if (httpAddrs == null) httpAddrs = lookup.getHttpAddrs();
if (httpAddrs == null) return null;
urls = new ArrayList<String>(httpAddrs.size());
for (String httpAddr : httpAddrs) {
String url = new StringBuffer(httpAddr).append(PUT_URL).append(topic).toString();
urls.add(url);
}
hostIndex.put(topic, urls);
reTryCountMap.put(topic, 0);
}
}

return urls.get((int)(Math.random()*100) % urls.size());
}

/**
Expand All @@ -178,5 +217,21 @@ public void setSocketTimeout(int timeout) {
public void setConnectionTimeout(int timeout) {
this.httpclient.getParams().setIntParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, timeout);
}

/**
* default 2
* @param max
*/
public void setDefaultMaxPerRoute(int max) {
this.cm.setDefaultMaxPerRoute(max);
}

/**
* default 20
* @param max
*/
public void setMaxTotal(int max) {
this.cm.setMaxTotal(max);
}

}
Loading