Skip to content

Commit

Permalink
brianfrankcoopergh-54 Add mapkeeper driver (m1ch1)
Browse files Browse the repository at this point in the history
  • Loading branch information
Michi Mutsuzaki committed Dec 2, 2011
1 parent e6880f6 commit d5120fb
Show file tree
Hide file tree
Showing 4 changed files with 203 additions and 0 deletions.
5 changes: 5 additions & 0 deletions build.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@
<antcall target="dbcompile"/>
</target>

<target name="dbcompile-mapkeeper" depends="compile">
<property name="db.dir" value="db/mapkeeper"/>
<antcall target="dbcompile"/>
</target>

<target name="compile">
<mkdir dir="${classes.dir}"/>
<javac includeantruntime="false" srcdir="${src.dir}" destdir="${classes.dir}" classpathref="build.classpath" deprecation="on">
Expand Down
1 change: 1 addition & 0 deletions changes
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
* gh-50 Switch unordered key generation from FNV32 to FNV64 (sears)
* gh-51 Improved Random Number Generation Performance and add Exponential distribution support (sears)
* gh-52 Mongo db fix (sears)
* gh-54 Add mapkeeper driver (m1ch1)

0.1.3
* Voldemort binding (rsumbaly)
Expand Down
1 change: 1 addition & 0 deletions db/mapkeeper/lib/README
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
This directory should contain jars for building and running MapKeeper
196 changes: 196 additions & 0 deletions db/mapkeeper/src/com/yahoo/ycsb/db/MapKeeperClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
package com.yahoo.ycsb.db;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Properties;
import java.util.Set;
import java.util.Vector;

import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;

import com.yahoo.mapkeeper.BinaryResponse;
import com.yahoo.mapkeeper.MapKeeper;
import com.yahoo.mapkeeper.Record;
import com.yahoo.mapkeeper.RecordListResponse;
import com.yahoo.mapkeeper.ResponseCode;
import com.yahoo.mapkeeper.ScanOrder;
import com.yahoo.ycsb.ByteIterator;
import com.yahoo.ycsb.DB;
import com.yahoo.ycsb.StringByteIterator;
import com.yahoo.ycsb.workloads.CoreWorkload;

public class MapKeeperClient extends DB {
MapKeeper.Client c;
boolean writeallfields;
static boolean initteddb = false;
private synchronized static void initDB(Properties p, MapKeeper.Client c) throws TException {
if(!initteddb) {
initteddb = true;
c.addMap(p.getProperty(CoreWorkload.TABLENAME_PROPERTY, CoreWorkload.TABLENAME_PROPERTY_DEFAULT));
}
}

public void init() {
TTransport tr = new TFramedTransport(new TSocket("localhost", 9090));
TProtocol proto = new TBinaryProtocol(tr);
c = new MapKeeper.Client(proto);
try {
tr.open();
initDB(getProperties(), c);
} catch(TException e) {
throw new RuntimeException(e);
}
writeallfields = Boolean.parseBoolean(getProperties().getProperty(CoreWorkload.WRITE_ALL_FIELDS_PROPERTY,
CoreWorkload.WRITE_ALL_FIELDS_PROPERTY_DEFAULT));
}

ByteBuffer encode(HashMap<String, ByteIterator> values) {
int len = 0;
for(String k : values.keySet()) {
len += (k.length() + 1 + values.get(k).bytesLeft() + 1);
}
byte[] array = new byte[len];
int i = 0;
for(String k : values.keySet()) {
for(int j = 0; j < k.length(); j++) {
array[i] = (byte)k.charAt(j);
i++;
}
array[i] = '\t'; // XXX would like to use sane delimiter (null, 254, 255, ...) but java makes this nearly impossible
i++;
ByteIterator v = values.get(k);
i = v.nextBuf(array, i);
array[i] = '\t';
i++;
}
array[array.length-1] = 0;
ByteBuffer buf = ByteBuffer.wrap(array);
buf.rewind();
return buf;
}
void decode(Set<String> fields, String tups, HashMap<String, ByteIterator> tup) {
String[] tok = tups.split("\\t");
if(tok.length == 0) { throw new IllegalStateException("split returned empty array!"); }
for(int i = 0; i < tok.length; i+=2) {
if(fields == null || fields.contains(tok[i])) {
if(tok.length < i+2) { throw new IllegalStateException("Couldn't parse tuple <" + tups + "> at index " + i); }
if(tok[i] == null || tok[i+1] == null) throw new NullPointerException("Key is " + tok[i] + " val is + " + tok[i+1]);
tup.put(tok[i], new StringByteIterator(tok[i+1]));
}
}
if(tok.length == 0) {
System.err.println("Empty tuple: " + tups);
}
}

int ycsbThriftRet(BinaryResponse succ, ResponseCode zero, ResponseCode one) {
return ycsbThriftRet(succ.responseCode, zero, one);
}
int ycsbThriftRet(ResponseCode rc, ResponseCode zero, ResponseCode one) {
return
rc == zero ? 0 :
rc == one ? 1 : 2;
}
ByteBuffer bufStr(String str) {
ByteBuffer buf = ByteBuffer.wrap(str.getBytes());
return buf;
}
String strResponse(BinaryResponse buf) {
return new String(buf.value.array());
}

@Override
public int read(String table, String key, Set<String> fields,
HashMap<String, ByteIterator> result) {
try {
ByteBuffer buf = bufStr(key);

BinaryResponse succ = c.get(table, buf);

int ret = ycsbThriftRet(
succ,
ResponseCode.RecordExists,
ResponseCode.RecordNotFound);

if(ret == 0) {
decode(fields, strResponse(succ), result);
}
return ret;
} catch(TException e) {
e.printStackTrace();
return 2;
}
}

@Override
public int scan(String table, String startkey, int recordcount,
Set<String> fields, Vector<HashMap<String, ByteIterator>> result) {
try {
//XXX what to pass in for nulls / zeros?
RecordListResponse res = c.scan(table, ScanOrder.Ascending, bufStr(startkey), true, null, false, recordcount, 0);
int ret = ycsbThriftRet(res.responseCode, ResponseCode.Success, ResponseCode.ScanEnded);
if(ret == 0) {
for(Record r : res.records) {
HashMap<String, ByteIterator> tuple = new HashMap<String, ByteIterator>();
// Note: r.getKey() and r.getValue() call special helper methods that trim the buffer
// to an appropriate length, and memcpy it to a byte[]. Trying to manipulate the ByteBuffer
// directly leads to trouble.
tuple.put("key", new StringByteIterator(new String(r.getKey())));
decode(fields, new String(r.getValue())/*strBuf(r.bufferForValue())*/, tuple);
result.add(tuple);
}
}
return ret;
} catch(TException e) {
e.printStackTrace();
return 2;
}
}

@Override
public int update(String table, String key,
HashMap<String, ByteIterator> values) {
try {
if(!writeallfields) {
HashMap<String, ByteIterator> oldval = new HashMap<String, ByteIterator>();
read(table, key, null, oldval);
for(String k: values.keySet()) {
oldval.put(k, values.get(k));
}
values = oldval;
}
ResponseCode succ = c.update(table, bufStr(key), encode(values));
return ycsbThriftRet(succ, ResponseCode.RecordExists, ResponseCode.RecordNotFound);
} catch(TException e) {
e.printStackTrace();
return 2;
}
}

@Override
public int insert(String table, String key,
HashMap<String, ByteIterator> values) {
try {
int ret = ycsbThriftRet(c.insert(table, bufStr(key), encode(values)), ResponseCode.Success, ResponseCode.RecordExists);
return ret;
} catch(TException e) {
e.printStackTrace();
return 2;
}
}

@Override
public int delete(String table, String key) {
try {
return ycsbThriftRet(c.remove(table, bufStr(key)), ResponseCode.Success, ResponseCode.RecordExists);
} catch(TException e) {
e.printStackTrace();
return 2;
}
}
}

0 comments on commit d5120fb

Please sign in to comment.