Skip to content

Commit

Permalink
Simple return-all-entries query
Browse files Browse the repository at this point in the history
  • Loading branch information
justinsb committed Dec 15, 2013
1 parent 1177867 commit 4b281eb
Show file tree
Hide file tree
Showing 11 changed files with 355 additions and 20 deletions.
Original file line number Original file line Diff line number Diff line change
@@ -1,14 +1,20 @@
package com.cloudata.keyvalue; package com.cloudata.keyvalue;


import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.Iterator;
import java.util.NoSuchElementException;


import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MediaType;


import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import com.cloudata.util.Hex; import com.cloudata.util.Hex;
import com.google.common.base.Throwables;
import com.google.common.io.ByteStreams;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import com.sun.jersey.api.client.Client; import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientResponse; import com.sun.jersey.api.client.ClientResponse;
Expand Down Expand Up @@ -128,10 +134,129 @@ public KeyValueEntry read(long storeId, ByteString key) throws IOException {
} }
} }


public KeyValueRecordset query(long storeId) throws IOException {
ClientResponse response = CLIENT.resource(url).path(toUrlPath(storeId)).get(ClientResponse.class);

try {
int status = response.getStatus();

switch (status) {
case 200:
break;

default:
throw new IllegalStateException("Unexpected status: " + status);
}

KeyValueRecordset records = new KeyValueRecordset(response);
response = null;
return records;
} finally {
if (response != null) {
response.close();
}
}
}

static class KeyValueRecordset implements AutoCloseable, Iterable<KeyValueEntry> {
final ClientResponse response;

boolean read;

public KeyValueRecordset(ClientResponse response) {
this.response = response;
}

@Override
public void close() {
response.close();
}

@Override
public Iterator<KeyValueEntry> iterator() {
if (read) {
throw new IllegalStateException();
}
read = true;
InputStream is = response.getEntityInputStream();

final DataInputStream dis = new DataInputStream(is);

return new Iterator<KeyValueClient.KeyValueEntry>() {
KeyValueEntry next;
boolean done;

@Override
public void remove() {
throw new UnsupportedOperationException();
}

@Override
public KeyValueEntry next() {
ensureHaveNext();

if (next != null) {
KeyValueEntry ret = next;
next = null;
return ret;
} else {
throw new NoSuchElementException();
}
}

@Override
public boolean hasNext() {
ensureHaveNext();

return next != null;
}

private void ensureHaveNext() {
if (next == null) {
if (!done) {
try {
next = read();
} catch (IOException e) {
throw Throwables.propagate(e);
}
if (next == null) {
done = true;
}
}
}
}

KeyValueEntry read() throws IOException {
int keyLength = dis.readInt();
if (keyLength == -1) {
return null;
}

ByteString key = ByteString.readFrom(ByteStreams.limit(dis, keyLength));
if (key.size() != keyLength) {
throw new EOFException();
}

int valueLength = dis.readInt();
ByteString value = ByteString.readFrom(ByteStreams.limit(dis, valueLength));
if (value.size() != valueLength) {
throw new EOFException();
}

return new KeyValueEntry(key, value);
}
};
}
}

private String toUrlPath(long storeId, ByteString key) { private String toUrlPath(long storeId, ByteString key) {
return Long.toString(storeId) + "/" + Hex.toHex(key); return Long.toString(storeId) + "/" + Hex.toHex(key);
} }


private String toUrlPath(long storeId) {
return Long.toString(storeId);
}

public void delete(long storeId, ByteString key) { public void delete(long storeId, ByteString key) {
ClientResponse response = CLIENT.resource(url).path(toUrlPath(storeId, key)).delete(ClientResponse.class); ClientResponse response = CLIENT.resource(url).path(toUrlPath(storeId, key)).delete(ClientResponse.class);


Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.cloudata.keyvalue.btree.operation.IncrementOperation; import com.cloudata.keyvalue.btree.operation.IncrementOperation;
import com.cloudata.keyvalue.btree.operation.KeyOperation; import com.cloudata.keyvalue.btree.operation.KeyOperation;
import com.cloudata.keyvalue.btree.operation.SetOperation; import com.cloudata.keyvalue.btree.operation.SetOperation;
import com.cloudata.keyvalue.web.KeyValueQuery;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader; import com.google.common.cache.CacheLoader;
Expand Down Expand Up @@ -159,7 +160,12 @@ public KeyValueStore load(@Nonnull Long id) throws Exception {


public ByteBuffer get(long storeId, ByteBuffer key) { public ByteBuffer get(long storeId, ByteBuffer key) {
KeyValueStore keyValueStore = getKeyValueStore(storeId); KeyValueStore keyValueStore = getKeyValueStore(storeId);
return keyValueStore.get(key.asReadOnlyBuffer()); return keyValueStore.get(key);
}

public KeyValueQuery scan(long storeId) {
KeyValueStore keyValueStore = getKeyValueStore(storeId);
return keyValueStore.buildQuery();
} }


} }
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.cloudata.keyvalue.btree.ReadOnlyTransaction; import com.cloudata.keyvalue.btree.ReadOnlyTransaction;
import com.cloudata.keyvalue.btree.WriteTransaction; import com.cloudata.keyvalue.btree.WriteTransaction;
import com.cloudata.keyvalue.btree.operation.KeyOperation; import com.cloudata.keyvalue.btree.operation.KeyOperation;
import com.cloudata.keyvalue.web.KeyValueQuery;


public class KeyValueStore { public class KeyValueStore {


Expand Down Expand Up @@ -76,4 +77,8 @@ public ByteBuffer get(final ByteBuffer key) {
} }
} }


public KeyValueQuery buildQuery() {
return new KeyValueQuery(btree);
}

} }
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public int getEntryCount() {


public boolean walk(Transaction txn, ByteBuffer from, EntryListener listener) { public boolean walk(Transaction txn, ByteBuffer from, EntryListener listener) {
int n = getEntryCount(); int n = getEntryCount();
int pos = findPos(from); int pos = from != null ? findPos(from) : 0;
if (pos < 0) { if (pos < 0) {
pos = 0; pos = 0;
} }
Expand Down Expand Up @@ -355,7 +355,7 @@ public boolean walk(Transaction txn, ByteBuffer from, EntryListener listener) {
} }


int n = getEntryCount(); int n = getEntryCount();
int pos = findPos(from); int pos = from != null ? findPos(from) : 0;
if (pos < 0) { if (pos < 0) {
pos = 0; pos = 0;
} }
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@


import java.nio.ByteBuffer; import java.nio.ByteBuffer;


import com.google.protobuf.ByteString;

public class ByteBuffers { public class ByteBuffers {


public static int compare(ByteBuffer l, ByteBuffer r) { public static int compare(ByteBuffer l, ByteBuffer r) {
Expand Down Expand Up @@ -87,4 +89,11 @@ public static ByteBuffer clone(ByteBuffer b) {
buff.flip(); buff.flip();
return buff; return buff;
} }

public static ByteBuffer asReadOnlyBuffer(ByteString b) {
if (b == null) {
return null;
}
return b.asReadOnlyByteBuffer();
}
} }
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ public int getEntryCount() {


public boolean walk(Transaction txn, ByteBuffer from, EntryListener listener) { public boolean walk(Transaction txn, ByteBuffer from, EntryListener listener) {
int n = getEntryCount(); int n = getEntryCount();
int pos = firstGTE(from); int pos = from != null ? firstGTE(from) : 0;
while (pos < n) { while (pos < n) {
ByteBuffer key = getKey(pos); ByteBuffer key = getKey(pos);
ByteBuffer value = getValue(pos); ByteBuffer value = getValue(pos);
Expand Down Expand Up @@ -381,7 +381,7 @@ public boolean walk(Transaction txn, ByteBuffer from, EntryListener listener) {
} }


int n = getEntryCount(); int n = getEntryCount();
int pos = firstGTE(from); int pos = from != null ? firstGTE(from) : 0;
while (pos < n) { while (pos < n) {
ByteBuffer key = getKey(pos); ByteBuffer key = getKey(pos);
ByteBuffer value = getValue(pos); ByteBuffer value = getValue(pos);
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ public Response get(@PathParam("key") String key) throws IOException {
return Response.ok(v).build(); return Response.ok(v).build();
} }


@GET
// @Consumes(MediaType.APPLICATION_OCTET_STREAM)
public Response query() throws IOException {
KeyValueQuery query = stateMachine.scan(storeId);

return Response.ok(query).build();
}

enum PostAction { enum PostAction {
SET, INCREMENT SET, INCREMENT
} }
Expand Down
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,55 @@
package com.cloudata.keyvalue.web;

import java.nio.ByteBuffer;

import com.cloudata.keyvalue.btree.Btree;
import com.cloudata.keyvalue.btree.EntryListener;
import com.cloudata.keyvalue.btree.ReadOnlyTransaction;

public class KeyValueQuery {

private final Btree btree;
private final ByteBuffer start;

public KeyValueQuery(Btree btree) {
this.btree = btree;
this.start = null;
}

public KeyValueResultset execute() {
ReadOnlyTransaction txn = null;
KeyValueResultset cursor = null;

try {
txn = btree.beginReadOnly();

cursor = new KeyValueResultset(txn);
txn = null;
} finally {
if (txn != null) {
txn.close();
}
}

return cursor;
}

public class KeyValueResultset implements AutoCloseable {
private final ReadOnlyTransaction txn;

public KeyValueResultset(ReadOnlyTransaction txn) {
this.txn = txn;
}

public void walk(EntryListener entryListener) {
txn.walk(btree, start, entryListener);
}

@Override
public void close() {
txn.close();
}

}

}
Loading

0 comments on commit 4b281eb

Please sign in to comment.