Permalink
Browse files

updated checkpointing logger names

  • Loading branch information...
1 parent e628327 commit 0e61639601a155c917ae445d3576b19c40e7fb34 Matthieu Morel committed Jul 12, 2011
@@ -68,7 +68,7 @@
//DeleteCallback,
AddCallback,
ReadCallback, StatCallback, DeleteCallback, org.apache.zookeeper.AsyncCallback.StringCallback {
- private static Logger logger = Logger.getLogger(BookKeeperStateStorage.class);
+ private static Logger logger = Logger.getLogger("s4-ft");
private String zkServers;
private BookKeeper bk;
@@ -183,6 +183,12 @@ public void saveState(SafeKeeperId key, byte[] state,
logger.debug("checkpointing: " + key);
}
SaveCtx sctx = new SaveCtx(key, state, callback);
+
+ // TODO
+ // if ledger exist for this prototype, use it, else fetch it from zookeeper, else create it
+ // then write entry to ledger
+ // then add entry id to index ledger
+
/*
* Creates a new ledger to store the checkpoint
*/
@@ -333,7 +339,7 @@ public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq,
fctx.state = seq.nextElement().getEntry();
} else {
- logger.error("Reading checkpoint failed.");
+ logger.error("Reading checkpoint failed : " + rc);
}
if(fctx.sb != null){
@@ -9,6 +9,7 @@
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -33,8 +34,7 @@
*/
public class DefaultFileSystemStateStorage implements StateStorage {
- private static Logger logger = Logger
- .getLogger(DefaultFileSystemStateStorage.class);
+ private static Logger logger = Logger.getLogger("s4-ft");
private String storageRootPath;
ThreadPoolExecutor threadPool;
int maxWriteThreads = 1;
@@ -46,19 +46,24 @@ public DefaultFileSystemStateStorage() {
/**
* <p>
- * Must be called by the dependency injection framework.<p/>
+ * Must be called by the dependency injection framework.
+ * <p/>
*/
public void init() {
checkStorageDir();
- threadPool = new ThreadPoolExecutor(0, maxWriteThreads,
- writeThreadKeepAliveSeconds, TimeUnit.SECONDS,
+ threadPool = new ThreadPoolExecutor(0, maxWriteThreads, writeThreadKeepAliveSeconds, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(maxOutstandingWriteRequests));
}
@Override
- public void saveState(SafeKeeperId key, byte[] state,
- StorageCallback callback) {
- threadPool.submit(new SaveTask(key, state, callback, storageRootPath));
+ public void saveState(SafeKeeperId key, byte[] state, StorageCallback callback) {
+ try {
+ threadPool.submit(new SaveTask(key, state, callback, storageRootPath));
+ } catch (RejectedExecutionException e) {
+ logger.error("Could not submit task to persist checkpoint. Remaining capacity for task queue is ["
+ + threadPool.getQueue().remainingCapacity() + "] ; number of elements is ["
+ + threadPool.getQueue().size() + "] ; maximum capacity is [" + maxOutstandingWriteRequests + "]");
+ }
}
@Override
@@ -82,23 +87,19 @@ public void saveState(SafeKeeperId key, byte[] state,
* array.
*/
if (length > Integer.MAX_VALUE) {
- throw new IOException("Error file is too large: "
- + file.getName() + " " + length + " bytes");
+ throw new IOException("Error file is too large: " + file.getName() + " " + length + " bytes");
}
byte[] buffer = new byte[(int) length];
int offSet = 0;
int numRead = 0;
- while (offSet < buffer.length
- && (numRead = in.read(buffer, offSet, buffer.length
- - offSet)) >= 0) {
+ while (offSet < buffer.length && (numRead = in.read(buffer, offSet, buffer.length - offSet)) >= 0) {
offSet += numRead;
}
if (offSet < buffer.length) {
- throw new IOException("Error, could not read entire file: "
- + file.getName() + " " + offSet + "/"
+ throw new IOException("Error, could not read entire file: " + file.getName() + " " + offSet + "/"
+ buffer.length + " bytes read");
}
@@ -147,15 +148,10 @@ public boolean accept(File file) {
}
// files kept as : root/<partitionId>/<prototypeId>/encodedKeyWithFullInfo
- private static File safeKeeperID2File(SafeKeeperId key,
- String storageRootPath) {
-
- return new File(storageRootPath
- + File.separator
- + key.getPrototypeId()
- + File.separator
- + Base64.encodeBase64URLSafeString(key
- .getStringRepresentation().getBytes()));
+ private static File safeKeeperID2File(SafeKeeperId key, String storageRootPath) {
+
+ return new File(storageRootPath + File.separator + key.getPrototypeId() + File.separator
+ + Base64.encodeBase64URLSafeString(key.getStringRepresentation().getBytes()));
}
private static SafeKeeperId file2SafeKeeperID(File file) {
@@ -173,8 +169,7 @@ public void setStorageRootPath(String storageRootPath) {
File rootPathFile = new File(storageRootPath);
if (!rootPathFile.exists()) {
if (!rootPathFile.mkdirs()) {
- logger.error("could not create root storage directory : "
- + storageRootPath);
+ logger.error("could not create root storage directory : " + storageRootPath);
}
}
@@ -207,18 +202,16 @@ public void setMaxOutstandingWriteRequests(int maxOutstandingWriteRequests) {
public void checkStorageDir() {
if (storageRootPath == null) {
- File defaultStorageDir = new File(System.getProperty("user.dir")
- + File.separator + "tmp" + File.separator + "storage");
+ File defaultStorageDir = new File(System.getProperty("user.dir") + File.separator + "tmp" + File.separator
+ + "storage");
storageRootPath = defaultStorageDir.getAbsolutePath();
if (logger.isInfoEnabled()) {
- logger.info("Unspecified storage dir; using default dir: "
- + defaultStorageDir.getAbsolutePath());
+ logger.info("Unspecified storage dir; using default dir: " + defaultStorageDir.getAbsolutePath());
}
if (!defaultStorageDir.exists()) {
if (!(defaultStorageDir.mkdirs())) {
logger.error("Storage directory not specified, and cannot create default storage directory : "
- + defaultStorageDir.getAbsolutePath()
- + "\n Checkpointing and recovery will be disabled.");
+ + defaultStorageDir.getAbsolutePath() + "\n Checkpointing and recovery will be disabled.");
}
}
}
@@ -227,16 +220,15 @@ public void checkStorageDir() {
/**
*
* Writing to storage is an asynchronous operation specified in this class.
- *
+ *
*/
private static class SaveTask implements Runnable {
SafeKeeperId key;
byte[] state;
StorageCallback callback;
private String storageRootPath;
- public SaveTask(SafeKeeperId key, byte[] state,
- StorageCallback callback, String storageRootPath) {
+ public SaveTask(SafeKeeperId key, byte[] state, StorageCallback callback, String storageRootPath) {
super();
this.key = key;
this.state = state;
@@ -247,15 +239,13 @@ public SaveTask(SafeKeeperId key, byte[] state,
public void run() {
File f = safeKeeperID2File(key, storageRootPath);
if (logger.isDebugEnabled()) {
- logger.debug("Checkpointing [" + key + "] into file: ["
- + f.getAbsolutePath() + "]");
+ logger.debug("Checkpointing [" + key + "] into file: [" + f.getAbsolutePath() + "]");
}
if (!f.exists()) {
if (!f.getParentFile().exists()) {
// parent file has prototype id
if (!f.getParentFile().mkdir()) {
- callback.storageOperationResult(
- SafeKeeper.StorageResultCode.FAILURE,
+ callback.storageOperationResult(SafeKeeper.StorageResultCode.FAILURE,
"Cannot create directory for storing PE for prototype: "
+ f.getParentFile().getAbsolutePath());
return;
@@ -265,17 +255,14 @@ public void run() {
try {
f.createNewFile();
} catch (IOException e) {
- callback.storageOperationResult(
- SafeKeeper.StorageResultCode.FAILURE,
- e.getMessage());
+ callback.storageOperationResult(SafeKeeper.StorageResultCode.FAILURE, e.getMessage());
return;
}
} else {
if (!f.delete()) {
- callback.storageOperationResult(
- SafeKeeper.StorageResultCode.FAILURE,
- "Cannot delete previously saved checkpoint file ["
- + f.getParentFile().getAbsolutePath() + "]");
+ callback.storageOperationResult(SafeKeeper.StorageResultCode.FAILURE,
+ "Cannot delete previously saved checkpoint file [" + f.getParentFile().getAbsolutePath()
+ + "]");
return;
}
}
@@ -284,11 +271,9 @@ public void run() {
fos = new FileOutputStream(f);
fos.write(state);
} catch (FileNotFoundException e) {
- callback.storageOperationResult(
- SafeKeeper.StorageResultCode.FAILURE, e.getMessage());
+ callback.storageOperationResult(SafeKeeper.StorageResultCode.FAILURE, e.getMessage());
} catch (IOException e) {
- callback.storageOperationResult(
- SafeKeeper.StorageResultCode.FAILURE, e.getMessage());
+ callback.storageOperationResult(SafeKeeper.StorageResultCode.FAILURE, e.getMessage());
} finally {
try {
if (fos != null) {
@@ -84,8 +84,6 @@ public String getName() {
}
}
- transient private static Logger LOG = Logger.getLogger(AbstractPE.class);
-
transient private Clock s4Clock;
// FIXME replaces monitor wait on AbstractPE, for triggering possible extra
// thread when checkpointing activated
@@ -624,11 +622,11 @@ private void restoreFieldsForClass(Class currentInOldStateClassHierarchy, Abstra
// TODO use reflectasm
field.set(this, field.get(oldState));
} catch (IllegalArgumentException e) {
- LOG.error("Cannot recover old state for this PE ["
+ Logger.getLogger("s4-ft").error("Cannot recover old state for this PE ["
+ this + "]", e);
return;
} catch (IllegalAccessException e) {
- LOG.error("Cannot recover old state for this PE ["
+ Logger.getLogger("s4-ft").error("Cannot recover old state for this PE ["
+ this + "]", e);
return;
}
@@ -25,6 +25,10 @@
<level value="info"/>
<appender-ref ref="S"/>
</logger>
+ <logger name="s4-ft">
+ <level value="info"/>
+ <appender-ref ref="R"/>
+ </logger>
<appender name="R" class="org.apache.log4j.DailyRollingFileAppender">
<param name="File" value="${log_loc}/s4-core/s4-core_${instanceId}.log" />
<layout class="org.apache.log4j.PatternLayout">

0 comments on commit 0e61639

Please sign in to comment.