Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

Solr exporter http posts are now asynchronous

  • Loading branch information...
commit 6e7647131be86fa5cfa0655d9f9ac5acc5bf0d02 1 parent b284270
Mike Wiederhold authored
5 .classpath
@@ -4,5 +4,10 @@
4 4 <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6"/>
5 5 <classpathentry kind="lib" path="lib/slf4j-api-1.6.1.jar"/>
6 6 <classpathentry kind="lib" path="lib/slf4j-simple-1.6.1.jar"/>
  7 + <classpathentry kind="lib" path="lib/httpclient-4.1.jar"/>
  8 + <classpathentry kind="lib" path="lib/httpasyncclient-4.0-alpha1.jar"/>
  9 + <classpathentry kind="lib" path="lib/httpcore-nio-4.1.jar"/>
  10 + <classpathentry kind="lib" path="lib/httpcore-4.1.jar"/>
  11 + <classpathentry kind="lib" path="lib/commons-logging-api-1.1.jar"/>
7 12 <classpathentry kind="output" path="bin"/>
8 13 </classpath>
BIN  lib/commons-logging-api-1.1.jar
Binary file not shown
BIN  lib/httpasyncclient-4.0-alpha1.jar
Binary file not shown
BIN  lib/httpclient-4.1.jar
Binary file not shown
BIN  lib/httpcore-4.1.jar
Binary file not shown
BIN  lib/httpcore-nio-4.1.jar
Binary file not shown
27 src/com/membase/jtap/TapRunner.java
... ... @@ -1,7 +1,10 @@
1 1 package com.membase.jtap;
2 2
  3 +import org.apache.http.nio.reactor.IOReactorException;
  4 +
3 5 import com.membase.jtap.exporter.Exporter;
4 6 import com.membase.jtap.exporter.FileExporter;
  7 +import com.membase.jtap.exporter.SolrExporter;
5 8 import com.membase.jtap.exporter.TextExporter;
6 9 import com.membase.jtap.ops.BackfillStream;
7 10 import com.membase.jtap.ops.CustomStream;
@@ -14,19 +17,35 @@
14 17 public static void main(String args[]) {
15 18 int[] vbucketlist = {1, 0};
16 19 //Exporter exporter = new FileExporter("results");
17   - Exporter exporter = new TextExporter();
  20 + //Exporter exporter = new TextExporter();
  21 + Exporter exporter = null;
  22 + try {
  23 + exporter = new SolrExporter("http://10.2.1.11:8983");
  24 + } catch (IOReactorException e) {
  25 + // TODO Auto-generated catch block
  26 + e.printStackTrace();
  27 + }
18 28 //TapStream tapListener = new DumpStream(exporter, "node1");
19 29 //TapStream tapListener = new BackfillStream(exporter, "node1", null);
20   -
21   -
  30 + /*try {
  31 + exporter.write("key1");
  32 + exporter.write("key2");
  33 + Thread.sleep(10000);
  34 + } catch (Exception e) {
  35 + // TODO Auto-generated catch block
  36 + e.printStackTrace();
  37 + }
  38 + System.exit(0);*/
22 39 CustomStream tapListener = new CustomStream(exporter, "node1");
23   - tapListener.keysOnly();
  40 + //tapListener.keysOnly();
24 41 tapListener.doDump();
25 42 //tapListener.specifyVbuckets(vbucketlist);
26 43 TapStreamClient client = new TapStreamClient("10.2.1.11", 11210, "saslbucket", "password");
27 44 //TapStreamClient client = new TapStreamClient("10.2.1.11", 11210, "default", null);
28 45 client.start(tapListener);
29 46
  47 + // No arrays
  48 + // Don't pass tapStream in start
30 49
31 50 /*try {
32 51 Thread.sleep(10000);
17 src/com/membase/jtap/TapStreamClient.java
@@ -38,7 +38,7 @@
38 38 private String password;
39 39 private int port;
40 40 private SocketChannel channel;
41   - public BlockingQueue<Response> rQueue;
  41 + private BlockingQueue<Response> rQueue;
42 42 private BlockingQueue<Response> wQueue;
43 43 private SASLAuthenticator sasl;
44 44 private Thread mbuilder;
@@ -157,6 +157,8 @@ public MessageBuilder(Thread reader, BlockingQueue<Response> rQueue, TapStream t
157 157 public void run() {
158 158 bodylen = 0;
159 159 mpos = 0;
  160 + double tot = 0;
  161 + double count = 0;
160 162 boolean headerparsed = false;
161 163
162 164 while (reader.getState() != Thread.State.TERMINATED || rQueue.size() > 0) {
@@ -228,6 +230,7 @@ private void getNextResponse() {
228 230
229 231 class SocketReader implements Runnable {
230 232 private static final Logger LOG = LoggerFactory.getLogger(SocketReader.class);
  233 + private static final int MAX_RQUEUE_SIZE = 20480;
231 234 private static final int BUFFER_SIZE = 256;
232 235
233 236 BlockingQueue<Response> rQueue;
@@ -243,6 +246,8 @@ public void run() {
243 246 ByteBuffer rbuf;
244 247 int bytesRead = 0;
245 248 while (bytesRead >= 0) {
  249 + if (rQueue.size() > MAX_RQUEUE_SIZE)
  250 + backoff();
246 251 rbuf = ByteBuffer.allocateDirect(BUFFER_SIZE);
247 252 try {
248 253 rbuf.clear();
@@ -269,6 +274,16 @@ public void run() {
269 274 }
270 275 LOG.info("SocketReader terminating");
271 276 }
  277 +
  278 + private void backoff() {
  279 + try {
  280 + while (rQueue.size() > MAX_RQUEUE_SIZE) {
  281 + Thread.sleep(1000);
  282 + }
  283 + } catch (InterruptedException e) {
  284 + LOG.info("SocketReader backoff stopped");
  285 + }
  286 + }
272 287 }
273 288
274 289 class SocketWriter implements Runnable {
97 src/com/membase/jtap/exporter/SolrExporter.java
... ... @@ -0,0 +1,97 @@
  1 +package com.membase.jtap.exporter;
  2 +
  3 +import org.apache.http.HttpEntity;
  4 +import org.apache.http.HttpResponse;
  5 +import org.apache.http.client.methods.HttpPost;
  6 +import org.apache.http.entity.StringEntity;
  7 +import org.apache.http.impl.nio.client.DefaultHttpAsyncClient;
  8 +import org.apache.http.nio.client.HttpAsyncClient;
  9 +import org.apache.http.nio.concurrent.FutureCallback;
  10 +import org.apache.http.nio.reactor.IOReactorException;
  11 +
  12 +import com.membase.jtap.TapStreamClient;
  13 +
  14 +public class SolrExporter implements Exporter {
  15 + private HttpAsyncClient httpclient;
  16 + private int posts;
  17 + private int totalposts;
  18 + private int completed;
  19 + private String host;
  20 +
  21 + public SolrExporter(String host) throws IOReactorException {
  22 + this.posts = 0;
  23 + this.totalposts = 0;
  24 + this.completed = 0;
  25 + this.host = host;
  26 + this.httpclient = new DefaultHttpAsyncClient();
  27 + httpclient.start();
  28 + }
  29 +
  30 + @Override
  31 + public void write(String key) {
  32 + String xml = "<add><doc><field name=\"id\">" + key
  33 + + "</field></doc></add>";
  34 + try {
  35 + postData(xml);
  36 + } catch (Exception e) {
  37 + // TODO Auto-generated catch block
  38 + e.printStackTrace();
  39 + System.exit(0);
  40 + }
  41 + }
  42 +
  43 + @Override
  44 + public void write(String key, String value) {
  45 + String xml = "<add><doc><field name=\"id\">" + key
  46 + + "</field><field name=\"name\">" + value
  47 + + "</field></doc></add>";
  48 + try {
  49 + postData(xml);
  50 + } catch (Exception e) {
  51 + // TODO Auto-generated catch block
  52 + e.printStackTrace();
  53 + System.exit(0);
  54 + }
  55 + }
  56 +
  57 + @Override
  58 + public void close() {
  59 + // TODO Auto-generated method stub
  60 + }
  61 +
  62 + public void postData(String content) throws Exception {
  63 + HttpPost request;
  64 + HttpEntity entity = new StringEntity(content);
  65 + if (posts < 10000) {
  66 + request = new HttpPost(host + "/solr/update");
  67 + request.setEntity(entity);
  68 + posts++;
  69 + } else {
  70 + totalposts += 10000;
  71 + System.out.println("Posts : " + totalposts);
  72 + System.out.println("Completed Posts: " + completed);
  73 + request = new HttpPost(host + "/solr/update");
  74 + request.setEntity(new StringEntity("<commit/>"));
  75 + posts = 0;
  76 + }
  77 + httpclient.execute(request, new FutureCallback<HttpResponse>() {
  78 + @Override
  79 + public void cancelled() {
  80 + System.out.println("Cancelled");
  81 + }
  82 +
  83 + @Override
  84 + public void completed(HttpResponse response) {
  85 + if (response.getStatusLine().getStatusCode() != 200)
  86 + System.out.println(response.getStatusLine());
  87 + completed++;
  88 + }
  89 +
  90 + @Override
  91 + public void failed(Exception e) {
  92 + System.out.println("Failed: " + e.getMessage());
  93 + System.exit(0);
  94 + }
  95 + });
  96 + }
  97 +}
4 src/com/membase/jtap/message/RequestMessage.java
@@ -41,11 +41,11 @@ public void setFlags(Flag f) {
41 41 public void setBackfill(Date date) {
42 42 backfilldate = new byte[BACKFILL_DATE_FIELD_LENGTH];
43 43 if (date == null) {
44   - for (int i = 0; i < 8; i++)
  44 + for (int i = 0; i < 8; i++)
45 45 backfilldate[i] = -1;
46 46 } else {
47 47 Util.valueToField(backfilldate, 0, BACKFILL_DATE_FIELD_LENGTH, date.getTime());
48   - }
  48 + }
49 49 encode();
50 50 }
51 51
7 src/com/membase/jtap/ops/CustomStream.java
@@ -81,10 +81,6 @@ public void receive(ResponseMessage streamMessage) {
81 81 // Ignore
82 82 } else {
83 83 String key = streamMessage.getKey();
84   - if (key.equals("")) {
85   - streamMessage.printMessage();
86   - System.exit(0);
87   - }
88 84 if (!keysonly) {
89 85 try {
90 86 String value = streamMessage.getValue();
@@ -94,8 +90,9 @@ public void receive(ResponseMessage streamMessage) {
94 90 }
95 91 } else {
96 92 exporter.write(key);
97   - count++;
98 93 }
  94 + count++;
  95 + //System.out.println(count);
99 96 }
100 97 }
101 98

0 comments on commit 6e76471

Please sign in to comment.
Something went wrong with that request. Please try again.