Skip to content

Commit

Permalink
Name it segmented raft log.
Browse files Browse the repository at this point in the history
  • Loading branch information
martinfurmanski committed May 9, 2016
1 parent 534e573 commit 389d0a0
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 26 deletions.
Expand Up @@ -35,15 +35,13 @@
import org.neo4j.cursor.IOCursor; import org.neo4j.cursor.IOCursor;
import org.neo4j.helpers.collection.LruCache; import org.neo4j.helpers.collection.LruCache;
import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogFile;
import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.Log; import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider; import org.neo4j.logging.LogProvider;


/** /**
* The physical RAFT log is an append only log supporting the operations required to support * The segmented RAFT log is an append only log supporting the operations required to support
* the RAFT consensus algorithm. The physical part relates to the fact that the implementation * the RAFT consensus algorithm.
* borrows from infrastructure around the already existing {@link PhysicalLogFile} and related.
* *
* A RAFT log must be able to append new entries, but also truncate not yet committed entries, * A RAFT log must be able to append new entries, but also truncate not yet committed entries,
* prune out old compacted entries and skip to a later starting point. * prune out old compacted entries and skip to a later starting point.
Expand All @@ -58,7 +56,7 @@
* by switching to the next segment file, called the next version. A new segment file is also started * by switching to the next segment file, called the next version. A new segment file is also started
* when the threshold for a particular file has been reached. * when the threshold for a particular file has been reached.
*/ */
public class SegmentedPhysicalRaftLog extends LifecycleAdapter implements RaftLog public class SegmentedRaftLog extends LifecycleAdapter implements RaftLog
{ {
private final Log log; private final Log log;


Expand All @@ -76,7 +74,7 @@ public class SegmentedPhysicalRaftLog extends LifecycleAdapter implements RaftLo


private State state; private State state;


public SegmentedPhysicalRaftLog( public SegmentedRaftLog(
FileSystemAbstraction fileSystem, FileSystemAbstraction fileSystem,
File directory, File directory,
long rotateAtSize, long rotateAtSize,
Expand Down
Expand Up @@ -54,7 +54,7 @@
import org.neo4j.coreedge.raft.log.physical.PhysicalRaftLog; import org.neo4j.coreedge.raft.log.physical.PhysicalRaftLog;
import org.neo4j.coreedge.raft.log.RaftLog; import org.neo4j.coreedge.raft.log.RaftLog;
import org.neo4j.coreedge.raft.log.RaftLogMetadataCache; import org.neo4j.coreedge.raft.log.RaftLogMetadataCache;
import org.neo4j.coreedge.raft.log.segmented.SegmentedPhysicalRaftLog; import org.neo4j.coreedge.raft.log.segmented.SegmentedRaftLog;
import org.neo4j.coreedge.raft.log.physical.PhysicalRaftLogFile; import org.neo4j.coreedge.raft.log.physical.PhysicalRaftLogFile;
import org.neo4j.coreedge.raft.membership.CoreMemberSetBuilder; import org.neo4j.coreedge.raft.membership.CoreMemberSetBuilder;
import org.neo4j.coreedge.raft.membership.MembershipWaiter; import org.neo4j.coreedge.raft.membership.MembershipWaiter;
Expand Down Expand Up @@ -527,7 +527,7 @@ private RaftLog createRaftLog(
long rotateAtSize = config.get( CoreEdgeClusterSettings.raft_log_rotation_size ); long rotateAtSize = config.get( CoreEdgeClusterSettings.raft_log_rotation_size );
int metaDataCacheSize = config.get( CoreEdgeClusterSettings.raft_log_meta_data_cache_size ); int metaDataCacheSize = config.get( CoreEdgeClusterSettings.raft_log_meta_data_cache_size );


return life.add( new SegmentedPhysicalRaftLog( return life.add( new SegmentedRaftLog(
fileSystem, fileSystem,
new File( clusterStateDirectory, PhysicalRaftLog.DIRECTORY_NAME ), new File( clusterStateDirectory, PhysicalRaftLog.DIRECTORY_NAME ),
rotateAtSize, rotateAtSize,
Expand Down
Expand Up @@ -24,7 +24,6 @@
import org.neo4j.coreedge.raft.log.DummyRaftableContentSerializer; import org.neo4j.coreedge.raft.log.DummyRaftableContentSerializer;
import org.neo4j.coreedge.raft.log.RaftLog; import org.neo4j.coreedge.raft.log.RaftLog;
import org.neo4j.coreedge.raft.log.RaftLogVerificationIT; import org.neo4j.coreedge.raft.log.RaftLogVerificationIT;
import org.neo4j.coreedge.raft.log.segmented.SegmentedPhysicalRaftLog;
import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.logging.NullLogProvider; import org.neo4j.logging.NullLogProvider;


Expand All @@ -41,7 +40,7 @@ protected RaftLog createRaftLog() throws Throwable
long rotateAtSizeBytes = 128; long rotateAtSizeBytes = 128;
int entryCacheSize = 4; int entryCacheSize = 4;


SegmentedPhysicalRaftLog newRaftLog = new SegmentedPhysicalRaftLog( fsa, directory, rotateAtSizeBytes, SegmentedRaftLog newRaftLog = new SegmentedRaftLog( fsa, directory, rotateAtSizeBytes,
new DummyRaftableContentSerializer(), NullLogProvider.getInstance(), entryCacheSize ); new DummyRaftableContentSerializer(), NullLogProvider.getInstance(), entryCacheSize );


newRaftLog.init(); newRaftLog.init();
Expand Down
Expand Up @@ -30,7 +30,6 @@
import org.neo4j.coreedge.raft.log.RaftLog; import org.neo4j.coreedge.raft.log.RaftLog;
import org.neo4j.coreedge.raft.log.RaftLogContractTest; import org.neo4j.coreedge.raft.log.RaftLogContractTest;
import org.neo4j.coreedge.raft.log.RaftLogEntry; import org.neo4j.coreedge.raft.log.RaftLogEntry;
import org.neo4j.coreedge.raft.log.segmented.SegmentedPhysicalRaftLog;
import org.neo4j.graphdb.mockfs.EphemeralFileSystemAbstraction; import org.neo4j.graphdb.mockfs.EphemeralFileSystemAbstraction;
import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.lifecycle.LifeSupport; import org.neo4j.kernel.lifecycle.LifeSupport;
Expand All @@ -41,9 +40,9 @@
import static org.neo4j.coreedge.raft.log.RaftLogHelper.readLogEntry; import static org.neo4j.coreedge.raft.log.RaftLogHelper.readLogEntry;


// TODO: Separate into small-cache and no-cache tests. Perhaps parameterize this one (0, 5, 1024) cache sizes. // TODO: Separate into small-cache and no-cache tests. Perhaps parameterize this one (0, 5, 1024) cache sizes.
public class SegmentedPhysicalRaftLogContractTest extends RaftLogContractTest public class SegmentedRaftLogContractTest extends RaftLogContractTest
{ {
private SegmentedPhysicalRaftLog raftLog; private SegmentedRaftLog raftLog;
private LifeSupport life = new LifeSupport(); private LifeSupport life = new LifeSupport();
private FileSystemAbstraction fileSystem; private FileSystemAbstraction fileSystem;


Expand All @@ -61,7 +60,7 @@ public void tearDown() throws Throwable
life.shutdown(); life.shutdown();
} }


private SegmentedPhysicalRaftLog createRaftLog( int cacheSize ) private SegmentedRaftLog createRaftLog( int cacheSize )
{ {
if ( fileSystem == null ) if ( fileSystem == null )
{ {
Expand All @@ -70,7 +69,7 @@ private SegmentedPhysicalRaftLog createRaftLog( int cacheSize )
File directory = new File( "raft-log" ); File directory = new File( "raft-log" );
fileSystem.mkdir( directory ); fileSystem.mkdir( directory );


SegmentedPhysicalRaftLog newRaftLog = new SegmentedPhysicalRaftLog( fileSystem, directory, 10 * 1024, SegmentedRaftLog newRaftLog = new SegmentedRaftLog( fileSystem, directory, 10 * 1024,
new DummyRaftableContentSerializer(), new DummyRaftableContentSerializer(),
NullLogProvider.getInstance(), cacheSize ); NullLogProvider.getInstance(), cacheSize );
life.add( newRaftLog ); life.add( newRaftLog );
Expand All @@ -83,7 +82,7 @@ private SegmentedPhysicalRaftLog createRaftLog( int cacheSize )
public void shouldReadBackInCachedEntry() throws Throwable public void shouldReadBackInCachedEntry() throws Throwable
{ {
// Given // Given
SegmentedPhysicalRaftLog raftLog = (SegmentedPhysicalRaftLog) createRaftLog(); SegmentedRaftLog raftLog = (SegmentedRaftLog) createRaftLog();
int term = 0; int term = 0;
ReplicatedInteger content = ReplicatedInteger.valueOf( 4 ); ReplicatedInteger content = ReplicatedInteger.valueOf( 4 );


Expand All @@ -101,7 +100,7 @@ public void shouldReadBackNonCachedEntry() throws Exception
{ {
// Given // Given
int cacheSize = 1; int cacheSize = 1;
SegmentedPhysicalRaftLog raftLog = createRaftLog( cacheSize ); SegmentedRaftLog raftLog = createRaftLog( cacheSize );
int term = 0; int term = 0;
ReplicatedInteger content1 = ReplicatedInteger.valueOf( 4 ); ReplicatedInteger content1 = ReplicatedInteger.valueOf( 4 );
ReplicatedInteger content2 = ReplicatedInteger.valueOf( 5 ); ReplicatedInteger content2 = ReplicatedInteger.valueOf( 5 );
Expand All @@ -124,7 +123,7 @@ public void shouldReadBackNonCachedEntry() throws Exception
public void shouldRestoreCommitIndexOnStartup() throws Throwable public void shouldRestoreCommitIndexOnStartup() throws Throwable
{ {
// Given // Given
SegmentedPhysicalRaftLog raftLog = createRaftLog( 100 /* cache size */ ); SegmentedRaftLog raftLog = createRaftLog( 100 /* cache size */ );
int term = 0; int term = 0;
ReplicatedInteger content1 = ReplicatedInteger.valueOf( 4 ); ReplicatedInteger content1 = ReplicatedInteger.valueOf( 4 );
ReplicatedInteger content2 = ReplicatedInteger.valueOf( 5 ); ReplicatedInteger content2 = ReplicatedInteger.valueOf( 5 );
Expand All @@ -144,7 +143,7 @@ public void shouldRestoreCommitIndexOnStartup() throws Throwable
public void shouldRestoreCorrectCommitAndAppendIndexOnStartupAfterTruncation() throws Exception public void shouldRestoreCorrectCommitAndAppendIndexOnStartupAfterTruncation() throws Exception
{ {
// Given // Given
SegmentedPhysicalRaftLog raftLog = createRaftLog( 100 /* cache size */ ); SegmentedRaftLog raftLog = createRaftLog( 100 /* cache size */ );
int term = 0; int term = 0;
ReplicatedInteger content = ReplicatedInteger.valueOf( 4 ); ReplicatedInteger content = ReplicatedInteger.valueOf( 4 );
raftLog.append( new RaftLogEntry( term, content ) ); raftLog.append( new RaftLogEntry( term, content ) );
Expand All @@ -167,7 +166,7 @@ public void shouldRestoreCorrectCommitAndAppendIndexOnStartupAfterTruncation() t
public void shouldRestoreCorrectCommitAndAppendIndexWithTruncationRecordsAndAppendedRecordsAfterThat() throws Exception public void shouldRestoreCorrectCommitAndAppendIndexWithTruncationRecordsAndAppendedRecordsAfterThat() throws Exception
{ {
// Given // Given
SegmentedPhysicalRaftLog raftLog = createRaftLog( 100 /* cache size */ ); SegmentedRaftLog raftLog = createRaftLog( 100 /* cache size */ );
int term = 0; int term = 0;
ReplicatedInteger content = ReplicatedInteger.valueOf( 4 ); ReplicatedInteger content = ReplicatedInteger.valueOf( 4 );
raftLog.append( new RaftLogEntry( term, content ) ); raftLog.append( new RaftLogEntry( term, content ) );
Expand Down
Expand Up @@ -31,13 +31,12 @@
import org.neo4j.coreedge.raft.ReplicatedString; import org.neo4j.coreedge.raft.ReplicatedString;
import org.neo4j.coreedge.raft.log.DummyRaftableContentSerializer; import org.neo4j.coreedge.raft.log.DummyRaftableContentSerializer;
import org.neo4j.coreedge.raft.log.RaftLogEntry; import org.neo4j.coreedge.raft.log.RaftLogEntry;
import org.neo4j.coreedge.raft.log.segmented.SegmentedPhysicalRaftLog;
import org.neo4j.graphdb.mockfs.EphemeralFileSystemAbstraction; import org.neo4j.graphdb.mockfs.EphemeralFileSystemAbstraction;
import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.lifecycle.LifeSupport; import org.neo4j.kernel.lifecycle.LifeSupport;
import org.neo4j.logging.NullLogProvider; import org.neo4j.logging.NullLogProvider;


public class SegmentedPhysicalRaftLogRotationTest public class SegmentedRaftLogRotationTest
{ {
private LifeSupport life = new LifeSupport(); private LifeSupport life = new LifeSupport();
private FileSystemAbstraction fileSystem; private FileSystemAbstraction fileSystem;
Expand All @@ -49,7 +48,7 @@ public void tearDown() throws Throwable
life.shutdown(); life.shutdown();
} }


private SegmentedPhysicalRaftLog createRaftLog( long rotateAtSize ) private SegmentedRaftLog createRaftLog( long rotateAtSize )
{ {
if ( fileSystem == null ) if ( fileSystem == null )
{ {
Expand All @@ -58,7 +57,7 @@ private SegmentedPhysicalRaftLog createRaftLog( long rotateAtSize )
File directory = new File( "raft-log" ); File directory = new File( "raft-log" );
fileSystem.mkdir( directory ); fileSystem.mkdir( directory );


SegmentedPhysicalRaftLog newRaftLog = new SegmentedPhysicalRaftLog( fileSystem, directory, rotateAtSize, SegmentedRaftLog newRaftLog = new SegmentedRaftLog( fileSystem, directory, rotateAtSize,
new DummyRaftableContentSerializer(), new DummyRaftableContentSerializer(),
NullLogProvider.getInstance(), 1000 ); NullLogProvider.getInstance(), 1000 );
life.add( newRaftLog ); life.add( newRaftLog );
Expand All @@ -73,7 +72,7 @@ public void shouldRotateOnAppendWhenRotateSizeIsReached() throws Exception
// Given // Given
AtomicLong currentVersion = new AtomicLong(); AtomicLong currentVersion = new AtomicLong();
int rotateAtSize = 100; int rotateAtSize = 100;
SegmentedPhysicalRaftLog log = createRaftLog( rotateAtSize ); SegmentedRaftLog log = createRaftLog( rotateAtSize );


StringBuilder builder = new StringBuilder(); StringBuilder builder = new StringBuilder();
for ( int i = 0; i < rotateAtSize; i++ ) for ( int i = 0; i < rotateAtSize; i++ )
Expand All @@ -93,7 +92,7 @@ public void shouldRotateOnAppendWhenRotateSizeIsReached() throws Exception
public void shouldBeAbleToRecoverToLatestStateAfterRotation() throws Throwable public void shouldBeAbleToRecoverToLatestStateAfterRotation() throws Throwable
{ {
int rotateAtSize = 100; int rotateAtSize = 100;
SegmentedPhysicalRaftLog log = createRaftLog( rotateAtSize ); SegmentedRaftLog log = createRaftLog( rotateAtSize );


StringBuilder builder = new StringBuilder(); StringBuilder builder = new StringBuilder();
for ( int i = 0; i < rotateAtSize - 40; i++ ) for ( int i = 0; i < rotateAtSize - 40; i++ )
Expand Down

0 comments on commit 389d0a0

Please sign in to comment.