diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/PhysicalRaftEntryStore.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/PhysicalRaftEntryStore.java index ee73fea32a35d..e97d0f1d0dfc6 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/PhysicalRaftEntryStore.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/PhysicalRaftEntryStore.java @@ -20,6 +20,7 @@ package org.neo4j.coreedge.raft.log; import java.io.IOException; +import java.util.Stack; import org.neo4j.coreedge.raft.replication.ReplicatedContent; import org.neo4j.coreedge.raft.state.ChannelMarshal; @@ -27,17 +28,15 @@ import org.neo4j.kernel.impl.transaction.log.LogFile; import org.neo4j.kernel.impl.transaction.log.LogHeaderVisitor; import org.neo4j.kernel.impl.transaction.log.LogPosition; -import org.neo4j.kernel.impl.transaction.log.ReadableLogChannel; -import org.neo4j.logging.LogProvider; -public class PhysicalRaftEntryStore implements RaftEntryStore +class PhysicalRaftEntryStore implements RaftEntryStore { private final LogFile logFile; private final RaftLogMetadataCache metadataCache; private final ChannelMarshal marshal; - public PhysicalRaftEntryStore( LogFile logFile, RaftLogMetadataCache metadataCache, - ChannelMarshal marshal ) + PhysicalRaftEntryStore( LogFile logFile, RaftLogMetadataCache metadataCache, + ChannelMarshal marshal ) { this.logFile = logFile; this.metadataCache = metadataCache; @@ -45,72 +44,80 @@ public PhysicalRaftEntryStore( LogFile logFile, RaftLogMetadataCache metadataCac } @Override - public IOCursor getEntriesFrom( final long indexToStartFrom ) throws IOException + public IOCursor getEntriesFrom( long fromIndex ) throws IOException { - // look up in position cache - RaftLogMetadataCache.RaftLogEntryMetadata metadata = metadataCache.getMetadata( indexToStartFrom ); - LogPosition startPosition; - boolean positionedCorrectly = false; - if ( metadata != null ) - { - startPosition = metadata.getStartPosition(); - positionedCorrectly = true; - } - else - { - // ask LogFile about the version it may be in - LogVersionLocator headerVisitor = new LogVersionLocator( indexToStartFrom ); - logFile.accept( headerVisitor ); - startPosition = headerVisitor.foundPosition; - if ( headerVisitor.firstLogIndexForFoundFile == indexToStartFrom ) - { - /* - * we need to know if the first entry (the one the cursor will return next) is the one we are looking - * for, because if it isn't, then we need to skip forward until we find it - */ - positionedCorrectly = true; - } - } + // generate skip stack and get starting position + Stack skipStack = new Stack<>(); + SkipStackGenerator skipStackGenerator = new SkipStackGenerator( fromIndex, skipStack ); + logFile.accept( skipStackGenerator ); + + // the skip stack generator scans through the headers and gives us the logs starting position as a side-effect + LogPosition startPosition = skipStackGenerator.logStartPosition; + if ( startPosition == null ) { return IOCursor.getEmpty(); } - ReadableLogChannel reader = logFile.getReader( startPosition ); - - PhysicalRaftLogEntryCursor physicalRaftLogEntryCursor = new PhysicalRaftLogEntryCursor( new - RaftRecordCursor<>( reader, marshal ) ); - if ( !positionedCorrectly ) + RaftLogMetadataCache.RaftLogEntryMetadata logEntryInfo = metadataCache.getMetadata( fromIndex ); + if( logEntryInfo != null && logEntryInfo.getStartPosition().getLogVersion() == startPosition.getLogVersion() ) { - /* - * At this point we know the first entry is not the entry we look for. Iterate until we find it. - */ - while( physicalRaftLogEntryCursor.next() && physicalRaftLogEntryCursor.get().getLogIndex() < indexToStartFrom - 1 ); + // then metadata is valid for this log version, read from there + startPosition = logEntryInfo.getStartPosition(); } - return physicalRaftLogEntryCursor; + + return new PhysicalRaftLogEntryCursor( new RaftRecordCursor<>( logFile.getReader( startPosition ), marshal ), + skipStack, fromIndex ); } - private static final class LogVersionLocator implements LogHeaderVisitor + private static final class SkipStackGenerator implements LogHeaderVisitor { private final long logEntryIndex; - private LogPosition foundPosition; - private long firstLogIndexForFoundFile; + private final Stack skipStack; + private LogPosition logStartPosition; + private long nextContinuation = -1; - LogVersionLocator( long logEntryIndex ) + private SkipStackGenerator( long logEntryIndex, Stack skipStack ) { this.logEntryIndex = logEntryIndex; + this.skipStack = skipStack; } + /** + * Visits all log files backwards in order and creates a stack defining where a record traversal + * should skip forward to the next continuation record. */ @Override - public boolean visit( LogPosition position, long firstLogIndex, long lastLogIndex ) + public boolean visit( LogPosition position, long firstLogIndex, long ignored ) { - boolean foundIt = logEntryIndex >= firstLogIndex && logEntryIndex <= lastLogIndex; - if ( foundIt ) + if( nextContinuation != -1 ) { - this.firstLogIndexForFoundFile = firstLogIndex; - foundPosition = position; + if( !skipStack.empty() && skipStack.peek() < nextContinuation ) + { + // This happens if you truncate again past a previous truncation. For example + // truncating to 7 first and later truncating to 3. Thus the older truncation becomes + // irrelevant and must be ignored. + // + // So one must do a double-skip to reach the start of the latest 3 and this is + // implemented by setting the older skip point to the value of the newer one. Thus + // instead of skipping to 7 and then to 3, we will skip to 3 and again to 3. + // + // The skip points mark the indexes where we should start skipping from until the next + // continuation record. Thus if we skip starting from 3, reach the continuation record, + // and yet again find that we should skip to 3 (by popping the skip stack) then we will + // simply keep skipping. + nextContinuation = skipStack.peek(); + } + skipStack.push( nextContinuation ); } - return !foundIt; // continue as long we don't find it + + if ( logEntryIndex >= firstLogIndex ) + { + logStartPosition = position; + return false; + } + + nextContinuation = firstLogIndex; + return true; } } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/PhysicalRaftLog.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/PhysicalRaftLog.java index 8352a0c443a54..d5d17ea3ef938 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/PhysicalRaftLog.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/PhysicalRaftLog.java @@ -19,8 +19,6 @@ */ package org.neo4j.coreedge.raft.log; -import static java.lang.String.format; - import java.io.File; import java.io.IOException; import java.util.concurrent.atomic.AtomicLong; @@ -29,6 +27,7 @@ import org.neo4j.coreedge.raft.replication.ReplicatedContent; import org.neo4j.coreedge.raft.state.ChannelMarshal; import org.neo4j.cursor.IOCursor; +import org.neo4j.helpers.collection.LruCache; import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.kernel.impl.transaction.log.FlushablePositionAwareChannel; import org.neo4j.kernel.impl.transaction.log.LogHeaderCache; @@ -48,6 +47,9 @@ import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; +import static java.lang.String.format; +import static org.neo4j.coreedge.raft.log.PhysicalRaftLog.RecordType.COMMIT; + public class PhysicalRaftLog implements RaftLog, Lifecycle { public static final String BASE_FILE_NAME = "raft.log"; @@ -66,17 +68,18 @@ public class PhysicalRaftLog implements RaftLog, Lifecycle private long term = -1; private final RaftEntryStore entryStore; - + private final LruCache entryCache; private final PhysicalLogFiles logFiles; public PhysicalRaftLog( FileSystemAbstraction fileSystem, File directory, long rotateAtSize, - int entryCacheSize, int headerCacheSize, PhysicalLogFile.Monitor monitor, + int entryCacheSize, int metaDataCacheSize, int headerCacheSize, PhysicalLogFile.Monitor monitor, ChannelMarshal marshal, Supplier databaseHealthSupplier, LogProvider logProvider ) { this.marshal = marshal; this.databaseHealthSupplier = databaseHealthSupplier; this.log = logProvider.getLog( getClass() ); + this.entryCache = new LruCache<>( "raft-log-entry-cache", entryCacheSize ); directory.mkdirs(); @@ -86,7 +89,7 @@ public PhysicalRaftLog( FileSystemAbstraction fileSystem, File directory, long r logFile = new PhysicalLogFile( fileSystem, logFiles, rotateAtSize, appendIndex::get, logVersionRepository, monitor, new LogHeaderCache( headerCacheSize ) ); - this.metadataCache = new RaftLogMetadataCache( entryCacheSize ); + this.metadataCache = new RaftLogMetadataCache( metaDataCacheSize ); this.entryStore = new PhysicalRaftEntryStore( logFile, metadataCache, marshal ); } @@ -103,17 +106,20 @@ public long append( RaftLogEntry entry ) throws IOException entry.term(), entry.toString(), term ) ); } - appendIndex.incrementAndGet(); + long newAppendIndex = appendIndex.incrementAndGet(); + LogPositionMarker entryStartPosition = writer.getCurrentPosition( positionMarker ); - writer.put( RecordType.APPEND.value ); - writer.putLong( appendIndex.get() ); - writer.putLong( entry.term() ); - marshal.marshal( entry.content(), writer ); - metadataCache.cacheMetadata( appendIndex.get(), entry.term(), entryStartPosition.newPosition() ); + RaftLogAppendRecord.write( writer, marshal, newAppendIndex, term, entry.content() ); writer.prepareForFlush().flush(); + entryCache.put( newAppendIndex, entry ); + metadataCache.cacheMetadata( newAppendIndex, entry.term(), entryStartPosition.newPosition() ); - logRotation.rotateLogIfNeeded( LogAppendEvent.NULL ); - return appendIndex.get(); + if( logRotation.rotateLogIfNeeded( LogAppendEvent.NULL ) ) + { + RaftLogContinuationRecord.write( writer, newAppendIndex ); + } + + return newAppendIndex; } @Override @@ -125,16 +131,20 @@ public void truncate( long fromIndex ) throws IOException fromIndex, commitIndex ) ); } - if ( appendIndex.get() >= fromIndex ) + if ( appendIndex.get() < fromIndex ) { - appendIndex.set( fromIndex - 1 ); - - writer.put( RecordType.TRUNCATE.value ); - writer.putLong( fromIndex ); - writer.prepareForFlush().flush(); - logRotation.rotateLogIfNeeded( LogAppendEvent.NULL ); + return; } - term = readEntryTerm( appendIndex.get() ); + + entryCache.clear(); + + long newAppendIndex = fromIndex - 1; + appendIndex.set( newAppendIndex ); + logRotation.rotateLogFile(); + + RaftLogContinuationRecord.write( writer, fromIndex ); + writer.prepareForFlush().flush(); + term = readEntryTerm( newAppendIndex ); } @Override @@ -145,16 +155,9 @@ public void commit( long newCommitIndex ) throws IOException return; } - long actualNewCommitIndex = Math.min( newCommitIndex, appendIndex.get() ); - - while ( commitIndex < actualNewCommitIndex ) - { - commitIndex++; - } - writer.put( RecordType.COMMIT.value ); - writer.putLong( commitIndex ); + commitIndex = Math.min( newCommitIndex, appendIndex.get() ); + RaftLogCommitRecord.write( writer, commitIndex ); writer.prepareForFlush().flush(); - logRotation.rotateLogIfNeeded( LogAppendEvent.NULL ); } @Override @@ -175,10 +178,21 @@ public IOCursor getEntryCursor( long fromIndex ) throws IOExceptio final IOCursor inner = entryStore.getEntriesFrom( fromIndex ); return new IOCursor() { + private RaftLogEntry current = null; + @Override public boolean next() throws IOException { - return inner.next(); + boolean hasNext = inner.next(); + if ( hasNext ) + { + current = inner.get().getLogEntry(); + } + else + { + current = null; + } + return hasNext; } @Override @@ -190,7 +204,7 @@ public void close() throws IOException @Override public RaftLogEntry get() { - return inner.get().getLogEntry(); + return current; } }; } @@ -198,6 +212,12 @@ public RaftLogEntry get() @Override public RaftLogEntry readLogEntry( long logIndex ) throws IOException { + RaftLogEntry entry = entryCache.get( logIndex ); + if( entry != null ) + { + return entry; + } + try ( IOCursor entriesFrom = entryStore.getEntriesFrom( logIndex ) ) { while ( entriesFrom.next() ) @@ -220,27 +240,27 @@ else if ( raftLogAppendRecord.getLogIndex() > logIndex ) @Override public long readEntryTerm( long logIndex ) throws IOException { - // -1 is not an existing log index, but represents the beginning of the log. It is a valid value to request the - // term for, and the term is -1. + // Index -1 is not an existing log index, but represents the beginning of the log. + // It is a valid value to request the term for, and the term is -1. if( logIndex == -1 || ( logIndex > appendIndex.get() ) ) { return -1; } - long resultTerm = -1; + RaftLogMetadataCache.RaftLogEntryMetadata metadata = metadataCache.getMetadata( logIndex ); - if ( metadata != null ) + if( metadata != null ) { - resultTerm = metadata.getEntryTerm(); + return metadata.getEntryTerm(); } - else - { - RaftLogEntry raftLogEntry = readLogEntry( logIndex ); - if ( raftLogEntry != null ) - { - resultTerm = raftLogEntry.term(); - } + long resultTerm = -1; + + RaftLogEntry raftLogEntry = readLogEntry( logIndex ); + if ( raftLogEntry != null ) + { + resultTerm = raftLogEntry.term(); } + return resultTerm; } @@ -259,15 +279,31 @@ public void init() throws Throwable @Override public void start() throws Throwable { - this.logRotation = - new LogRotationImpl( new LoggingLogFileMonitor( log ), logFile, databaseHealthSupplier.get() ); + this.logRotation = new LogRotationImpl( new LoggingLogFileMonitor( log ), logFile, databaseHealthSupplier.get() ); logFile.start(); - restoreIndexes(); + restoreCommitIndex(); + restoreAppendIndex(); + this.writer = logFile.getWriter(); } - private void restoreIndexes() throws IOException + private void restoreAppendIndex() throws IOException + { + long restoredAppendIndex = -1; + try( IOCursor cursor = entryStore.getEntriesFrom( 0 ) ) + { + while( cursor.next() ) + { + restoredAppendIndex = cursor.get().getLogIndex(); + } + } + + appendIndex.set( restoredAppendIndex ); + log.info( "Restored append index at %d", restoredAppendIndex ); + } + + private void restoreCommitIndex() throws IOException { long lowestLogVersion = logFiles.getLowestLogVersion(); ReadableLogChannel reader = logFile.getReader( new LogPosition( lowestLogVersion, LogHeader.LOG_HEADER_SIZE ) ); @@ -276,26 +312,14 @@ private void restoreIndexes() throws IOException while ( recordCursor.next() ) { RaftLogRecord record = recordCursor.get(); - switch ( record.getType() ) + if( record.getType() == COMMIT ) { - case COMMIT: - commitIndex = record.getLogIndex(); - break; - case APPEND: - appendIndex.set( record.getLogIndex() ); - break; - case TRUNCATE: - long truncateAtIndex = record.getLogIndex() - 1; // we must restore append/commit at before this index - appendIndex.set( truncateAtIndex ); - commitIndex = Math.min( commitIndex, truncateAtIndex ); - break; - default: - throw new IllegalStateException( "Record is of unknown type " + record ); + commitIndex = record.getLogIndex(); } } } - log.info( "Restored commit index at %d, append index at %d, started at version %d (largest is %d)", - commitIndex, appendIndex.get(), lowestLogVersion, logFiles.getHighestLogVersion() ); + + log.info( "Restored commit index at %d", commitIndex ); } @Override @@ -313,7 +337,7 @@ public void shutdown() throws Throwable public enum RecordType { - APPEND( (byte) 0 ), COMMIT( (byte) 1 ), TRUNCATE( (byte) 2 ); + APPEND( (byte) 0 ), COMMIT( (byte) 1 ), CONTINUATION( (byte) 2 ); private final byte value; @@ -336,11 +360,10 @@ public static RecordType forValue( byte value ) case 1: return COMMIT; case 2: - return TRUNCATE; + return CONTINUATION; default: throw new IllegalArgumentException( "Value " + value + " is not a known entry type" ); } } } - } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogEntryCursor.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogEntryCursor.java index e8b5b67647375..f22e1696293fd 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogEntryCursor.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogEntryCursor.java @@ -20,90 +20,76 @@ package org.neo4j.coreedge.raft.log; import java.io.IOException; -import java.util.LinkedList; -import java.util.List; -import java.util.ListIterator; -import java.util.Queue; -import java.util.function.Predicate; +import java.util.Stack; import org.neo4j.cursor.IOCursor; public class PhysicalRaftLogEntryCursor implements IOCursor { + private long NO_SKIP = -1; private final RaftRecordCursor recordCursor; + + private final Stack skipStack; private RaftLogAppendRecord currentEntry; - private final Queue logicallyNext; - private final List staging; + private long nextIndex; + private long skipPoint = NO_SKIP; + private boolean skipMode = false; - public PhysicalRaftLogEntryCursor( RaftRecordCursor recordCursor ) + public PhysicalRaftLogEntryCursor( RaftRecordCursor recordCursor, Stack skipStack, long fromIndex ) { this.recordCursor = recordCursor; - this.logicallyNext = new LinkedList<>(); - this.staging = new LinkedList<>(); + this.skipStack = skipStack; + this.nextIndex = fromIndex; + popSkip(); + } + + private void popSkip() + { + skipPoint = skipStack.empty() ? NO_SKIP : skipStack.pop(); } @Override public boolean next() throws IOException { - if ( !logicallyNext.isEmpty() ) - { - currentEntry = logicallyNext.poll(); - return true; - } + RaftLogRecord record; while ( recordCursor.next() ) { - RaftLogRecord record = recordCursor.get(); + record = recordCursor.get(); switch ( record.getType() ) { case APPEND: - staging.add( (RaftLogAppendRecord) record ); - break; - case COMMIT: - { - moveStagingToLogicallyNextAndSetCurrentEntry( logIndex -> logIndex <= record.getLogIndex() ); - - if ( currentEntry != null ) + if( skipMode ) + { + // skip records + } + else if( record.getLogIndex() == nextIndex ) { + currentEntry = (RaftLogAppendRecord) record; + + nextIndex++; + if( nextIndex == skipPoint ) + { + skipMode = true; + } return true; } break; - } - case TRUNCATE: + case CONTINUATION: { - removeFromStaging( index -> index >= record.getLogIndex() ); + if( skipMode ) + { + popSkip(); + if( skipPoint == NO_SKIP || skipPoint > nextIndex ) + { + skipMode = false; + } + } break; } } } - moveStagingToLogicallyNextAndSetCurrentEntry( index -> true ); - return currentEntry != null; - } - - private void removeFromStaging( Predicate condition ) - { - ListIterator iterator = staging.listIterator(); - while ( iterator.hasNext() ) - { - if ( condition.test( iterator.next().getLogIndex() ) ) - { - iterator.remove(); - } - } - } - - private void moveStagingToLogicallyNextAndSetCurrentEntry( Predicate condition ) - { - ListIterator iterator = staging.listIterator(); - while ( iterator.hasNext() ) - { - RaftLogAppendRecord next = iterator.next(); - if ( condition.test( next.getLogIndex() ) ) - { - logicallyNext.offer( next ); - iterator.remove(); - } - } - currentEntry = logicallyNext.poll(); + currentEntry = null; + return false; } @Override diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogAppendRecord.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogAppendRecord.java index bb1f97a0eb5aa..356bcd775d767 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogAppendRecord.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogAppendRecord.java @@ -19,13 +19,23 @@ */ package org.neo4j.coreedge.raft.log; +import java.io.IOException; + +import org.neo4j.coreedge.raft.replication.ReplicatedContent; +import org.neo4j.coreedge.raft.state.ChannelMarshal; +import org.neo4j.storageengine.api.ReadPastEndException; +import org.neo4j.storageengine.api.ReadableChannel; +import org.neo4j.storageengine.api.WritableChannel; + +import static org.neo4j.coreedge.raft.log.PhysicalRaftLog.RecordType.APPEND; + public class RaftLogAppendRecord extends RaftLogRecord { private final RaftLogEntry logEntry; RaftLogAppendRecord( long logIndex, RaftLogEntry logEntry ) { - super( PhysicalRaftLog.RecordType.APPEND, logIndex ); + super( APPEND, logIndex ); this.logEntry = logEntry; } @@ -34,6 +44,26 @@ public RaftLogEntry getLogEntry() return logEntry; } + public static RaftLogAppendRecord read( ReadableChannel channel, ChannelMarshal marshal ) throws IOException + { + long appendIndex = channel.getLong(); + long term = channel.getLong(); + ReplicatedContent content = marshal.unmarshal( channel ); + if ( content == null ) + { + throw ReadPastEndException.INSTANCE; + } + return new RaftLogAppendRecord( appendIndex, new RaftLogEntry( term, content ) ); + } + + public static void write( WritableChannel channel, ChannelMarshal marshal, long appendIndex, long term, ReplicatedContent content ) throws IOException + { + channel.put( APPEND.value() ); + channel.putLong( appendIndex ); + channel.putLong( term ); + marshal.marshal( content, channel ); + } + @Override public String toString() { diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogCommitRecord.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogCommitRecord.java index b34ff380e0b1d..257e9234012b0 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogCommitRecord.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogCommitRecord.java @@ -19,11 +19,30 @@ */ package org.neo4j.coreedge.raft.log; +import java.io.IOException; + +import org.neo4j.storageengine.api.ReadableChannel; +import org.neo4j.storageengine.api.WritableChannel; + +import static org.neo4j.coreedge.raft.log.PhysicalRaftLog.RecordType.COMMIT; + public class RaftLogCommitRecord extends RaftLogRecord { public RaftLogCommitRecord( long logIndex ) { - super( PhysicalRaftLog.RecordType.COMMIT, logIndex ); + super( COMMIT, logIndex ); + } + + public static RaftLogCommitRecord read( ReadableChannel channel ) throws IOException + { + long commitIndex = channel.getLong(); + return new RaftLogCommitRecord( commitIndex ); + } + + public static void write( WritableChannel channel, long commitIndex ) throws IOException + { + channel.put( COMMIT.value() ); + channel.putLong( commitIndex ); } @Override diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogTruncateRecord.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogContinuationRecord.java similarity index 51% rename from enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogTruncateRecord.java rename to enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogContinuationRecord.java index 29c06af0794a0..8b38a9c2a1573 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogTruncateRecord.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogContinuationRecord.java @@ -19,16 +19,35 @@ */ package org.neo4j.coreedge.raft.log; -public class RaftLogTruncateRecord extends RaftLogRecord +import java.io.IOException; + +import org.neo4j.storageengine.api.ReadableChannel; +import org.neo4j.storageengine.api.WritableChannel; + +import static org.neo4j.coreedge.raft.log.PhysicalRaftLog.RecordType.CONTINUATION; + +public class RaftLogContinuationRecord extends RaftLogRecord { - RaftLogTruncateRecord( long fromLogIndex ) + RaftLogContinuationRecord( long fromLogIndex ) + { + super( CONTINUATION, fromLogIndex ); + } + + public static RaftLogContinuationRecord read( ReadableChannel channel ) throws IOException + { + long fromIndex = channel.getLong(); + return new RaftLogContinuationRecord( fromIndex ); + } + + public static void write( WritableChannel channel, long fromIndex ) throws IOException { - super( PhysicalRaftLog.RecordType.TRUNCATE, fromLogIndex ); + channel.put( CONTINUATION.value() ); + channel.putLong( fromIndex ); } @Override public String toString() { - return String.format( "RaftLogTruncateRecord{%s}", super.toString() ); + return String.format( "RaftLogContinuationRecord{%s}", super.toString() ); } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftRecordCursor.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftRecordCursor.java index 5f88f0d469388..bbd118c579e63 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftRecordCursor.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftRecordCursor.java @@ -48,22 +48,13 @@ public boolean next() throws IOException switch ( PhysicalRaftLog.RecordType.forValue( type ) ) { case APPEND: - long appendIndex = channel.getLong(); - long term = channel.getLong(); - ReplicatedContent content = marshal.unmarshal( channel ); - if ( content == null ) - { - return false; - } - lastFoundRecord = new RaftLogAppendRecord( appendIndex, new RaftLogEntry( term, content ) ); + lastFoundRecord = RaftLogAppendRecord.read( channel, marshal ); return true; case COMMIT: - long commitIndex = channel.getLong(); - lastFoundRecord = new RaftLogCommitRecord( commitIndex ); + lastFoundRecord = RaftLogCommitRecord.read( channel ); return true; - case TRUNCATE: - long truncateFromIndex = channel.getLong(); - lastFoundRecord = new RaftLogTruncateRecord( truncateFromIndex ); + case CONTINUATION: + lastFoundRecord = RaftLogContinuationRecord.read( channel ); return true; default: throw new IllegalStateException( "Not really sure how we got here. Read type value was " + type ); diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/debug/DumpPhysicalRaftLog.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/debug/DumpPhysicalRaftLog.java index 3adbf86ab30f2..a2d4418ae3c69 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/debug/DumpPhysicalRaftLog.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/debug/DumpPhysicalRaftLog.java @@ -31,6 +31,7 @@ import java.nio.ByteBuffer; import java.util.Collection; import java.util.Comparator; +import java.util.Stack; import java.util.TreeSet; import org.neo4j.coreedge.raft.log.PhysicalRaftLogEntryCursor; @@ -90,7 +91,8 @@ public int dump( String filenameOrDirectory, String logPrefix, PrintStream out ) ReadableLogChannel logChannel = new ReadAheadLogChannel( channel, NO_MORE_CHANNELS ); - try ( PhysicalRaftLogEntryCursor cursor = new PhysicalRaftLogEntryCursor( new RaftRecordCursor<>( logChannel, marshal ) ) ) + try ( PhysicalRaftLogEntryCursor cursor = new PhysicalRaftLogEntryCursor( new RaftRecordCursor<>( logChannel, marshal ), + new Stack<>(), 0 ) ) { while ( cursor.next() ) { diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/CoreEdgeClusterSettings.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/CoreEdgeClusterSettings.java index f1d21c5b20816..e641dbde05ed4 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/CoreEdgeClusterSettings.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/CoreEdgeClusterSettings.java @@ -187,6 +187,10 @@ public String toString() public static final Setting raft_log_meta_data_cache_size = setting( "core_edge.raft_log_meta_data_cache_size", INTEGER, "100000" ); + @Description("RAFT entry cache size (in unit of entries)") + public static final Setting raft_log_entry_cache_size = + setting( "core_edge.raft_log_entry_cache_size", INTEGER, "32" ); + @Description("RAFT header cache size (in unit of file headers)") public static final Setting raft_log_header_cache_size = setting( "core_edge.raft_log_header_cache_size", INTEGER, "10" ); diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java index fa301c494396c..eaf0f0fcb269a 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java @@ -412,14 +412,15 @@ private RaftLog createRaftLog( return new InMemoryRaftLog(); case PHYSICAL: long rotateAtSize = config.get( CoreEdgeClusterSettings.raft_log_rotation_size ); - int entryCacheSize = config.get( CoreEdgeClusterSettings.raft_log_meta_data_cache_size ); + int entryCacheSize = config.get( CoreEdgeClusterSettings.raft_log_entry_cache_size ); + int metaDataCacheSize = config.get( CoreEdgeClusterSettings.raft_log_meta_data_cache_size ); int headerCacheSize = config.get( CoreEdgeClusterSettings.raft_log_header_cache_size ); return life.add( new PhysicalRaftLog( fileSystem, new File( clusterStateDirectory, PhysicalRaftLog.DIRECTORY_NAME ), - rotateAtSize, entryCacheSize, headerCacheSize, new PhysicalLogFile.Monitor.Adapter(), - marshal, databaseHealthSupplier, logProvider ) ); + rotateAtSize, entryCacheSize, metaDataCacheSize, headerCacheSize, + new PhysicalLogFile.Monitor.Adapter(), marshal, databaseHealthSupplier, logProvider ) ); case NAIVE: default: return life.add( new NaiveDurableRaftLog( diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftInstanceTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftInstanceTest.java index e50ca0f543c88..dda9747af0da5 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftInstanceTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftInstanceTest.java @@ -27,7 +27,6 @@ import org.neo4j.coreedge.raft.log.RaftLog; import org.neo4j.coreedge.raft.log.RaftLogEntry; import org.neo4j.coreedge.raft.membership.RaftTestGroup; -import org.neo4j.coreedge.raft.replication.ReplicatedContent; import org.neo4j.coreedge.server.RaftTestMember; import org.neo4j.coreedge.server.RaftTestMemberSetBuilder; import org.neo4j.kernel.internal.KernelEventHandlers; diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/PhysicalRaftEntryStoreTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/PhysicalRaftEntryStoreTest.java index aaa487698f456..3a1610d96781d 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/PhysicalRaftEntryStoreTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/PhysicalRaftEntryStoreTest.java @@ -48,7 +48,7 @@ public void shouldReturnCursorProperlyPositionedIfThereIsACacheMiss() throws Thr File baseDirectory = new File( "raft-logs" ); fsa.mkdir( baseDirectory ); - PhysicalRaftLog log = new PhysicalRaftLog( fsa, baseDirectory, 10000, 1, 10, mock( PhysicalLogFile.Monitor.class), + PhysicalRaftLog log = new PhysicalRaftLog( fsa, baseDirectory, 10000, 1, 10, 10, mock( PhysicalLogFile.Monitor.class), new DummyRaftableContentSerializer(), mock( Supplier.class ), NullLogProvider.getInstance() ); AtomicLong appendIndex = new AtomicLong( 0 ); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogContractTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogContractTest.java index 8c3dd7c88b54f..dae560ad6baf7 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogContractTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogContractTest.java @@ -30,7 +30,7 @@ import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.function.Supplier; +import java.util.Stack; import org.junit.After; import org.junit.Test; @@ -43,6 +43,7 @@ import org.neo4j.kernel.impl.transaction.log.ReadAheadLogChannel; import org.neo4j.kernel.impl.transaction.log.ReadableLogChannel; import org.neo4j.kernel.impl.transaction.log.entry.LogVersions; +import org.neo4j.kernel.internal.DatabaseHealth; import org.neo4j.kernel.lifecycle.LifeSupport; import org.neo4j.logging.NullLogProvider; @@ -75,8 +76,8 @@ private PhysicalRaftLog createRaftLog( int cacheSize ) File directory = new File( "raft-log" ); fileSystem.mkdir( directory ); - PhysicalRaftLog newRaftLog = new PhysicalRaftLog( fileSystem, directory, 1000000, cacheSize, 10, - new PhysicalLogFile.Monitor.Adapter(), new DummyRaftableContentSerializer(), mock( Supplier.class ), + PhysicalRaftLog newRaftLog = new PhysicalRaftLog( fileSystem, directory, 1000000, cacheSize, 10, 10, + new PhysicalLogFile.Monitor.Adapter(), new DummyRaftableContentSerializer(), () -> mock( DatabaseHealth.class ), NullLogProvider.getInstance() ); life.add( newRaftLog ); life.init(); @@ -151,7 +152,7 @@ public void shouldRestoreCommitIndexOnStartup() throws Throwable } @Test - public void shouldRestoreCorrectCommitAndAppendIndexOnStartupWhenTruncationRecordsArePresent() throws Exception + public void shouldRestoreCorrectCommitAndAppendIndexOnStartupAfterTruncation() throws Exception { // Given PhysicalRaftLog raftLog = createRaftLog( 100 /* cache size */ ); @@ -217,7 +218,8 @@ public void shouldReturnNullOnEndOfFile() throws Exception logStoreChannel, 0, LogVersions.CURRENT_LOG_VERSION ); ReadableLogChannel logChannel = new ReadAheadLogChannel( channel, NO_MORE_CHANNELS ); - try ( PhysicalRaftLogEntryCursor cursor = new PhysicalRaftLogEntryCursor( new RaftRecordCursor<>( logChannel, new DummyRaftableContentSerializer() ) ) ) + try ( PhysicalRaftLogEntryCursor cursor = new PhysicalRaftLogEntryCursor( new RaftRecordCursor<>( logChannel, new DummyRaftableContentSerializer() ), + new Stack<>(), 0 ) ) { boolean firstRecordEncountered = false; while( cursor.next() ) diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogEntryCursorTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogEntryCursorTest.java index 57865431e3f80..4a91ffca01377 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogEntryCursorTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogEntryCursorTest.java @@ -1,3 +1,22 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ /* * Copyright (c) 2002-2016 "Neo Technology," * Network Engine for Objects in Lund AB [http://neotechnology.com] @@ -26,282 +45,81 @@ import static org.mockito.Mockito.when; import org.junit.Test; -import org.neo4j.coreedge.raft.ReplicatedInteger; - -public class PhysicalRaftLogEntryCursorTest -{ - @Test - public void shouldReturnAllRecordsIfAllAreAppended() throws Exception - { - // Given - ReplicatedInteger content = ReplicatedInteger.valueOf( 1 ); - RaftLogEntry payload = new RaftLogEntry( 0, content ); - RaftRecordCursor recordCursor = mock( RaftRecordCursor.class ); - // return 3 records that are just appended, then be done with it - when( recordCursor.next() ) - .thenReturn( true ) - .thenReturn( true ) - .thenReturn( true ) - .thenReturn( false ); - when( recordCursor.get() ) - .thenReturn( new RaftLogAppendRecord( 1, payload ) ) - .thenReturn( new RaftLogAppendRecord( 2, payload ) ) - .thenReturn( new RaftLogAppendRecord( 3, payload ) ) - .thenReturn( null ); - PhysicalRaftLogEntryCursor entryCursor = new PhysicalRaftLogEntryCursor( recordCursor ); +import java.util.Stack; - // When - Then - assertTrue( entryCursor.next() ); - assertEquals( payload, entryCursor.get().getLogEntry() ); - assertTrue( entryCursor.next() ); - assertEquals( payload, entryCursor.get().getLogEntry() ); - assertTrue( entryCursor.next() ); - assertEquals( payload, entryCursor.get().getLogEntry() ); +import org.neo4j.coreedge.raft.ReplicatedString; - assertFalse( entryCursor.next() ); // record cursor is done, there should be no more entries/ - } +public class PhysicalRaftLogEntryCursorTest +{ + private final RaftLogEntry entryA = new RaftLogEntry( 0, ReplicatedString.valueOf( "A" ) ); + private final RaftLogEntry entryB = new RaftLogEntry( 0, ReplicatedString.valueOf( "B" ) ); + private final RaftLogEntry entryC = new RaftLogEntry( 0, ReplicatedString.valueOf( "C" ) ); + private final RaftLogEntry entryD = new RaftLogEntry( 0, ReplicatedString.valueOf( "D" ) ); @Test - public void shouldReturnNonTruncatedRecords() throws Exception + public void shouldReturnAppendedRecords() throws Exception { - // Given - ReplicatedInteger content = ReplicatedInteger.valueOf( 1 ); - RaftLogEntry payload = new RaftLogEntry( 0, content ); + // given RaftRecordCursor recordCursor = mock( RaftRecordCursor.class ); - // return 3 records that are just appended, then be done with it - when( recordCursor.next() ) - .thenReturn( true ) - .thenReturn( true ) - .thenReturn( true ) - .thenReturn( false ); - when( recordCursor.get() ) - .thenReturn( new RaftLogAppendRecord( 1, payload ) ) - .thenReturn( new RaftLogAppendRecord( 2, payload ) ) - .thenReturn( new RaftLogTruncateRecord( 2 ) ) - .thenReturn( null ); - - PhysicalRaftLogEntryCursor entryCursor = new PhysicalRaftLogEntryCursor( recordCursor ); - // When - Then - assertTrue( entryCursor.next() ); - assertEquals( payload, entryCursor.get().getLogEntry() ); - - assertFalse( entryCursor.next() ); // record cursor is done, there should be no more entries/ - } - @Test - public void shouldNotReturnAnyRecordsIfTheyAreAllTruncated() throws Exception - { - // Given - ReplicatedInteger content = ReplicatedInteger.valueOf( 1 ); - RaftLogEntry payload = new RaftLogEntry( 0, content ); - RaftRecordCursor recordCursor = mock( RaftRecordCursor.class ); - // return 3 records that are just appended, then be done with it when( recordCursor.next() ) .thenReturn( true ) .thenReturn( true ) .thenReturn( true ) - .thenReturn( true ) .thenReturn( false ); when( recordCursor.get() ) - .thenReturn( new RaftLogAppendRecord( 1, payload ) ) - .thenReturn( new RaftLogAppendRecord( 2, payload ) ) - .thenReturn( new RaftLogTruncateRecord( 2 ) ) - .thenReturn( new RaftLogTruncateRecord( 1 ) ) + .thenReturn( new RaftLogAppendRecord( 0, entryA ) ) + .thenReturn( new RaftLogAppendRecord( 1, entryB ) ) + .thenReturn( new RaftLogAppendRecord( 2, entryC ) ) .thenReturn( null ); - PhysicalRaftLogEntryCursor entryCursor = new PhysicalRaftLogEntryCursor( recordCursor ); - - // When - Then - assertFalse( entryCursor.next() ); - } + PhysicalRaftLogEntryCursor entryCursor = new PhysicalRaftLogEntryCursor( recordCursor, new Stack<>(), 0 ); - @Test - public void shouldReturnCommittedRecords() throws Exception - { - // Given - ReplicatedInteger content = ReplicatedInteger.valueOf( 1 ); - RaftLogEntry payload = new RaftLogEntry( 0, content ); - RaftRecordCursor recordCursor = mock( RaftRecordCursor.class ); - // return 3 records that are just appended, then be done with it - when( recordCursor.next() ) - .thenReturn( true ) - .thenReturn( true ) - .thenReturn( true ) - .thenReturn( false ); - when( recordCursor.get() ) - .thenReturn( new RaftLogAppendRecord( 1, payload ) ) - .thenReturn( new RaftLogAppendRecord( 2, payload ) ) - .thenReturn( new RaftLogCommitRecord( 2 ) ) - .thenReturn( null ); - - PhysicalRaftLogEntryCursor entryCursor = new PhysicalRaftLogEntryCursor( recordCursor ); - - // When - Then + // when/then assertTrue( entryCursor.next() ); - assertEquals( payload, entryCursor.get().getLogEntry() ); - + assertEquals( entryA, entryCursor.get().getLogEntry() ); assertTrue( entryCursor.next() ); - assertEquals( payload, entryCursor.get().getLogEntry() ); - - assertFalse( entryCursor.next() ); // record cursor is done, there should be no more entries/ - } - - @Test - public void shouldSkipTruncatedAndReturnCommittedRecords() throws Exception - { - // Given - RaftLogEntry payloadTruncated = new RaftLogEntry( 0, ReplicatedInteger.valueOf( 4 ) ); - RaftLogEntry payload5 = new RaftLogEntry( 5, ReplicatedInteger.valueOf( 5 ) ); - RaftLogEntry payload6 = new RaftLogEntry( 6, ReplicatedInteger.valueOf( 6 ) ); - RaftLogEntry payload7 = new RaftLogEntry( 7, ReplicatedInteger.valueOf( 7 ) ); - RaftRecordCursor recordCursor = mock( RaftRecordCursor.class ); - // return 3 records that are just appended, then be done with it - when( recordCursor.next() ) - .thenReturn( true ) - .thenReturn( true ) - .thenReturn( true ) - .thenReturn( true ) - .thenReturn( true ) - .thenReturn( true ) - .thenReturn( true ) - .thenReturn( false ); - when( recordCursor.get() ) - .thenReturn( new RaftLogAppendRecord( 5, payload5 ) ) - .thenReturn( new RaftLogAppendRecord( 6, payloadTruncated ) ) - .thenReturn( new RaftLogAppendRecord( 7, payloadTruncated ) ) - .thenReturn( new RaftLogTruncateRecord( 6 ) ) - .thenReturn( new RaftLogAppendRecord( 6, payload6 ) ) - .thenReturn( new RaftLogAppendRecord( 7, payload7 ) ) - .thenReturn( new RaftLogCommitRecord( 7 ) ) - .thenReturn( null ); - - PhysicalRaftLogEntryCursor entryCursor = new PhysicalRaftLogEntryCursor( recordCursor ); - - // When - Then - assertTrue( entryCursor.next() ); - assertEquals( payload5, entryCursor.get().getLogEntry() ); - - assertTrue( entryCursor.next() ); - assertEquals( payload6, entryCursor.get().getLogEntry() ); - + assertEquals( entryB, entryCursor.get().getLogEntry() ); assertTrue( entryCursor.next() ); - assertEquals( payload7, entryCursor.get().getLogEntry() ); - + assertEquals( entryC, entryCursor.get().getLogEntry() ); assertFalse( entryCursor.next() ); // record cursor is done, there should be no more entries/ + assertEquals( null, entryCursor.get() ); } @Test - public void shouldSkipTruncatedAndReturnOnEndOfFile() throws Exception + public void shouldSkipUntilContinuation() throws Exception { - // Given - RaftLogEntry payloadTruncated = new RaftLogEntry( 0, ReplicatedInteger.valueOf( 4 ) ); - RaftLogEntry payload5 = new RaftLogEntry( 5, ReplicatedInteger.valueOf( 5 ) ); - RaftLogEntry payload6 = new RaftLogEntry( 6, ReplicatedInteger.valueOf( 6 ) ); - RaftLogEntry payload7 = new RaftLogEntry( 7, ReplicatedInteger.valueOf( 7 ) ); + // given RaftRecordCursor recordCursor = mock( RaftRecordCursor.class ); - // return 3 records that are just appended, then be done with it - when( recordCursor.next() ) - .thenReturn( true ) - .thenReturn( true ) - .thenReturn( true ) - .thenReturn( true ) - .thenReturn( true ) - .thenReturn( true ) - .thenReturn( false ); - when( recordCursor.get() ) - .thenReturn( new RaftLogAppendRecord( 5, payload5 ) ) - .thenReturn( new RaftLogAppendRecord( 6, payloadTruncated ) ) - .thenReturn( new RaftLogAppendRecord( 7, payloadTruncated ) ) - .thenReturn( new RaftLogTruncateRecord( 6 ) ) - .thenReturn( new RaftLogAppendRecord( 6, payload6 ) ) - .thenReturn( new RaftLogAppendRecord( 7, payload7 ) ) - .thenReturn( null ); - - PhysicalRaftLogEntryCursor entryCursor = new PhysicalRaftLogEntryCursor( recordCursor ); - // When - Then - assertTrue( entryCursor.next() ); - assertEquals( payload5, entryCursor.get().getLogEntry() ); - - assertTrue( entryCursor.next() ); - assertEquals( payload6, entryCursor.get().getLogEntry() ); - - assertTrue( entryCursor.next() ); - assertEquals( payload7, entryCursor.get().getLogEntry() ); - - assertFalse( entryCursor.next() ); // record cursor is done, there should be no more entries/ - } - - @Test - public void shouldSkipTruncatedAndReturnCommittedRecordsAndRecordsAtEndOfFile() throws Exception - { - // Given - RaftLogEntry payloadTruncated = new RaftLogEntry( 0, ReplicatedInteger.valueOf( 4 ) ); - RaftLogEntry payload5 = new RaftLogEntry( 5, ReplicatedInteger.valueOf( 5 ) ); - RaftLogEntry payload6 = new RaftLogEntry( 6, ReplicatedInteger.valueOf( 6 ) ); - RaftLogEntry payload7 = new RaftLogEntry( 7, ReplicatedInteger.valueOf( 7 ) ); - RaftLogEntry payload8 = new RaftLogEntry( 7, ReplicatedInteger.valueOf( 8 ) ); - RaftRecordCursor recordCursor = mock( RaftRecordCursor.class ); - // return 3 records that are just appended, then be done with it when( recordCursor.next() ) .thenReturn( true ) .thenReturn( true ) .thenReturn( true ) .thenReturn( true ) .thenReturn( true ) - .thenReturn( true ) - .thenReturn( true ) - .thenReturn( true ) .thenReturn( false ); when( recordCursor.get() ) - .thenReturn( new RaftLogAppendRecord( 5, payload5 ) ) - .thenReturn( new RaftLogAppendRecord( 6, payloadTruncated ) ) - .thenReturn( new RaftLogAppendRecord( 7, payloadTruncated ) ) - .thenReturn( new RaftLogTruncateRecord( 6 ) ) - .thenReturn( new RaftLogAppendRecord( 6, payload6 ) ) - .thenReturn( new RaftLogAppendRecord( 7, payload7 ) ) - .thenReturn( new RaftLogCommitRecord( 7 ) ) - .thenReturn( new RaftLogAppendRecord( 8, payload8 ) ) + .thenReturn( new RaftLogAppendRecord( 0, entryA ) ) + .thenReturn( new RaftLogAppendRecord( 1, entryB ) ) // truncated + .thenReturn( new RaftLogAppendRecord( 2, entryC ) ) // truncated + .thenReturn( new RaftLogContinuationRecord( 1 ) ) + .thenReturn( new RaftLogAppendRecord( 1, entryD ) ) .thenReturn( null ); - PhysicalRaftLogEntryCursor entryCursor = new PhysicalRaftLogEntryCursor( recordCursor ); - - // When - Then - assertTrue( entryCursor.next() ); - assertEquals( payload5, entryCursor.get().getLogEntry() ); + Stack skipStack = new Stack<>(); + // this represents the state after truncating entryB, we skip from index 1 until the next continuation + // the algorithm for generating the skip-stack lives in PhysicalRaftEntryStore + skipStack.push( 1L ); + PhysicalRaftLogEntryCursor entryCursor = new PhysicalRaftLogEntryCursor( recordCursor, skipStack, 0 ); + // when - then assertTrue( entryCursor.next() ); - assertEquals( payload6, entryCursor.get().getLogEntry() ); - + assertEquals( entryA, entryCursor.get().getLogEntry() ); assertTrue( entryCursor.next() ); - assertEquals( payload7, entryCursor.get().getLogEntry() ); - - assertTrue( entryCursor.next() ); - assertEquals( payload8, entryCursor.get().getLogEntry() ); - + assertEquals( entryD, entryCursor.get().getLogEntry() ); assertFalse( entryCursor.next() ); // record cursor is done, there should be no more entries/ - } - - @Test - public void shouldReturnUncommittedEntriesAtEndOfFileDespiteCommitEntryForNonEncounteredRecords() throws Exception - { - // Given - RaftLogEntry payload5 = new RaftLogEntry( 5, ReplicatedInteger.valueOf( 5 ) ); - RaftRecordCursor recordCursor = mock( RaftRecordCursor.class ); - when( recordCursor.next() ) - .thenReturn( true ) - .thenReturn( true ) - .thenReturn( false ); - when( recordCursor.get() ) - .thenReturn( new RaftLogAppendRecord( 5, payload5 ) ) - .thenReturn( new RaftLogCommitRecord( 2 ) ) - .thenReturn( null ); - - PhysicalRaftLogEntryCursor entryCursor = new PhysicalRaftLogEntryCursor( recordCursor ); - - // When - Then - assertTrue( entryCursor.next() ); - assertEquals( payload5, entryCursor.get().getLogEntry() ); + assertEquals( null, entryCursor.get() ); } } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogRotationTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogRotationTest.java index 80318e564cd00..13553fcbc1192 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogRotationTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogRotationTest.java @@ -58,7 +58,7 @@ private PhysicalRaftLog createRaftLog( long rotateAtSize, PhysicalLogFile.Monito File directory = new File( "raft-log" ); fileSystem.mkdir( directory ); - PhysicalRaftLog newRaftLog = new PhysicalRaftLog( fileSystem, directory, rotateAtSize, 100, 10, + PhysicalRaftLog newRaftLog = new PhysicalRaftLog( fileSystem, directory, rotateAtSize, 100, 10, 10, logFileMonitor, new DummyRaftableContentSerializer(), () -> mock( DatabaseHealth.class ), NullLogProvider.getInstance() ); life.add( newRaftLog ); @@ -92,33 +92,7 @@ public void shouldRotateOnAppendWhenRotateSizeIsReached() throws Exception } @Test - public void shouldRotateOnCommitWhenRotateSizeIsReached() throws Exception - { - // Given - AtomicLong currentVersion = new AtomicLong(); - PhysicalLogFile.Monitor logFileMonitor = - ( logFile, logVersion, lastTransactionId, clean ) -> currentVersion.set( logVersion ); - int rotateAtSize = 100; - PhysicalRaftLog log = createRaftLog( rotateAtSize, logFileMonitor ); - - StringBuilder builder = new StringBuilder(); - for ( int i = 0; i < rotateAtSize - 40; i++ ) - { - builder.append( "i" ); - } - - // When - ReplicatedString stringThatGetsTheSizeToAlmost100Bytes = new ReplicatedString( builder.toString() ); - log.append( new RaftLogEntry( 0, stringThatGetsTheSizeToAlmost100Bytes ) ); - assertEquals( 0, currentVersion.get() ); - log.commit( 1 ); - - // Then - assertEquals( 1, currentVersion.get() ); - } - - @Test - public void shouldRotateOnTruncateWhenRotateSizeIsReached() throws Exception + public void shouldRotateOnTruncate() throws Exception { // Given AtomicLong currentVersion = new AtomicLong(); @@ -172,38 +146,4 @@ public void shouldBeAbleToRecoverToLatestStateAfterRotation() throws Throwable assertTrue( log.entryExists( indexToCommit ) ); assertEquals( term, log.readEntryTerm( indexToCommit ) ); } - - - @Test - public void shouldBeAbleToRecoverToLatestStateAfterRotationWhenEarlierFilesArePruned() throws Throwable - { - int rotateAtSize = 100; - PhysicalRaftLog log = createRaftLog( rotateAtSize, new PhysicalLogFile.Monitor.Adapter() ); - - StringBuilder builder = new StringBuilder(); - for ( int i = 0; i < rotateAtSize - 40; i++ ) - { - builder.append( "i" ); - } - - ReplicatedString stringThatGetsTheSizeToAlmost100Bytes = new ReplicatedString( builder.toString() ); - int term = 0; - long indexToCommit = log.append( new RaftLogEntry( term, stringThatGetsTheSizeToAlmost100Bytes ) ); - log.commit( indexToCommit ); - indexToCommit = log.append( new RaftLogEntry( term, ReplicatedInteger.valueOf( 1 ) ) ); - log.commit( indexToCommit ); - - // When - life.remove( log ); - - fileSystem.deleteFile( new File( new File("raft-log"), "raft.log.0" ) ); // hackish, decorate it my great Tech Liege - - log = createRaftLog( rotateAtSize, new PhysicalLogFile.Monitor.Adapter() ); - - // Then - assertEquals( indexToCommit, log.commitIndex() ); - assertEquals( indexToCommit, log.appendIndex() ); - assertTrue( log.entryExists( indexToCommit ) ); - assertEquals( term, log.readEntryTerm( indexToCommit ) ); - } } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogVerificationIT.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogVerificationIT.java index 1e4008ac8bb01..471a859c6416b 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogVerificationIT.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogVerificationIT.java @@ -39,10 +39,11 @@ protected RaftLog createRaftLog() throws Throwable fsa.mkdir( directory ); long rotateAtSizeBytes = 128; - int entryCacheSize = 8; + int entryCacheSize = 4; + int metadataCacheSize = 8; int fileHeaderCacheSize = 2; - PhysicalRaftLog newRaftLog = new PhysicalRaftLog( fsa, directory, rotateAtSizeBytes, entryCacheSize, fileHeaderCacheSize, + PhysicalRaftLog newRaftLog = new PhysicalRaftLog( fsa, directory, rotateAtSizeBytes, entryCacheSize, metadataCacheSize, fileHeaderCacheSize, new PhysicalLogFile.Monitor.Adapter(), new DummyRaftableContentSerializer(), () -> mock( DatabaseHealth.class ), NullLogProvider.getInstance() );