Skip to content

Commit

Permalink
Cleans up ByteBuffer creation and use in some core-edge specific classes
Browse files Browse the repository at this point in the history
This commit removes buffers that were allocated but not used, and makes
 sure to reuse buffers in NaiveDurableRaftLog instead of having them
 instantiated in every method call. This class is not thread safe anyway
 because of the way channels are written to.
  • Loading branch information
digitalstain authored and jimwebber committed Jan 18, 2016
1 parent 77a1020 commit 3f99ec0
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 42 deletions.
Expand Up @@ -25,17 +25,12 @@
import java.util.Set; import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CopyOnWriteArraySet;


import org.neo4j.coreedge.raft.log.monitoring.RaftLogAppendIndexMonitor;
import org.neo4j.coreedge.raft.log.monitoring.RaftLogCommitIndexMonitor;
import org.neo4j.coreedge.raft.replication.MarshallingException; import org.neo4j.coreedge.raft.replication.MarshallingException;
import org.neo4j.coreedge.raft.replication.ReplicatedContent; import org.neo4j.coreedge.raft.replication.ReplicatedContent;
import org.neo4j.coreedge.raft.replication.Serializer; import org.neo4j.coreedge.raft.replication.Serializer;
import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.fs.StoreChannel; import org.neo4j.io.fs.StoreChannel;
import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.kernel.monitoring.Monitors;

import static org.neo4j.coreedge.raft.log.RaftLog.*;




/** /**
Expand Down Expand Up @@ -73,6 +68,11 @@ public class NaiveDurableRaftLog extends LifecycleAdapter implements RaftLog
public static final int COMMIT_INDEX_BYTES = 8; public static final int COMMIT_INDEX_BYTES = 8;
public static final String DIRECTORY_NAME = "raft-log"; public static final String DIRECTORY_NAME = "raft-log";


private final ByteBuffer entryRecordBuffer = ByteBuffer.allocate( ENTRY_RECORD_LENGTH );
private final ByteBuffer contentLengthBuffer = ByteBuffer.allocate( CONTENT_LENGTH_BYTES );
private final ByteBuffer commitIndexBuffer = ByteBuffer.allocate( COMMIT_INDEX_BYTES );


private final Set<Listener> listeners = new CopyOnWriteArraySet<>(); private final Set<Listener> listeners = new CopyOnWriteArraySet<>();


private final StoreChannel entriesChannel; private final StoreChannel entriesChannel;
Expand Down Expand Up @@ -111,7 +111,7 @@ public void shutdown() throws Throwable
{ {
Exception container = new Exception( "Exception happened during shutdown of RaftLog. See suppressed " + Exception container = new Exception( "Exception happened during shutdown of RaftLog. See suppressed " +
"exceptions for details" ); "exceptions for details" );
boolean shouldThrow = false; boolean shouldThrow;
shouldThrow = forceAndCloseChannel( entriesChannel, container ); shouldThrow = forceAndCloseChannel( entriesChannel, container );
shouldThrow = forceAndCloseChannel( contentChannel, container ) || shouldThrow; shouldThrow = forceAndCloseChannel( contentChannel, container ) || shouldThrow;
shouldThrow = forceAndCloseChannel( commitChannel, container ) || shouldThrow; shouldThrow = forceAndCloseChannel( commitChannel, container ) || shouldThrow;
Expand Down Expand Up @@ -340,12 +340,12 @@ public Entry( long term, long contentPointer )


private void writeEntry( Entry entry ) throws IOException private void writeEntry( Entry entry ) throws IOException
{ {
ByteBuffer buffer = ByteBuffer.allocate( ENTRY_RECORD_LENGTH ); entryRecordBuffer.clear();
buffer.putLong( entry.term ); entryRecordBuffer.putLong( entry.term );
buffer.putLong( entry.contentPointer ); entryRecordBuffer.putLong( entry.contentPointer );
buffer.flip(); entryRecordBuffer.flip();


entriesChannel.writeAll( buffer, (appendIndex + 1) * ENTRY_RECORD_LENGTH ); entriesChannel.writeAll( entryRecordBuffer, (appendIndex + 1) * ENTRY_RECORD_LENGTH );
entriesChannel.force( false ); entriesChannel.force( false );
} }


Expand All @@ -356,11 +356,11 @@ private Entry readEntry( long logIndex ) throws IOException
return new Entry( -1, -1 ); return new Entry( -1, -1 );
} }


ByteBuffer buffer = ByteBuffer.allocate( ENTRY_RECORD_LENGTH ); entryRecordBuffer.clear();
entriesChannel.read( buffer, logIndex * ENTRY_RECORD_LENGTH ); entriesChannel.read( entryRecordBuffer, logIndex * ENTRY_RECORD_LENGTH );
buffer.flip(); entryRecordBuffer.flip();
long term = buffer.getLong(); long term = entryRecordBuffer.getLong();
long contentPointer = buffer.getLong(); long contentPointer = entryRecordBuffer.getLong();
return new Entry( term, contentPointer ); return new Entry( term, contentPointer );
} }


Expand All @@ -369,7 +369,7 @@ private int writeContent( RaftLogEntry logEntry ) throws MarshallingException, I
ByteBuffer contentBuffer = serializer.serialize( logEntry.content() ); ByteBuffer contentBuffer = serializer.serialize( logEntry.content() );
int length = CONTENT_LENGTH_BYTES + contentBuffer.remaining(); int length = CONTENT_LENGTH_BYTES + contentBuffer.remaining();


ByteBuffer contentLengthBuffer = ByteBuffer.allocate( CONTENT_LENGTH_BYTES ); contentLengthBuffer.clear();
contentLengthBuffer.putInt( length ); contentLengthBuffer.putInt( length );
contentLengthBuffer.flip(); contentLengthBuffer.flip();
contentChannel.writeAll( contentLengthBuffer, contentOffset ); contentChannel.writeAll( contentLengthBuffer, contentOffset );
Expand All @@ -381,10 +381,10 @@ private int writeContent( RaftLogEntry logEntry ) throws MarshallingException, I


private ReplicatedContent readContentFrom( long contentPointer ) throws IOException, MarshallingException private ReplicatedContent readContentFrom( long contentPointer ) throws IOException, MarshallingException
{ {
ByteBuffer lengthBuffer = ByteBuffer.allocate( CONTENT_LENGTH_BYTES ); contentLengthBuffer.clear();
contentChannel.read( lengthBuffer, contentPointer ); contentChannel.read( contentLengthBuffer, contentPointer );
lengthBuffer.flip(); contentLengthBuffer.flip();
int contentLength = lengthBuffer.getInt(); int contentLength = contentLengthBuffer.getInt();


ByteBuffer contentBuffer = ByteBuffer.allocate( contentLength - CONTENT_LENGTH_BYTES ); ByteBuffer contentBuffer = ByteBuffer.allocate( contentLength - CONTENT_LENGTH_BYTES );
contentChannel.read( contentBuffer, contentPointer + CONTENT_LENGTH_BYTES ); contentChannel.read( contentBuffer, contentPointer + CONTENT_LENGTH_BYTES );
Expand All @@ -394,10 +394,10 @@ private ReplicatedContent readContentFrom( long contentPointer ) throws IOExcept


private void storeCommitIndex( long commitIndex ) throws IOException private void storeCommitIndex( long commitIndex ) throws IOException
{ {
ByteBuffer buffer = ByteBuffer.allocate( COMMIT_INDEX_BYTES ); commitIndexBuffer.clear();
buffer.putLong( commitIndex ); commitIndexBuffer.putLong( commitIndex );
buffer.flip(); commitIndexBuffer.flip();
commitChannel.writeAll( buffer, 0 ); commitChannel.writeAll( commitIndexBuffer, 0 );
commitChannel.force( false ); commitChannel.force( false );
} }


Expand All @@ -407,9 +407,9 @@ private long readCommitIndex() throws IOException
{ {
return -1; return -1;
} }
ByteBuffer buffer = ByteBuffer.allocate( COMMIT_INDEX_BYTES ); commitIndexBuffer.clear();
commitChannel.read( buffer, 0 ); commitChannel.read( commitIndexBuffer, 0 );
buffer.flip(); commitIndexBuffer.flip();
return buffer.getLong(); return commitIndexBuffer.getLong();
} }
} }
Expand Up @@ -49,7 +49,6 @@ public class OnDiskIdAllocationState extends LifecycleAdapter implements IdAlloc
private final InMemoryIdAllocationState inMemoryIdAllocationState; private final InMemoryIdAllocationState inMemoryIdAllocationState;


private final StatePersister<InMemoryIdAllocationState> statePersister; private final StatePersister<InMemoryIdAllocationState> statePersister;
private final ByteBuffer workingBuffer;
private final InMemoryIdAllocationState.InMemoryIdAllocationStateChannelMarshal marshal; private final InMemoryIdAllocationState.InMemoryIdAllocationStateChannelMarshal marshal;


public OnDiskIdAllocationState( FileSystemAbstraction fileSystemAbstraction, File stateDir, public OnDiskIdAllocationState( FileSystemAbstraction fileSystemAbstraction, File stateDir,
Expand All @@ -59,8 +58,6 @@ public OnDiskIdAllocationState( FileSystemAbstraction fileSystemAbstraction, Fil
File fileA = new File( stateDir, FILENAME + "a" ); File fileA = new File( stateDir, FILENAME + "a" );
File fileB = new File( stateDir, FILENAME + "b" ); File fileB = new File( stateDir, FILENAME + "b" );


this.workingBuffer = ByteBuffer.allocate( NUMBER_OF_BYTES_PER_WRITE );

IdAllocationStateRecoveryManager recoveryManager = IdAllocationStateRecoveryManager recoveryManager =
new IdAllocationStateRecoveryManager( fileSystemAbstraction, new IdAllocationStateRecoveryManager( fileSystemAbstraction,
new InMemoryIdAllocationState.InMemoryIdAllocationStateChannelMarshal() ); new InMemoryIdAllocationState.InMemoryIdAllocationStateChannelMarshal() );
Expand Down
Expand Up @@ -38,7 +38,6 @@ public class OnDiskTermState extends LifecycleAdapter implements TermState
public static final String FILENAME = "term."; public static final String FILENAME = "term.";
public static final String DIRECTORY_NAME = "term-state"; public static final String DIRECTORY_NAME = "term-state";


private final ByteBuffer workingBuffer;
private InMemoryTermState inMemoryTermState; private InMemoryTermState inMemoryTermState;


private final StatePersister<InMemoryTermState> statePersister; private final StatePersister<InMemoryTermState> statePersister;
Expand All @@ -50,20 +49,14 @@ public OnDiskTermState( FileSystemAbstraction fileSystemAbstraction, File stateD
File fileA = new File( stateDir, FILENAME + "a" ); File fileA = new File( stateDir, FILENAME + "a" );
File fileB = new File( stateDir, FILENAME + "b" ); File fileB = new File( stateDir, FILENAME + "b" );


workingBuffer = ByteBuffer.allocate( InMemoryVoteState.InMemoryVoteStateChannelMarshal
.NUMBER_OF_BYTES_PER_VOTE );


TermStateRecoveryManager recoveryManager = TermStateRecoveryManager recoveryManager =
new TermStateRecoveryManager( fileSystemAbstraction, new InMemoryTermStateChannelMarshal() ); new TermStateRecoveryManager( fileSystemAbstraction, new InMemoryTermStateChannelMarshal() );


final StateRecoveryManager.RecoveryStatus recoveryStatus = recoveryManager.recover( fileA, fileB ); final StateRecoveryManager.RecoveryStatus recoveryStatus = recoveryManager.recover( fileA, fileB );



this.inMemoryTermState = recoveryManager.readLastEntryFrom( fileSystemAbstraction, recoveryStatus this.inMemoryTermState = recoveryManager.readLastEntryFrom( fileSystemAbstraction, recoveryStatus
.previouslyActive() ); .previouslyActive() );



this.statePersister = new StatePersister<>( fileA, fileB, fileSystemAbstraction, numberOfEntriesBeforeRotation, this.statePersister = new StatePersister<>( fileA, fileB, fileSystemAbstraction, numberOfEntriesBeforeRotation,
new InMemoryTermStateChannelMarshal(), recoveryStatus.previouslyInactive(), new InMemoryTermStateChannelMarshal(), recoveryStatus.previouslyInactive(),
databaseHealthSupplier ); databaseHealthSupplier );
Expand Down
Expand Up @@ -48,9 +48,6 @@ public OnDiskVoteState( FileSystemAbstraction fileSystemAbstraction, File stateD
File fileA = new File( stateDir, FILENAME + "a" ); File fileA = new File( stateDir, FILENAME + "a" );
File fileB = new File( stateDir, FILENAME + "b" ); File fileB = new File( stateDir, FILENAME + "b" );


ByteBuffer workingBuffer = ByteBuffer.allocate( InMemoryVoteState.InMemoryVoteStateChannelMarshal
.NUMBER_OF_BYTES_PER_VOTE );

InMemoryVoteState.InMemoryVoteStateChannelMarshal<MEMBER> marshal = InMemoryVoteState.InMemoryVoteStateChannelMarshal<MEMBER> marshal =
new InMemoryVoteState.InMemoryVoteStateChannelMarshal<>( memberByteBufferMarshal ); new InMemoryVoteState.InMemoryVoteStateChannelMarshal<>( memberByteBufferMarshal );


Expand Down

0 comments on commit 3f99ec0

Please sign in to comment.