diff --git a/src/com/trifork/riak/KeySource.java b/src/com/trifork/riak/KeySource.java new file mode 100644 index 0000000..570f633 --- /dev/null +++ b/src/com/trifork/riak/KeySource.java @@ -0,0 +1,89 @@ +/** + * This file is part of riak-java-pb-client + * + * Copyright (c) 2010 by Trifork + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package com.trifork.riak; + +import java.io.IOException; +import java.util.NoSuchElementException; + +import com.google.protobuf.ByteString; +import com.trifork.riak.RPB.RpbListKeysResp; + +public class KeySource extends RiakStreamClient { + + private RpbListKeysResp r; + private int i; + + public KeySource(RiakClient client, RiakConnection conn) throws IOException { + super(client, conn); + get_next_response(); + } + + public boolean hasNext() throws IOException { + if (isClosed()) { + return false; + } + + if (r_is_exhausted()) { + get_next_response(); + } + + return !isClosed(); + } + + private boolean r_is_exhausted() { + return i == r.getKeysCount(); + } + + public ByteString next() throws IOException { + if (!hasNext()) + throw new NoSuchElementException(); + return r.getKeys(i++); + } + + private void get_next_response() throws IOException { + if (isClosed()) + return; + + // either we're in the first call (r == null) + // or we got here because we ran out of keys. + assert r == null || r_is_exhausted(); + + do { + + if (r != null) { + if (r.hasDone() && r.getDone()) { + close(); + return; + } + } + + byte[] data = conn.receive(RiakMessageCodes.MSG_ListKeysResp); + if (data == null) { + close(); + throw new IOException("received empty response"); + } + + r = RPB.RpbListKeysResp.parseFrom(data); + i = 0; + + // did we got an empty chunk? get another one. + } while (r.getKeysCount() == 0); + } + +} diff --git a/src/com/trifork/riak/MapReduceResponseSource.java b/src/com/trifork/riak/MapReduceResponseSource.java new file mode 100644 index 0000000..673be0a --- /dev/null +++ b/src/com/trifork/riak/MapReduceResponseSource.java @@ -0,0 +1,89 @@ +/** + * This file is part of riak-java-pb-client + * + * Copyright (c) 2010 by Trifork + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package com.trifork.riak; + +import java.io.IOException; +import java.util.NoSuchElementException; + +import com.google.protobuf.ByteString; +import com.trifork.riak.RPB.RpbMapRedResp; +import com.trifork.riak.mapreduce.MapReduceResponse; + +public class MapReduceResponseSource extends + RiakStreamClient { + + private RpbMapRedResp r; + private boolean is_given; + private final ByteString contentType; + + protected MapReduceResponseSource(RiakClient client, RiakConnection conn, + ByteString contentType) throws IOException { + super(client, conn); + this.contentType = contentType; + get_next_response(); + } + + @Override + public boolean hasNext() throws IOException { + if (isClosed()) { + return false; + } + + if (is_given) { + get_next_response(); + } + + return !isClosed(); + } + + @Override + public MapReduceResponse next() throws IOException { + if (!hasNext()) + throw new NoSuchElementException(); + is_given = true; + return new MapReduceResponse(r, contentType); + } + + private void get_next_response() throws IOException { + if (isClosed()) + return; + + // either we're in the first call (r == null) + // or we got here because we gave the reply away. + assert r == null || is_given; + + if (r != null && is_given) { + if (r.hasDone() && r.getDone()) { + close(); + return; + } + } + + byte[] data = conn.receive(RiakMessageCodes.MSG_MapRedResp); + if (data == null) { + close(); + throw new IOException("received empty response"); + } + + r = RPB.RpbMapRedResp.parseFrom(data); + is_given = false; + + } + +} diff --git a/src/com/trifork/riak/RiakClient.java b/src/com/trifork/riak/RiakClient.java index 89a0e7d..2e6eed1 100644 --- a/src/com/trifork/riak/RiakClient.java +++ b/src/com/trifork/riak/RiakClient.java @@ -61,7 +61,10 @@ public class RiakClient implements RiakMessageCodes { private InetAddress addr; private int port; - /** if this has been set (or gotten) then it will be applied to new connections */ + /** + * if this has been set (or gotten) then it will be applied to new + * connections + */ private ByteString clientID; public RiakClient(String host) throws IOException { @@ -83,7 +86,7 @@ RiakConnection getConnection() throws IOException { RiakConnection c = connections.get(); if (c == null || !c.endIdleAndCheckValid()) { c = new RiakConnection(addr, port); - + if (this.clientID != null) { setClientID(clientID); } @@ -95,8 +98,13 @@ RiakConnection getConnection() throws IOException { } void release(RiakConnection c) { - c.beginIdle(); - connections.set(c); + RiakConnection cc = connections.get(); + if (cc == null) { + c.beginIdle(); + connections.set(c); + } else { + c.close(); + } } /** @@ -155,7 +163,7 @@ public void setClientID(ByteString id) throws IOException { } finally { release(c); } - + this.clientID = id; } @@ -337,7 +345,9 @@ public void run() { byte[] data = c.receive(MSG_PutResp); if (data != null) { RpbPutResp resp = RPB.RpbPutResp.parseFrom(data); - vclocks[i] = resp.getVclock(); + if (resp.hasVclock()) { + vclocks[i] = resp.getVclock(); + } } } } catch (IOException e) { @@ -494,72 +504,39 @@ public void setBucketProperties(ByteString bucket, BucketProperties props) // ///////////////////// - public ByteString[] listKeys(ByteString bucket) throws IOException { + public KeySource listKeys(ByteString bucket) throws IOException { List keys = new ArrayList(); RiakConnection c = getConnection(); - try { - c.send(MSG_ListKeysReq, RPB.RpbListKeysReq.newBuilder().setBucket( - bucket).build()); - - RpbListKeysResp r; - do { - byte[] data = c.receive(MSG_ListKeysResp); - if (data == null) { - return NO_BYTE_STRINGS; - } - r = RPB.RpbListKeysResp.parseFrom(data); - - for (int i = 0; i < r.getKeysCount(); i++) { - keys.add(r.getKeys(i)); - } + c.send(MSG_ListKeysReq, RPB.RpbListKeysReq.newBuilder().setBucket( + bucket).build()); - } while (!r.hasDone() || r.getDone() == false); - } finally { - release(c); - } - - return keys.toArray(new ByteString[keys.size()]); + return new KeySource(this, c); } // ///////////////////// - MapReduceResponse[] mapreduce(JSONObject obj) throws IOException { + MapReduceResponseSource mapreduce(JSONObject obj) throws IOException { return mapreduce(ByteString.copyFromUtf8(obj.toString()), new RequestMeta().contentType("application/json")); } - public MapReduceResponse[] mapreduce(ByteString request, RequestMeta meta) - throws IOException { + public MapReduceResponseSource mapreduce(ByteString request, + RequestMeta meta) throws IOException { List out = new ArrayList(); RiakConnection c = getConnection(); - try { - ByteString contentType = meta.getContentType(); - if (contentType == null) { - throw new IllegalArgumentException("no content type"); - } - RpbMapRedReq req = RPB.RpbMapRedReq.newBuilder() - .setRequest(request).setContentType(meta.getContentType()) - .build(); - - c.send(MSG_MapRedReq, req); - byte[] data = c.receive(MSG_MapRedResp); - if (data == null) { - return NO_MAP_REDUCE_RESPONSES; - } - - RpbMapRedResp resp; - do { - resp = RPB.RpbMapRedResp.parseFrom(data); - out.add(new MapReduceResponse(resp, contentType)); - } while (!resp.hasDone() || resp.getDone() == false); - } finally { - release(c); + ByteString contentType = meta.getContentType(); + if (contentType == null) { + throw new IllegalArgumentException("no content type"); } + RpbMapRedReq req = RPB.RpbMapRedReq.newBuilder().setRequest(request) + .setContentType(meta.getContentType()).build(); - return out.toArray(new MapReduceResponse[out.size()]); + c.send(MSG_MapRedReq, req); + + return new MapReduceResponseSource(this, c, contentType); } } diff --git a/src/com/trifork/riak/RiakConnection.java b/src/com/trifork/riak/RiakConnection.java index b37f0ad..dadd5f4 100644 --- a/src/com/trifork/riak/RiakConnection.java +++ b/src/com/trifork/riak/RiakConnection.java @@ -31,7 +31,7 @@ import com.google.protobuf.MessageLite; import com.trifork.riak.RPB.RpbErrorResp; -public class RiakConnection { +class RiakConnection { static final int DEFAULT_RIAK_PB_PORT = 8087; @@ -108,7 +108,7 @@ void receive_code(int code) throws IOException, RiakError { } } - static Timer idle_timer = new Timer(); + static Timer timer = new Timer(); TimerTask idle_timeout; public void beginIdle() { @@ -120,7 +120,7 @@ public void run() { } }; - idle_timer.schedule(idle_timeout, 1000); + timer.schedule(idle_timeout, 1000); } synchronized void timer_fired(TimerTask fired_timer) { @@ -129,6 +129,13 @@ synchronized void timer_fired(TimerTask fired_timer) { return; } + close(); + } + + void close() { + if (isClosed()) + return; + try { sock.close(); din = null; @@ -145,7 +152,7 @@ synchronized boolean endIdleAndCheckValid() { if (tt != null) { tt.cancel(); } idle_timeout = null; - if (sock == null) { + if (isClosed()) { return false; } else { return true; @@ -155,6 +162,10 @@ synchronized boolean endIdleAndCheckValid() { public DataOutputStream getOutputStream() { return dout; } + + public boolean isClosed() { + return sock == null || sock.isClosed(); + } } diff --git a/src/com/trifork/riak/RiakStreamClient.java b/src/com/trifork/riak/RiakStreamClient.java new file mode 100644 index 0000000..e3aee17 --- /dev/null +++ b/src/com/trifork/riak/RiakStreamClient.java @@ -0,0 +1,114 @@ +/** + * This file is part of riak-java-pb-client + * + * Copyright (c) 2010 by Trifork + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package com.trifork.riak; + +import java.io.IOException; +import java.lang.ref.WeakReference; +import java.util.Iterator; +import java.util.TimerTask; + +abstract class RiakStreamClient implements Iterable { + + private RiakClient client; + protected RiakConnection conn; + private ReaperTask reaper; + + protected RiakStreamClient(RiakClient client, RiakConnection conn) { + this.client = client; + this.conn = conn; + this.reaper = new ReaperTask(this, conn); + } + + static class ReaperTask extends TimerTask { + + private final RiakConnection conn; + private WeakReference ref; + + ReaperTask (Object holder, RiakConnection conn) { + this.conn = conn; + this.ref = new WeakReference(holder); + RiakConnection.timer.scheduleAtFixedRate(this, 1000, 1000); + } + + @Override + public synchronized void run() { + if (ref == null) { + // do nothing; we were explicitly cancelled // + } else if (ref.get() == null) { + + // the reference was lost; cancel this timer and + // close the connection + cancel(); + conn.close(); + } else if (conn.isClosed()) { + cancel(); + } + } + + @Override + public synchronized boolean cancel() { + ref = null; + return super.cancel(); + } + } + + public synchronized void close() { + if (!isClosed()) { + reaper.cancel(); + client.release(conn); + conn = null; + } + } + + public boolean isClosed() { + return conn == null; + } + + public abstract boolean hasNext() throws IOException; + public abstract T next() throws IOException; + + @Override + public Iterator iterator() { + return new Iterator() { + + @Override + public boolean hasNext() { + try { + return RiakStreamClient.this.hasNext(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public T next() { + try { + return RiakStreamClient.this.next(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } +} diff --git a/src/com/trifork/riak/mapreduce/MapReduceBuilder.java b/src/com/trifork/riak/mapreduce/MapReduceBuilder.java index e1972b9..ea6483f 100644 --- a/src/com/trifork/riak/mapreduce/MapReduceBuilder.java +++ b/src/com/trifork/riak/mapreduce/MapReduceBuilder.java @@ -27,6 +27,7 @@ import org.json.JSONObject; import com.google.protobuf.ByteString; +import com.trifork.riak.MapReduceResponseSource; import com.trifork.riak.RequestMeta; import com.trifork.riak.RiakClient; import com.trifork.riak.RiakObject; @@ -313,13 +314,13 @@ public MapReduceBuilder link(String bucket, String tag, boolean keep) { * @throws RiakResponseRuntimeException * If the Riak server returns a malformed response. */ - public MapReduceResponse[] submit(RequestMeta meta) throws IOException { + public MapReduceResponseSource submit(RequestMeta meta) throws IOException { if (riak == null) throw new IllegalStateException("Cannot perform map reduce without a RiakClient"); return riak.mapreduce(ByteString.copyFromUtf8( toJSON().toString() ), meta); } - public MapReduceResponse[] submit() throws JSONException, IOException { + public MapReduceResponseSource submit() throws JSONException, IOException { return submit(null); } diff --git a/test/com/trifork/riak/ListAllContents.java b/test/com/trifork/riak/ListAllContents.java index aa95d2f..b3b80a0 100644 --- a/test/com/trifork/riak/ListAllContents.java +++ b/test/com/trifork/riak/ListAllContents.java @@ -22,9 +22,9 @@ public static void main(String[] args) throws IOException { System.out.println("=bucket "+bucket.toStringUtf8()); - ByteString[] keys = client.listKeys(bucket); + KeySource keys = client.listKeys(bucket); for (ByteString key : keys) { - + System.out.println("==key "+key.toStringUtf8()); RiakObject[] ros = client.fetch(bucket, key);