Permalink
Browse files

Initial Import

  • Loading branch information...
0 parents commit 67edbaab2827394282e34cb0f2732cad03273fe9 @krestenkrab committed Jun 24, 2010
9 .classpath
@@ -0,0 +1,9 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<classpath>
+ <classpathentry kind="src" path="src"/>
+ <classpathentry kind="src" path="gen"/>
+ <classpathentry kind="src" path="test"/>
+ <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
+ <classpathentry kind="lib" path="lib/protobuf-java-2.3.0.jar" sourcepath="/Users/krab/Projects/protobuf-2.3.0/java/src"/>
+ <classpathentry kind="output" path="bin"/>
+</classpath>
2 .gitignore
@@ -0,0 +1,2 @@
+bin
+*.class
17 .project
@@ -0,0 +1,17 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<projectDescription>
+ <name>riak-java-pb-client</name>
+ <comment></comment>
+ <projects>
+ </projects>
+ <buildSpec>
+ <buildCommand>
+ <name>org.eclipse.jdt.core.javabuilder</name>
+ <arguments>
+ </arguments>
+ </buildCommand>
+ </buildSpec>
+ <natures>
+ <nature>org.eclipse.jdt.core.javanature</nature>
+ </natures>
+</projectDescription>
7,979 gen/com/trifork/riak/RPB.java
7,979 additions, 0 deletions not shown because the diff is too large. Please use a local Git client to view these changes.
BIN lib/protobuf-java-2.3.0.jar
Binary file not shown.
307 src/com/trifork/riak/Base64Coder.java
@@ -0,0 +1,307 @@
+package com.trifork.riak;
+
+//Copyright 2003-2010 Christian d'Heureuse, Inventec Informatik AG, Zurich, Switzerland
+//www.source-code.biz, www.inventec.ch/chdh
+//
+//This module is multi-licensed and may be used under the terms
+//of any of the following licenses:
+//
+//EPL, Eclipse Public License, http://www.eclipse.org/legal
+//LGPL, GNU Lesser General Public License, http://www.gnu.org/licenses/lgpl.html
+//AL, Apache License, http://www.apache.org/licenses
+//BSD, BSD License, http://www.opensource.org/licenses/bsd-license.php
+//
+//Please contact the author if you need another license.
+//This module is provided "as is", without warranties of any kind.
+
+/**
+ * A Base64 encoder/decoder.
+ *
+ * <p>
+ * This class is used to encode and decode data in Base64 format as described in
+ * RFC 1521.
+ *
+ * <p>
+ * Project home page: <a
+ * href="http://www.source-code.biz/base64coder/java/">www.
+ * source-code.biz/base64coder/java</a><br>
+ * Author: Christian d'Heureuse, Inventec Informatik AG, Zurich, Switzerland<br>
+ * Multi-licensed: EPL / LGPL / AL / BSD.
+ */
+class Base64Coder {
+
+ // The line separator string of the operating system.
+ private static final String systemLineSeparator = System
+ .getProperty("line.separator");
+
+ // Mapping table from 6-bit nibbles to Base64 characters.
+ private static char[] map1 = new char[64];
+ static {
+ int i = 0;
+ for (char c = 'A'; c <= 'Z'; c++)
+ map1[i++] = c;
+ for (char c = 'a'; c <= 'z'; c++)
+ map1[i++] = c;
+ for (char c = '0'; c <= '9'; c++)
+ map1[i++] = c;
+ map1[i++] = '+';
+ map1[i++] = '/';
+ }
+
+ // Mapping table from Base64 characters to 6-bit nibbles.
+ private static byte[] map2 = new byte[128];
+ static {
+ for (int i = 0; i < map2.length; i++)
+ map2[i] = -1;
+ for (int i = 0; i < 64; i++)
+ map2[map1[i]] = (byte) i;
+ }
+
+ /**
+ * Encodes a string into Base64 format. No blanks or line breaks are
+ * inserted.
+ *
+ * @param s
+ * A String to be encoded.
+ * @return A String containing the Base64 encoded data.
+ */
+ public static String encodeString(String s) {
+ return new String(encode(s.getBytes()));
+ }
+
+ /**
+ * Encodes a byte array into Base 64 format and breaks the output into lines
+ * of 76 characters. This method is compatible with
+ * <code>sun.misc.BASE64Encoder.encodeBuffer(byte[])</code>.
+ *
+ * @param in
+ * An array containing the data bytes to be encoded.
+ * @return A String containing the Base64 encoded data, broken into lines.
+ */
+ public static String encodeLines(byte[] in) {
+ return encodeLines(in, 0, in.length, 76, systemLineSeparator);
+ }
+
+ /**
+ * Encodes a byte array into Base 64 format and breaks the output into
+ * lines.
+ *
+ * @param in
+ * An array containing the data bytes to be encoded.
+ * @param iOff
+ * Offset of the first byte in <code>in</code> to be processed.
+ * @param iLen
+ * Number of bytes to be processed in <code>in</code>, starting
+ * at <code>iOff</code>.
+ * @param lineLen
+ * Line length for the output data. Should be a multiple of 4.
+ * @param lineSeparator
+ * The line separator to be used to separate the output lines.
+ * @return A String containing the Base64 encoded data, broken into lines.
+ */
+ public static String encodeLines(byte[] in, int iOff, int iLen,
+ int lineLen, String lineSeparator) {
+ int blockLen = (lineLen * 3) / 4;
+ if (blockLen <= 0)
+ throw new IllegalArgumentException();
+ int lines = (iLen + blockLen - 1) / blockLen;
+ int bufLen = ((iLen + 2) / 3) * 4 + lines * lineSeparator.length();
+ StringBuilder buf = new StringBuilder(bufLen);
+ int ip = 0;
+ while (ip < iLen) {
+ int l = Math.min(iLen - ip, blockLen);
+ buf.append(encode(in, iOff + ip, l));
+ buf.append(lineSeparator);
+ ip += l;
+ }
+ return buf.toString();
+ }
+
+ /**
+ * Encodes a byte array into Base64 format. No blanks or line breaks are
+ * inserted in the output.
+ *
+ * @param in
+ * An array containing the data bytes to be encoded.
+ * @return A character array containing the Base64 encoded data.
+ */
+ public static char[] encode(byte[] in) {
+ return encode(in, 0, in.length);
+ }
+
+ /**
+ * Encodes a byte array into Base64 format. No blanks or line breaks are
+ * inserted in the output.
+ *
+ * @param in
+ * An array containing the data bytes to be encoded.
+ * @param iLen
+ * Number of bytes to process in <code>in</code>.
+ * @return A character array containing the Base64 encoded data.
+ */
+ public static char[] encode(byte[] in, int iLen) {
+ return encode(in, 0, iLen);
+ }
+
+ /**
+ * Encodes a byte array into Base64 format. No blanks or line breaks are
+ * inserted in the output.
+ *
+ * @param in
+ * An array containing the data bytes to be encoded.
+ * @param iOff
+ * Offset of the first byte in <code>in</code> to be processed.
+ * @param iLen
+ * Number of bytes to process in <code>in</code>, starting at
+ * <code>iOff</code>.
+ * @return A character array containing the Base64 encoded data.
+ */
+ public static char[] encode(byte[] in, int iOff, int iLen) {
+ int oDataLen = (iLen * 4 + 2) / 3; // output length without padding
+ int oLen = ((iLen + 2) / 3) * 4; // output length including padding
+ char[] out = new char[oLen];
+ int ip = iOff;
+ int iEnd = iOff + iLen;
+ int op = 0;
+ while (ip < iEnd) {
+ int i0 = in[ip++] & 0xff;
+ int i1 = ip < iEnd ? in[ip++] & 0xff : 0;
+ int i2 = ip < iEnd ? in[ip++] & 0xff : 0;
+ int o0 = i0 >>> 2;
+ int o1 = ((i0 & 3) << 4) | (i1 >>> 4);
+ int o2 = ((i1 & 0xf) << 2) | (i2 >>> 6);
+ int o3 = i2 & 0x3F;
+ out[op++] = map1[o0];
+ out[op++] = map1[o1];
+ out[op] = op < oDataLen ? map1[o2] : '=';
+ op++;
+ out[op] = op < oDataLen ? map1[o3] : '=';
+ op++;
+ }
+ return out;
+ }
+
+ /**
+ * Decodes a string from Base64 format. No blanks or line breaks are allowed
+ * within the Base64 encoded input data.
+ *
+ * @param s
+ * A Base64 String to be decoded.
+ * @return A String containing the decoded data.
+ * @throws IllegalArgumentException
+ * If the input is not valid Base64 encoded data.
+ */
+ public static String decodeString(String s) {
+ return new String(decode(s));
+ }
+
+ /**
+ * Decodes a byte array from Base64 format and ignores line separators, tabs
+ * and blanks. CR, LF, Tab and Space characters are ignored in the input
+ * data. This method is compatible with
+ * <code>sun.misc.BASE64Decoder.decodeBuffer(String)</code>.
+ *
+ * @param s
+ * A Base64 String to be decoded.
+ * @return An array containing the decoded data bytes.
+ * @throws IllegalArgumentException
+ * If the input is not valid Base64 encoded data.
+ */
+ public static byte[] decodeLines(String s) {
+ char[] buf = new char[s.length()];
+ int p = 0;
+ for (int ip = 0; ip < s.length(); ip++) {
+ char c = s.charAt(ip);
+ if (c != ' ' && c != '\r' && c != '\n' && c != '\t')
+ buf[p++] = c;
+ }
+ return decode(buf, 0, p);
+ }
+
+ /**
+ * Decodes a byte array from Base64 format. No blanks or line breaks are
+ * allowed within the Base64 encoded input data.
+ *
+ * @param s
+ * A Base64 String to be decoded.
+ * @return An array containing the decoded data bytes.
+ * @throws IllegalArgumentException
+ * If the input is not valid Base64 encoded data.
+ */
+ public static byte[] decode(String s) {
+ return decode(s.toCharArray());
+ }
+
+ /**
+ * Decodes a byte array from Base64 format. No blanks or line breaks are
+ * allowed within the Base64 encoded input data.
+ *
+ * @param in
+ * A character array containing the Base64 encoded data.
+ * @return An array containing the decoded data bytes.
+ * @throws IllegalArgumentException
+ * If the input is not valid Base64 encoded data.
+ */
+ public static byte[] decode(char[] in) {
+ return decode(in, 0, in.length);
+ }
+
+ /**
+ * Decodes a byte array from Base64 format. No blanks or line breaks are
+ * allowed within the Base64 encoded input data.
+ *
+ * @param in
+ * A character array containing the Base64 encoded data.
+ * @param iOff
+ * Offset of the first character in <code>in</code> to be
+ * processed.
+ * @param iLen
+ * Number of characters to process in <code>in</code>, starting
+ * at <code>iOff</code>.
+ * @return An array containing the decoded data bytes.
+ * @throws IllegalArgumentException
+ * If the input is not valid Base64 encoded data.
+ */
+ public static byte[] decode(char[] in, int iOff, int iLen) {
+ if (iLen % 4 != 0)
+ throw new IllegalArgumentException(
+ "Length of Base64 encoded input string is not a multiple of 4.");
+ while (iLen > 0 && in[iOff + iLen - 1] == '=')
+ iLen--;
+ int oLen = (iLen * 3) / 4;
+ byte[] out = new byte[oLen];
+ int ip = iOff;
+ int iEnd = iOff + iLen;
+ int op = 0;
+ while (ip < iEnd) {
+ int i0 = in[ip++];
+ int i1 = in[ip++];
+ int i2 = ip < iEnd ? in[ip++] : 'A';
+ int i3 = ip < iEnd ? in[ip++] : 'A';
+ if (i0 > 127 || i1 > 127 || i2 > 127 || i3 > 127)
+ throw new IllegalArgumentException(
+ "Illegal character in Base64 encoded data.");
+ int b0 = map2[i0];
+ int b1 = map2[i1];
+ int b2 = map2[i2];
+ int b3 = map2[i3];
+ if (b0 < 0 || b1 < 0 || b2 < 0 || b3 < 0)
+ throw new IllegalArgumentException(
+ "Illegal character in Base64 encoded data.");
+ int o0 = (b0 << 2) | (b1 >>> 4);
+ int o1 = ((b1 & 0xf) << 4) | (b2 >>> 2);
+ int o2 = ((b2 & 3) << 6) | b3;
+ out[op++] = (byte) o0;
+ if (op < oLen)
+ out[op++] = (byte) o1;
+ if (op < oLen)
+ out[op++] = (byte) o2;
+ }
+ return out;
+ }
+
+ // Dummy constructor.
+ private Base64Coder() {
+ }
+
+} // end class Base64Coder
41 src/com/trifork/riak/RequestMeta.java
@@ -0,0 +1,41 @@
+package com.trifork.riak;
+
+public class RequestMeta {
+
+ Boolean returnBody;
+ Integer writeQuorum;
+ Integer durableWriteQuorum;
+
+ public RequestMeta() {
+ }
+
+ public void preparePut(RPB.RpbPutReq.Builder builder) {
+
+ if (returnBody != null) {
+ builder.setReturnBody(returnBody.booleanValue());
+ }
+
+ if (writeQuorum != null) {
+ builder.setW(writeQuorum.intValue());
+ }
+
+ if (durableWriteQuorum != null) {
+ builder.setDw(durableWriteQuorum.intValue());
+ }
+ }
+
+ RequestMeta returnBody(boolean ret) {
+ returnBody = Boolean.valueOf(ret);
+ return this;
+ }
+
+ RequestMeta w(int w) {
+ w = new Integer(w);
+ return this;
+ }
+
+ RequestMeta dw(int dw) {
+ dw = new Integer(dw);
+ return this;
+ }
+}
463 src/com/trifork/riak/RiakClient.java
@@ -0,0 +1,463 @@
+package com.trifork.riak;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.security.NoSuchAlgorithmException;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.prefs.BackingStoreException;
+import java.util.prefs.Preferences;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.MessageLite;
+import com.trifork.riak.RPB.RpbDelReq;
+import com.trifork.riak.RPB.RpbErrorResp;
+import com.trifork.riak.RPB.RpbGetClientIdResp;
+import com.trifork.riak.RPB.RpbGetReq;
+import com.trifork.riak.RPB.RpbGetResp;
+import com.trifork.riak.RPB.RpbGetServerInfoResp;
+import com.trifork.riak.RPB.RpbListBucketsResp;
+import com.trifork.riak.RPB.RpbListKeysResp;
+import com.trifork.riak.RPB.RpbPutReq;
+import com.trifork.riak.RPB.RpbPutResp;
+import com.trifork.riak.RPB.RpbSetClientIdReq;
+import com.trifork.riak.RPB.RpbPutReq.Builder;
+
+public class RiakClient {
+
+ public static final int MSG_ErrorResp = 0;
+ public static final int MSG_PingReq = 1;
+ public static final int MSG_PingResp = 2;
+ public static final int MSG_GetClientIdReq = 3;
+ public static final int MSG_GetClientIdResp = 4;
+ public static final int MSG_SetClientIdReq = 5;
+ public static final int MSG_SetClientIdResp = 6;
+ public static final int MSG_GetServerInfoReq = 7;
+ public static final int MSG_GetServerInfoResp = 8;
+ public static final int MSG_GetReq = 9;
+ public static final int MSG_GetResp = 10;
+ public static final int MSG_PutReq = 11;
+ public static final int MSG_PutResp = 12;
+ public static final int MSG_DelReq = 13;
+ public static final int MSG_DelResp = 14;
+ public static final int MSG_ListBucketsReq = 15;
+ public static final int MSG_ListBucketsResp = 16;
+ public static final int MSG_ListKeysReq = 17;
+ public static final int MSG_ListKeysResp = 18;
+ public static final int MSG_GetBucketReq = 19;
+ public static final int MSG_GetBucketResp = 20;
+ public static final int MSG_SetBucketReq = 21;
+ public static final int MSG_SetBucketResp = 22;
+ public static final int MSG_MapRedReq = 23;
+ public static final int MSG_MapRedResp = 24;
+
+ private static final int DEFAULT_RIAK_PB_PORT = 8087;
+ private static final RiakObject[] NO_RIAK_OBJECTS = new RiakObject[0];
+ private static final ByteString[] NO_BYTE_STRINGS = new ByteString[0];
+ private Socket sock;
+ private DataOutputStream dout;
+ private DataInputStream din;
+ private String node;
+ private String serverVersion;
+
+ public RiakClient(String host) throws IOException {
+ this(host, DEFAULT_RIAK_PB_PORT);
+ }
+
+ public RiakClient(String host, int port) throws IOException {
+ this(InetAddress.getByName(host), port);
+ }
+
+ public RiakClient(InetAddress addr, int port) throws IOException {
+ sock = new Socket(addr, port);
+
+ sock.setSendBufferSize(1024 * 200);
+
+ dout = new DataOutputStream(new BufferedOutputStream(sock
+ .getOutputStream(), 1024 * 200));
+ din = new DataInputStream(
+ new BufferedInputStream(sock.getInputStream(), 1024 * 200));
+
+ ping();
+
+ // prepareClientID();
+
+ getServerInfo();
+ }
+
+ /**
+ * helper method to use a reasonable default client id
+ *
+ * @throws IOException
+ */
+ private void prepareClientID() throws IOException {
+ Preferences prefs = Preferences.userNodeForPackage(RiakClient.class);
+
+ String clid = prefs.get("client_id", null);
+ if (clid == null) {
+ SecureRandom sr;
+ try {
+ sr = SecureRandom.getInstance("SHA1PRNG");
+ } catch (NoSuchAlgorithmException e) {
+ throw new RuntimeException(e);
+ }
+ byte[] data = new byte[6];
+ sr.nextBytes(data);
+ clid = Base64Coder.encodeLines(data);
+ prefs.put("client_id", clid);
+ try {
+ prefs.flush();
+ } catch (BackingStoreException e) {
+ throw new IOException(e);
+ }
+ }
+
+ setClientID(clid);
+ }
+
+ public void ping() throws IOException {
+ send(MSG_PingReq);
+ receive_code(MSG_PingResp);
+ }
+
+ public void setClientID(String id) throws IOException {
+ setClientID(ByteString.copyFromUtf8(id));
+ }
+
+ // /////////////////////
+
+ public void setClientID(ByteString id) throws IOException {
+ RpbSetClientIdReq req = RPB.RpbSetClientIdReq.newBuilder().setClientId(
+ id).build();
+ send(MSG_SetClientIdReq, req);
+ receive_code(MSG_SetClientIdResp);
+ }
+
+ public String getClientID() throws IOException {
+ send(MSG_GetClientIdReq);
+ byte[] data = receive(MSG_GetClientIdResp);
+ if (data == null)
+ return null;
+ RpbGetClientIdResp res = RPB.RpbGetClientIdResp.parseFrom(data);
+ return res.getClientId().toStringUtf8();
+ }
+
+ public Map<String, String> getServerInfo() throws IOException {
+ send(MSG_GetServerInfoReq);
+ byte[] data = receive(MSG_GetServerInfoResp);
+ if (data == null)
+ return Collections.emptyMap();
+
+ RpbGetServerInfoResp res = RPB.RpbGetServerInfoResp.parseFrom(data);
+ if (res.hasNode()) {
+ this.node = res.getNode().toStringUtf8();
+ }
+ if (res.hasServerVersion()) {
+ this.serverVersion = res.getServerVersion().toStringUtf8();
+ }
+ Map<String, String> result = new HashMap<String, String>();
+ result.put("node", node);
+ result.put("server_version", serverVersion);
+ return result;
+ }
+
+ // /////////////////////
+
+ public RiakObject[] fetch(String bucket, String key, int readQuorum)
+ throws IOException {
+ return fetch(ByteString.copyFromUtf8(bucket), ByteString
+ .copyFromUtf8(key), readQuorum);
+ }
+
+ public RiakObject[] fetch(ByteString bucket, ByteString key, int readQuorum)
+ throws IOException {
+ RpbGetReq req = RPB.RpbGetReq.newBuilder().setBucket(bucket)
+ .setKey(key).setR(readQuorum).build();
+
+ send(MSG_GetReq, req);
+ return process_fetch_reply(bucket, key);
+
+ }
+
+ public RiakObject[] fetch(String bucket, String key) throws IOException {
+ return fetch(ByteString.copyFromUtf8(bucket), ByteString
+ .copyFromUtf8(key));
+ }
+
+ public RiakObject[] fetch(ByteString bucket, ByteString key)
+ throws IOException {
+ RpbGetReq req = RPB.RpbGetReq.newBuilder().setBucket(bucket)
+ .setKey(key).build();
+
+ send(MSG_GetReq, req);
+ return process_fetch_reply(bucket, key);
+ }
+
+ private RiakObject[] process_fetch_reply(ByteString bucket, ByteString key)
+ throws IOException, InvalidProtocolBufferException {
+ byte[] rep = receive(MSG_GetResp);
+
+ if (rep == null) {
+ return NO_RIAK_OBJECTS;
+ }
+
+ RpbGetResp resp = RPB.RpbGetResp.parseFrom(rep);
+ int count = resp.getContentCount();
+ RiakObject[] out = new RiakObject[count];
+ ByteString vclock = resp.getVclock();
+ for (int i = 0; i < count; i++) {
+ out[i] = new RiakObject(vclock, bucket, key, resp.getContent(i));
+ }
+ return out;
+ }
+
+ // /////////////////////
+
+ public ByteString[] store(RiakObject[] values, RequestMeta meta)
+ throws IOException {
+
+ BulkReader reader = new BulkReader(values.length);
+ Thread worker = new Thread(reader);
+ worker.start();
+
+ for (int i = 0; i < values.length; i++) {
+ RiakObject value = values[i];
+
+ RPB.RpbPutReq.Builder builder = RPB.RpbPutReq.newBuilder()
+ .setBucket(value.getBucket()).setKey(value.getKey())
+ .setContent(value.buildContent());
+
+ if (value.getVclock() != null) {
+ builder.setVclock(value.getVclock());
+ }
+
+ if (meta != null) {
+
+ builder.setReturnBody(false);
+
+ if (meta.writeQuorum != null) {
+ builder.setW(meta.writeQuorum.intValue());
+ }
+
+ if (meta.durableWriteQuorum != null) {
+ builder.setDw(meta.durableWriteQuorum.intValue());
+ }
+ }
+
+ RpbPutReq req = builder.build();
+
+ int len = req.getSerializedSize();
+ dout.writeInt(len + 1);
+ dout.write(MSG_PutReq);
+ req.writeTo(dout);
+ }
+
+ dout.flush();
+
+ try {
+ worker.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ return reader.vclocks;
+ }
+
+ class BulkReader implements Runnable {
+
+ private ByteString[] vclocks;
+
+ public BulkReader(int count) {
+ this.vclocks = new ByteString[count];
+ }
+
+ @Override
+ public void run() {
+
+ try {
+ for (int i = 0; i < vclocks.length; i++) {
+ byte[] data = receive(MSG_PutResp);
+ if (data != null) {
+ RpbPutResp resp = RPB.RpbPutResp.parseFrom(data);
+ vclocks[i] = resp.getVclock();
+ }
+ }
+ } catch (IOException e) {
+ // TODO
+ e.printStackTrace();
+ }
+
+ }
+
+ }
+
+ public void store(RiakObject value) throws IOException {
+ store(value, null);
+ }
+
+ public RiakObject[] store(RiakObject value, RequestMeta meta)
+ throws IOException {
+
+ RPB.RpbPutReq.Builder builder = RPB.RpbPutReq.newBuilder().setBucket(
+ value.getBucket()).setKey(value.getKey()).setContent(
+ value.buildContent());
+
+ if (value.getVclock() != null) {
+ builder.setVclock(value.getVclock());
+ }
+
+ if (meta != null) {
+ meta.preparePut(builder);
+ }
+
+ send(MSG_PutReq, builder.build());
+ byte[] r = receive(MSG_PutResp);
+
+ if (r == null) {
+ return NO_RIAK_OBJECTS;
+ }
+
+ RpbPutResp resp = RPB.RpbPutResp.parseFrom(r);
+
+ RiakObject[] res = new RiakObject[resp.getContentsCount()];
+ ByteString vclock = resp.getVclock();
+
+ for (int i = 0; i < res.length; i++) {
+ res[i] = new RiakObject(vclock, value.getBucket(), value.getKey(),
+ resp.getContents(i));
+ }
+
+ return res;
+ }
+
+ // /////////////////////
+
+ void delete(String bucket, String key, int rw) throws IOException {
+ delete(ByteString.copyFromUtf8(bucket), ByteString.copyFromUtf8(key),
+ rw);
+ }
+
+ public void delete(ByteString bucket, ByteString key, int rw)
+ throws IOException {
+ RpbDelReq req = RPB.RpbDelReq.newBuilder().setBucket(bucket)
+ .setKey(key).setRw(rw).build();
+
+ send(MSG_DelReq, req);
+ receive_code(MSG_DelResp);
+ }
+
+ void delete(String bucket, String key) throws IOException {
+ delete(ByteString.copyFromUtf8(bucket), ByteString.copyFromUtf8(key));
+ }
+
+ public void delete(ByteString bucket, ByteString key) throws IOException {
+ RpbDelReq req = RPB.RpbDelReq.newBuilder().setBucket(bucket)
+ .setKey(key).build();
+
+ send(MSG_DelReq, req);
+ receive_code(MSG_DelResp);
+ }
+
+ public ByteString[] listBuckets() throws IOException {
+
+ send(MSG_ListBucketsReq);
+
+ byte[] data = receive(MSG_ListBucketsResp);
+ if (data == null) {
+ return NO_BYTE_STRINGS;
+ }
+
+ RpbListBucketsResp resp = RPB.RpbListBucketsResp.parseFrom(data);
+ ByteString[] out = new ByteString[resp.getBucketsCount()];
+ for (int i = 0; i < out.length; i++) {
+ out[i] = resp.getBuckets(i);
+ }
+ return out;
+ }
+
+ // /////////////////////
+
+ public ByteString[] listKeys(ByteString bucket) throws IOException {
+
+ send(MSG_ListKeysReq, RPB.RpbListKeysReq.newBuilder().setBucket(bucket)
+ .build());
+
+ List<ByteString> keys = new ArrayList<ByteString>();
+
+ RpbListKeysResp r;
+ do {
+ byte[] data = receive(MSG_ListKeysResp);
+ if (data == null) {
+ return NO_BYTE_STRINGS;
+ }
+ r = RPB.RpbListKeysResp.parseFrom(data);
+
+ for (int i = 0; i < r.getKeysCount(); i++) {
+ keys.add(r.getKeys(i));
+ }
+
+ } while (!r.hasDone() || r.getDone() == false);
+
+ return keys.toArray(new ByteString[keys.size()]);
+ }
+
+ // /////////////////////
+
+ private void send(int code, MessageLite req) throws IOException {
+ int len = req.getSerializedSize();
+ dout.writeInt(len + 1);
+ dout.write(code);
+ req.writeTo(dout);
+ dout.flush();
+ }
+
+ private void send(int code) throws IOException {
+ dout.writeInt(1);
+ dout.write(code);
+ dout.flush();
+ }
+
+ private byte[] receive(int code) throws IOException {
+ int len = din.readInt();
+ int get_code = din.read();
+
+ if (code == MSG_ErrorResp) {
+ RpbErrorResp err = com.trifork.riak.RPB.RpbErrorResp.parseFrom(din);
+ throw new RiakError(err);
+ }
+
+ byte[] data = null;
+ if (len > 1) {
+ data = new byte[len - 1];
+ din.readFully(data);
+ }
+
+ if (code != get_code) {
+ throw new IOException("bad message code");
+ }
+
+ return data;
+ }
+
+ private void receive_code(int code) throws IOException, RiakError {
+ int len = din.readInt();
+ int get_code = din.read();
+ if (code == MSG_ErrorResp) {
+ RpbErrorResp err = com.trifork.riak.RPB.RpbErrorResp.parseFrom(din);
+ throw new RiakError(err);
+ }
+ if (len != 1 || code != get_code) {
+ throw new IOException("bad message code");
+ }
+ }
+}
13 src/com/trifork/riak/RiakError.java
@@ -0,0 +1,13 @@
+package com.trifork.riak;
+
+import java.io.IOException;
+
+import com.trifork.riak.RPB.RpbErrorResp;
+
+public class RiakError extends IOException {
+
+ public RiakError(RpbErrorResp err) {
+ super(err.getErrmsg().toStringUtf8());
+ }
+
+}
44 src/com/trifork/riak/RiakLink.java
@@ -0,0 +1,44 @@
+package com.trifork.riak;
+
+import java.util.List;
+
+import com.google.protobuf.ByteString;
+import com.trifork.riak.RPB.RpbLink;
+import com.trifork.riak.RPB.RpbLink.Builder;
+
+public class RiakLink {
+
+ private ByteString bucket;
+ private ByteString key;
+ private ByteString tag;
+
+ RiakLink(RpbLink rpbLink) {
+ this.bucket = rpbLink.getBucket();
+ this.key = rpbLink.getKey();
+ this.tag = rpbLink.getTag();
+ }
+
+ public static RiakLink[] decode(List<RpbLink> list) {
+ RiakLink[] res = new RiakLink[list.size()];
+ for (int i = 0; i < res.length; i++) {
+ res[i] = new RiakLink(list.get(i));
+ }
+ return res;
+ }
+
+ public RpbLink build() {
+ Builder b = RpbLink.newBuilder();
+
+ if (bucket != null)
+ b.setBucket(bucket);
+
+ if (key != null)
+ b.setKey(key);
+
+ if (tag != null)
+ b.setTag(tag);
+
+ return b.build();
+ }
+
+}
134 src/com/trifork/riak/RiakObject.java
@@ -0,0 +1,134 @@
+package com.trifork.riak;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.protobuf.ByteString;
+import com.trifork.riak.RPB.RpbContent;
+import com.trifork.riak.RPB.RpbPair;
+import com.trifork.riak.RPB.RpbContent.Builder;
+
+public class RiakObject {
+
+ private ByteString vclock;
+ private ByteString bucket;
+ private ByteString key;
+
+ private ByteString value;
+
+ private String contentType;
+ private RiakLink[] links;
+ private String vtag;
+ private String contentEncoding;
+ private String charset;
+ private Map<String,String> userMeta;
+ private Integer lastModified;
+ private Integer lastModifiedUsec;
+
+
+ RiakObject(ByteString vclock, ByteString bucket, ByteString key, RpbContent content) {
+ this.vclock = vclock;
+ this.bucket = bucket;
+ this.key = key;
+ this.value = content.getValue();
+ this.contentType = str(content.getContentType());
+ this.charset = str(content.getCharset());
+ this.contentEncoding = str(content.getContentEncoding());
+ this.vtag = str(content.getVtag());
+ this.links = content.getLinksCount() == 0
+ ? null
+ : RiakLink.decode(content.getLinksList());
+
+ if (content.hasLastMod()) {
+ this.lastModified = new Integer(content.getLastMod());
+ this.lastModifiedUsec = new Integer(content.getLastModUsecs());
+ }
+
+ if (content.getUsermetaCount() == 0) {
+ userMeta = Collections.emptyMap();
+ } else {
+ userMeta = new HashMap<String, String>();
+ for (int i = 0; i < content.getUsermetaCount(); i++) {
+ RpbPair um = content.getUsermeta(i);
+ userMeta.put(um.getKey().toStringUtf8(),
+ str(um.getValue()));
+ }
+ }
+ }
+
+ public RiakObject(ByteString bucket, ByteString key, ByteString content) {
+ this.bucket = bucket;
+ this.key = key;
+ this.value = content;
+ }
+
+ private String str(ByteString str) {
+ if (str == null) return null;
+ return str.toStringUtf8();
+ }
+
+ public ByteString getBucket() {
+ return bucket;
+ }
+
+ public ByteString getKey() {
+ return key;
+ }
+
+ public ByteString getVclock() {
+ return vclock;
+ }
+
+
+ public RpbContent buildContent() {
+ Builder b =
+ RpbContent.newBuilder()
+ .setValue(value);
+
+ if (contentType != null) {
+ b.setContentType(ByteString.copyFromUtf8(contentType));
+ }
+
+ if (charset != null) {
+ b.setCharset(ByteString.copyFromUtf8(charset));
+ }
+
+ if (contentEncoding != null) {
+ b.setContentEncoding(ByteString.copyFromUtf8(contentEncoding));
+ }
+
+ if (vtag != null) {
+ b.setVtag(ByteString.copyFromUtf8(vtag));
+ }
+
+ if (links != null && links.length != 0) {
+ for (int i = 0; i < links.length; i++) {
+ b.addLinks(links[i].build());
+ }
+ }
+
+ if (lastModified != null) {
+ b.setLastMod(lastModified);
+ }
+
+ if (lastModifiedUsec != null) {
+ b.setLastModUsecs(lastModifiedUsec);
+ }
+
+ if (userMeta != null && !userMeta.isEmpty()) {
+ int i = 0;
+ for (Map.Entry<String, String> ent : userMeta.entrySet()) {
+ ByteString key = ByteString.copyFromUtf8(ent.getKey());
+ com.trifork.riak.RPB.RpbPair.Builder pb = RPB.RpbPair.newBuilder().setKey(key);
+ if (ent.getValue() != null) {
+ pb.setValue(ByteString.copyFromUtf8(ent.getValue()));
+ }
+ b.addUsermeta(pb);
+ }
+ }
+
+ return b.build();
+ }
+
+}
249 src/riakclient.proto
@@ -0,0 +1,249 @@
+/* -------------------------------------------------------------------
+**
+** riakclient.proto: Protocol buffers for riak
+**
+** Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved.
+**
+** This file is provided to you under the Apache License,
+** Version 2.0 (the "License"); you may not use this file
+** except in compliance with the License. You may obtain
+** a copy of the License at
+**
+** http://www.apache.org/licenses/LICENSE-2.0
+**
+** Unless required by applicable law or agreed to in writing,
+** software distributed under the License is distributed on an
+** "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+** KIND, either express or implied. See the License for the
+** specific language governing permissions and limitations
+** under the License.
+**
+** -------------------------------------------------------------------
+*/
+/*
+** Revision: 1.1
+**
+** Lowest Common Denominator Protocol Buffers Client
+** - no ENUM (protobuffs_erlang does not support)
+**
+** Protocol
+**
+** The protocol encodes requests and responses as protocol buffer messages.
+** Each request message results in one or more response messages.
+** As message type and length are not encoded by PB they are sent
+** on the wire as
+**
+** <length:32> <msg_code:8> <pbmsg>
+**
+** length is the length of msg_code (1 byte) plus the message length
+** in bytes encoded in network order (big endian)
+**
+** msg_code indicates what is encoded as pbmsg
+**
+** pbmsg is the encoded protocol buffer message
+**
+** On connect, the client can make requests and will receive responses.
+** For each request message there is a corresponding response message,
+** or the server will respond with an error message if something has
+** gone wrong.
+**
+** The client should be prepared to handle messages without any pbmsg
+** (i.e. length==1) for requests like ping or a put without return_body set.
+**
+** RpbGetClientIdReq -> RpbGetClientIdResp
+** RpbSetClientIdReq -> RpbSetClientIdResp
+** RpbGetServerInfoReq -> RpbGetServerInfoResp
+** RpbPingReq -> RpbPingResp
+** RpbGetReq -> RpbErrorResp | RbpGetResp
+** RpbPutReq -> RpbErrorResp | RpbPutResp
+** RpbDelReq -> RpbErrorResp | RpbDelResp
+** RpbListBucketsReq -> RpbErrorResp | RpbListBucketsResp
+** RpbListKeysReq -> RpbErrorResp | RpbListKeysResp{1,}
+** RpbGetBucketReq -> RpbErrorResp | RpbGetBucketResp
+**
+**
+** Message Codes
+** 0 - RpbErrorResp
+** 1 - RpbPingReq - 0 length
+** 2 - RpbPingResp (pong) - 0 length
+** 3 - RpbGetClientIdReq
+** 4 - RpbGetClientIdResp
+** 5 - RpbSetClientIdReq
+** 6 - RpbSetClientIdResp
+** 7 - RpbGetServerInfoReq
+** 8 - RpbGetServerInfoResp
+** 9 - RpbGetReq
+** 10 - RpbGetResp
+** 11 - RpbPutReq
+** 12 - RpbPutResp - 0 length
+** 13 - RpbDelReq
+** 14 - RpbDelResp
+** 15 - RpbListBucketsReq
+** 16 - RpbListBucketsResp
+** 17 - RpbListKeysReq
+** 18 - RpbListKeysResp{1,}
+** 19 - RpbGetBucketReq
+** 20 - RpbGetBucketResp
+** 21 - RpbSetBucketReq
+** 22 - RpbSetBucketResp
+** 23 - RpbMapRedReq
+** 24 - RpbMapRedResp{1,}
+**
+*/
+
+option java_package = "com.trifork.riak";
+option java_outer_classname = "RPB";
+
+// Error response - may be generated for any Req
+message RpbErrorResp {
+ required bytes errmsg = 1;
+ required uint32 errcode = 2;
+}
+
+// Get ClientId Request - no message defined, just send RpbGetClientIdReq message code
+message RpbGetClientIdResp {
+ required bytes client_id = 1; // Client id in use for this connection
+}
+
+message RpbSetClientIdReq {
+ required bytes client_id = 1; // Client id to use for this connection
+}
+// Set ClientId Request - no message defined, just send RpbSetClientIdReq message code
+
+// Get server info request - no message defined, just send RpbGetServerInfoReq message code
+
+message RpbGetServerInfoResp {
+ optional bytes node = 1;
+ optional bytes server_version = 2;
+}
+
+
+// Get Request - retrieve bucket/key
+message RpbGetReq {
+ required bytes bucket = 1;
+ required bytes key = 2;
+ optional uint32 r = 3;
+}
+
+// Get Response - if the record was not found there will be no content/vclock
+message RpbGetResp {
+ repeated RpbContent content = 1;
+ optional bytes vclock = 2; // the opaque vector clock for the object
+}
+
+
+// Put request - if options.return_body is set then the updated metadata/data for
+// the key will be returned.
+message RpbPutReq {
+ required bytes bucket = 1;
+ required bytes key = 2;
+ optional bytes vclock = 3;
+ required RpbContent content = 4;
+ optional uint32 w = 5;
+ optional uint32 dw = 6;
+ optional bool return_body = 7;
+}
+
+// Put response - same as get response
+message RpbPutResp {
+ repeated RpbContent contents = 1;
+ optional bytes vclock = 2; // the opaque vector clock for the object
+}
+
+
+// Delete request
+message RpbDelReq {
+ required bytes bucket = 1;
+ required bytes key = 2;
+ optional uint32 rw = 3;
+}
+
+// Delete response - not defined, will return a RpbDelResp on success or RpbErrorResp on failure
+
+// List buckets request - no message defined, just send RpbListBucketsReq
+
+// List buckets response
+message RpbListBucketsResp {
+ repeated bytes buckets = 1;
+}
+
+
+// List keys in bucket request
+message RpbListKeysReq {
+ required bytes bucket = 1;
+}
+
+// List keys in bucket response - one or more of these packets will be sent
+// the last one will have done set true (and may not have any keys in it)
+message RpbListKeysResp {
+ repeated bytes keys = 1;
+ optional bool done = 2;
+}
+
+// Get bucket properties request
+message RpbGetBucketReq {
+ required bytes bucket = 1;
+}
+
+// Get bucket properties response
+message RpbGetBucketResp {
+ required RpbBucketProps props = 1;
+}
+
+// Set bucket properties request
+message RpbSetBucketReq {
+ required bytes bucket = 1;
+ required RpbBucketProps props = 2;
+}
+
+
+// Set bucket properties response - no message defined, just send RpbSetBucketResp
+
+
+// Map/Reduce request
+message RpbMapRedReq {
+ required bytes request = 1;
+ required bytes content_type = 2;
+}
+
+// Map/Reduce response
+// one or more of these packets will be sent the last one will have done set
+// true (and may not have phase/data in it)
+message RpbMapRedResp {
+ optional uint32 phase = 1;
+ optional bytes response = 2;
+ optional bool done = 3;
+}
+
+// Content message included in get/put responses
+// Holds the value and associated metadata
+message RpbContent {
+ required bytes value = 1;
+ optional bytes content_type = 2; // the media type/format
+ optional bytes charset = 3;
+ optional bytes content_encoding = 4;
+ optional bytes vtag = 5;
+ repeated RpbLink links = 6; // links to other resources
+ optional uint32 last_mod = 7;
+ optional uint32 last_mod_usecs = 8;
+ repeated RpbPair usermeta = 9; // user metadata stored with the object
+}
+
+// Key/value pair - used for user metadata
+message RpbPair {
+ required bytes key = 1;
+ optional bytes value = 2;
+}
+
+// Link metadata
+message RpbLink {
+ optional bytes bucket = 1;
+ optional bytes key = 2;
+ optional bytes tag = 3;
+}
+
+// Bucket properties
+message RpbBucketProps {
+ optional uint32 n_val = 1;
+ optional bool allow_mult = 2;
+}
53 test/com/trifork/riak/BatchPushLotsOfData.java
@@ -0,0 +1,53 @@
+package com.trifork.riak;
+
+import java.io.IOException;
+
+import com.google.protobuf.ByteString;
+
+public class BatchPushLotsOfData {
+
+
+
+ public static void main(String[] args) throws IOException {
+
+ RiakClient riak = new RiakClient("127.0.0.1");
+
+ long before = System.currentTimeMillis();
+
+
+ byte[] val = new byte[419];
+ ByteString value = ByteString.copyFrom(val);
+
+
+ for (int i = 0; i < 100; i++) {
+
+ long before2 = System.currentTimeMillis();
+
+ String cpr = "" + (1000000000+i);
+ ByteString cpr_key = ByteString.copyFromUtf8(cpr);
+
+ RiakObject[] many = new RiakObject[1000];
+
+ for (int rid = 0; rid < 1000; rid++) {
+
+ ByteString rid_key = ByteString.copyFromUtf8("" + (200000+i));
+
+ many[rid] = new RiakObject(cpr_key, rid_key, value);
+
+ }
+
+ riak.store(many, null);
+
+ long after2 = System.currentTimeMillis();
+
+ System.out.println(""+i+" x 1000 INSERTS: "+(after2-before2)+"ms");
+
+ }
+
+
+ long after = System.currentTimeMillis();
+
+ System.out.println("TOTAL TIME: "+(1.0*(after-before)/1000.0)+"s");
+ }
+
+}
40 test/com/trifork/riak/ListAllContents.java
@@ -0,0 +1,40 @@
+package com.trifork.riak;
+
+import java.io.IOException;
+import java.util.Map;
+
+import com.google.protobuf.ByteString;
+
+public class ListAllContents {
+
+ public static void main(String[] args) throws IOException {
+
+ RiakClient client = new RiakClient("127.0.0.1");
+ Map<String, String> si = client.getServerInfo();
+
+ System.out.println("connected to: "+si);
+
+ ByteString[] buckets = client.listBuckets();
+ for (ByteString bucket : buckets) {
+
+ System.out.println("=bucket "+bucket.toStringUtf8());
+
+ ByteString[] keys = client.listKeys(bucket);
+ for (ByteString key : keys) {
+
+ System.out.println("==key "+key.toStringUtf8());
+
+ RiakObject[] ros = client.fetch(bucket, key);
+ for (RiakObject ro : ros) {
+
+ System.out.println("==="+ro.toString());
+
+ }
+ }
+ }
+
+
+ }
+
+
+}
50 test/com/trifork/riak/PushLotsOfData.java
@@ -0,0 +1,50 @@
+package com.trifork.riak;
+
+import java.io.IOException;
+
+import com.google.protobuf.ByteString;
+
+public class PushLotsOfData {
+
+
+
+ public static void main(String[] args) throws IOException {
+
+ RiakClient riak = new RiakClient("127.0.0.1");
+
+ long before = System.currentTimeMillis();
+
+
+ byte[] val = new byte[419];
+ ByteString value = ByteString.copyFrom(val);
+
+
+ for (int i = 0; i < 100; i++) {
+
+ long before2 = System.currentTimeMillis();
+
+ String cpr = "" + (1000000000+i);
+ ByteString cpr_key = ByteString.copyFromUtf8(cpr);
+
+ for (int rid = 0; rid < 1000; rid++) {
+
+ ByteString rid_key = ByteString.copyFromUtf8("" + (200000+i));
+
+ RiakObject ro = new RiakObject(cpr_key, rid_key, value);
+ riak.store(ro);
+
+ }
+
+ long after2 = System.currentTimeMillis();
+
+ System.out.println(""+i+" x 1000 INSERTS: "+(after2-before2)+"ms");
+
+ }
+
+
+ long after = System.currentTimeMillis();
+
+ System.out.println("TOTAL TIME: "+(1.0*(after-before)/1000.0)+"s");
+ }
+
+}

0 comments on commit 67edbaa

Please sign in to comment.