Skip to content
Browse files

Per-thread connection handling; set client-id on connections coming f…

…rom same RiakClient.
  • Loading branch information...
1 parent 569ef19 commit f2cfbb25df68f4533f2628aa6b46e8466a2a5dd3 @krestenkrab committed
View
10 NOTICE
@@ -21,7 +21,7 @@
This project includes the following 3rd party pieces
- Protocol buffers spec for Riak client API
+ Misc. files from Riak and Riak's riak-java-client
** riakclient.proto: Protocol buffers for riak
**
@@ -42,8 +42,12 @@ This project includes the following 3rd party pieces
** under the License.
-
-
+ Base64 Encoder
+
+ //Copyright 2003-2010 Christian d'Heureuse, Inventec Informatik AG, Zurich, Switzerland
+ //www.source-code.biz, www.inventec.ch/chdh
+
+
Protocol buffers support library
protobuf-java-2.3.0.jar
View
514 src/com/trifork/riak/RiakClient.java
@@ -18,13 +18,9 @@
package com.trifork.riak;
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetAddress;
-import java.net.Socket;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.ArrayList;
@@ -39,9 +35,7 @@
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.MessageLite;
import com.trifork.riak.RPB.RpbDelReq;
-import com.trifork.riak.RPB.RpbErrorResp;
import com.trifork.riak.RPB.RpbGetClientIdResp;
import com.trifork.riak.RPB.RpbGetReq;
import com.trifork.riak.RPB.RpbGetResp;
@@ -53,49 +47,25 @@
import com.trifork.riak.RPB.RpbPutReq;
import com.trifork.riak.RPB.RpbPutResp;
import com.trifork.riak.RPB.RpbSetClientIdReq;
-import com.trifork.riak.RPB.RpbMapRedReq.Builder;
import com.trifork.riak.mapreduce.MapReduceResponse;
-public class RiakClient {
-
- public static final int MSG_ErrorResp = 0;
- public static final int MSG_PingReq = 1;
- public static final int MSG_PingResp = 2;
- public static final int MSG_GetClientIdReq = 3;
- public static final int MSG_GetClientIdResp = 4;
- public static final int MSG_SetClientIdReq = 5;
- public static final int MSG_SetClientIdResp = 6;
- public static final int MSG_GetServerInfoReq = 7;
- public static final int MSG_GetServerInfoResp = 8;
- public static final int MSG_GetReq = 9;
- public static final int MSG_GetResp = 10;
- public static final int MSG_PutReq = 11;
- public static final int MSG_PutResp = 12;
- public static final int MSG_DelReq = 13;
- public static final int MSG_DelResp = 14;
- public static final int MSG_ListBucketsReq = 15;
- public static final int MSG_ListBucketsResp = 16;
- public static final int MSG_ListKeysReq = 17;
- public static final int MSG_ListKeysResp = 18;
- public static final int MSG_GetBucketReq = 19;
- public static final int MSG_GetBucketResp = 20;
- public static final int MSG_SetBucketReq = 21;
- public static final int MSG_SetBucketResp = 22;
- public static final int MSG_MapRedReq = 23;
- public static final int MSG_MapRedResp = 24;
-
- private static final int DEFAULT_RIAK_PB_PORT = 8087;
+public class RiakClient implements RiakMessageCodes {
+
private static final RiakObject[] NO_RIAK_OBJECTS = new RiakObject[0];
private static final ByteString[] NO_BYTE_STRINGS = new ByteString[0];
+ private static final String[] NO_STRINGS = new String[0];
private static final MapReduceResponse[] NO_MAP_REDUCE_RESPONSES = new MapReduceResponse[0];
- private Socket sock;
- private DataOutputStream dout;
- private DataInputStream din;
+
private String node;
private String serverVersion;
+ private InetAddress addr;
+ private int port;
+
+ /** if this has been set (or gotten) then it will be applied to new connections */
+ private ByteString clientID;
public RiakClient(String host) throws IOException {
- this(host, DEFAULT_RIAK_PB_PORT);
+ this(host, RiakConnection.DEFAULT_RIAK_PB_PORT);
}
public RiakClient(String host, int port) throws IOException {
@@ -103,20 +73,30 @@ public RiakClient(String host, int port) throws IOException {
}
public RiakClient(InetAddress addr, int port) throws IOException {
- sock = new Socket(addr, port);
-
- sock.setSendBufferSize(1024 * 200);
-
- dout = new DataOutputStream(new BufferedOutputStream(sock
- .getOutputStream(), 1024 * 200));
- din = new DataInputStream(
- new BufferedInputStream(sock.getInputStream(), 1024 * 200));
+ this.addr = addr;
+ this.port = port;
+ }
- ping();
+ private ThreadLocal<RiakConnection> connections = new ThreadLocal<RiakConnection>();
- // prepareClientID();
+ RiakConnection getConnection() throws IOException {
+ RiakConnection c = connections.get();
+ if (c == null || !c.endIdleAndCheckValid()) {
+ c = new RiakConnection(addr, port);
+
+ if (this.clientID != null) {
+ setClientID(clientID);
+ }
+ } else {
+ // we're fine! //
+ }
+ connections.set(null);
+ return c;
+ }
- getServerInfo();
+ void release(RiakConnection c) {
+ c.beginIdle();
+ connections.set(c);
}
/**
@@ -150,8 +130,13 @@ public void prepareClientID() throws IOException {
}
public void ping() throws IOException {
- send(MSG_PingReq);
- receive_code(MSG_PingResp);
+ RiakConnection c = getConnection();
+ try {
+ c.send(MSG_PingReq);
+ c.receive_code(MSG_PingResp);
+ } finally {
+ release(c);
+ }
}
public void setClientID(String id) throws IOException {
@@ -163,36 +148,54 @@ public void setClientID(String id) throws IOException {
public void setClientID(ByteString id) throws IOException {
RpbSetClientIdReq req = RPB.RpbSetClientIdReq.newBuilder().setClientId(
id).build();
- send(MSG_SetClientIdReq, req);
- receive_code(MSG_SetClientIdResp);
+ RiakConnection c = getConnection();
+ try {
+ c.send(MSG_SetClientIdReq, req);
+ c.receive_code(MSG_SetClientIdResp);
+ } finally {
+ release(c);
+ }
+
+ this.clientID = id;
}
public String getClientID() throws IOException {
- send(MSG_GetClientIdReq);
- byte[] data = receive(MSG_GetClientIdResp);
- if (data == null)
- return null;
- RpbGetClientIdResp res = RPB.RpbGetClientIdResp.parseFrom(data);
- return res.getClientId().toStringUtf8();
+ RiakConnection c = getConnection();
+ try {
+ c.send(MSG_GetClientIdReq);
+ byte[] data = c.receive(MSG_GetClientIdResp);
+ if (data == null)
+ return null;
+ RpbGetClientIdResp res = RPB.RpbGetClientIdResp.parseFrom(data);
+ clientID = res.getClientId();
+ return clientID.toStringUtf8();
+ } finally {
+ release(c);
+ }
}
public Map<String, String> getServerInfo() throws IOException {
- send(MSG_GetServerInfoReq);
- byte[] data = receive(MSG_GetServerInfoResp);
- if (data == null)
- return Collections.emptyMap();
-
- RpbGetServerInfoResp res = RPB.RpbGetServerInfoResp.parseFrom(data);
- if (res.hasNode()) {
- this.node = res.getNode().toStringUtf8();
- }
- if (res.hasServerVersion()) {
- this.serverVersion = res.getServerVersion().toStringUtf8();
+ RiakConnection c = getConnection();
+ try {
+ c.send(MSG_GetServerInfoReq);
+ byte[] data = c.receive(MSG_GetServerInfoResp);
+ if (data == null)
+ return Collections.emptyMap();
+
+ RpbGetServerInfoResp res = RPB.RpbGetServerInfoResp.parseFrom(data);
+ if (res.hasNode()) {
+ this.node = res.getNode().toStringUtf8();
+ }
+ if (res.hasServerVersion()) {
+ this.serverVersion = res.getServerVersion().toStringUtf8();
+ }
+ Map<String, String> result = new HashMap<String, String>();
+ result.put("node", node);
+ result.put("server_version", serverVersion);
+ return result;
+ } finally {
+ release(c);
}
- Map<String, String> result = new HashMap<String, String>();
- result.put("node", node);
- result.put("server_version", serverVersion);
- return result;
}
// /////////////////////
@@ -208,8 +211,13 @@ public String getClientID() throws IOException {
RpbGetReq req = RPB.RpbGetReq.newBuilder().setBucket(bucket)
.setKey(key).setR(readQuorum).build();
- send(MSG_GetReq, req);
- return process_fetch_reply(bucket, key);
+ RiakConnection c = getConnection();
+ try {
+ c.send(MSG_GetReq, req);
+ return process_fetch_reply(c, bucket, key);
+ } finally {
+ release(c);
+ }
}
@@ -223,13 +231,19 @@ public String getClientID() throws IOException {
RpbGetReq req = RPB.RpbGetReq.newBuilder().setBucket(bucket)
.setKey(key).build();
- send(MSG_GetReq, req);
- return process_fetch_reply(bucket, key);
+ RiakConnection c = getConnection();
+ try {
+ c.send(MSG_GetReq, req);
+ return process_fetch_reply(c, bucket, key);
+ } finally {
+ release(c);
+ }
}
- private RiakObject[] process_fetch_reply(ByteString bucket, ByteString key)
- throws IOException, InvalidProtocolBufferException {
- byte[] rep = receive(MSG_GetResp);
+ private RiakObject[] process_fetch_reply(RiakConnection c,
+ ByteString bucket, ByteString key) throws IOException,
+ InvalidProtocolBufferException {
+ byte[] rep = c.receive(MSG_GetResp);
if (rep == null) {
return NO_RIAK_OBJECTS;
@@ -250,58 +264,68 @@ public String getClientID() throws IOException {
public ByteString[] store(RiakObject[] values, RequestMeta meta)
throws IOException {
- BulkReader reader = new BulkReader(values.length);
- Thread worker = new Thread(reader);
- worker.start();
+ RiakConnection c = getConnection();
+ try {
+ BulkReader reader = new BulkReader(c, values.length);
+ Thread worker = new Thread(reader);
+ worker.start();
- for (int i = 0; i < values.length; i++) {
- RiakObject value = values[i];
+ DataOutputStream dout = c.getOutputStream();
- RPB.RpbPutReq.Builder builder = RPB.RpbPutReq.newBuilder()
- .setBucket(value.getBucket()).setKey(value.getKey())
- .setContent(value.buildContent());
+ for (int i = 0; i < values.length; i++) {
+ RiakObject value = values[i];
- if (value.getVclock() != null) {
- builder.setVclock(value.getVclock());
- }
+ RPB.RpbPutReq.Builder builder = RPB.RpbPutReq.newBuilder()
+ .setBucket(value.getBucketBS())
+ .setKey(value.getKeyBS()).setContent(
+ value.buildContent());
- if (meta != null) {
+ if (value.getVclock() != null) {
+ builder.setVclock(value.getVclock());
+ }
- builder.setReturnBody(false);
+ if (meta != null) {
- if (meta.writeQuorum != null) {
- builder.setW(meta.writeQuorum.intValue());
- }
+ builder.setReturnBody(false);
+
+ if (meta.writeQuorum != null) {
+ builder.setW(meta.writeQuorum.intValue());
+ }
- if (meta.durableWriteQuorum != null) {
- builder.setDw(meta.durableWriteQuorum.intValue());
+ if (meta.durableWriteQuorum != null) {
+ builder.setDw(meta.durableWriteQuorum.intValue());
+ }
}
- }
- RpbPutReq req = builder.build();
+ RpbPutReq req = builder.build();
- int len = req.getSerializedSize();
- dout.writeInt(len + 1);
- dout.write(MSG_PutReq);
- req.writeTo(dout);
- }
+ int len = req.getSerializedSize();
+ dout.writeInt(len + 1);
+ dout.write(MSG_PutReq);
+ req.writeTo(dout);
+ }
- dout.flush();
+ dout.flush();
- try {
- worker.join();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
+ try {
+ worker.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
- return reader.vclocks;
+ return reader.vclocks;
+ } finally {
+ release(c);
+ }
}
class BulkReader implements Runnable {
private ByteString[] vclocks;
+ private final RiakConnection c;
- public BulkReader(int count) {
+ public BulkReader(RiakConnection c, int count) {
+ this.c = c;
this.vclocks = new ByteString[count];
}
@@ -310,7 +334,7 @@ public void run() {
try {
for (int i = 0; i < vclocks.length; i++) {
- byte[] data = receive(MSG_PutResp);
+ byte[] data = c.receive(MSG_PutResp);
if (data != null) {
RpbPutResp resp = RPB.RpbPutResp.parseFrom(data);
vclocks[i] = resp.getVclock();
@@ -333,7 +357,7 @@ public void store(RiakObject value) throws IOException {
throws IOException {
RPB.RpbPutReq.Builder builder = RPB.RpbPutReq.newBuilder().setBucket(
- value.getBucket()).setKey(value.getKey()).setContent(
+ value.getBucketBS()).setKey(value.getKeyBS()).setContent(
value.buildContent());
if (value.getVclock() != null) {
@@ -344,24 +368,29 @@ public void store(RiakObject value) throws IOException {
meta.preparePut(builder);
}
- send(MSG_PutReq, builder.build());
- byte[] r = receive(MSG_PutResp);
+ RiakConnection c = getConnection();
+ try {
+ c.send(MSG_PutReq, builder.build());
+ byte[] r = c.receive(MSG_PutResp);
+
+ if (r == null) {
+ return NO_RIAK_OBJECTS;
+ }
- if (r == null) {
- return NO_RIAK_OBJECTS;
- }
-
- RpbPutResp resp = RPB.RpbPutResp.parseFrom(r);
+ RpbPutResp resp = RPB.RpbPutResp.parseFrom(r);
- RiakObject[] res = new RiakObject[resp.getContentsCount()];
- ByteString vclock = resp.getVclock();
+ RiakObject[] res = new RiakObject[resp.getContentsCount()];
+ ByteString vclock = resp.getVclock();
- for (int i = 0; i < res.length; i++) {
- res[i] = new RiakObject(vclock, value.getBucket(), value.getKey(),
- resp.getContents(i));
- }
+ for (int i = 0; i < res.length; i++) {
+ res[i] = new RiakObject(vclock, value.getBucketBS(), value
+ .getKeyBS(), resp.getContents(i));
+ }
- return res;
+ return res;
+ } finally {
+ release(c);
+ }
}
// /////////////////////
@@ -376,8 +405,14 @@ public void delete(ByteString bucket, ByteString key, int rw)
RpbDelReq req = RPB.RpbDelReq.newBuilder().setBucket(bucket)
.setKey(key).setRw(rw).build();
- send(MSG_DelReq, req);
- receive_code(MSG_DelResp);
+ RiakConnection c = getConnection();
+ try {
+ c.send(MSG_DelReq, req);
+ c.receive_code(MSG_DelResp);
+ } finally {
+ release(c);
+ }
+
}
void delete(String bucket, String key) throws IOException {
@@ -388,17 +423,28 @@ public void delete(ByteString bucket, ByteString key) throws IOException {
RpbDelReq req = RPB.RpbDelReq.newBuilder().setBucket(bucket)
.setKey(key).build();
- send(MSG_DelReq, req);
- receive_code(MSG_DelResp);
+ RiakConnection c = getConnection();
+ try {
+ c.send(MSG_DelReq, req);
+ c.receive_code(MSG_DelResp);
+ } finally {
+ release(c);
+ }
}
public ByteString[] listBuckets() throws IOException {
- send(MSG_ListBucketsReq);
+ byte[] data;
+ RiakConnection c = getConnection();
+ try {
+ c.send(MSG_ListBucketsReq);
- byte[] data = receive(MSG_ListBucketsResp);
- if (data == null) {
- return NO_BYTE_STRINGS;
+ data = c.receive(MSG_ListBucketsResp);
+ if (data == null) {
+ return NO_BYTE_STRINGS;
+ }
+ } finally {
+ release(c);
}
RpbListBucketsResp resp = RPB.RpbListBucketsResp.parseFrom(data);
@@ -409,143 +455,111 @@ public void delete(ByteString bucket, ByteString key) throws IOException {
return out;
}
- public BucketProperties getBucketProperties(ByteString bucket) throws IOException {
-
- send(MSG_GetBucketReq,
- RPB.RpbGetBucketReq.newBuilder()
- .setBucket(bucket).build());
-
- byte[] data = receive(MSG_GetBucketResp);
- BucketProperties bp = new BucketProperties();
- if (data == null) {
+ public BucketProperties getBucketProperties(ByteString bucket)
+ throws IOException {
+
+ RiakConnection c = getConnection();
+ try {
+ c.send(MSG_GetBucketReq, RPB.RpbGetBucketReq.newBuilder()
+ .setBucket(bucket).build());
+
+ byte[] data = c.receive(MSG_GetBucketResp);
+ BucketProperties bp = new BucketProperties();
+ if (data == null) {
+ return bp;
+ }
+
+ bp.init(RPB.RpbGetBucketResp.parseFrom(data));
return bp;
- }
+ } finally {
+ release(c);
+ }
- bp.init(RPB.RpbGetBucketResp.parseFrom(data));
- return bp;
}
-
- public void setBucketProperties(ByteString bucket, BucketProperties props) throws IOException {
-
- RPB.RpbSetBucketReq req = RPB.RpbSetBucketReq.newBuilder()
- .setBucket(bucket)
- .setProps( props.build() ).build();
-
- send(MSG_SetBucketReq, req);
- receive_code(MSG_SetBucketResp);
+
+ public void setBucketProperties(ByteString bucket, BucketProperties props)
+ throws IOException {
+
+ RPB.RpbSetBucketReq req = RPB.RpbSetBucketReq.newBuilder().setBucket(
+ bucket).setProps(props.build()).build();
+
+ RiakConnection c = getConnection();
+ try {
+ c.send(MSG_SetBucketReq, req);
+ c.receive_code(MSG_SetBucketResp);
+ } finally {
+ release(c);
+ }
}
-
-
+
// /////////////////////
public ByteString[] listKeys(ByteString bucket) throws IOException {
- send(MSG_ListKeysReq, RPB.RpbListKeysReq.newBuilder().setBucket(bucket)
- .build());
-
List<ByteString> keys = new ArrayList<ByteString>();
- RpbListKeysResp r;
- do {
- byte[] data = receive(MSG_ListKeysResp);
- if (data == null) {
- return NO_BYTE_STRINGS;
- }
- r = RPB.RpbListKeysResp.parseFrom(data);
+ RiakConnection c = getConnection();
+ try {
+ c.send(MSG_ListKeysReq, RPB.RpbListKeysReq.newBuilder().setBucket(
+ bucket).build());
+
+ RpbListKeysResp r;
+ do {
+ byte[] data = c.receive(MSG_ListKeysResp);
+ if (data == null) {
+ return NO_BYTE_STRINGS;
+ }
+ r = RPB.RpbListKeysResp.parseFrom(data);
- for (int i = 0; i < r.getKeysCount(); i++) {
- keys.add(r.getKeys(i));
- }
+ for (int i = 0; i < r.getKeysCount(); i++) {
+ keys.add(r.getKeys(i));
+ }
- } while (!r.hasDone() || r.getDone() == false);
+ } while (!r.hasDone() || r.getDone() == false);
+ } finally {
+ release(c);
+ }
return keys.toArray(new ByteString[keys.size()]);
}
-
-
- ///////////////////////
+ // /////////////////////
MapReduceResponse[] mapreduce(JSONObject obj) throws IOException {
- return mapreduce(ByteString.copyFromUtf8(obj.toString()),
+ return mapreduce(ByteString.copyFromUtf8(obj.toString()),
new RequestMeta().contentType("application/json"));
}
- public MapReduceResponse[] mapreduce(ByteString request, RequestMeta meta) throws IOException {
-
- ByteString contentType = meta.getContentType();
- if (contentType == null) {
- throw new IllegalArgumentException("no content type");
- }
- RpbMapRedReq req = RPB.RpbMapRedReq.newBuilder()
- .setRequest(request)
- .setContentType(meta.getContentType()).build();
-
- send(MSG_MapRedReq, req);
- byte[] data = receive(MSG_MapRedResp);
- if (data == null) {
- return NO_MAP_REDUCE_RESPONSES;
- }
-
+ public MapReduceResponse[] mapreduce(ByteString request, RequestMeta meta)
+ throws IOException {
List<MapReduceResponse> out = new ArrayList<MapReduceResponse>();
- RpbMapRedResp resp;
- do {
- resp = RPB.RpbMapRedResp.parseFrom(data);
- out.add(new MapReduceResponse(resp, contentType));
-
- } while (!resp.hasDone() || resp.getDone() == false);
-
- return out.toArray(new MapReduceResponse[out.size()]);
- }
-
-
- ///////////////////////
-
- private void send(int code, MessageLite req) throws IOException {
- int len = req.getSerializedSize();
- dout.writeInt(len + 1);
- dout.write(code);
- req.writeTo(dout);
- dout.flush();
- }
-
- private void send(int code) throws IOException {
- dout.writeInt(1);
- dout.write(code);
- dout.flush();
- }
-
- private byte[] receive(int code) throws IOException {
- int len = din.readInt();
- int get_code = din.read();
+ RiakConnection c = getConnection();
+ try {
+ ByteString contentType = meta.getContentType();
+ if (contentType == null) {
+ throw new IllegalArgumentException("no content type");
+ }
+ RpbMapRedReq req = RPB.RpbMapRedReq.newBuilder()
+ .setRequest(request).setContentType(meta.getContentType())
+ .build();
- if (code == MSG_ErrorResp) {
- RpbErrorResp err = com.trifork.riak.RPB.RpbErrorResp.parseFrom(din);
- throw new RiakError(err);
- }
+ c.send(MSG_MapRedReq, req);
+ byte[] data = c.receive(MSG_MapRedResp);
+ if (data == null) {
+ return NO_MAP_REDUCE_RESPONSES;
+ }
- byte[] data = null;
- if (len > 1) {
- data = new byte[len - 1];
- din.readFully(data);
- }
+ RpbMapRedResp resp;
+ do {
+ resp = RPB.RpbMapRedResp.parseFrom(data);
+ out.add(new MapReduceResponse(resp, contentType));
- if (code != get_code) {
- throw new IOException("bad message code");
+ } while (!resp.hasDone() || resp.getDone() == false);
+ } finally {
+ release(c);
}
- return data;
+ return out.toArray(new MapReduceResponse[out.size()]);
}
- private void receive_code(int code) throws IOException, RiakError {
- int len = din.readInt();
- int get_code = din.read();
- if (code == MSG_ErrorResp) {
- RpbErrorResp err = com.trifork.riak.RPB.RpbErrorResp.parseFrom(din);
- throw new RiakError(err);
- }
- if (len != 1 || code != get_code) {
- throw new IOException("bad message code");
- }
- }
}
View
160 src/com/trifork/riak/RiakConnection.java
@@ -0,0 +1,160 @@
+/**
+ * This file is part of riak-java-pb-client
+ *
+ * Copyright (c) 2010 by Trifork
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package com.trifork.riak;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import com.google.protobuf.MessageLite;
+import com.trifork.riak.RPB.RpbErrorResp;
+
+public class RiakConnection {
+
+ static final int DEFAULT_RIAK_PB_PORT = 8087;
+
+ private Socket sock;
+ private DataOutputStream dout;
+ private DataInputStream din;
+
+ public RiakConnection(String host) throws IOException {
+ this(host, DEFAULT_RIAK_PB_PORT);
+ }
+
+ public RiakConnection(String host, int port) throws IOException {
+ this(InetAddress.getByName(host), port);
+ }
+
+ public RiakConnection(InetAddress addr, int port) throws IOException {
+ sock = new Socket(addr, port);
+
+ sock.setSendBufferSize(1024 * 200);
+
+ dout = new DataOutputStream(new BufferedOutputStream(sock
+ .getOutputStream(), 1024 * 200));
+ din = new DataInputStream(
+ new BufferedInputStream(sock.getInputStream(), 1024 * 200));
+ }
+
+ ///////////////////////
+
+ void send(int code, MessageLite req) throws IOException {
+ int len = req.getSerializedSize();
+ dout.writeInt(len + 1);
+ dout.write(code);
+ req.writeTo(dout);
+ dout.flush();
+ }
+
+ void send(int code) throws IOException {
+ dout.writeInt(1);
+ dout.write(code);
+ dout.flush();
+ }
+
+ byte[] receive(int code) throws IOException {
+ int len = din.readInt();
+ int get_code = din.read();
+
+ if (code == RiakClient.MSG_ErrorResp) {
+ RpbErrorResp err = com.trifork.riak.RPB.RpbErrorResp.parseFrom(din);
+ throw new RiakError(err);
+ }
+
+ byte[] data = null;
+ if (len > 1) {
+ data = new byte[len - 1];
+ din.readFully(data);
+ }
+
+ if (code != get_code) {
+ throw new IOException("bad message code");
+ }
+
+ return data;
+ }
+
+ void receive_code(int code) throws IOException, RiakError {
+ int len = din.readInt();
+ int get_code = din.read();
+ if (code == RiakClient.MSG_ErrorResp) {
+ RpbErrorResp err = com.trifork.riak.RPB.RpbErrorResp.parseFrom(din);
+ throw new RiakError(err);
+ }
+ if (len != 1 || code != get_code) {
+ throw new IOException("bad message code");
+ }
+ }
+
+ static Timer idle_timer = new Timer();
+ TimerTask idle_timeout;
+
+ public void beginIdle() {
+ idle_timeout = new TimerTask() {
+
+ @Override
+ public void run() {
+ RiakConnection.this.timer_fired(this);
+ }
+ };
+
+ idle_timer.schedule(idle_timeout, 1000);
+ }
+
+ synchronized void timer_fired(TimerTask fired_timer) {
+ if (idle_timeout != fired_timer) {
+ // if it is not our current timer, then ignore
+ return;
+ }
+
+ try {
+ sock.close();
+ din = null;
+ dout = null;
+ sock = null;
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ synchronized boolean endIdleAndCheckValid() {
+ TimerTask tt = idle_timeout;
+ if (tt != null) { tt.cancel(); }
+ idle_timeout = null;
+
+ if (sock == null) {
+ return false;
+ } else {
+ return true;
+ }
+ }
+
+ public DataOutputStream getOutputStream() {
+ return dout;
+ }
+
+
+}
View
1 src/com/trifork/riak/RiakError.java
@@ -22,6 +22,7 @@
import com.trifork.riak.RPB.RpbErrorResp;
+@SuppressWarnings("serial")
public class RiakError extends IOException {
public RiakError(RpbErrorResp err) {
View
48 src/com/trifork/riak/RiakMessageCodes.java
@@ -0,0 +1,48 @@
+/**
+ * This file is part of riak-java-pb-client
+ *
+ * Copyright (c) 2010 by Trifork
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package com.trifork.riak;
+
+interface RiakMessageCodes {
+ public static final int MSG_ErrorResp = 0;
+ public static final int MSG_PingReq = 1;
+ public static final int MSG_PingResp = 2;
+ public static final int MSG_GetClientIdReq = 3;
+ public static final int MSG_GetClientIdResp = 4;
+ public static final int MSG_SetClientIdReq = 5;
+ public static final int MSG_SetClientIdResp = 6;
+ public static final int MSG_GetServerInfoReq = 7;
+ public static final int MSG_GetServerInfoResp = 8;
+ public static final int MSG_GetReq = 9;
+ public static final int MSG_GetResp = 10;
+ public static final int MSG_PutReq = 11;
+ public static final int MSG_PutResp = 12;
+ public static final int MSG_DelReq = 13;
+ public static final int MSG_DelResp = 14;
+ public static final int MSG_ListBucketsReq = 15;
+ public static final int MSG_ListBucketsResp = 16;
+ public static final int MSG_ListKeysReq = 17;
+ public static final int MSG_ListKeysResp = 18;
+ public static final int MSG_GetBucketReq = 19;
+ public static final int MSG_GetBucketResp = 20;
+ public static final int MSG_SetBucketReq = 21;
+ public static final int MSG_SetBucketResp = 22;
+ public static final int MSG_MapRedReq = 23;
+ public static final int MSG_MapRedResp = 24;
+
+}
View
22 src/com/trifork/riak/RiakObject.java
@@ -81,25 +81,40 @@ public RiakObject(ByteString bucket, ByteString key, ByteString content) {
this.value = content;
}
+ public RiakObject(String bucket, String key, String content) {
+ this.bucket = ByteString.copyFromUtf8(bucket);
+ this.key = ByteString.copyFromUtf8(key);
+ this.value = ByteString.copyFromUtf8(content);
+ }
+
private String str(ByteString str) {
if (str == null) return null;
return str.toStringUtf8();
}
- public ByteString getBucket() {
+ public ByteString getBucketBS() {
return bucket;
}
- public ByteString getKey() {
+ public String getBucket() {
+ return bucket.toStringUtf8();
+ }
+
+
+ public ByteString getKeyBS() {
return key;
}
+ public String getKey() {
+ return key.toStringUtf8();
+ }
+
public ByteString getVclock() {
return vclock;
}
- public RpbContent buildContent() {
+ RpbContent buildContent() {
Builder b =
RpbContent.newBuilder()
.setValue(value);
@@ -135,7 +150,6 @@ public RpbContent buildContent() {
}
if (userMeta != null && !userMeta.isEmpty()) {
- int i = 0;
for (Map.Entry<String, String> ent : userMeta.entrySet()) {
ByteString key = ByteString.copyFromUtf8(ent.getKey());
com.trifork.riak.RPB.RpbPair.Builder pb = RPB.RpbPair.newBuilder().setKey(key);

0 comments on commit f2cfbb2

Please sign in to comment.
Something went wrong with that request. Please try again.