44
55import java .io .File ;
66import java .nio .ByteBuffer ;
7+ import java .util .concurrent .Callable ;
78import java .util .concurrent .ExecutionException ;
89
910import javax .annotation .Nonnull ;
1920
2021import com .cloudata .btree .BtreeQuery ;
2122import com .cloudata .btree .Keyspace ;
22- import com .cloudata .keyvalue .KeyValueLog .KvEntry ;
23- import com .cloudata .keyvalue .operation .AppendOperation ;
24- import com .cloudata .keyvalue .operation .DeleteOperation ;
25- import com .cloudata .keyvalue .operation .IncrementOperation ;
26- import com .cloudata .keyvalue .operation .KeyOperation ;
27- import com .cloudata .keyvalue .operation .SetOperation ;
23+ import com .cloudata .keyvalue .KeyValueProtocol .ActionResponse ;
24+ import com .cloudata .keyvalue .KeyValueProtocol .KeyValueAction ;
25+ import com .cloudata .keyvalue .operation .KeyValueOperation ;
26+ import com .cloudata .keyvalue .operation .KeyValueOperations ;
2827import com .cloudata .values .Value ;
28+ import com .google .common .base .Function ;
29+ import com .google .common .base .Preconditions ;
2930import com .google .common .base .Throwables ;
3031import com .google .common .cache .CacheBuilder ;
3132import com .google .common .cache .CacheLoader ;
3233import com .google .common .cache .LoadingCache ;
34+ import com .google .common .util .concurrent .Futures ;
3335import com .google .common .util .concurrent .ListenableFuture ;
36+ import com .google .common .util .concurrent .ListeningExecutorService ;
3437import com .google .protobuf .ByteString ;
3538import com .google .protobuf .InvalidProtocolBufferException ;
3639
@@ -41,7 +44,10 @@ public class KeyValueStateMachine implements StateMachine {
4144 private File baseDir ;
4245 final LoadingCache <Long , KeyValueStore > keyValueStoreCache ;
4346
44- public KeyValueStateMachine () {
47+ private final ListeningExecutorService executor ;
48+
49+ public KeyValueStateMachine (ListeningExecutorService executor ) {
50+ this .executor = executor ;
4551 KeyValueStoreCacheLoader loader = new KeyValueStoreCacheLoader ();
4652 this .keyValueStoreCache = CacheBuilder .newBuilder ().recordStats ().build (loader );
4753 }
@@ -65,7 +71,7 @@ public void init(RaftService raft, File stateDir) {
6571 // return raft.commit(entry.toByteArray());
6672 // }
6773
68- public < V > V doActionSync (KeyOperation < V > operation ) throws InterruptedException , RaftException {
74+ public ActionResponse doActionSync (KeyValueOperation operation ) throws InterruptedException , RaftException {
6975 try {
7076 return doActionAsync (operation ).get ();
7177 } catch (ExecutionException e ) {
@@ -80,50 +86,53 @@ public <V> V doActionSync(KeyOperation<V> operation) throws InterruptedException
8086 }
8187 }
8288
83- public <V > ListenableFuture <V > doActionAsync (KeyOperation <V > operation ) throws RaftException {
84- KvEntry entry = operation .serialize ();
89+ public ListenableFuture <ActionResponse > doActionAsync (final KeyValueOperation operation ) throws RaftException {
90+ if (operation .isReadOnly ()) {
91+ return executor .submit (new Callable <ActionResponse >() {
92+
93+ @ Override
94+ public ActionResponse call () throws Exception {
95+ // TODO: Need to check that we are the leader!!
96+
97+ long storeId = operation .getStoreId ();
98+
99+ KeyValueStore keyValueStore = getKeyValueStore (storeId );
100+
101+ keyValueStore .doAction (operation );
102+
103+ return operation .getResult ();
104+ }
105+ });
106+ }
107+
108+ KeyValueAction entry = operation .serialize ();
85109
86110 log .debug ("Proposing operation {}" , entry .getAction ());
87111
88- return (ListenableFuture <V >) raft .commitAsync (entry .toByteArray ());
112+ return Futures .transform (raft .commitAsync (entry .toByteArray ()), new Function <Object , ActionResponse >() {
113+
114+ @ Override
115+ public ActionResponse apply (Object input ) {
116+ Preconditions .checkArgument (input instanceof ActionResponse );
117+ return (ActionResponse ) input ;
118+ }
119+
120+ });
89121 }
90122
91123 @ Override
92124 public Object applyOperation (@ Nonnull ByteBuffer op ) {
93125 // TODO: We need to prevent repetition during replay
94126 // (we need idempotency)
95127 try {
96- KvEntry entry = KvEntry .parseFrom (ByteString .copyFrom (op ));
128+ KeyValueAction entry = KeyValueAction .parseFrom (ByteString .copyFrom (op ));
97129 log .debug ("Committing operation {}" , entry .getAction ());
98130
99131 long storeId = entry .getStoreId ();
100132
101133 KeyValueStore keyValueStore = getKeyValueStore (storeId );
102134
103- KeyOperation <?> operation ;
104-
105- switch (entry .getAction ()) {
106-
107- case APPEND :
108- operation = new AppendOperation (entry );
109- break ;
110-
111- case DELETE :
112- operation = new DeleteOperation (entry );
113- break ;
114-
115- case INCREMENT : {
116- operation = new IncrementOperation (entry );
117- break ;
118- }
119-
120- case SET :
121- operation = new SetOperation (entry );
122- break ;
123-
124- default :
125- throw new UnsupportedOperationException ();
126- }
135+ KeyValueOperation operation = KeyValueOperations .build (entry );
127136
128137 keyValueStore .doAction (operation );
129138
@@ -168,11 +177,13 @@ public KeyValueStore load(@Nonnull Long id) throws Exception {
168177 }
169178 }
170179
180+ // This should also really be an operation, but is special-cased for speed
171181 public Value get (long storeId , Keyspace keyspace , ByteString key ) {
172182 KeyValueStore keyValueStore = getKeyValueStore (storeId );
173183 return keyValueStore .get (keyspace .mapToKey (key ).asReadOnlyByteBuffer ());
174184 }
175185
186+ // This function should really be an operation, but we want to support streaming
176187 public BtreeQuery scan (long storeId , Keyspace keyspace , ByteString keyPrefix ) {
177188 KeyValueStore keyValueStore = getKeyValueStore (storeId );
178189 boolean stripKeyspace = true ;
0 commit comments