Skip to content

Commit

Permalink
ISPN-979 Added different fsync modes for the FileCacheStore
Browse files Browse the repository at this point in the history
* With default or periodic fsync modes, a file channel is used to keep
track of changes, and these are synched via force() call when data is
read or after an interval.
* Default and periodic modes do not delete files when updating buckets,
instead they clear the channel and rewrite it.
  • Loading branch information
galderz authored and maniksurtani committed Jun 15, 2011
1 parent 29e3ef7 commit 1580807
Show file tree
Hide file tree
Showing 8 changed files with 450 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -414,9 +414,9 @@ public static void setValues(Object target, Map<?, ?> attribs, boolean isXmlAttr
for (Map.Entry entry : attribs.entrySet()) {
String propName = (String) entry.getKey();
String setter = BeanUtils.setterName(propName);
Method method;

try {
Method method;
if (isXmlAttribs) {
method = objectClass.getMethod(setter, Element.class);
method.invoke(target, entry.getValue());
Expand Down
271 changes: 237 additions & 34 deletions core/src/main/java/org/infinispan/loaders/file/FileCacheStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,15 @@
*/
package org.infinispan.loaders.file;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.*;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.infinispan.Cache;
import org.infinispan.config.ConfigurationException;
Expand All @@ -53,6 +52,7 @@
* @author Manik Surtani
* @author Mircea.Markus@jboss.com
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
* @author Galder Zamarreño
* @since 4.0
*/
@CacheLoaderMetadata(configurationClass = FileCacheStoreConfig.class)
Expand All @@ -64,6 +64,7 @@ public class FileCacheStore extends BucketBasedCacheStore {

FileCacheStoreConfig config;
File root;
FileSync fileSync;

/**
* @return root directory where all files for this {@link org.infinispan.loaders.CacheStore CacheStore} are written.
Expand Down Expand Up @@ -267,11 +268,14 @@ protected Bucket loadBucket(Integer hash) throws CacheLoaderException {
protected Bucket loadBucket(File bucketFile) throws CacheLoaderException, InterruptedException {
Bucket bucket = null;
if (bucketFile.exists()) {
if (log.isTraceEnabled()) {
if (trace) {
log.trace("Found bucket file: '" + bucketFile + "'");
}
FileInputStream is = null;
InputStream is = null;
try {
// It could happen that the output buffer might not have been
// flushed, so just in case, flush it to be able to read it.
fileSync.flush(bucketFile);
is = new FileInputStream(bucketFile);
bucket = (Bucket) objectFromInputStreamInReentrantMode(is);
} catch (InterruptedException ie) {
Expand All @@ -293,20 +297,17 @@ protected Bucket loadBucket(File bucketFile) throws CacheLoaderException, Interr
public void updateBucket(Bucket b) throws CacheLoaderException {
File f = new File(root, b.getBucketIdAsString());
if (f.exists()) {
if (!deleteFile(f)) {
if (!purgeFile(f)) {
log.problemsRemovingFile(f);
}
} else if (log.isTraceEnabled()) {
} else if (trace) {
log.tracef("Successfully deleted file: '%s'", f.getName());
}

if (!b.getEntries().isEmpty()) {
FileOutputStream fos = null;
try {
try {
byte[] bytes = marshaller.objectToByteBuffer(b);
fos = new FileOutputStream(f);
fos.write(bytes);
fos.flush();
fileSync.write(bytes, f);
} catch (IOException ex) {
log.errorSavingBucket(b, ex);
throw new CacheLoaderException(ex);
Expand All @@ -316,9 +317,6 @@ public void updateBucket(Bucket b) throws CacheLoaderException {
}
Thread.currentThread().interrupt(); // Restore interrupted status
}
finally {
safeClose(fos);
}
}
}

Expand All @@ -345,6 +343,24 @@ public void start() throws CacheLoaderException {
throw new ConfigurationException("Directory " + root.getAbsolutePath() + " does not exist and cannot be created!");
}
streamBufferSize = config.getStreamBufferSize();

switch(config.getFsyncMode()) {
case DEFAULT :
fileSync = new BufferedFileSync();
break;
case PER_WRITE:
fileSync = new PerWriteFileSync();
break;
case PERIODIC:
fileSync = new PeriodicFileSync(config.getFsyncInterval());
break;
}
}

@Override
public void stop() throws CacheLoaderException {
super.stop();
fileSync.stop();
}

public Bucket loadBucketContainingKey(String key) throws CacheLoaderException {
Expand All @@ -358,22 +374,209 @@ private boolean deleteFile(File f) {
return f.delete();
}

private boolean purgeFile(File f) {
if (trace) {
log.tracef("Really clear file %s", f);
}
try {
fileSync.purge(f);
return true;
}
catch (IOException e) {
if (trace)
log.trace("Error encountered while clearing file: " + f, e);
return false;
}
}

private Object objectFromInputStreamInReentrantMode(InputStream is) throws IOException, ClassNotFoundException, InterruptedException {
int len = is.available();
ExposedByteArrayOutputStream bytes = new ExposedByteArrayOutputStream(len);
byte[] buf = new byte[Math.min(len, 1024)];
int bytesRead;
while ((bytesRead = is.read(buf, 0, buf.length)) != -1) {
bytes.write(buf, 0, bytesRead);
}
is = new ByteArrayInputStream(bytes.getRawBuffer(), 0, bytes.size());
ObjectInput unmarshaller = marshaller.startObjectInput(is, true);
Object o = null;
try {
o = marshaller.objectFromObjectStream(unmarshaller);
} finally {
marshaller.finishObjectInput(unmarshaller);
if (len != 0) {
ExposedByteArrayOutputStream bytes = new ExposedByteArrayOutputStream(len);
byte[] buf = new byte[Math.min(len, 1024)];
int bytesRead;
while ((bytesRead = is.read(buf, 0, buf.length)) != -1) {
bytes.write(buf, 0, bytesRead);
}
is = new ByteArrayInputStream(bytes.getRawBuffer(), 0, bytes.size());
ObjectInput unmarshaller = marshaller.startObjectInput(is, true);
try {
o = marshaller.objectFromObjectStream(unmarshaller);
} finally {
marshaller.finishObjectInput(unmarshaller);
}
}
return o;
}

/**
* Specifies how the changes written to a file will be synched
* with the underlying file system.
*/
private interface FileSync {

/**
* Writes the given bytes to the file.
*
* @param bytes byte array containing the bytes to write.
* @param f File instance representing the location where to store the data.
* @throws IOException if an I/O error occurs
*/
void write(byte[] bytes, File f) throws IOException;

/**
* Force the file changes to be flushed to the underlying file system.
* Client code calling this flush method should in advance check whether
* the file exists and so this method assumes that check was already done.
*
* @param f File instance representing the location changes should be flushed to.
* @throws IOException if an I/O error occurs
*/
void flush(File f) throws IOException;

/**
* Forces the file to be purged. Implementations are free to decide what
* the best option should be here. For example, whether to delete the
* file, whether to empty it...etc.
*
* @param f File instance that should be purged.
* @throws IOException if an I/O error occurs
*/
void purge(File f) throws IOException;

/**
* Stop the file synching mechanism. This offers implementors the
* opportunity to do any cleanup when the cache stops.
*/
void stop();

}

private class BufferedFileSync implements FileSync {
protected final ConcurrentMap<String, FileChannel> streams =
new ConcurrentHashMap<String, FileChannel>();

@Override
public void write(byte[] bytes, File f) throws IOException {
String path = f.getPath();
FileChannel channel = streams.get(path);
if (channel == null) {
channel = createChannel(f);
streams.putIfAbsent(path, channel);
} else if (!f.exists()) {
f.createNewFile();
FileChannel oldChannel = channel;
channel = createChannel(f);
streams.replace(path, oldChannel, channel);
}
channel.write(ByteBuffer.wrap(bytes));
}

private FileChannel createChannel(File f) throws FileNotFoundException {
return new RandomAccessFile(f, "rw").getChannel();
}

@Override
public void flush(File f) throws IOException {
FileChannel channel = streams.get(f.getPath());
if (channel != null)
channel.force(true);

}

@Override
public void purge(File f) throws IOException {
// Avoid a delete per-se because it hampers any fsync-like functionality
// cos any cached file channel write won't change the file's exists
// status. So, clear the file rather than delete it.
FileChannel channel = streams.get(f.getPath());
channel.truncate(0);
// Apart from truncating, it's necessary to reset the position!
channel.position(0);
}

@Override
public void stop() {
for (FileChannel channel : streams.values())
Util.close(channel);

streams.clear();
}

}

private class PeriodicFileSync extends BufferedFileSync {
private final ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor();
protected final ConcurrentMap<String, IOException> flushErrors =
new ConcurrentHashMap<String, IOException>();

private PeriodicFileSync(long interval) {
executor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
for (Map.Entry<String, FileChannel> entry : streams.entrySet()) {
if (trace)
log.tracef("Flushing channel in %s", entry.getKey());
FileChannel channel = entry.getValue();
try {
channel.force(true);
} catch (IOException e) {
if (trace)
log.tracef(e, "Error flushing output stream for %s", entry.getKey());
flushErrors.putIfAbsent(entry.getKey(), e);
// If an error is encountered, close it. Next time it's used,
// the exception will be propagated back to the user.
Util.close(channel);
}
}
}
}, interval, interval, TimeUnit.MILLISECONDS);
}

@Override
public void write(byte[] bytes, File f) throws IOException {
String path = f.getPath();
IOException error = flushErrors.get(path);
if (error != null)
throw new IOException(String.format(
"Periodic flush of channel for %s failed", path), error);

super.write(bytes, f);
}

}

private class PerWriteFileSync implements FileSync {
@Override
public void write(byte[] bytes, File f) throws IOException {
FileOutputStream fos = null;
try {
fos = new FileOutputStream(f);
fos.write(bytes);
fos.flush();
} finally {
if (fos != null)
fos.close();
}
}

@Override
public void flush(File f) throws IOException {
// No-op since flush always happens upon write
}

@Override
public void purge(File f) throws IOException {
f.delete();
}

@Override
public void stop() {
// No-op
}
}


}
Loading

0 comments on commit 1580807

Please sign in to comment.