Skip to content

Commit

Permalink
Concurrency fixes for read shared mode.
Browse files Browse the repository at this point in the history
  • Loading branch information
jhalliday committed Feb 12, 2020
1 parent 3b88731 commit 576acf7
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 7 deletions.
Expand Up @@ -25,7 +25,7 @@
import java.nio.channels.*;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.util.EnumSet;
import java.util.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

Expand Down Expand Up @@ -310,6 +310,10 @@ public int write(ByteBuffer src, long position) throws IOException {

private int writeInternal(ByteBuffer src, int position) throws ClosedChannelException {

if(metadata.isReadShared()) {
throw new IllegalStateException();
}

if (position < metadata.getPersistenceIndex()) {
throw new IllegalArgumentException();
}
Expand All @@ -324,9 +328,7 @@ private int writeInternal(ByteBuffer src, int position) throws ClosedChannelExce
dst.put(srcSlice);
src.position(src.position() + length);

int startIndex = position;

persist(startIndex, length);
persist(position, length);

return length;
}
Expand Down Expand Up @@ -531,12 +533,16 @@ protected void implCloseChannel() throws IOException {
// https://bugs.openjdk.java.net/browse/JDK-4724038
unsafe.invokeCleaner(rawBuffer);

fileChannel.truncate(metadata.getPersistenceIndex());

// TODO close metadata (assuming it's private instance)
if(!metadata.isReadShared()) {
int persistenceIndex = metadata.getPersistenceIndex();
logger.trace("truncating {} to {}", file.getAbsolutePath(), persistenceIndex);
fileChannel.truncate(persistenceIndex);
}

fileChannel.close();

metadata.close();

} finally {
lock.unlock();
}
Expand Down
Expand Up @@ -190,6 +190,15 @@ public void persist(int startIndex, int length) throws ClosedChannelException {
logger.exit();
}

/**
* Returns the read sharing mode.
*
* @return true is read sharing is enabled, false otherwise.
*/
public boolean isReadShared() {
return readShared;
}

/**
* Reinitializes the instance.
*
Expand Down

0 comments on commit 576acf7

Please sign in to comment.