Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

ajustments to default storage callback logging

- factory injection
- less verbosity
  • Loading branch information...
commit c690cb8941d347116ad7cea3612cfc4703492c05 1 parent 96e0989
Matthieu Morel authored
13 s4-core/src/main/java/io/s4/ft/DefaultFileSystemStateStorage.java
View
@@ -1,5 +1,7 @@
package io.s4.ft;
+import io.s4.ft.SafeKeeper.StorageResultCode;
+
import java.io.File;
import java.io.FileFilter;
import java.io.FileInputStream;
@@ -60,7 +62,7 @@ public void saveState(SafeKeeperId key, byte[] state, StorageCallback callback)
try {
threadPool.submit(new SaveTask(key, state, callback, storageRootPath));
} catch (RejectedExecutionException e) {
- logger.error("Could not submit task to persist checkpoint. Remaining capacity for task queue is ["
+ callback.storageOperationResult(StorageResultCode.FAILURE, "Could not submit task to persist checkpoint. Remaining capacity for task queue is ["
+ threadPool.getQueue().remainingCapacity() + "] ; number of elements is ["
+ threadPool.getQueue().size() + "] ; maximum capacity is [" + maxOutstandingWriteRequests + "]");
}
@@ -246,7 +248,7 @@ public void run() {
// parent file has prototype id
if (!f.getParentFile().mkdir()) {
callback.storageOperationResult(SafeKeeper.StorageResultCode.FAILURE,
- "Cannot create directory for storing PE for prototype: "
+ "Cannot create directory for storing PE ["+key.toString() + "] for prototype: "
+ f.getParentFile().getAbsolutePath());
return;
}
@@ -255,7 +257,7 @@ public void run() {
try {
f.createNewFile();
} catch (IOException e) {
- callback.storageOperationResult(SafeKeeper.StorageResultCode.FAILURE, e.getMessage());
+ callback.storageOperationResult(SafeKeeper.StorageResultCode.FAILURE, key.toString() + " : " + e.getMessage());
return;
}
} else {
@@ -270,10 +272,11 @@ public void run() {
try {
fos = new FileOutputStream(f);
fos.write(state);
+ callback.storageOperationResult(SafeKeeper.StorageResultCode.SUCCESS, key.toString());
} catch (FileNotFoundException e) {
- callback.storageOperationResult(SafeKeeper.StorageResultCode.FAILURE, e.getMessage());
+ callback.storageOperationResult(SafeKeeper.StorageResultCode.FAILURE, key.toString() + " : " + e.getMessage());
} catch (IOException e) {
- callback.storageOperationResult(SafeKeeper.StorageResultCode.FAILURE, e.getMessage());
+ callback.storageOperationResult(SafeKeeper.StorageResultCode.FAILURE, key.toString() + " : " + e.getMessage());
} finally {
try {
if (fos != null) {
8 s4-core/src/main/java/io/s4/ft/LoggingStorageCallbackFactory.java
View
@@ -22,8 +22,12 @@ public StorageCallback createStorageCallback() {
@Override
public void storageOperationResult(StorageResultCode code, Object message) {
- if (SafeKeeper.logger.isInfoEnabled()) {
- SafeKeeper.logger.info("Callback from storage: " + message);
+ if (StorageResultCode.SUCCESS == code) {
+ if (SafeKeeper.logger.isDebugEnabled()) {
+ SafeKeeper.logger.debug("Callback from storage: " + message);
+ }
+ } else {
+ SafeKeeper.logger.warn("Callback from storage: " + message);
}
}
}
4 s4-core/src/main/java/io/s4/ft/RedisStateStorage.java
View
@@ -136,9 +136,9 @@ public void run() {
jedisPool.returnResource(jedis);
}
if ("OK".equals(statusCode)) {
- callback.storageOperationResult(StorageResultCode.SUCCESS, "Redis result code is [" + statusCode + "]");
+ callback.storageOperationResult(StorageResultCode.SUCCESS, "Redis result code is [" + statusCode + "] for key [" + key.toString() +"]");
} else {
- callback.storageOperationResult(StorageResultCode.FAILURE, "Unexpected redis result code : [" + statusCode + "]");
+ callback.storageOperationResult(StorageResultCode.FAILURE, "Unexpected redis result code : [" + statusCode + "] for key [" + key.toString() +"]");
}
}
6 s4-core/src/main/java/io/s4/ft/SafeKeeper.java
View
@@ -7,11 +7,7 @@
import io.s4.serialize.SerializerDeserializer;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import org.apache.log4j.Logger;
@@ -32,7 +28,7 @@
SUCCESS, FAILURE
}
- static Logger logger = Logger.getLogger(SafeKeeper.class);
+ static Logger logger = Logger.getLogger("s4-ft");
private StateStorage stateStorage;
private Dispatcher loopbackDispatcher;
private SerializerDeserializer serializer;
3  s4-core/src/test/java/io/s4/ft/s4_core_conf_bk_backend.xml
View
@@ -182,8 +182,11 @@
<property name="loopbackDispatcher" ref="loopbackDispatcher" />
<property name="serializer" ref="serDeser"/>
<property name="hasher" ref="hasher"/>
+ <property name="storageCallbackFactory" ref="loggingStorageCallbackFactory"/>
</bean>
+ <bean id="loggingStorageCallbackFactory" class="io.s4.ft.LoggingStorageCallbackFactory"/>
+
<bean id="bkStateStorage" class="io.s4.ft.BookKeeperStateStorage" init-method="init">
<!-- if not specified, default is <current_dir>/tmp/storage
<property name="storageRootPath" value="${storage_root_path}" /> -->
5 s4-core/src/test/java/io/s4/ft/s4_core_conf_fs_backend.xml
View
@@ -182,8 +182,11 @@
<property name="loopbackDispatcher" ref="loopbackDispatcher" />
<property name="serializer" ref="serDeser"/>
<property name="hasher" ref="hasher"/>
+ <property name="storageCallbackFactory" ref="loggingStorageCallbackFactory"/>
</bean>
-
+
+ <bean id="loggingStorageCallbackFactory" class="io.s4.ft.LoggingStorageCallbackFactory"/>
+
<bean id="fsStateStorage" class="io.s4.ft.DefaultFileSystemStateStorage" init-method="init">
<!-- if not specified, default is <current_dir>/tmp/storage
<property name="storageRootPath" value="${storage_root_path}" /> -->
3  s4-core/src/test/java/io/s4/ft/wordcount/s4_core_conf_bk_backend.xml
View
@@ -182,8 +182,11 @@
<property name="loopbackDispatcher" ref="loopbackDispatcher" />
<property name="serializer" ref="serDeser"/>
<property name="hasher" ref="hasher"/>
+ <property name="storageCallbackFactory" ref="loggingStorageCallbackFactory"/>
</bean>
+ <bean id="loggingStorageCallbackFactory" class="io.s4.ft.LoggingStorageCallbackFactory"/>
+
<bean id="bkStateStorage" class="io.s4.ft.BookKeeperStateStorage" init-method="init">
<property name="zkServers" value="localhost:21810"/>
<property name="ensembleSize" value="3"/>
3  s4-core/src/test/java/io/s4/ft/wordcount/s4_core_conf_fs_backend.xml
View
@@ -182,7 +182,10 @@
<property name="loopbackDispatcher" ref="loopbackDispatcher" />
<property name="serializer" ref="serDeser"/>
<property name="hasher" ref="hasher"/>
+ <property name="storageCallbackFactory" ref="loggingStorageCallbackFactory"/>
</bean>
+
+ <bean id="loggingStorageCallbackFactory" class="io.s4.ft.LoggingStorageCallbackFactory"/>
<bean id="fsStateStorage" class="io.s4.ft.DefaultFileSystemStateStorage" init-method="init">
<!-- if not specified, default is <current_dir>/tmp/storage
4 s4-core/src/test/java/io/s4/ft/wordcount/s4_core_conf_redis_backend.xml
View
@@ -182,8 +182,12 @@
<property name="loopbackDispatcher" ref="loopbackDispatcher" />
<property name="serializer" ref="serDeser"/>
<property name="hasher" ref="hasher"/>
+ <property name="storageCallbackFactory" ref="loggingStorageCallbackFactory"/>
</bean>
+ <bean id="loggingStorageCallbackFactory" class="io.s4.ft.LoggingStorageCallbackFactory"/>
+
+
<bean id="redisStateStorage" class="io.s4.ft.RedisStateStorage" init-method="init">
<property name="redisHost" value="localhost"/>
<property name="redisPort" value="6379"/>
5 s4-examples/twittertopiccount-ft/src/main/resources/s4-core-conf.xml
View
@@ -170,7 +170,12 @@
<property name="loopbackDispatcher" ref="loopbackDispatcher" />
<property name="serializer" ref="serDeser"/>
<property name="hasher" ref="hasher"/>
+ <property name="storageCallbackFactory" ref="loggingStorageCallbackFactory"/>
</bean>
+
+ <bean id="loggingStorageCallbackFactory" class="io.s4.ft.LoggingStorageCallbackFactory"/>
+
+
<bean id="fsStateStorage" class="io.s4.ft.DefaultFileSystemStateStorage" init-method="init">
<!-- if not specified, default is <current_dir>/tmp/storage
Please sign in to comment.
Something went wrong with that request. Please try again.