Permalink
Browse files

added memcached storage engine.

  • Loading branch information...
1 parent 0013aac commit 6aed715acc6a8017f614e2aaa72baf43efafe6ba @sunsuk7tp committed Jan 19, 2012
@@ -1,4 +1,4 @@
-/*
+/*
* Copyright 2011 Shunsuke Nakamura, and contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -31,13 +31,22 @@
public static final int HSMYSQL = 5;
public static final int MONGODB = 6;
public static final int KYOTOCABINET = 7;
-
- // label name specified in cassandra.yaml
- public static final String[] storageLabels = {"Bigtable", "Redis", "MySQL", "RangeMySQL", "HSMySQL", "MongoDB", "KyotoCabinet"};
+ public static final int MEMCACHED = 8;
+
+ // label name specified in cassandra.yaml
+ public static final String[] storageLabels = {
+ "Bigtable",
+ "Redis",
+ "MySQL",
+ "RangeMySQL",
+ "HSMySQL",
+ "MongoDB",
+ "KyotoCabinet",
+ "Memcached"};
public static final Map<Integer, EngineInfo> enginesInfo = new HashMap<Integer, EngineInfo>(storageLabels.length);
public static Map<String, Integer> engineKSMap = new HashMap<String, Integer>();
-
+
// schema used se number
public static final int[] schemaUsedTypes = {MYSQL, RANGEMYSQL, HSMYSQL};
public static final int[] needSetupTypes = {KYOTOCABINET};
@@ -156,7 +165,7 @@ public int getStorageTypeForKS(String ksName)
if(ksName == null)
return getStorageType();
return storageKSMap.containsKey(ksName) ? storageKSMap.get(ksName) : getStorageType();
-
+
}
// init setup and return the instance specified in storageType
@@ -196,10 +205,13 @@ public static StorageEngine getEngine(int storageType, String tableName, String
engine = new MongoInstance(tableName, cfName);
break;
case KYOTOCABINET:
- engine = kcDBClass != null
+ engine = kcDBClass != null
? new KyotoCabinetInstance(tableName, cfName, kcDBClass)
: new KyotoCabinetInstance(tableName, cfName);
break;
+ case MEMCACHED:
+ engine = new MemcacheInstance(tableName, cfName);
+ break;
}
return engine;
@@ -0,0 +1,132 @@
+package org.apache.cassandra.db.engine;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.DecoratedKey;
+
+import net.spy.memcached.MemcachedClient;
+import net.spy.memcached.internal.OperationFuture;
+
+public class MemcacheInstance extends DBSchemalessInstance
+{
+ MemcachedClient cli;
+
+ private static final String KEYSEPARATOR = ":";
+ private static final int timeout = 5;
+
+ public MemcacheInstance (String ksName, String cfName)
+ {
+ setConfiguration();
+ try {
+ cli = new MemcachedClient(new InetSocketAddress(host, port));
+ } catch (IOException e) {
+ super.errorMsg(String.format("memcached connection failed, %s, %s", host, port), e);
+ System.exit(1);
+ }
+ engineName = "Memcached";
+ this.ksName = ksName;
+ this.cfName = cfName;
+ }
+
+ @Override
+ public int insert(String rowKey, ColumnFamily cf)
+ {
+ return doInsert(makeRowKey(rowKey),cf.toBytes());
+ }
+
+ @Override
+ public int update(String rowKey, ColumnFamily newcf)
+ {
+ return doUpdate(makeRowKey(rowKey), newcf.toBytes());
+ }
+
+ @Override
+ public byte[] select(String rowKey) {
+ return (byte[]) cli.get(makeRowKey(rowKey));
+ }
+
+ @Override
+ public Map<ByteBuffer, ColumnFamily> getRangeSlice(DecoratedKey startWith,
+ DecoratedKey stopAt, int maxResults) {
+ // RangeSlice is not supported in memcached.
+ return null;
+ }
+
+ @Override
+ public int truncate() {
+ // not implemented
+ return 0;
+ }
+
+ @Override
+ public int dropTable() {
+ // not implemented
+ return 0;
+ }
+
+ @Override
+ public int dropDB() {
+ // not implemented
+ return 0;
+ }
+
+ @Override
+ public int delete(String rowKey) {
+ return doDelete(makeRowKey(rowKey));
+ }
+
+ synchronized private int doUpdate(String rowKey, byte[] cfValue)
+ {
+ OperationFuture<Boolean> f = cli.replace(rowKey, -1, cfValue);
+ try
+ {
+ boolean isUpdate = f.get(timeout, TimeUnit.SECONDS);
+ return isUpdate ? 1 : -1;
+ }
+ catch (Exception e)
+ {
+ f.cancel(false);
+ return -1;
+ }
+ }
+
+ synchronized private int doInsert(String rowKey, byte[] cfValue)
+ {
+ OperationFuture<Boolean> f = cli.add(rowKey, -1, cfValue);
+ try
+ {
+ boolean isInsert = f.get(timeout, TimeUnit.SECONDS);
+ return isInsert ? 1 : -1;
+ }
+ catch (Exception e)
+ {
+ f.cancel(false);
+ return -1;
+ }
+ }
+
+ synchronized private int doDelete(String rowKey)
+ {
+ OperationFuture<Boolean> f = cli.delete(rowKey);
+ try
+ {
+ boolean isDelete = f.get(timeout, TimeUnit.SECONDS);
+ return isDelete ? 1 : -1;
+ }
+ catch (Exception e)
+ {
+ f.cancel(false);
+ return -1;
+ }
+ }
+
+ private String makeRowKey(String rowKey)
+ {
+ return ksName + KEYSEPARATOR + cfName + KEYSEPARATOR + rowKey;
+ }
+}
@@ -1,4 +1,4 @@
-/*
+/*
* Copyright 2011 Shunsuke Nakamura, and contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -29,8 +29,8 @@
{
BinaryJedis conn;
- final String KEYSEPARATOR = ":";
- final int timeout = 300;
+ private static final String KEYSEPARATOR = ":";
+ private static final int timeout = 300;
public RedisInstance(String ksName, String cfName, int dbIndex)
{
@@ -63,11 +63,13 @@ public boolean auth()
return status.equals("OK") ? true : false;
}
+ @Override
public int insert(String rowKey, ColumnFamily cf)
{
return doInsert(makeRowKey(rowKey), cf.toBytes());
}
+ @Override
public int update(String rowKey, ColumnFamily newcf)
{
return doInsert(makeRowKey(rowKey), newcf.toBytes());
@@ -78,32 +80,38 @@ public int update(String rowKey, ColumnFamily newcf)
* If you want not to use 'synchronized' for performance with concurrent processing,
* you should create a instance by an operation and adjust the max file discriptor on your environments.
*/
+ @Override
synchronized public byte[] select(String rowKey)
{
return conn.get(makeRowKey(rowKey));
}
+ @Override
public Map<ByteBuffer, ColumnFamily> getRangeSlice(DecoratedKey startWith, DecoratedKey stopAt, int maxResults)
{
return null;
}
-
+
+ @Override
synchronized public int truncate()
{
return FAILURE;
}
+ @Override
synchronized public int dropTable()
{
return FAILURE;
}
+ @Override
synchronized public int dropDB()
{
conn.flushDB();
return SUCCESS;
}
+ @Override
synchronized public int delete(String rowKey)
{
conn.del(makeRowKey(rowKey));
@@ -116,7 +124,7 @@ synchronized private int doInsert(byte[] rowKey, byte[] cfValue)
return SUCCESS;
}
- public byte[] makeRowKey(String rowKey)
+ private byte[] makeRowKey(String rowKey)
{
try
{

0 comments on commit 6aed715

Please sign in to comment.