Permalink
Browse files

Use finagle memcached in MemcachedState

  • Loading branch information...
1 parent 26c7b37 commit aaaafdbde7b198a76e85278f6f83bc50935efa74 Krishna Gade committed Aug 31, 2012
Showing with 125 additions and 42 deletions.
  1. +4 −0 .gitignore
  2. +8 −0 project.clj
  3. +113 −42 src/jvm/trident/memcached/MemcachedState.java
View
@@ -20,4 +20,8 @@ NANNY
_release
*.zip
.lein-deps-sum
+*.ipr
+*.iml
+*.iws
+target
View
@@ -2,9 +2,17 @@
:source-path "src/clj"
:java-source-path "src/jvm"
:javac-options {:debug "true" :fork "true"}
+ :repositories {"twitter-maven" "http://maven.twttr.com/", "storm" "https://clojars.org/storm"}
:dependencies [[spy/spymemcached "2.8.1"]
[com.thimbleware.jmemcached/jmemcached-cli "1.0.0"]
+ [com.twitter/util-core "5.3.7"]
+ [com.twitter/util-collection "5.3.7"]
+ [com.twitter/util-logging "5.3.7"]
+ [com.twitter/finagle-core "5.3.8"]
+ [com.twitter/finagle-memcached "5.3.8"]
+ [storm "0.8.0"]
+ [org.clojure/clojure "1.4.0"]
]
:dev-dependencies [[storm "0.8.0"]
@@ -1,19 +1,32 @@
package trident.memcached;
import backtype.storm.tuple.Values;
-import java.io.IOException;
import java.io.Serializable;
+import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
import net.spy.memcached.CachedData;
import net.spy.memcached.ConnectionFactoryBuilder;
-import net.spy.memcached.MemcachedClient;
-import net.spy.memcached.internal.OperationFuture;
import net.spy.memcached.transcoders.Transcoder;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import com.twitter.finagle.builder.ClientBuilder;
+import com.twitter.finagle.memcached.KetamaClientBuilder;
+import com.twitter.finagle.memcached.java.Client;
+import com.twitter.finagle.memcached.java.ClientBase;
+import com.twitter.finagle.memcached.protocol.text.Memcached;
+import com.twitter.util.Duration;
+import com.twitter.util.Future;
+import com.twitter.util.Time;
+
import storm.trident.state.JSONNonTransactionalSerializer;
import storm.trident.state.JSONOpaqueSerializer;
import storm.trident.state.JSONTransactionalSerializer;
@@ -37,43 +50,43 @@
put(StateType.TRANSACTIONAL, new JSONTransactionalSerializer());
put(StateType.OPAQUE, new JSONOpaqueSerializer());
}};
-
+
public static class Options<T> implements Serializable {
int localCacheSize = 1000;
String globalKey = "$GLOBAL$";
Serializer<T> serializer = null;
- }
-
+ }
+
public static StateFactory opaque(List<InetSocketAddress> servers) {
return opaque(servers, new Options());
}
public static StateFactory opaque(List<InetSocketAddress> servers, Options<OpaqueValue> opts) {
return new Factory(servers, StateType.OPAQUE, opts);
}
-
+
public static StateFactory transactional(List<InetSocketAddress> servers) {
return transactional(servers, new Options());
}
-
+
public static StateFactory transactional(List<InetSocketAddress> servers, Options<TransactionalValue> opts) {
return new Factory(servers, StateType.TRANSACTIONAL, opts);
- }
-
+ }
+
public static StateFactory nonTransactional(List<InetSocketAddress> servers) {
- return nonTransactional(servers, new Options());
+ return nonTransactional(servers, new Options());
}
-
+
public static StateFactory nonTransactional(List<InetSocketAddress> servers, Options<Object> opts) {
- return new Factory(servers, StateType.NON_TRANSACTIONAL, opts);
- }
-
+ return new Factory(servers, StateType.NON_TRANSACTIONAL, opts);
+ }
+
protected static class Factory implements StateFactory {
StateType _type;
List<InetSocketAddress> _servers;
Serializer _ser;
Options _opts;
-
+
public Factory(List<InetSocketAddress> servers, StateType type, Options options) {
_type = type;
_servers = servers;
@@ -87,7 +100,7 @@ public Factory(List<InetSocketAddress> servers, StateType type, Options options)
_ser = options.serializer;
}
}
-
+
@Override
public State makeState(Map conf, int partitionIndex, int numPartitions) {
ConnectionFactoryBuilder builder =
@@ -116,8 +129,8 @@ public int getMaxSize() {
});
MemcachedState s;
try {
- s = new MemcachedState(new MemcachedClient(builder.build(), _servers));
- } catch (IOException e) {
+ s = new MemcachedState(makeMemcachedClient(_servers), _opts.serializer);
+ } catch (UnknownHostException e) {
throw new RuntimeException(e);
}
CachedMap c = new CachedMap(s, _opts.localCacheSize);
@@ -132,13 +145,67 @@ public int getMaxSize() {
throw new RuntimeException("Unknown state type: " + _type);
}
return new SnapshottableMap(ms, new Values(_opts.globalKey));
- }
+ }
+
+ /**
+ * Constructs a finagle java memcached client for the list of endpoints..
+ *
+ * @param endpoints list of {@code InetSocketAddress} for all the memcached servers.
+ * @return {@link Client} to read/write to the hash ring of the servers..
+ */
+ static Client makeMemcachedClient(List<InetSocketAddress> endpoints)
+ throws UnknownHostException {
+ final int requestRetries = 2; // max number of retries after the first failure.
+ final int connectTimeoutMillis = 200; // tcp connection timeout.
+ final int requestTimeoutMillis = 50; // request timeout.
+ final int hostConnectionLimit = 10; // concurrent connections to one server.
+ final int maxWaiters = 2; // max waiters in the request queue.
+
+ com.twitter.finagle.memcached.Client client =
+ KetamaClientBuilder.get()
+ .nodes(getHostPortWeightTuples(endpoints))
+ .clientBuilder(ClientBuilder.get()
+ .codec(new Memcached())
+ .tcpConnectTimeout(new Duration(TimeUnit.MILLISECONDS.toNanos(connectTimeoutMillis)))
+ .requestTimeout(new Duration(TimeUnit.MILLISECONDS.toNanos(requestTimeoutMillis)))
+ .hostConnectionLimit(hostConnectionLimit)
+ .hostConnectionMaxWaiters(maxWaiters)
+ .retries(requestRetries))
+ .build();
+
+ return new ClientBase(client);
+ }
+
+ /**
+ * Constructs a host:port:weight tuples string of all the passed endpoints.
+ *
+ * @param endpoints list of {@code InetSocketAddress} for all the memcached servers.
+ * @return Comma-separated string of host:port:weight tuples.
+ */
+ static String getHostPortWeightTuples(List<InetSocketAddress> endpoints) throws UnknownHostException {
+ final int defaultWeight = 1;
+ final StringBuilder tuples = new StringBuilder(1024);
+ for (InetSocketAddress endpoint : endpoints) {
+ if (tuples.length() > 0) {
+ tuples.append(",");
+ }
+ String hostname = endpoint.getHostName();
+ if (endpoint.getHostName().equals("localhost")) {
+ InetAddress localMachine = InetAddress.getLocalHost();
+ hostname = localMachine.getHostName();
+ }
+ tuples.append(String.format("%s:%d:%d", hostname, endpoint.getPort(), defaultWeight));
+ }
+ return tuples.toString();
+ }
}
-
- MemcachedClient _client;
-
- public MemcachedState(MemcachedClient client) {
+
+ private final Client _client;
+ private final Serializer<T> _serializer;
+
+ public MemcachedState(Client client, Serializer<T> serializer) {
_client = client;
+ _serializer = serializer;
}
@Override
@@ -147,37 +214,41 @@ public MemcachedState(MemcachedClient client) {
for(List<Object> key: keys) {
singleKeys.add(toSingleKey(key));
}
- Map<String, Object> result = _client.getBulk(singleKeys);
+ Map<String, ChannelBuffer> result = _client.get(singleKeys).get();
List<T> ret = new ArrayList(singleKeys.size());
for(String k: singleKeys) {
- ret.add((T)result.get(k));
+ ChannelBuffer entry = result.get(k);
+ T val = (T)_serializer.deserialize(entry.array());
+ ret.add(val);
}
return ret;
}
@Override
- public void multiPut(List<List<Object>> keys, List<T> vals) {
- List<OperationFuture<Boolean>> futures = new ArrayList(keys.size());
+ public void multiPut(List<List<Object>> keys, List<T> vals) {
+ final long defaultExpirtyMillis = 30 * 24 * 60 * 60 * 1000; // 30 days.
+ List<Future> futures = new ArrayList(keys.size());
for(int i=0; i<keys.size(); i++) {
String key = toSingleKey(keys.get(i));
T val = vals.get(i);
- futures.add(_client.set(key, 0, val));
- }
- for(OperationFuture<Boolean> future: futures) {
- try {
- future.get();
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- } catch (ExecutionException e) {
- throw new RuntimeException(e);
- }
+ byte[] serialized = _serializer.serialize(val);
+ final ChannelBuffer entry = ChannelBuffers.wrappedBuffer(serialized);
+ Time expiry =
+ Time.fromMilliseconds(Time.now().inMilliseconds() + defaultExpirtyMillis);
+ futures.add(_client.set(key, 0 /* no flags */, expiry, entry));
}
+
+ //TODO: Do we need to block on the success of put ?
+ for(Future future: futures) {
+ future.get();
+ }
}
-
+
private String toSingleKey(List<Object> key) {
if(key.size()!=1) {
throw new RuntimeException("Memcached state does not support compound keys");
}
- return (String) key.get(0);
- }
+ //TODO: Can we return a smaller key here, typically base64 hash of the key is preferable.
+ return (String) key.get(0);
+ }
}

0 comments on commit aaaafdb

Please sign in to comment.