Skip to content

Commit

Permalink
Store keys in a system keyspace
Browse files Browse the repository at this point in the history
Added the idea of complex operations (multi-row operations)
Added system keyspaces
StructuredSetOperation stores each of the keys
Scan can now be filtered by keyspace (inefficiently!)
  • Loading branch information
justinsb committed Dec 17, 2013
1 parent 4977965 commit 2d7bcb2
Show file tree
Hide file tree
Showing 32 changed files with 353 additions and 140 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,9 @@ public Value get(long storeId, Keyspace keyspace, ByteString key) {
return keyValueStore.get(keyspace.mapToKey(key).asReadOnlyByteBuffer());
}

public BtreeQuery scan(long storeId) {
public BtreeQuery scan(long storeId, Keyspace keyspace) {
KeyValueStore keyValueStore = getKeyValueStore(storeId);
return keyValueStore.buildQuery();
return keyValueStore.buildQuery(keyspace);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import com.cloudata.btree.Btree;
import com.cloudata.btree.BtreeQuery;
import com.cloudata.btree.Keyspace;
import com.cloudata.btree.MmapPageStore;
import com.cloudata.btree.PageStore;
import com.cloudata.btree.ReadOnlyTransaction;
Expand Down Expand Up @@ -44,8 +45,8 @@ public Value get(final ByteBuffer key) {
}
}

public BtreeQuery buildQuery() {
return new BtreeQuery(btree);
public BtreeQuery buildQuery(Keyspace keyspace) {
return new BtreeQuery(btree, keyspace);
}

}
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package com.cloudata.keyvalue.operation;

import com.cloudata.btree.BtreeOperation;
import com.cloudata.btree.operation.RowOperation;
import com.cloudata.keyvalue.KeyValueProto.KvEntry;

public interface KeyOperation<V> extends BtreeOperation<V> {
public interface KeyOperation<V> extends RowOperation<V> {

public abstract KvEntry.Builder serialize();
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.Future;

import java.net.SocketAddress;

Expand Down Expand Up @@ -65,8 +67,11 @@ protected void initChannel(SocketChannel ch) throws Exception {
}

public void stop() throws InterruptedException {
group.shutdownGracefully();
Future<?> f1 = group.shutdownGracefully();

serverChannel.close().sync();
ChannelFuture f2 = serverChannel.close();

f1.sync();
f2.sync();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public RedisResponse execute(RedisServer server, RedisSession session, RedisRequ
return new ErrorRedisReponse("invalid DB index");
}

session.setKeyspace(Keyspace.build(Ints.checkedCast(keyspaceId)));
session.setKeyspace(Keyspace.user(Ints.checkedCast(keyspaceId)));
return StatusRedisResponse.OK;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@
import com.cloudata.btree.BtreeQuery;
import com.cloudata.btree.Keyspace;
import com.cloudata.keyvalue.KeyValueProto.KvAction;
import com.cloudata.keyvalue.KeyValueStateMachine;
import com.cloudata.keyvalue.operation.DeleteOperation;
import com.cloudata.keyvalue.operation.IncrementOperation;
import com.cloudata.keyvalue.operation.SetOperation;
import com.cloudata.keyvalue.KeyValueStateMachine;
import com.cloudata.values.Value;
import com.google.common.io.BaseEncoding;
import com.google.common.io.ByteStreams;
Expand Down Expand Up @@ -63,7 +63,7 @@ public Response get(@PathParam("key") String key) throws IOException {
@GET
@Produces(MediaType.APPLICATION_OCTET_STREAM)
public Response query() throws IOException {
BtreeQuery query = stateMachine.scan(storeId);
BtreeQuery query = stateMachine.scan(storeId, getKeyspace());

query.setFormat(MediaType.APPLICATION_OCTET_STREAM_TYPE);
return Response.ok(query).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.cloudata.btree.operation.RowOperation;
import com.cloudata.util.Hex;

public class BranchPage extends Page {
Expand Down Expand Up @@ -45,7 +46,8 @@ private int findPos(ByteBuffer find) {

ByteBuffer midKey = getKey(mid);
int comparison = ByteBuffers.compare(midKey, find);
log.info("comparison: @{} {} vs {}: {}", mid, Hex.forDebug(midKey), Hex.forDebug(find), comparison);
// log.info("comparison: @{} {} vs {}: {}", mid, Hex.forDebug(midKey), Hex.forDebug(find),
// comparison);

if (comparison < 0) {
min = mid + 1;
Expand Down Expand Up @@ -316,7 +318,7 @@ private int findPos(ByteBuffer find) {

ByteBuffer midKey = getKey(mid);
int comparison = ByteBuffers.compare(midKey, find);
log.info("comparison: @{} {} vs {}: {}", mid, Hex.forDebug(midKey), Hex.forDebug(find), comparison);
// log.info("comparison: @{} {} vs {}: {}", mid, Hex.forDebug(midKey), Hex.forDebug(find), comparison);

if (comparison < 0) {
min = mid + 1;
Expand Down Expand Up @@ -411,7 +413,7 @@ Mutable getMutable() {
}

@Override
public <V> void doAction(Transaction txn, ByteBuffer key, BtreeOperation<V> operation) {
public <V> void doAction(Transaction txn, ByteBuffer key, RowOperation<V> operation) {
int pos = findPos(key);
if (pos < 0) {
pos = 0;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ public class BtreeQuery {
private final Btree btree;
private final ByteBuffer start;
private MediaType format;
private final Keyspace keyspace;

public BtreeQuery(Btree btree) {
public BtreeQuery(Btree btree, Keyspace keyspace) {
this.btree = btree;
this.keyspace = keyspace;
this.start = null;
}

Expand Down Expand Up @@ -41,6 +43,9 @@ public KeyValueResultset(ReadOnlyTransaction txn) {
}

public void walk(EntryListener entryListener) {
if (keyspace != null) {
entryListener = new KeyspaceFilter(keyspace, entryListener);
}
txn.walk(btree, start, entryListener);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,10 @@

public interface EntryListener {

/*
* Called for each entry.
*
* Return false to terminate the walk.
*/
public boolean found(ByteBuffer key, Value value);
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,28 @@
package com.cloudata.btree;

import java.io.IOException;
import java.nio.ByteBuffer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.cloudata.util.Hex;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.protobuf.ByteString;
import com.google.protobuf.CodedOutputStream;

public class Keyspace {
public static final Keyspace ZERO = new Keyspace(0);

private static final Logger log = LoggerFactory.getLogger(Keyspace.class);

public static final Keyspace ZERO = Keyspace.user(0);
private static final int SYSTEM_START = 65536;

final int keyspaceId;
final ByteString keyspaceIdPrefix;

public Keyspace(int keyspaceId) {
private Keyspace(int keyspaceId) {
Preconditions.checkArgument(keyspaceId >= 0);
this.keyspaceId = keyspaceId;

Expand Down Expand Up @@ -44,11 +53,38 @@ public ByteString mapToKey(ByteString key) {
return keyspaceIdPrefix.concat(key);
}

public static Keyspace build(int keyspaceId) {
public static Keyspace user(int keyspaceId) {
if (keyspaceId > SYSTEM_START) {
throw new IllegalArgumentException();
}
return new Keyspace(keyspaceId);
}

public ByteString mapToKey(byte[] key) {
return mapToKey(ByteString.copyFrom(key));
}

public static Keyspace system(int i) {
return new Keyspace(SYSTEM_START + i);
}

public boolean contains(ByteBuffer buffer) {
if (buffer.remaining() < keyspaceIdPrefix.size()) {
return false;
}
for (int i = 0; i < keyspaceIdPrefix.size(); i++) {
if (buffer.get(buffer.position() + i) != keyspaceIdPrefix.byteAt(i)) {
log.debug("Mismatch: {} vs {}", Hex.forDebug(buffer), Hex.forDebug(keyspaceIdPrefix));
return false;
}
}
return true;
}

public ByteBuffer getSuffix(ByteBuffer buffer) {
assert contains(buffer);
ByteBuffer dup = buffer.duplicate();
dup.position(dup.position() + keyspaceIdPrefix.size());
return dup;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.cloudata.btree;

import java.nio.ByteBuffer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.cloudata.values.Value;

public class KeyspaceFilter implements EntryListener {
private static final Logger log = LoggerFactory.getLogger(KeyspaceFilter.class);

private final Keyspace keyspace;
private final EntryListener inner;

public KeyspaceFilter(Keyspace keyspace, EntryListener inner) {
this.keyspace = keyspace;
this.inner = inner;
}

@Override
public boolean found(ByteBuffer key, Value value) {
log.warn("KeyspaceFilter is stupid");
if (keyspace.contains(key)) {
return inner.found(key, value);
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.cloudata.btree.operation.RowOperation;
import com.cloudata.util.Hex;
import com.cloudata.values.Value;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -91,7 +92,7 @@ int firstGTE(ByteBuffer find) {
return pos;
}

<V> void doAction(ByteBuffer key, BtreeOperation<V> operation) {
<V> void doAction(ByteBuffer key, RowOperation<V> operation) {
int position = firstGTE(key);

Value oldValue = null;
Expand Down Expand Up @@ -340,6 +341,12 @@ public Entry(ByteBuffer key, ByteBuffer value) {
this.key = key;
this.value = value;
}

@Override
public String toString() {
return "Entry [key=" + Hex.forDebug(key) + ", value=" + Hex.forDebug(value) + "]";
}

}

private int firstGTE(ByteBuffer find) {
Expand Down Expand Up @@ -466,7 +473,7 @@ Mutable getMutable() {
}

@Override
public <V> void doAction(Transaction txn, ByteBuffer key, BtreeOperation<V> operation) {
public <V> void doAction(Transaction txn, ByteBuffer key, RowOperation<V> operation) {
getMutable().doAction(key, operation);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import java.nio.ByteBuffer;
import java.util.List;

import com.cloudata.btree.operation.RowOperation;

public abstract class Page {
protected final Page parent;
protected final ByteBuffer buffer;
Expand All @@ -20,7 +22,7 @@ protected Page(Page parent, int pageNumber, ByteBuffer buffer) {

public abstract boolean walk(Transaction txn, ByteBuffer from, EntryListener listener);

public abstract <V> void doAction(Transaction txn, ByteBuffer key, BtreeOperation<V> operation);
public abstract <V> void doAction(Transaction txn, ByteBuffer key, RowOperation<V> operation);

public abstract ByteBuffer getKeyLbound();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.Collections;
import java.util.List;

import com.cloudata.btree.operation.RowOperation;
import com.cloudata.freemap.SpaceMapEntry;
import com.google.common.collect.Lists;

Expand Down Expand Up @@ -181,7 +182,7 @@ public boolean walk(Transaction txn, ByteBuffer from, EntryListener listener) {
}

@Override
public <V> void doAction(Transaction txn, ByteBuffer key, BtreeOperation<V> operation) {
public <V> void doAction(Transaction txn, ByteBuffer key, RowOperation<V> operation) {
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
import org.slf4j.LoggerFactory;

import com.cloudata.btree.PageStore.PageRecord;
import com.cloudata.btree.operation.BtreeOperation;
import com.cloudata.btree.operation.ComplexOperation;
import com.cloudata.btree.operation.RowOperation;
import com.cloudata.freemap.SpaceMapEntry;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -175,7 +178,13 @@ public void commit() {
}

public <V> void doAction(Btree btree, ByteBuffer key, BtreeOperation<V> operation) {
getRootPage(btree, true).doAction(this, key, operation);
if (operation instanceof RowOperation) {
getRootPage(btree, true).doAction(this, key, (RowOperation<V>) operation);
} else if (operation instanceof ComplexOperation) {
((ComplexOperation) operation).doAction(btree, this, key);
} else {
throw new UnsupportedOperationException();
}
}

int createdPageCount;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.cloudata.btree.operation;

public interface BtreeOperation<V> {
public abstract V getResult();
}
Loading

0 comments on commit 2d7bcb2

Please sign in to comment.