Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

refactored state storage API

    - delegated scheduling of save operations to SafeKeeper
  • Loading branch information...
commit c475042de6367d93f80bfd626e7109bc3dedf41b 1 parent ecccad9
Matthieu Morel authored
View
160 s4-core/src/main/java/io/s4/ft/DefaultFileSystemStateStorage.java
@@ -1,7 +1,5 @@
package io.s4.ft;
-import io.s4.ft.SafeKeeper.StorageResultCode;
-
import java.io.File;
import java.io.FileFilter;
import java.io.FileInputStream;
@@ -10,10 +8,6 @@
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
import org.apache.commons.codec.binary.Base64;
import org.apache.log4j.Logger;
@@ -36,36 +30,19 @@
*/
public class DefaultFileSystemStateStorage implements StateStorage {
- private static Logger logger = Logger.getLogger("s4-ft");
+ private static org.apache.log4j.Logger logger = Logger.getLogger(DefaultFileSystemStateStorage.class);
private String storageRootPath;
- ThreadPoolExecutor threadPool;
- int maxWriteThreads = 1;
- int writeThreadKeepAliveSeconds = 120;
- int maxOutstandingWriteRequests = 1000;
public DefaultFileSystemStateStorage() {
}
/**
* <p>
- * Must be called by the dependency injection framework.
+ * Called by the dependency injection framework, after construction.
* <p/>
*/
public void init() {
checkStorageDir();
- threadPool = new ThreadPoolExecutor(0, maxWriteThreads, writeThreadKeepAliveSeconds, TimeUnit.SECONDS,
- new ArrayBlockingQueue<Runnable>(maxOutstandingWriteRequests));
- }
-
- @Override
- public void saveState(SafeKeeperId key, byte[] state, StorageCallback callback) {
- try {
- threadPool.submit(new SaveTask(key, state, callback, storageRootPath));
- } catch (RejectedExecutionException e) {
- callback.storageOperationResult(StorageResultCode.FAILURE, "Could not submit task to persist checkpoint. Remaining capacity for task queue is ["
- + threadPool.getQueue().remainingCapacity() + "] ; number of elements is ["
- + threadPool.getQueue().size() + "] ; maximum capacity is [" + maxOutstandingWriteRequests + "]");
- }
}
@Override
@@ -107,15 +84,15 @@ public void saveState(SafeKeeperId key, byte[] state, StorageCallback callback)
in.close();
return buffer;
} catch (FileNotFoundException e1) {
- logger.error(e1);
+ logger.error(e1.getMessage(), e1);
} catch (IOException e2) {
- logger.error(e2);
+ logger.error(e2.getMessage(), e2);
} finally {
if (in != null) {
try {
in.close();
} catch (Exception e) {
- logger.warn(e);
+ logger.warn(e.getMessage(), e);
}
}
}
@@ -176,30 +153,6 @@ public void setStorageRootPath(String storageRootPath) {
}
}
- public int getWriteThreadKeepAliveSeconds() {
- return writeThreadKeepAliveSeconds;
- }
-
- public void setWriteThreadKeepAliveSeconds(int writeThreadKeepAliveSeconds) {
- this.writeThreadKeepAliveSeconds = writeThreadKeepAliveSeconds;
- }
-
- public int getMaxWriteThreads() {
- return maxWriteThreads;
- }
-
- public void setMaxWriteThreads(int maxWriteThreads) {
- this.maxWriteThreads = maxWriteThreads;
- }
-
- public int getMaxOutstandingWriteRequests() {
- return maxOutstandingWriteRequests;
- }
-
- public void setMaxOutstandingWriteRequests(int maxOutstandingWriteRequests) {
- this.maxOutstandingWriteRequests = maxOutstandingWriteRequests;
- }
-
public void checkStorageDir() {
if (storageRootPath == null) {
@@ -218,76 +171,57 @@ public void checkStorageDir() {
}
}
- /**
- *
- * Writing to storage is an asynchronous operation specified in this class.
- *
- */
- private static class SaveTask implements Runnable {
- SafeKeeperId key;
- byte[] state;
- StorageCallback callback;
- private String storageRootPath;
-
- public SaveTask(SafeKeeperId key, byte[] state, StorageCallback callback, String storageRootPath) {
- super();
- this.key = key;
- this.state = state;
- this.callback = callback;
- this.storageRootPath = storageRootPath;
+ @Override
+ public void saveState(SafeKeeperId key, byte[] state, StorageCallback callback) {
+ File f = safeKeeperID2File(key, storageRootPath);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Checkpointing [" + key + "] into file: [" + f.getAbsolutePath() + "]");
}
-
- public void run() {
- File f = safeKeeperID2File(key, storageRootPath);
- if (logger.isDebugEnabled()) {
- logger.debug("Checkpointing [" + key + "] into file: [" + f.getAbsolutePath() + "]");
- }
- if (!f.exists()) {
- if (!f.getParentFile().exists()) {
- // parent file has prototype id
- if (!f.getParentFile().mkdir()) {
- callback.storageOperationResult(SafeKeeper.StorageResultCode.FAILURE,
- "Cannot create directory for storing PE ["+key.toString() + "] for prototype: "
- + f.getParentFile().getAbsolutePath());
- return;
- }
- }
- // TODO handle IO exception
- try {
- f.createNewFile();
- } catch (IOException e) {
- callback.storageOperationResult(SafeKeeper.StorageResultCode.FAILURE, key.toString() + " : " + e.getMessage());
- return;
- }
- } else {
- if (!f.delete()) {
+ if (!f.exists()) {
+ if (!f.getParentFile().exists()) {
+ // parent file has prototype id
+ if (!f.getParentFile().mkdir()) {
callback.storageOperationResult(SafeKeeper.StorageResultCode.FAILURE,
- "Cannot delete previously saved checkpoint file [" + f.getParentFile().getAbsolutePath()
- + "]");
- return;
+ "Cannot create directory for storing PE [" + key.toString() + "] for prototype: "
+ + f.getParentFile().getAbsolutePath());
+ return ;
}
}
- FileOutputStream fos = null;
+ // TODO handle IO exception
try {
- fos = new FileOutputStream(f);
- fos.write(state);
- callback.storageOperationResult(SafeKeeper.StorageResultCode.SUCCESS, key.toString());
- } catch (FileNotFoundException e) {
- callback.storageOperationResult(SafeKeeper.StorageResultCode.FAILURE, key.toString() + " : " + e.getMessage());
+ f.createNewFile();
} catch (IOException e) {
- callback.storageOperationResult(SafeKeeper.StorageResultCode.FAILURE, key.toString() + " : " + e.getMessage());
- } finally {
- try {
- if (fos != null) {
- fos.close();
- }
- } catch (IOException e) {
- logger.error(e);
+ callback.storageOperationResult(SafeKeeper.StorageResultCode.FAILURE,
+ key.toString() + " : " + e.getMessage());
+ return ;
+ }
+ } else {
+ if (!f.delete()) {
+ callback.storageOperationResult(SafeKeeper.StorageResultCode.FAILURE,
+ "Cannot delete previously saved checkpoint file [" + f.getParentFile().getAbsolutePath() + "]");
+ return ;
+ }
+ }
+ FileOutputStream fos = null;
+ try {
+ fos = new FileOutputStream(f);
+ fos.write(state);
+ callback.storageOperationResult(SafeKeeper.StorageResultCode.SUCCESS, key.toString());
+ } catch (FileNotFoundException e) {
+ callback.storageOperationResult(SafeKeeper.StorageResultCode.FAILURE,
+ key.toString() + " : " + e.getMessage());
+ } catch (IOException e) {
+ callback.storageOperationResult(SafeKeeper.StorageResultCode.FAILURE,
+ key.toString() + " : " + e.getMessage());
+ } finally {
+ try {
+ if (fos != null) {
+ fos.close();
}
+ } catch (IOException e) {
+ logger.error(e.getMessage(), e);
}
-
}
-
}
}
View
12 s4-core/src/main/java/io/s4/ft/LoggingStorageCallbackFactory.java
@@ -2,6 +2,8 @@
import io.s4.ft.SafeKeeper.StorageResultCode;
+import org.apache.log4j.Logger;
+
/**
* A factory for creating storage callbacks that simply log callback results
*
@@ -18,16 +20,18 @@ public StorageCallback createStorageCallback() {
* A basic storage callback that simply logs results from storage operations
*
*/
- class StorageCallbackLogger implements StorageCallback {
+ static class StorageCallbackLogger implements StorageCallback {
+
+ private static Logger logger = Logger.getLogger("s4-ft");
@Override
public void storageOperationResult(StorageResultCode code, Object message) {
if (StorageResultCode.SUCCESS == code) {
- if (SafeKeeper.logger.isDebugEnabled()) {
- SafeKeeper.logger.debug("Callback from storage: " + StorageResultCode.SUCCESS + " : " + message);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Callback from storage: " + StorageResultCode.SUCCESS.name() + " : " + message);
}
} else {
- SafeKeeper.logger.warn("Callback from storage: " +StorageResultCode.FAILURE + " : " + message);
+ logger.warn("Callback from storage: " + StorageResultCode.FAILURE.name() + " : " + message);
}
}
}
View
81 s4-core/src/main/java/io/s4/ft/RedisStateStorage.java
@@ -36,28 +36,6 @@
private JedisPool jedisPool;
private String redisHost;
private int redisPort;
- private ThreadPoolExecutor threadPool;
- // TODO: should probably define a lower default value...
- int maxWriteThreads = Runtime.getRuntime().availableProcessors() == 1 ? 1 : (Runtime.getRuntime()
- .availableProcessors() - 1);
- int writeThreadKeepAliveSeconds = 120;
- int maxOutstandingWriteRequests = 1000;
-
- public String getRedisHost() {
- return redisHost;
- }
-
- public void setRedisHost(String redisHost) {
- this.redisHost = redisHost;
- }
-
- public int getRedisPort() {
- return redisPort;
- }
-
- public void setRedisPort(int redisPort) {
- this.redisPort = redisPort;
- }
public void clear() {
Jedis jedis = jedisPool.getResource();
@@ -72,20 +50,21 @@ public void init() {
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
// TODO optional parameterization
jedisPool = new JedisPool(jedisPoolConfig, redisHost, redisPort);
- threadPool = new ThreadPoolExecutor(0, maxWriteThreads, writeThreadKeepAliveSeconds, TimeUnit.SECONDS,
- new ArrayBlockingQueue<Runnable>(maxOutstandingWriteRequests));
}
@Override
public void saveState(SafeKeeperId key, byte[] state, StorageCallback callback) {
+ Jedis jedis = jedisPool.getResource();
+ String statusCode = "UNKNOWN";
try {
- threadPool.submit(new SaveTask(key, state, callback, jedisPool));
- } catch (RejectedExecutionException e) {
- callback.storageOperationResult(StorageResultCode.FAILURE,
- "Could not submit task to persist checkpoint. Remaining capacity for task queue is ["
- + threadPool.getQueue().remainingCapacity() + "] ; number of elements is ["
- + threadPool.getQueue().size() + "] ; maximum capacity is [" + maxOutstandingWriteRequests
- + "]");
+ statusCode = jedis.set(key.getStringRepresentation().getBytes(), state);
+ } finally {
+ jedisPool.returnResource(jedis);
+ }
+ if ("OK".equals(statusCode)) {
+ callback.storageOperationResult(StorageResultCode.SUCCESS, "Redis result code is [" + statusCode + "] for key [" + key.toString() +"]");
+ } else {
+ callback.storageOperationResult(StorageResultCode.FAILURE, "Unexpected redis result code : [" + statusCode + "] for key [" + key.toString() +"]");
}
}
@@ -113,35 +92,21 @@ public void saveState(SafeKeeperId key, byte[] state, StorageCallback callback)
}
}
+
+ public String getRedisHost() {
+ return redisHost;
+ }
- private static class SaveTask implements Runnable {
- SafeKeeperId key;
- byte[] state;
- StorageCallback callback;
- JedisPool jedisPool;
-
- public SaveTask(SafeKeeperId key, byte[] state, StorageCallback callback, JedisPool jedisPool) {
- super();
- this.key = key;
- this.state = state;
- this.callback = callback;
- this.jedisPool = jedisPool;
- }
+ public void setRedisHost(String redisHost) {
+ this.redisHost = redisHost;
+ }
- public void run() {
- Jedis jedis = jedisPool.getResource();
- String statusCode = "UNKNOWN";
- try {
- statusCode = jedis.set(key.getStringRepresentation().getBytes(), state);
- } finally {
- jedisPool.returnResource(jedis);
- }
- if ("OK".equals(statusCode)) {
- callback.storageOperationResult(StorageResultCode.SUCCESS, "Redis result code is [" + statusCode + "] for key [" + key.toString() +"]");
- } else {
- callback.storageOperationResult(StorageResultCode.FAILURE, "Unexpected redis result code : [" + statusCode + "] for key [" + key.toString() +"]");
- }
- }
+ public int getRedisPort() {
+ return redisPort;
+ }
+ public void setRedisPort(int redisPort) {
+ this.redisPort = redisPort;
}
+
}
View
70 s4-core/src/main/java/io/s4/ft/SafeKeeper.java
@@ -8,7 +8,11 @@
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
@@ -16,7 +20,8 @@
*
* <p>
* This class is responsible for coordinating interactions between the S4 event
- * processor and the checkpoint storage backend.
+ * processor and the checkpoint storage backend. In particular, it schedules
+ * asynchronous save tasks to be executed on the backend.
* </p>
*
*
@@ -28,7 +33,7 @@
SUCCESS, FAILURE
}
- static Logger logger = Logger.getLogger("s4-ft");
+ private static Logger logger = Logger.getLogger("s4-ft");
private StateStorage stateStorage;
private Dispatcher loopbackDispatcher;
private SerializerDeserializer serializer;
@@ -38,15 +43,21 @@
private CountDownLatch signalNodesAvailable = new CountDownLatch(1);
private StorageCallbackFactory storageCallbackFactory = new LoggingStorageCallbackFactory();
+ ThreadPoolExecutor threadPool;
+
+ int maxWriteThreads = 1;
+ int writeThreadKeepAliveSeconds = 120;
+ int maxOutstandingWriteRequests = 1000;
+
public SafeKeeper() {
}
/**
* <p>
* This init() method <b>must</b> be called by the dependency injection
- * framework. It waits until all required dependencies
- * are injected in SafeKeeper, and until the node count is accessible from
- * the communication layer.
+ * framework. It waits until all required dependencies are injected in
+ * SafeKeeper, and until the node count is accessible from the communication
+ * layer.
* </p>
*/
public void init() {
@@ -55,6 +66,12 @@ public void init() {
} catch (InterruptedException e1) {
e1.printStackTrace();
}
+ threadPool = new ThreadPoolExecutor(1, maxWriteThreads, writeThreadKeepAliveSeconds, TimeUnit.SECONDS,
+ new ArrayBlockingQueue<Runnable>(maxOutstandingWriteRequests));
+ logger.debug("Started thread pool with maxWriteThreads=[" + maxWriteThreads
+ + "], writeThreadKeepAliveSeconds=[" + writeThreadKeepAliveSeconds + "], maxOutsandingWriteRequests=["
+ + maxOutstandingWriteRequests + "]");
+
int nodeCount = getLoopbackDispatcher().getEventEmitter().getNodeCount();
// required wait until nodes are available
while (nodeCount == 0) {
@@ -76,8 +93,22 @@ public void init() {
* @param state
* checkpoint data
*/
- public void saveState(SafeKeeperId key, byte[] state) {
- stateStorage.saveState(key, state, storageCallbackFactory.createStorageCallback());
+ public void saveState(SafeKeeperId safeKeeperId, byte[] serializedState) {
+ StorageCallback storageCallback = storageCallbackFactory.createStorageCallback();
+ try {
+ threadPool.submit(createSaveStateTask(safeKeeperId, serializedState));
+ } catch (RejectedExecutionException e) {
+ storageCallback.storageOperationResult(StorageResultCode.FAILURE,
+ "Could not submit task to persist checkpoint. Remaining capacity for task queue is ["
+ + threadPool.getQueue().remainingCapacity() + "] ; number of elements is ["
+ + threadPool.getQueue().size() + "] ; maximum capacity is [" + maxOutstandingWriteRequests
+ + "]");
+ }
+ }
+
+ private SaveStateTask createSaveStateTask(SafeKeeperId safeKeeperId, byte[] serializedState) {
+ return new SaveStateTask(safeKeeperId, serializedState, storageCallbackFactory.createStorageCallback(),
+ stateStorage);
}
/**
@@ -183,6 +214,29 @@ public StorageCallbackFactory getStorageCallbackFactory() {
public void setStorageCallbackFactory(StorageCallbackFactory storageCallbackFactory) {
this.storageCallbackFactory = storageCallbackFactory;
}
-
+
+ public int getMaxWriteThreads() {
+ return maxWriteThreads;
+ }
+
+ public void setMaxWriteThreads(int maxWriteThreads) {
+ this.maxWriteThreads = maxWriteThreads;
+ }
+
+ public int getWriteThreadKeepAliveSeconds() {
+ return writeThreadKeepAliveSeconds;
+ }
+
+ public void setWriteThreadKeepAliveSeconds(int writeThreadKeepAliveSeconds) {
+ this.writeThreadKeepAliveSeconds = writeThreadKeepAliveSeconds;
+ }
+
+ public int getMaxOutstandingWriteRequests() {
+ return maxOutstandingWriteRequests;
+ }
+
+ public void setMaxOutstandingWriteRequests(int maxOutstandingWriteRequests) {
+ this.maxOutstandingWriteRequests = maxOutstandingWriteRequests;
+ }
}
View
28 s4-core/src/main/java/io/s4/ft/SaveStateTask.java
@@ -0,0 +1,28 @@
+package io.s4.ft;
+
+
+/**
+ *
+ * Encapsulates a checkpoint request. It is scheduled by the checkpointing framework.
+ *
+ */
+public class SaveStateTask implements Runnable {
+
+ SafeKeeperId safeKeeperId;
+ byte[] state;
+ StorageCallback storageCallback;
+ StateStorage stateStorage;
+
+ public SaveStateTask(SafeKeeperId safeKeeperId, byte[] state, StorageCallback storageCallback, StateStorage stateStorage) {
+ super();
+ this.safeKeeperId = safeKeeperId;
+ this.state = state;
+ this.storageCallback = storageCallback;
+ this.stateStorage = stateStorage;
+ }
+
+ @Override
+ public void run() {
+ stateStorage.saveState(safeKeeperId, state, storageCallback);
+ }
+}
View
12 s4-core/src/main/java/io/s4/ft/StateStorage.java
@@ -12,8 +12,12 @@
public interface StateStorage {
/**
- * <i>Asynchronous</i> call for storing the checkpoint data
+ * Stores a checkpoint.
*
+ * <p>
+ * NOTE: we don't handle any failure/success return value, because all
+ * failure/success notifications go through the StorageCallback reference
+ * </p>
* @param key
* safeKeeperId
* @param state
@@ -25,9 +29,7 @@
public void saveState(SafeKeeperId key, byte[] state, StorageCallback callback);
/**
- * <p>
- * <i>Synchronous</i> call to fetch stored checkpoint data.
- * </p>
+ * Fetches data for a stored checkpoint.
* <p>
* Must return null if storage does not contain this key.
* </p>
@@ -41,7 +43,7 @@
public byte[] fetchState(SafeKeeperId key);
/**
- * <i>Synchronous</i> call to fetch all stored safeKeeper Ids.
+ * Fetches all stored safeKeeper Ids.
*
* @return all stored safeKeeper Ids.
*/
View
7 s4-core/src/test/java/io/s4/ft/RecoveryTest.java
@@ -1,6 +1,7 @@
package io.s4.ft;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import junit.framework.Assert;
@@ -71,9 +72,6 @@ public void testCheckpointRestorationThroughApplicationEvent()
final CountDownLatch signalCheckpointed = new CountDownLatch(1);
TestUtils.watchAndSignalCreation("/checkpointed", signalCheckpointed,
zk);
- final CountDownLatch signalCheckpointAddedByBK = new CountDownLatch(1);
- // TestUtils.watchAndSignalChangedChildren("/s4/checkpoints",
- // signalCheckpointAddedByBK, zk);
// trigger checkpoint
gen.injectValueEvent(new KeyValue("initiateCheckpoint", "blah"),
"Stream1", 0);
@@ -103,9 +101,8 @@ public void testCheckpointRestorationThroughApplicationEvent()
TestUtils.watchAndSignalCreation("/value2Set", signalValue2Set, zk);
gen.injectValueEvent(new KeyValue("value2", "message2"), "Stream1", 0);
- signalValue2Set.await();
+ signalValue2Set.await(10, TimeUnit.SECONDS);
- System.out.println("#2");
// we should get "message1" (checkpointed) instead of "message1b"
// (latest)
Assert.assertEquals("value1=message1 ; value2=message2",
View
6 s4-core/src/test/java/io/s4/ft/wordcount/FTWordCountTest.java
@@ -102,7 +102,7 @@ public void doTestCheckpointingAndRecovery(String backendConf)
gen.injectValueEvent(
new KeyValue("sentence", WordCountTest.SENTENCE_1),
"Sentences", 0);
- signalSentence1Processed.await();
+ signalSentence1Processed.await(10, TimeUnit.SECONDS);
Thread.sleep(1000);
@@ -131,7 +131,7 @@ public void doTestCheckpointingAndRecovery(String backendConf)
new KeyValue("sentence", WordCountTest.SENTENCE_2),
"Sentences", 0);
- sentence2Processed.await();
+ sentence2Processed.await(10, TimeUnit.SECONDS);
Thread.sleep(1000);
// crash the app
@@ -148,7 +148,7 @@ public void doTestCheckpointingAndRecovery(String backendConf)
gen.injectValueEvent(
new KeyValue("sentence", WordCountTest.SENTENCE_3),
"Sentences", 0);
- signalTextProcessed.await();
+ signalTextProcessed.await(10, TimeUnit.SECONDS);
File results = new File(S4TestCase.DEFAULT_TEST_OUTPUT_DIR
+ File.separator + "wordcount");
String s = TestUtils.readFile(results);
Please sign in to comment.
Something went wrong with that request. Please try again.