Skip to content

Commit

Permalink
Provide proper implementation for streaming results of RiakClient#map…
Browse files Browse the repository at this point in the history
…Reduce and RiakClient#listKeys. If you forget to close a connection, the connection is closed properly upon garbage collection.
  • Loading branch information
krestenkrab committed Jun 24, 2010
1 parent 8d1370c commit a5d72e4
Show file tree
Hide file tree
Showing 7 changed files with 343 additions and 62 deletions.
89 changes: 89 additions & 0 deletions src/com/trifork/riak/KeySource.java
@@ -0,0 +1,89 @@
/**
* This file is part of riak-java-pb-client
*
* Copyright (c) 2010 by Trifork
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/

package com.trifork.riak;

import java.io.IOException;
import java.util.NoSuchElementException;

import com.google.protobuf.ByteString;
import com.trifork.riak.RPB.RpbListKeysResp;

public class KeySource extends RiakStreamClient<ByteString> {

private RpbListKeysResp r;
private int i;

public KeySource(RiakClient client, RiakConnection conn) throws IOException {
super(client, conn);
get_next_response();
}

public boolean hasNext() throws IOException {
if (isClosed()) {
return false;
}

if (r_is_exhausted()) {
get_next_response();
}

return !isClosed();
}

private boolean r_is_exhausted() {
return i == r.getKeysCount();
}

public ByteString next() throws IOException {
if (!hasNext())
throw new NoSuchElementException();
return r.getKeys(i++);
}

private void get_next_response() throws IOException {
if (isClosed())
return;

// either we're in the first call (r == null)
// or we got here because we ran out of keys.
assert r == null || r_is_exhausted();

do {

if (r != null) {
if (r.hasDone() && r.getDone()) {
close();
return;
}
}

byte[] data = conn.receive(RiakMessageCodes.MSG_ListKeysResp);
if (data == null) {
close();
throw new IOException("received empty response");
}

r = RPB.RpbListKeysResp.parseFrom(data);
i = 0;

// did we got an empty chunk? get another one.
} while (r.getKeysCount() == 0);
}

}
89 changes: 89 additions & 0 deletions src/com/trifork/riak/MapReduceResponseSource.java
@@ -0,0 +1,89 @@
/**
* This file is part of riak-java-pb-client
*
* Copyright (c) 2010 by Trifork
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/

package com.trifork.riak;

import java.io.IOException;
import java.util.NoSuchElementException;

import com.google.protobuf.ByteString;
import com.trifork.riak.RPB.RpbMapRedResp;
import com.trifork.riak.mapreduce.MapReduceResponse;

public class MapReduceResponseSource extends
RiakStreamClient<MapReduceResponse> {

private RpbMapRedResp r;
private boolean is_given;
private final ByteString contentType;

protected MapReduceResponseSource(RiakClient client, RiakConnection conn,
ByteString contentType) throws IOException {
super(client, conn);
this.contentType = contentType;
get_next_response();
}

@Override
public boolean hasNext() throws IOException {
if (isClosed()) {
return false;
}

if (is_given) {
get_next_response();
}

return !isClosed();
}

@Override
public MapReduceResponse next() throws IOException {
if (!hasNext())
throw new NoSuchElementException();
is_given = true;
return new MapReduceResponse(r, contentType);
}

private void get_next_response() throws IOException {
if (isClosed())
return;

// either we're in the first call (r == null)
// or we got here because we gave the reply away.
assert r == null || is_given;

if (r != null && is_given) {
if (r.hasDone() && r.getDone()) {
close();
return;
}
}

byte[] data = conn.receive(RiakMessageCodes.MSG_MapRedResp);
if (data == null) {
close();
throw new IOException("received empty response");
}

r = RPB.RpbMapRedResp.parseFrom(data);
is_given = false;

}

}
85 changes: 31 additions & 54 deletions src/com/trifork/riak/RiakClient.java
Expand Up @@ -61,7 +61,10 @@ public class RiakClient implements RiakMessageCodes {
private InetAddress addr; private InetAddress addr;
private int port; private int port;


/** if this has been set (or gotten) then it will be applied to new connections */ /**
* if this has been set (or gotten) then it will be applied to new
* connections
*/
private ByteString clientID; private ByteString clientID;


public RiakClient(String host) throws IOException { public RiakClient(String host) throws IOException {
Expand All @@ -83,7 +86,7 @@ RiakConnection getConnection() throws IOException {
RiakConnection c = connections.get(); RiakConnection c = connections.get();
if (c == null || !c.endIdleAndCheckValid()) { if (c == null || !c.endIdleAndCheckValid()) {
c = new RiakConnection(addr, port); c = new RiakConnection(addr, port);

if (this.clientID != null) { if (this.clientID != null) {
setClientID(clientID); setClientID(clientID);
} }
Expand All @@ -95,8 +98,13 @@ RiakConnection getConnection() throws IOException {
} }


void release(RiakConnection c) { void release(RiakConnection c) {
c.beginIdle(); RiakConnection cc = connections.get();
connections.set(c); if (cc == null) {
c.beginIdle();
connections.set(c);
} else {
c.close();
}
} }


/** /**
Expand Down Expand Up @@ -155,7 +163,7 @@ public void setClientID(ByteString id) throws IOException {
} finally { } finally {
release(c); release(c);
} }

this.clientID = id; this.clientID = id;
} }


Expand Down Expand Up @@ -337,7 +345,9 @@ public void run() {
byte[] data = c.receive(MSG_PutResp); byte[] data = c.receive(MSG_PutResp);
if (data != null) { if (data != null) {
RpbPutResp resp = RPB.RpbPutResp.parseFrom(data); RpbPutResp resp = RPB.RpbPutResp.parseFrom(data);
vclocks[i] = resp.getVclock(); if (resp.hasVclock()) {
vclocks[i] = resp.getVclock();
}
} }
} }
} catch (IOException e) { } catch (IOException e) {
Expand Down Expand Up @@ -494,72 +504,39 @@ public void setBucketProperties(ByteString bucket, BucketProperties props)


// ///////////////////// // /////////////////////


public ByteString[] listKeys(ByteString bucket) throws IOException { public KeySource listKeys(ByteString bucket) throws IOException {


List<ByteString> keys = new ArrayList<ByteString>(); List<ByteString> keys = new ArrayList<ByteString>();


RiakConnection c = getConnection(); RiakConnection c = getConnection();
try { c.send(MSG_ListKeysReq, RPB.RpbListKeysReq.newBuilder().setBucket(
c.send(MSG_ListKeysReq, RPB.RpbListKeysReq.newBuilder().setBucket( bucket).build());
bucket).build());

RpbListKeysResp r;
do {
byte[] data = c.receive(MSG_ListKeysResp);
if (data == null) {
return NO_BYTE_STRINGS;
}
r = RPB.RpbListKeysResp.parseFrom(data);

for (int i = 0; i < r.getKeysCount(); i++) {
keys.add(r.getKeys(i));
}


} while (!r.hasDone() || r.getDone() == false); return new KeySource(this, c);
} finally {
release(c);
}

return keys.toArray(new ByteString[keys.size()]);
} }


// ///////////////////// // /////////////////////


MapReduceResponse[] mapreduce(JSONObject obj) throws IOException { MapReduceResponseSource mapreduce(JSONObject obj) throws IOException {
return mapreduce(ByteString.copyFromUtf8(obj.toString()), return mapreduce(ByteString.copyFromUtf8(obj.toString()),
new RequestMeta().contentType("application/json")); new RequestMeta().contentType("application/json"));
} }


public MapReduceResponse[] mapreduce(ByteString request, RequestMeta meta) public MapReduceResponseSource mapreduce(ByteString request,
throws IOException { RequestMeta meta) throws IOException {
List<MapReduceResponse> out = new ArrayList<MapReduceResponse>(); List<MapReduceResponse> out = new ArrayList<MapReduceResponse>();
RiakConnection c = getConnection(); RiakConnection c = getConnection();
try {
ByteString contentType = meta.getContentType();
if (contentType == null) {
throw new IllegalArgumentException("no content type");
}
RpbMapRedReq req = RPB.RpbMapRedReq.newBuilder()
.setRequest(request).setContentType(meta.getContentType())
.build();

c.send(MSG_MapRedReq, req);
byte[] data = c.receive(MSG_MapRedResp);
if (data == null) {
return NO_MAP_REDUCE_RESPONSES;
}

RpbMapRedResp resp;
do {
resp = RPB.RpbMapRedResp.parseFrom(data);
out.add(new MapReduceResponse(resp, contentType));


} while (!resp.hasDone() || resp.getDone() == false); ByteString contentType = meta.getContentType();
} finally { if (contentType == null) {
release(c); throw new IllegalArgumentException("no content type");
} }
RpbMapRedReq req = RPB.RpbMapRedReq.newBuilder().setRequest(request)
.setContentType(meta.getContentType()).build();


return out.toArray(new MapReduceResponse[out.size()]); c.send(MSG_MapRedReq, req);

return new MapReduceResponseSource(this, c, contentType);
} }


} }
19 changes: 15 additions & 4 deletions src/com/trifork/riak/RiakConnection.java
Expand Up @@ -31,7 +31,7 @@
import com.google.protobuf.MessageLite; import com.google.protobuf.MessageLite;
import com.trifork.riak.RPB.RpbErrorResp; import com.trifork.riak.RPB.RpbErrorResp;


public class RiakConnection { class RiakConnection {


static final int DEFAULT_RIAK_PB_PORT = 8087; static final int DEFAULT_RIAK_PB_PORT = 8087;


Expand Down Expand Up @@ -108,7 +108,7 @@ void receive_code(int code) throws IOException, RiakError {
} }
} }


static Timer idle_timer = new Timer(); static Timer timer = new Timer();
TimerTask idle_timeout; TimerTask idle_timeout;


public void beginIdle() { public void beginIdle() {
Expand All @@ -120,7 +120,7 @@ public void run() {
} }
}; };


idle_timer.schedule(idle_timeout, 1000); timer.schedule(idle_timeout, 1000);
} }


synchronized void timer_fired(TimerTask fired_timer) { synchronized void timer_fired(TimerTask fired_timer) {
Expand All @@ -129,6 +129,13 @@ synchronized void timer_fired(TimerTask fired_timer) {
return; return;
} }


close();
}

void close() {
if (isClosed())
return;

try { try {
sock.close(); sock.close();
din = null; din = null;
Expand All @@ -145,7 +152,7 @@ synchronized boolean endIdleAndCheckValid() {
if (tt != null) { tt.cancel(); } if (tt != null) { tt.cancel(); }
idle_timeout = null; idle_timeout = null;


if (sock == null) { if (isClosed()) {
return false; return false;
} else { } else {
return true; return true;
Expand All @@ -155,6 +162,10 @@ synchronized boolean endIdleAndCheckValid() {
public DataOutputStream getOutputStream() { public DataOutputStream getOutputStream() {
return dout; return dout;
} }

public boolean isClosed() {
return sock == null || sock.isClosed();
}




} }

0 comments on commit a5d72e4

Please sign in to comment.