diff --git a/community/kernel/src/main/java/org/neo4j/helpers/Reference.java b/community/kernel/src/main/java/org/neo4j/helpers/Reference.java
new file mode 100644
index 0000000000000..6c023f0086f03
--- /dev/null
+++ b/community/kernel/src/main/java/org/neo4j/helpers/Reference.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright (c) 2002-2016 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Neo4j is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+package org.neo4j.helpers;
+
+public class Reference
+{
+ private T t;
+
+ public Reference( T initial )
+ {
+ this.t = initial;
+ }
+
+ public void set( T t )
+ {
+ this.t = t;
+ }
+
+ public T get()
+ {
+ return t;
+ }
+
+ @Override
+ public String toString()
+ {
+ return t.toString();
+ }
+
+ @Override
+ public boolean equals( Object obj )
+ {
+ return t.equals( obj );
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return t.hashCode();
+ }
+}
diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/LogHeaderVisitor.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/LogHeaderVisitor.java
index 7f5a33cd58471..a3c0784357ef9 100644
--- a/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/LogHeaderVisitor.java
+++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/LogHeaderVisitor.java
@@ -21,5 +21,9 @@
public interface LogHeaderVisitor
{
+ /***
+ * Used for visiting log headers in reverse order of age, meaning latest first.
+ * Stops visiting when false is returned.
+ */
boolean visit( LogPosition position, long firstTransactionIdInLog, long lastTransactionIdInLog );
}
diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/PhysicalLogFile.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/PhysicalLogFile.java
index 7c67a7bae3ca3..2c73073b1d2d4 100644
--- a/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/PhysicalLogFile.java
+++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/PhysicalLogFile.java
@@ -29,7 +29,7 @@
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.fs.StoreChannel;
import org.neo4j.kernel.impl.transaction.log.entry.LogHeader;
-import org.neo4j.kernel.lifecycle.LifecycleAdapter;
+import org.neo4j.kernel.lifecycle.Lifecycle;
import static org.neo4j.kernel.impl.transaction.log.entry.LogHeader.LOG_HEADER_SIZE;
import static org.neo4j.kernel.impl.transaction.log.entry.LogHeaderReader.readLogHeader;
@@ -39,7 +39,7 @@
/**
* {@link LogFile} backed by one or more files in a {@link FileSystemAbstraction}.
*/
-public class PhysicalLogFile extends LifecycleAdapter implements LogFile
+public class PhysicalLogFile implements LogFile, Lifecycle
{
public interface Monitor
{
@@ -87,7 +87,7 @@ public PhysicalLogFile( FileSystemAbstraction fileSystem, PhysicalLogFiles logFi
}
@Override
- public void init() throws Throwable
+ public void init() throws IOException
{
// Make sure at least a bare bones log file is available before recovery
long lastLogVersionUsed = logVersionRepository.getCurrentLogVersion();
@@ -96,7 +96,7 @@ public void init() throws Throwable
}
@Override
- public void start() throws Throwable
+ public void start() throws IOException
{
// Recovery has taken place before this, so the log file has been truncated to last known good tx
// Just read header and move to the end
@@ -109,11 +109,17 @@ public void start() throws Throwable
writer = new PositionAwarePhysicalFlushableChannel( channel );
}
+ @Override
+ public void stop()
+ {
+ // nothing to stop
+ }
+
// In order to be able to write into a logfile after life.stop during shutdown sequence
// we will close channel and writer only during shutdown phase when all pending changes (like last
// checkpoint) are already in
@Override
- public void shutdown() throws Throwable
+ public void shutdown() throws IOException
{
if ( writer != null )
{
diff --git a/community/primitive-collections/src/main/java/org/neo4j/cursor/CursorValue.java b/community/primitive-collections/src/main/java/org/neo4j/cursor/CursorValue.java
new file mode 100644
index 0000000000000..c5f9fe8fe8565
--- /dev/null
+++ b/community/primitive-collections/src/main/java/org/neo4j/cursor/CursorValue.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2002-2016 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Neo4j is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+package org.neo4j.cursor;
+
+public class CursorValue
+{
+ private boolean valid = false;
+ private T t;
+
+ public void set( T newT )
+ {
+ t = newT;
+ valid = true;
+ }
+
+ public T get()
+ {
+ if( valid )
+ {
+ return t;
+ }
+ throw new IllegalStateException();
+ }
+
+ public void invalidate()
+ {
+ valid = false;
+ t = null;
+ }
+}
diff --git a/community/primitive-collections/src/main/java/org/neo4j/cursor/RawCursor.java b/community/primitive-collections/src/main/java/org/neo4j/cursor/RawCursor.java
index 10480b9850be2..330eb125ad73c 100644
--- a/community/primitive-collections/src/main/java/org/neo4j/cursor/RawCursor.java
+++ b/community/primitive-collections/src/main/java/org/neo4j/cursor/RawCursor.java
@@ -19,6 +19,7 @@
*/
package org.neo4j.cursor;
+import java.util.function.Consumer;
import java.util.function.Supplier;
/**
@@ -32,7 +33,7 @@
* {@link #next()} has been done, or if it returned false, then such accessor methods throw {@link
* IllegalStateException}.
*/
-public interface RawCursor extends Supplier, AutoCloseable
+public interface RawCursor extends Supplier, AutoCloseable
{
/**
* Move the cursor to the next row.
@@ -46,4 +47,19 @@ public interface RawCursor extends Supplier, A
*/
@Override
void close() throws EXCEPTION;
+
+ default void forAll( Consumer consumer ) throws EXCEPTION
+ {
+ try
+ {
+ while ( next() )
+ {
+ consumer.accept( get() );
+ }
+ }
+ finally
+ {
+ close();
+ }
+ }
}
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java
index 9cecab369fdf5..1f7646fc47f68 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java
@@ -31,6 +31,7 @@
import org.neo4j.coreedge.helper.VolatileFuture;
import org.neo4j.coreedge.network.Message;
import org.neo4j.coreedge.raft.log.RaftLog;
+import org.neo4j.coreedge.raft.log.RaftLogCompactedException;
import org.neo4j.coreedge.raft.log.RaftLogEntry;
import org.neo4j.coreedge.raft.membership.RaftGroup;
import org.neo4j.coreedge.raft.membership.RaftMembershipManager;
@@ -164,7 +165,7 @@ Timeouts.ELECTION, electionTimeout, randomTimeoutRange(), timeout -> {
*
* @param memberSet The other members.
*/
- public synchronized void bootstrapWithInitialMembers( RaftGroup memberSet ) throws BootstrapException
+ public synchronized void bootstrapWithInitialMembers( RaftGroup memberSet ) throws BootstrapException, RaftLogCompactedException
{
if ( entryLog.appendIndex() >= 0 )
{
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftMessageHandler.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftMessageHandler.java
index 5b5376225cf26..b4ba9cac94338 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftMessageHandler.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftMessageHandler.java
@@ -21,6 +21,7 @@
import java.io.IOException;
+import org.neo4j.coreedge.raft.log.RaftLogCompactedException;
import org.neo4j.coreedge.raft.outcome.Outcome;
import org.neo4j.coreedge.raft.state.ReadableRaftState;
import org.neo4j.logging.Log;
@@ -28,5 +29,5 @@
public interface RaftMessageHandler
{
Outcome handle( RaftMessages.RaftMessage message, ReadableRaftState context, Log log )
- throws IOException;
+ throws IOException, RaftLogCompactedException;
}
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/InMemoryRaftLog.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/InMemoryRaftLog.java
index 59ee152984846..b25871850f4d9 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/InMemoryRaftLog.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/InMemoryRaftLog.java
@@ -30,6 +30,9 @@ public class InMemoryRaftLog implements RaftLog
{
private final Map raftLog = new HashMap<>();
+ private long prevIndex = -1;
+ private long prevTerm = -1;
+
private long appendIndex = -1;
private long commitIndex = -1;
private long term = -1;
@@ -63,37 +66,68 @@ public void commit( long commitIndex )
this.commitIndex = commitIndex;
}
+ @Override
+ public long prune( long safeIndex ) throws RaftLogCompactedException
+ {
+ if( safeIndex > prevIndex )
+ {
+ long removeIndex = prevIndex + 1;
+
+ prevTerm = readEntryTerm( safeIndex );
+ prevIndex = safeIndex;
+
+ do
+ {
+ raftLog.remove( removeIndex );
+ removeIndex++;
+ } while( removeIndex <= safeIndex );
+ }
+
+ return prevIndex;
+ }
+
@Override
public long appendIndex()
{
return appendIndex;
}
+ @Override
+ public long prevIndex()
+ {
+ return prevIndex;
+ }
+
@Override
public long commitIndex()
{
return commitIndex;
}
- private RaftLogEntry readLogEntry( long logIndex )
+ private RaftLogEntry readLogEntry( long logIndex ) throws RaftLogCompactedException
{
- if ( logIndex < 0 )
+ if ( logIndex <= prevIndex )
{
- throw new IllegalArgumentException( "logIndex must not be negative" );
+ throw new RaftLogCompactedException( "Entry does not exist in log" );
}
- if ( logIndex > appendIndex )
+ else if ( logIndex > appendIndex )
{
- throw new IllegalArgumentException(
+ throw new RaftLogCompactedException(
String.format( "cannot read past last appended index (lastAppended=%d, readIndex=%d)",
appendIndex, logIndex ) );
}
+
return raftLog.get( logIndex );
}
@Override
- public long readEntryTerm( long logIndex )
+ public long readEntryTerm( long logIndex ) throws RaftLogCompactedException
{
- if ( logIndex < 0 || logIndex > appendIndex )
+ if( logIndex == prevIndex )
+ {
+ return prevTerm;
+ }
+ else if ( logIndex < prevIndex || logIndex > appendIndex )
{
return -1;
}
@@ -101,12 +135,17 @@ public long readEntryTerm( long logIndex )
}
@Override
- public synchronized void truncate( long fromIndex )
+ public synchronized void truncate( long fromIndex ) throws RaftLogCompactedException
{
if ( fromIndex <= commitIndex )
{
throw new IllegalArgumentException( "cannot truncate before the commit index" );
}
+ else if( fromIndex <= prevIndex )
+ {
+ prevIndex = -1;
+ prevTerm = -1;
+ }
for ( long i = appendIndex; i >= fromIndex; --i )
{
@@ -121,17 +160,34 @@ public synchronized void truncate( long fromIndex )
}
@Override
- public IOCursor getEntryCursor( long fromIndex ) throws IOException
+ public RaftLogCursor getEntryCursor( long fromIndex ) throws IOException
{
- return new IOCursor()
+ return new RaftLogCursor()
{
private long currentIndex = fromIndex - 1; // the cursor starts "before" the first entry
+ RaftLogEntry current = null;
@Override
public boolean next() throws IOException
{
currentIndex++;
- return currentIndex <= appendIndex;
+ boolean hasNext = currentIndex <= appendIndex;
+ if( hasNext )
+ {
+ try
+ {
+ current = readLogEntry( currentIndex );
+ }
+ catch ( RaftLogCompactedException e )
+ {
+ throw new IOException( e );
+ }
+ }
+ else
+ {
+ current = null;
+ }
+ return hasNext;
}
@Override
@@ -142,11 +198,25 @@ public void close() throws IOException
@Override
public RaftLogEntry get()
{
- return ( currentIndex <= appendIndex ) ? readLogEntry( currentIndex ) : null;
+ return current;
}
};
}
+ @Override
+ public long skip( long index, long term )
+ {
+ if( index > appendIndex )
+ {
+ raftLog.clear();
+
+ appendIndex = index;
+ prevIndex = index;
+ prevTerm = term;
+ }
+ return appendIndex;
+ }
+
@Override
public boolean equals( Object o )
{
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/MonitoredRaftLog.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/MonitoredRaftLog.java
index aa8e7868f4365..d1c1dc2191879 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/MonitoredRaftLog.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/MonitoredRaftLog.java
@@ -56,7 +56,7 @@ public long append( RaftLogEntry entry ) throws IOException
}
@Override
- public void truncate( long fromIndex ) throws IOException
+ public void truncate( long fromIndex ) throws IOException, RaftLogCompactedException
{
delegate.truncate( fromIndex );
appendIndexMonitor.appendIndex( delegate.appendIndex() );
@@ -69,12 +69,24 @@ public void commit( long commitIndex ) throws IOException
commitIndexMonitor.commitIndex( delegate.commitIndex() );
}
+ @Override
+ public long prune( long safeIndex ) throws IOException, RaftLogCompactedException
+ {
+ return delegate.prune( safeIndex );
+ }
+
@Override
public long appendIndex()
{
return delegate.appendIndex();
}
+ @Override
+ public long prevIndex()
+ {
+ return delegate.prevIndex();
+ }
+
@Override
public long commitIndex()
{
@@ -82,14 +94,20 @@ public long commitIndex()
}
@Override
- public long readEntryTerm( long logIndex ) throws IOException
+ public long readEntryTerm( long logIndex ) throws IOException, RaftLogCompactedException
{
return delegate.readEntryTerm( logIndex );
}
@Override
- public IOCursor getEntryCursor( long fromIndex ) throws IOException
+ public RaftLogCursor getEntryCursor( long fromIndex ) throws IOException, RaftLogCompactedException
{
return delegate.getEntryCursor( fromIndex );
}
+
+ @Override
+ public long skip( long index, long term ) throws IOException
+ {
+ return delegate.skip( index, term );
+ }
}
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/NaiveDurableRaftLog.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/NaiveDurableRaftLog.java
index 7dffcac7a9231..23b39e6bb1882 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/NaiveDurableRaftLog.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/NaiveDurableRaftLog.java
@@ -28,7 +28,7 @@
import org.neo4j.coreedge.raft.replication.MarshallingException;
import org.neo4j.coreedge.raft.replication.ReplicatedContent;
import org.neo4j.coreedge.server.ByteBufMarshal;
-import org.neo4j.cursor.IOCursor;
+import org.neo4j.cursor.CursorValue;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.fs.StoreChannel;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
@@ -56,34 +56,42 @@
* │record length variable│
* └─────────────────────────────┘
*
- * 3. commit.log
+ * 3. meta.log
* ┌─────────────────────────────┐
- * │committedIndex 8 bytes│
+ * │prevIndex 8 bytes│
+ * │prevTerm 8 bytes│
+ * │commitIndex 8 bytes│
* ├─────────────────────────────┤
- * │record length 8 bytes│
+ * │record length 24 bytes│
* └─────────────────────────────┘
*/
public class NaiveDurableRaftLog extends LifecycleAdapter implements RaftLog
{
public static final int ENTRY_RECORD_LENGTH = 16;
public static final int CONTENT_LENGTH_BYTES = 4;
- public static final int COMMIT_INDEX_BYTES = 8;
+ public static final int META_BYTES = 8 * 3;
public static final String DIRECTORY_NAME = "raft-log";
- private final StoreChannel entriesChannel;
- private final StoreChannel contentChannel;
- private final StoreChannel commitChannel;
+ private StoreChannel entriesChannel;
+ private StoreChannel contentChannel;
+ private final StoreChannel metaChannel;
+ private final FileSystemAbstraction fileSystem;
+ private final File directory;
private final ByteBufMarshal marshal;
private final Log log;
private long appendIndex = -1;
private long contentOffset;
private long commitIndex = -1;
private long term = -1;
+ private long prevIndex = -1;
+ private long prevTerm = -1;
public NaiveDurableRaftLog( FileSystemAbstraction fileSystem, File directory,
ByteBufMarshal marshal, LogProvider logProvider )
{
+ this.fileSystem = fileSystem;
+ this.directory = directory;
this.marshal = marshal;
directory.mkdirs();
@@ -92,10 +100,11 @@ public NaiveDurableRaftLog( FileSystemAbstraction fileSystem, File directory,
{
entriesChannel = fileSystem.open( new File( directory, "entries.log" ), "rw" );
contentChannel = fileSystem.open( new File( directory, "content.log" ), "rw" );
- commitChannel = fileSystem.open( new File( directory, "commit.log" ), "rw" );
- appendIndex = entriesChannel.size() / ENTRY_RECORD_LENGTH - 1;
+ metaChannel = fileSystem.open( new File( directory, "meta.log" ), "rw" );
+ readMetadata();
+
+ appendIndex = prevIndex + entriesChannel.size() / ENTRY_RECORD_LENGTH;
contentOffset = contentChannel.size();
- commitIndex = readCommitIndex();
log = logProvider.getLog( getClass() );
log.info( "Raft log created. AppendIndex: %d, commitIndex: %d", appendIndex, commitIndex );
@@ -114,7 +123,7 @@ public void shutdown() throws Throwable
boolean shouldThrow;
shouldThrow = forceAndCloseChannel( entriesChannel, container );
shouldThrow = forceAndCloseChannel( contentChannel, container ) || shouldThrow;
- shouldThrow = forceAndCloseChannel( commitChannel, container ) || shouldThrow;
+ shouldThrow = forceAndCloseChannel( metaChannel, container ) || shouldThrow;
if ( shouldThrow )
{
throw container;
@@ -160,10 +169,10 @@ public long append( RaftLogEntry logEntry ) throws IOException
try
{
- int length = writeContent( logEntry );
- writeEntry( new Entry( logEntry.term(), contentOffset ) );
- contentOffset += length;
appendIndex++;
+ int length = writeContent( logEntry, contentChannel );
+ writeEntry( appendIndex - (prevIndex + 1), new Entry( logEntry.term(), contentOffset ), entriesChannel );
+ contentOffset += length;
return appendIndex;
}
catch ( MarshallingException | IOException e )
@@ -173,7 +182,7 @@ public long append( RaftLogEntry logEntry ) throws IOException
}
@Override
- public void truncate( long fromIndex ) throws IOException
+ public void truncate( long fromIndex ) throws IOException, RaftLogCompactedException
{
if ( fromIndex <= commitIndex )
{
@@ -207,18 +216,74 @@ public void commit( final long newCommitIndex ) throws IOException
actualNewCommitIndex = appendIndex;
}
// INVARIANT: If newCommitIndex was greater than appendIndex, commitIndex is equal to appendIndex
- storeCommitIndex( actualNewCommitIndex );
commitIndex = actualNewCommitIndex;
+ storeMetadata();
+ }
+
+ @Override
+ public long prune( long safeIndex ) throws IOException, RaftLogCompactedException
+ {
+ try
+ {
+ if( safeIndex > prevIndex )
+ {
+ long safeIndexTerm = readEntryTerm( safeIndex );
+
+ StoreChannel tempEntriesChannel = fileSystem.open( new File( directory, "temp-entries.log" ), "rw" );
+ StoreChannel tempContentChannel = fileSystem.open( new File( directory, "temp-content.log" ), "rw" );
+
+ contentOffset = 0;
+ for ( long i = safeIndex + 1; i <= appendIndex; i++ )
+ {
+ RaftLogEntry logEntry = readLogEntry( i );
+ int length = writeContent( logEntry, tempContentChannel );
+ writeEntry( i - (safeIndex + 1), new Entry( logEntry.term(), contentOffset ), tempEntriesChannel );
+ contentOffset += length;
+ }
+ tempEntriesChannel.close();
+ tempContentChannel.close();
+ entriesChannel.close();
+ contentChannel.close();
+ fileSystem.deleteFile( new File( directory, "entries.log" ) );
+ fileSystem.deleteFile( new File( directory, "content.log" ) );
+ fileSystem.renameFile( new File( directory, "temp-entries.log" ), new File( directory, "entries.log" ) );
+ fileSystem.renameFile( new File( directory, "temp-content.log" ), new File( directory, "content.log" ) );
+ entriesChannel = fileSystem.open( new File( directory, "entries.log" ), "rw" );
+ contentChannel = fileSystem.open( new File( directory, "content.log" ), "rw" );
+
+ prevTerm = safeIndexTerm;
+ prevIndex = safeIndex;
+
+ storeMetadata();
+ }
-// while ( commitIndex < actualNewCommitIndex )
-// {
-// commitIndex++;
-// for ( Listener listener : listeners )
-// {
-// ReplicatedContent content = readEntryContent( commitIndex );
-// listener.onCommitted( content, commitIndex );
-// }
-// }
+ return prevIndex;
+ }
+ catch ( MarshallingException e )
+ {
+ throw new IOException( e );
+ }
+ }
+
+ @Override
+ public long skip( long index, long term ) throws IOException
+ {
+ if( index > appendIndex )
+ {
+ entriesChannel.close();
+ contentChannel.close();
+ fileSystem.deleteFile( new File( directory, "entries.log" ) );
+ fileSystem.deleteFile( new File( directory, "content.log" ) );
+ entriesChannel = fileSystem.open( new File( directory, "entries.log" ), "rw" );
+ contentChannel = fileSystem.open( new File( directory, "content.log" ), "rw" );
+
+ appendIndex = index;
+ prevIndex = index;
+ prevTerm = term;
+
+ storeMetadata();
+ }
+ return appendIndex;
}
@Override
@@ -227,16 +292,22 @@ public long appendIndex()
return appendIndex;
}
+ @Override
+ public long prevIndex()
+ {
+ return prevIndex;
+ }
+
@Override
public long commitIndex()
{
return commitIndex;
}
- private RaftLogEntry readLogEntry( long logIndex ) throws IOException
+ private RaftLogEntry readLogEntry( long logIndex ) throws IOException, RaftLogCompactedException
{
Entry entry = readEntry( logIndex );
- ReplicatedContent content = null;
+ ReplicatedContent content;
try
{
content = readContentFrom( entry.contentPointer );
@@ -251,14 +322,23 @@ private RaftLogEntry readLogEntry( long logIndex ) throws IOException
}
@Override
- public long readEntryTerm( long logIndex ) throws IOException
+ public long readEntryTerm( long logIndex ) throws IOException, RaftLogCompactedException
{
+ if( logIndex == prevIndex )
+ {
+ return prevTerm;
+ }
+ else if ( logIndex < prevIndex || logIndex > appendIndex )
+ {
+ return -1;
+ }
+
return readEntry( logIndex ).term;
}
-
private static class Entry
{
private final long term;
+
private final long contentPointer;
public Entry( long term, long contentPointer )
@@ -266,7 +346,6 @@ public Entry( long term, long contentPointer )
this.term = term;
this.contentPointer = contentPointer;
}
-
@Override
public String toString()
{
@@ -275,31 +354,33 @@ public String toString()
", contentPointer=" + contentPointer +
'}';
}
+
}
@Override
- public IOCursor getEntryCursor( long fromIndex ) throws IOException
+ public RaftLogCursor getEntryCursor( long fromIndex ) throws IOException
{
- return new IOCursor()
+ return new RaftLogCursor()
{
private long currentIndex = fromIndex - 1; // the cursor starts "before" the first entry
- private RaftLogEntry currentEntry;
+ private CursorValue current = new CursorValue<>();
@Override
public boolean next() throws IOException
{
currentIndex++;
- boolean hasNext = currentIndex <= appendIndex;
- if ( hasNext )
+ try
{
- currentEntry = readLogEntry( currentIndex );
+ current.set( readLogEntry( currentIndex ) );
+ return true;
}
- else
+ catch ( RaftLogCompactedException e )
{
- currentEntry = null;
+ current.invalidate();
}
- return hasNext;
+
+ return false;
}
@Override
@@ -310,38 +391,38 @@ public void close() throws IOException
@Override
public RaftLogEntry get()
{
- return currentEntry;
+ return current.get();
}
};
}
- private void writeEntry( Entry entry ) throws IOException
+ private void writeEntry( long index, Entry entry, StoreChannel entriesChannel ) throws IOException
{
ByteBuffer buffer = ByteBuffer.allocate( ENTRY_RECORD_LENGTH );
buffer.putLong( entry.term );
buffer.putLong( entry.contentPointer );
buffer.flip();
- entriesChannel.writeAll( buffer, (appendIndex + 1) * ENTRY_RECORD_LENGTH );
+ entriesChannel.writeAll( buffer, index * ENTRY_RECORD_LENGTH );
entriesChannel.force( false );
}
- private Entry readEntry( long logIndex ) throws IOException
+ private Entry readEntry( long logIndex ) throws RaftLogCompactedException, IOException
{
- if ( logIndex < 0 || logIndex > appendIndex )
+ if ( logIndex <= prevIndex || logIndex > appendIndex )
{
- return new Entry( -1, -1 );
+ throw new RaftLogCompactedException();
}
ByteBuffer buffer = ByteBuffer.allocate( ENTRY_RECORD_LENGTH );
- entriesChannel.read( buffer, logIndex * ENTRY_RECORD_LENGTH );
+ entriesChannel.read( buffer, ( logIndex - (prevIndex + 1) ) * ENTRY_RECORD_LENGTH );
buffer.flip();
long term = buffer.getLong();
long contentPointer = buffer.getLong();
return new Entry( term, contentPointer );
}
- private int writeContent( RaftLogEntry logEntry ) throws MarshallingException, IOException
+ private int writeContent( RaftLogEntry logEntry, StoreChannel contentChannel ) throws MarshallingException, IOException
{
ByteBuf buffer = Unpooled.buffer();
marshal.marshal( logEntry.content(), buffer );
@@ -372,24 +453,33 @@ private ReplicatedContent readContentFrom( long contentPointer ) throws IOExcept
return marshal.unmarshal( byteBuf );
}
- private void storeCommitIndex( long commitIndex ) throws IOException
+ private void storeMetadata() throws IOException
{
- ByteBuffer buffer = ByteBuffer.allocate( COMMIT_INDEX_BYTES );
+ ByteBuffer buffer = ByteBuffer.allocate( META_BYTES );
+ buffer.putLong( prevIndex );
+ buffer.putLong( prevTerm );
buffer.putLong( commitIndex );
buffer.flip();
- commitChannel.writeAll( buffer, 0 );
- commitChannel.force( false );
+ metaChannel.writeAll( buffer, 0 );
+ metaChannel.force( false );
}
- private long readCommitIndex() throws IOException
+ private void readMetadata() throws IOException
{
- if ( commitChannel.size() < COMMIT_INDEX_BYTES )
+ if ( metaChannel.size() < META_BYTES )
{
- return -1;
+ prevIndex = -1;
+ prevTerm = -1;
+ commitIndex = -1;
+ }
+ else
+ {
+ ByteBuffer buffer = ByteBuffer.allocate( META_BYTES );
+ metaChannel.read( buffer, 0 );
+ buffer.flip();
+ prevIndex = buffer.getLong();
+ prevTerm = buffer.getLong();
+ commitIndex = buffer.getLong();
}
- ByteBuffer buffer = ByteBuffer.allocate( COMMIT_INDEX_BYTES );
- commitChannel.read( buffer, 0 );
- buffer.flip();
- return buffer.getLong();
}
}
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/PhysicalRaftEntryStore.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/PhysicalRaftEntryStore.java
index e97d0f1d0dfc6..493a4af0d9b5b 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/PhysicalRaftEntryStore.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/PhysicalRaftEntryStore.java
@@ -44,28 +44,35 @@ class PhysicalRaftEntryStore implements RaftEntryStore
}
@Override
- public IOCursor getEntriesFrom( long fromIndex ) throws IOException
+ public IOCursor getEntriesFrom( long fromIndex ) throws IOException, RaftLogCompactedException
{
// generate skip stack and get starting position
Stack skipStack = new Stack<>();
SkipStackGenerator skipStackGenerator = new SkipStackGenerator( fromIndex, skipStack );
logFile.accept( skipStackGenerator );
- // the skip stack generator scans through the headers and gives us the logs starting position as a side-effect
+ // the skip stack generator scans through the headers and gives us the starting position as a side-effect
+ // this will point to the beginning of the file
LogPosition startPosition = skipStackGenerator.logStartPosition;
if ( startPosition == null )
{
- return IOCursor.getEmpty();
+ throw new RaftLogCompactedException();
}
RaftLogMetadataCache.RaftLogEntryMetadata logEntryInfo = metadataCache.getMetadata( fromIndex );
- if( logEntryInfo != null && logEntryInfo.getStartPosition().getLogVersion() == startPosition.getLogVersion() )
+ // the following check is to validate the data received from the cache, instead of invalidating the cache somewhere else
+ if( logEntryInfo != null &&
+ !logEntryInfo.getStartPosition().equals( LogPosition.UNSPECIFIED ) &&
+ logEntryInfo.getStartPosition().getLogVersion() == startPosition.getLogVersion() )
{
- // then metadata is valid for this log version, read from there
+ // then metadata is valid for this log version, so we can start from a more efficient position
startPosition = logEntryInfo.getStartPosition();
}
+ // we now either have a startPosition at the beginning of the file or a more efficient one somewhere further ahead
+ // the entry cursor handles skipping unwanted entries
+
return new PhysicalRaftLogEntryCursor( new RaftRecordCursor<>( logFile.getReader( startPosition ), marshal ),
skipStack, fromIndex );
}
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/PhysicalRaftLog.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/PhysicalRaftLog.java
index b5e5b0e4ffd71..48de442f369ae 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/PhysicalRaftLog.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/PhysicalRaftLog.java
@@ -27,18 +27,24 @@
import org.neo4j.coreedge.raft.replication.ReplicatedContent;
import org.neo4j.coreedge.raft.state.ChannelMarshal;
import org.neo4j.cursor.IOCursor;
+import org.neo4j.helpers.Reference;
import org.neo4j.helpers.collection.LruCache;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.impl.transaction.log.FlushablePositionAwareChannel;
+import org.neo4j.kernel.impl.transaction.log.LogFileInformation;
import org.neo4j.kernel.impl.transaction.log.LogHeaderCache;
import org.neo4j.kernel.impl.transaction.log.LogPosition;
import org.neo4j.kernel.impl.transaction.log.LogPositionMarker;
import org.neo4j.kernel.impl.transaction.log.LogVersionRepository;
import org.neo4j.kernel.impl.transaction.log.LoggingLogFileMonitor;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogFile;
+import org.neo4j.kernel.impl.transaction.log.PhysicalLogFileInformation;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogFiles;
import org.neo4j.kernel.impl.transaction.log.ReadableLogChannel;
import org.neo4j.kernel.impl.transaction.log.entry.LogHeader;
+import org.neo4j.kernel.impl.transaction.log.pruning.LogPruneStrategy;
+import org.neo4j.kernel.impl.transaction.log.pruning.LogPruning;
+import org.neo4j.kernel.impl.transaction.log.pruning.LogPruningImpl;
import org.neo4j.kernel.impl.transaction.log.rotation.LogRotation;
import org.neo4j.kernel.impl.transaction.log.rotation.LogRotationImpl;
import org.neo4j.kernel.impl.transaction.tracing.LogAppendEvent;
@@ -49,7 +55,30 @@
import static java.lang.String.format;
import static org.neo4j.coreedge.raft.log.PhysicalRaftLog.RecordType.COMMIT;
+import static org.neo4j.kernel.impl.transaction.log.pruning.LogPruneStrategyFactory.fromConfigValue;
+// TODO: Handle recovery better; e.g add missing continuation records, faster scan for initial values, ...
+// TODO: Better caching; e.g divide per log version, allow searching for non-exact start point, cache when closing cursor ...
+
+/**
+ * The physical RAFT log is an append only log supporting the operations required to support
+ * the RAFT consensus algorithm. The physical part relates to the fact that the implementation
+ * borrows from infrastructure around the already existing {@link PhysicalLogFile} and related.
+ *
+ * A RAFT log must be able to append new entries, but also truncate not yet committed entries,
+ * prune out old compacted entries and skip to a later starting point.
+ *
+ * The RAFT log consists of a sequence of individual log files with the following format:
+ * [HEADER] [CONTINUATION RECORD] [APPEND/COMMIT RECORDS]*
+ *
+ * Later log files are said to have a higher log version. This terminology is slightly confusing
+ * but inherited, so we stick with it. The version is not about the format itself, which is specifically
+ * referred to as a log format version.
+ *
+ * New log files are created when truncating, skipping or when appending and the file size has
+ * reached over the configured rotation threshold. This is called rotation. Each log file begins
+ * with a header and a continuation record and then follows a sequence of append and commit records.
+ */
public class PhysicalRaftLog implements RaftLog, Lifecycle
{
public static final String BASE_FILE_NAME = "raft.log";
@@ -62,6 +91,8 @@ public class PhysicalRaftLog implements RaftLog, Lifecycle
private LogRotation logRotation;
private FlushablePositionAwareChannel writer;
private final RaftLogMetadataCache metadataCache;
+ private long prevIndex = -1;
+ private long prevTerm = -1;
private final AtomicLong appendIndex = new AtomicLong( -1 );
private long commitIndex = -1;
private final LogPositionMarker positionMarker = new LogPositionMarker();
@@ -70,6 +101,7 @@ public class PhysicalRaftLog implements RaftLog, Lifecycle
private final RaftEntryStore entryStore;
private final LruCache entryCache;
private final PhysicalLogFiles logFiles;
+ private final LogPruning logPruning;
public PhysicalRaftLog( FileSystemAbstraction fileSystem, File directory, long rotateAtSize,
int entryCacheSize, int metaDataCacheSize, int headerCacheSize, PhysicalLogFile.Monitor monitor,
@@ -86,8 +118,14 @@ public PhysicalRaftLog( FileSystemAbstraction fileSystem, File directory, long r
logFiles = new PhysicalLogFiles( directory, BASE_FILE_NAME, fileSystem );
LogVersionRepository logVersionRepository = new FilenameBasedLogVersionRepository( logFiles );
+ LogHeaderCache logHeaderCache = new LogHeaderCache( headerCacheSize );
logFile = new PhysicalLogFile( fileSystem, logFiles, rotateAtSize,
- appendIndex::get, logVersionRepository, monitor, new LogHeaderCache( headerCacheSize ) );
+ appendIndex::get, logVersionRepository, monitor, logHeaderCache );
+
+ String pruningConf = "0 files";
+ LogFileInformation logFileInformation = new PhysicalLogFileInformation( logFiles, logHeaderCache, this::appendIndex, version -> 0L );
+ LogPruneStrategy logPruneStrategy = fromConfigValue( fileSystem, logFileInformation, logFiles, pruningConf );
+ this.logPruning = new LogPruningImpl( logPruneStrategy, logProvider );
this.metadataCache = new RaftLogMetadataCache( metaDataCacheSize );
this.entryStore = new PhysicalRaftEntryStore( logFile, metadataCache, marshal );
@@ -102,7 +140,7 @@ public long append( RaftLogEntry entry ) throws IOException
}
else
{
- throw new IllegalStateException( format( "Non-monotonic term %d for in entry %s in term %d",
+ throw new IllegalStateException( format( "Non-monotonic term %d for entry %s in term %d",
entry.term(), entry.toString(), term ) );
}
@@ -116,18 +154,19 @@ public long append( RaftLogEntry entry ) throws IOException
if( logRotation.rotateLogIfNeeded( LogAppendEvent.NULL ) )
{
- RaftLogContinuationRecord.write( writer, newAppendIndex );
+ RaftLogContinuationRecord.write( writer, newAppendIndex, term );
+ writer.prepareForFlush().flush();
}
return newAppendIndex;
}
@Override
- public void truncate( long fromIndex ) throws IOException
+ public void truncate( long fromIndex ) throws IOException, RaftLogCompactedException
{
if ( fromIndex <= commitIndex )
{
- throw new IllegalArgumentException( format( "cannot truncate (%d) before the commit index (%d)",
+ throw new IllegalArgumentException( format( "cannot truncate (%d) at or before the commit index (%d)",
fromIndex, commitIndex ) );
}
@@ -136,15 +175,17 @@ public void truncate( long fromIndex ) throws IOException
return;
}
+ long newAppendIndex = fromIndex - 1;
+ long newTerm = readEntryTerm( newAppendIndex );
+
entryCache.clear();
- long newAppendIndex = fromIndex - 1;
appendIndex.set( newAppendIndex );
logRotation.rotateLogFile();
- RaftLogContinuationRecord.write( writer, fromIndex );
+ term = newTerm;
+ RaftLogContinuationRecord.write( writer, newAppendIndex - 1, term );
writer.prepareForFlush().flush();
- term = readEntryTerm( newAppendIndex );
}
@Override
@@ -166,6 +207,12 @@ public long appendIndex()
return appendIndex.get();
}
+ @Override
+ public long prevIndex()
+ {
+ return prevIndex;
+ }
+
@Override
public long commitIndex()
{
@@ -173,10 +220,10 @@ public long commitIndex()
}
@Override
- public IOCursor getEntryCursor( long fromIndex ) throws IOException
+ public RaftLogCursor getEntryCursor( long fromIndex ) throws IOException, RaftLogCompactedException
{
final IOCursor inner = entryStore.getEntriesFrom( fromIndex );
- return new IOCursor()
+ return new RaftLogCursor()
{
private RaftLogEntry current = null;
@@ -186,7 +233,7 @@ public boolean next() throws IOException
boolean hasNext = inner.next();
if ( hasNext )
{
- current = inner.get().getLogEntry();
+ current = inner.get().logEntry();
}
else
{
@@ -209,7 +256,26 @@ public RaftLogEntry get()
};
}
- private RaftLogEntry readLogEntry( long logIndex ) throws IOException
+ @Override
+ public long skip( long index, long term ) throws IOException
+ {
+ if( appendIndex.get() < index )
+ {
+ logRotation.rotateLogFile();
+ RaftLogContinuationRecord.write( writer, index, term );
+ writer.prepareForFlush().flush();
+
+ prevIndex = index;
+ prevTerm = term;
+ appendIndex.set( index );
+
+ metadataCache.cacheMetadata( index, term, LogPosition.UNSPECIFIED );
+ }
+
+ return appendIndex.get();
+ }
+
+ private RaftLogEntry readLogEntry( long logIndex ) throws IOException, RaftLogCompactedException
{
RaftLogEntry entry = entryCache.get( logIndex );
if( entry != null )
@@ -222,14 +288,14 @@ private RaftLogEntry readLogEntry( long logIndex ) throws IOException
while ( entriesFrom.next() )
{
RaftLogAppendRecord raftLogAppendRecord = entriesFrom.get();
- if ( raftLogAppendRecord.getLogIndex() == logIndex )
+ if ( raftLogAppendRecord.logIndex() == logIndex )
{
- return raftLogAppendRecord.getLogEntry();
+ return raftLogAppendRecord.logEntry();
}
- else if ( raftLogAppendRecord.getLogIndex() > logIndex )
+ else if ( raftLogAppendRecord.logIndex() > logIndex )
{
throw new IllegalStateException( format( "Asked for index %d but got up to %d without " +
- "finding it.", logIndex, raftLogAppendRecord.getLogIndex() ) );
+ "finding it.", logIndex, raftLogAppendRecord.logIndex() ) );
}
}
}
@@ -237,11 +303,15 @@ else if ( raftLogAppendRecord.getLogIndex() > logIndex )
}
@Override
- public long readEntryTerm( long logIndex ) throws IOException
+ public long readEntryTerm( long logIndex ) throws IOException, RaftLogCompactedException
{
// Index -1 is not an existing log index, but represents the beginning of the log.
// It is a valid value to request the term for, and the term is -1.
- if( logIndex == -1 || ( logIndex > appendIndex.get() ) )
+ if( logIndex == prevIndex )
+ {
+ return prevTerm;
+ }
+ else if ( logIndex < prevIndex || logIndex > appendIndex.get() )
{
return -1;
}
@@ -264,31 +334,108 @@ public long readEntryTerm( long logIndex ) throws IOException
}
@Override
- public void init() throws Throwable
+ public void init() throws IOException
{
logFile.init();
}
@Override
- public void start() throws Throwable
+ public void start() throws IOException, RaftLogCompactedException
{
- this.logRotation = new LogRotationImpl( new LoggingLogFileMonitor( log ), logFile, databaseHealthSupplier.get() );
+ this.logRotation = new LogRotationImpl( new LoggingLogFileMonitor( log ), logFile, databaseHealthSupplier.get() );
logFile.start();
- restoreCommitIndex();
+
+ recoverContinuationRecord();
+
+ restorePrevIndexAndTerm();
restoreAppendIndex();
+ restoreCommitIndex();
+
this.writer = logFile.getWriter();
}
- private void restoreAppendIndex() throws IOException
+ /** This is just a very basic "recovery" making it so that a continuation record exists in the very first file right after creation. */
+ private void recoverContinuationRecord() throws IOException
+ {
+ LogPositionMarker logPosition = new LogPositionMarker();
+
+ FlushablePositionAwareChannel writer = logFile.getWriter();
+ writer.getCurrentPosition( logPosition );
+
+ if( logPosition.getLogVersion() == 0 && logPosition.getByteOffset() == LogHeader.LOG_HEADER_SIZE )
+ {
+ RaftLogContinuationRecord.write( writer, -1, -1 );
+ writer.prepareForFlush().flush();
+ }
+ }
+
+ @Override
+ public long prune( long safeIndex ) throws IOException
+ {
+ final long logVersionToPrune = findLogVersionToPrune( safeIndex );
+
+ if ( logVersionToPrune != -1 )
+ {
+ logPruning.pruneLogs( logVersionToPrune );
+ restorePrevIndexAndTerm();
+ }
+
+ return prevIndex;
+ }
+
+ public long findLogVersionToPrune( long safeIndex ) throws IOException
+ {
+ final LogPosition[] firstFileToPrune = {LogPosition.UNSPECIFIED};
+
+ logFile.accept( ( position, ignored, lastIndex ) -> {
+ if ( lastIndex < safeIndex )
+ {
+ firstFileToPrune[0] = position;
+ return false;
+ }
+ return true; // keep going
+ } );
+
+ return firstFileToPrune[0].equals( LogPosition.UNSPECIFIED ) ? -1 : firstFileToPrune[0].getLogVersion();
+ }
+
+ private void restorePrevIndexAndTerm() throws IOException
+ {
+ final Reference firstFile = new Reference<>( null );
+
+ logFile.accept( ( position, ignored1, ignored2 ) -> {
+ firstFile.set( position );
+ return true;
+ } );
+
+ if( firstFile.get() == null )
+ {
+ throw new IOException( "No first log file found" );
+ }
+
+ ReadableLogChannel reader = logFile.getReader( firstFile.get() );
+
+ RaftRecordCursor recordCursor = new RaftRecordCursor<>( reader, marshal );
+ recordCursor.next();
+ RaftLogContinuationRecord cont = (RaftLogContinuationRecord) recordCursor.get();
+
+ prevIndex = cont.prevLogIndex();
+ prevTerm = cont.prevLogTerm();
+
+ log.info( "Restored prev index at %d", prevIndex );
+ log.info( "Restored prev term at %d", prevTerm );
+ }
+
+ private void restoreAppendIndex() throws IOException, RaftLogCompactedException
{
- long restoredAppendIndex = -1;
- try( IOCursor cursor = entryStore.getEntriesFrom( 0 ) )
+ long restoredAppendIndex = prevIndex;
+ try( IOCursor cursor = entryStore.getEntriesFrom( prevIndex + 1 ) )
{
while( cursor.next() )
{
- restoredAppendIndex = cursor.get().getLogIndex();
+ restoredAppendIndex = cursor.get().logIndex();
}
}
@@ -307,7 +454,7 @@ private void restoreCommitIndex() throws IOException
RaftLogRecord record = recordCursor.get();
if( record.getType() == COMMIT )
{
- commitIndex = record.getLogIndex();
+ commitIndex = ((RaftLogCommitRecord)record).commitIndex();
}
}
}
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogEntryCursor.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogEntryCursor.java
index f22e1696293fd..29a6826e2ace1 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogEntryCursor.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogEntryCursor.java
@@ -22,20 +22,21 @@
import java.io.IOException;
import java.util.Stack;
+import org.neo4j.cursor.CursorValue;
import org.neo4j.cursor.IOCursor;
public class PhysicalRaftLogEntryCursor implements IOCursor
{
private long NO_SKIP = -1;
- private final RaftRecordCursor> recordCursor;
+ private final IOCursor recordCursor;
private final Stack skipStack;
- private RaftLogAppendRecord currentEntry;
+ private CursorValue cursorValue = new CursorValue<>();
private long nextIndex;
private long skipPoint = NO_SKIP;
private boolean skipMode = false;
- public PhysicalRaftLogEntryCursor( RaftRecordCursor> recordCursor, Stack skipStack, long fromIndex )
+ public PhysicalRaftLogEntryCursor( IOCursor recordCursor, Stack skipStack, long fromIndex )
{
this.recordCursor = recordCursor;
this.skipStack = skipStack;
@@ -62,9 +63,9 @@ record = recordCursor.get();
{
// skip records
}
- else if( record.getLogIndex() == nextIndex )
+ else if( ((RaftLogAppendRecord)record).logIndex() == nextIndex )
{
- currentEntry = (RaftLogAppendRecord) record;
+ cursorValue.set( (RaftLogAppendRecord) record );
nextIndex++;
if( nextIndex == skipPoint )
@@ -88,7 +89,7 @@ else if( record.getLogIndex() == nextIndex )
}
}
}
- currentEntry = null;
+ cursorValue.invalidate();
return false;
}
@@ -101,6 +102,6 @@ public void close() throws IOException
@Override
public RaftLogAppendRecord get()
{
- return currentEntry;
+ return cursorValue.get();
}
}
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftEntryStore.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftEntryStore.java
index ed73140247a62..d36619b9bc385 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftEntryStore.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftEntryStore.java
@@ -25,5 +25,5 @@
public interface RaftEntryStore
{
- IOCursor getEntriesFrom( long logIndex ) throws IOException;
+ IOCursor getEntriesFrom( long logIndex ) throws IOException, RaftLogCompactedException;
}
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLog.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLog.java
index 2237a6ebd17b1..74acd3bdb6938 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLog.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLog.java
@@ -38,8 +38,7 @@ public interface RaftLog extends ReadableRaftLog
* through truncation).
*
* @param entry The log entry.
- * @return Returns the index at which the entry was appended, or -1
- * if the entry was not accepted.
+ * @return the index at which the entry was appended.
*/
long append( RaftLogEntry entry ) throws IOException;
@@ -49,7 +48,7 @@ public interface RaftLog extends ReadableRaftLog
*
* @param fromIndex The start index (inclusive).
*/
- void truncate( long fromIndex ) throws IOException;
+ void truncate( long fromIndex ) throws IOException, RaftLogCompactedException;
/**
* Signals the safe replication of any entries previously appended up to and
@@ -62,4 +61,34 @@ public interface RaftLog extends ReadableRaftLog
*/
void commit( long commitIndex ) throws IOException;
+ /**
+ * Attempt to prune (delete) a prefix of the log, no further than the safeIndex.
+ *
+ * Implementations can choose to prune a shorter prefix if this is convenient for
+ * their storage mechanism. The return value tells the caller how much was actually pruned.
+ *
+ * @param safeIndex Highest index that may be pruned.
+ *
+ * @return The new prevIndex for the log, which will be at most safeIndex.
+ */
+ long prune( long safeIndex ) throws IOException, RaftLogCompactedException;
+
+ /**
+ * Skip up to the supplied index if it is not already present.
+ *
+ * If the entry was not present then it gets defined with the
+ * supplied term, but without content, and thus can be used
+ * only for log matching from a later index.
+ *
+ * This is useful when a snapshot starting from a later index
+ * has been downloaded and thus earlier entries are irrelevant
+ * and possibly non-existent in the cluster.
+ *
+ * @param index the index we want to skip to
+ * @param term the term of the index
+ *
+ * @return The appendIndex after this call, which
+ * will be at least index.
+ */
+ long skip( long index, long term ) throws IOException;
}
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogAppendRecord.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogAppendRecord.java
index 356bcd775d767..33f162628bfa5 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogAppendRecord.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogAppendRecord.java
@@ -31,19 +31,26 @@
public class RaftLogAppendRecord extends RaftLogRecord
{
+ private final long appendIndex;
private final RaftLogEntry logEntry;
- RaftLogAppendRecord( long logIndex, RaftLogEntry logEntry )
+ RaftLogAppendRecord( long appendIndex, RaftLogEntry logEntry )
{
- super( APPEND, logIndex );
+ super( APPEND );
+ this.appendIndex = appendIndex;
this.logEntry = logEntry;
}
- public RaftLogEntry getLogEntry()
+ public RaftLogEntry logEntry()
{
return logEntry;
}
+ public long logIndex()
+ {
+ return appendIndex;
+ }
+
public static RaftLogAppendRecord read( ReadableChannel channel, ChannelMarshal marshal ) throws IOException
{
long appendIndex = channel.getLong();
@@ -67,6 +74,9 @@ public static void write( WritableChannel channel, ChannelMarshal.
+ */
+package org.neo4j.coreedge.raft.log;
+
+/**
+ * Thrown when the RAFT log cannot be read at the supplied index.
+ */
+public class RaftLogCompactedException extends Exception
+{
+ public RaftLogCompactedException()
+ {
+ }
+
+ public RaftLogCompactedException( String message )
+ {
+ super( message );
+ }
+}
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogContinuationRecord.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogContinuationRecord.java
index 8b38a9c2a1573..b49b3e23970f7 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogContinuationRecord.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogContinuationRecord.java
@@ -26,28 +26,59 @@
import static org.neo4j.coreedge.raft.log.PhysicalRaftLog.RecordType.CONTINUATION;
+/**
+ * Continuation records are written at the beginning of a new log file
+ * and define the start position (prevLogIndex+1) for subsequently appended
+ * entries. At the same time prevLogIndex is thus the index of the last
+ * valid entry in the previous file, and prevLogTerm its term. These values
+ * are used for log matching.
+ *
+ * New log files are created when truncating, skipping or when the threshold
+ * size for a single log file has been exceeded.
+ */
public class RaftLogContinuationRecord extends RaftLogRecord
{
- RaftLogContinuationRecord( long fromLogIndex )
+ private final long prevLogIndex;
+ private final long prevLogTerm;
+
+ RaftLogContinuationRecord( long prevLogIndex, long prevLogTerm )
+ {
+ super( CONTINUATION );
+ this.prevLogIndex = prevLogIndex;
+ this.prevLogTerm = prevLogTerm;
+ }
+
+ public long prevLogIndex()
{
- super( CONTINUATION, fromLogIndex );
+ return prevLogIndex;
+ }
+
+ public long prevLogTerm()
+ {
+ return prevLogTerm;
}
public static RaftLogContinuationRecord read( ReadableChannel channel ) throws IOException
{
- long fromIndex = channel.getLong();
- return new RaftLogContinuationRecord( fromIndex );
+ long prevLogIndex = channel.getLong();
+ long prevLogTerm = channel.getLong();
+
+ return new RaftLogContinuationRecord( prevLogIndex, prevLogTerm );
}
- public static void write( WritableChannel channel, long fromIndex ) throws IOException
+ public static void write( WritableChannel channel, long prevLogIndex, long prevLogTerm ) throws IOException
{
channel.put( CONTINUATION.value() );
- channel.putLong( fromIndex );
+ channel.putLong( prevLogIndex );
+ channel.putLong( prevLogTerm );
}
@Override
public String toString()
{
- return String.format( "RaftLogContinuationRecord{%s}", super.toString() );
+ return "RaftLogContinuationRecord{" +
+ "prevLogIndex=" + prevLogIndex +
+ ", prevLogTerm=" + prevLogTerm +
+ '}';
}
}
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogCursor.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogCursor.java
new file mode 100644
index 0000000000000..32f3fbd93f278
--- /dev/null
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogCursor.java
@@ -0,0 +1,37 @@
+package org.neo4j.coreedge.raft.log;
+
+import java.io.IOException;
+
+import org.neo4j.cursor.RawCursor;
+
+public interface RaftLogCursor extends RawCursor
+{
+ @Override
+ boolean next() throws IOException, RaftLogCompactedException;
+
+ @Override
+ void close() throws IOException;
+
+ static RaftLogCursor empty()
+ {
+ return new RaftLogCursor()
+ {
+ @Override
+ public boolean next() throws IOException
+ {
+ return false;
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ }
+
+ @Override
+ public RaftLogEntry get()
+ {
+ throw new IllegalStateException();
+ }
+ };
+ }
+}
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogMetadataCache.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogMetadataCache.java
index 3b3971942c5d8..67beee3e26230 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogMetadataCache.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogMetadataCache.java
@@ -49,11 +49,6 @@ public RaftLogEntryMetadata getMetadata( long logIndex )
public RaftLogEntryMetadata cacheMetadata( long logIndex, long entryTerm, LogPosition position )
{
- if ( position.getByteOffset() == -1 )
- {
- throw new RuntimeException( "StartEntry.position is " + position );
- }
-
RaftLogEntryMetadata result = new RaftLogEntryMetadata( entryTerm, position );
raftLogEntryCache.put( logIndex, result );
return result;
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogRecord.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogRecord.java
index fdd36c0dce131..26892fa36962a 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogRecord.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogRecord.java
@@ -21,28 +21,15 @@
public abstract class RaftLogRecord
{
- private final long logIndex;
- private final PhysicalRaftLog.RecordType type;
+ protected final PhysicalRaftLog.RecordType type;
- RaftLogRecord( PhysicalRaftLog.RecordType type, long logIndex )
+ RaftLogRecord( PhysicalRaftLog.RecordType type )
{
this.type = type;
- this.logIndex = logIndex;
- }
-
- public long getLogIndex()
- {
- return logIndex;
}
public PhysicalRaftLog.RecordType getType()
{
return type;
}
-
- @Override
- public String toString()
- {
- return String.format("type=%s, logIndex=%d", type, logIndex );
- }
}
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftRecordCursor.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftRecordCursor.java
index bbd118c579e63..47febe392a160 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftRecordCursor.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftRecordCursor.java
@@ -23,6 +23,7 @@
import org.neo4j.coreedge.raft.replication.ReplicatedContent;
import org.neo4j.coreedge.raft.state.ChannelMarshal;
+import org.neo4j.cursor.CursorValue;
import org.neo4j.cursor.IOCursor;
import org.neo4j.kernel.impl.transaction.log.ReadableClosablePositionAwareChannel;
import org.neo4j.storageengine.api.ReadPastEndException;
@@ -31,7 +32,7 @@ public class RaftRecordCursor im
{
private final T channel;
private final ChannelMarshal marshal;
- private RaftLogRecord lastFoundRecord;
+ private CursorValue currentRecord = new CursorValue<>();
public RaftRecordCursor( T channel, ChannelMarshal marshal )
{
@@ -48,13 +49,13 @@ public boolean next() throws IOException
switch ( PhysicalRaftLog.RecordType.forValue( type ) )
{
case APPEND:
- lastFoundRecord = RaftLogAppendRecord.read( channel, marshal );
+ currentRecord.set( RaftLogAppendRecord.read( channel, marshal ) );
return true;
case COMMIT:
- lastFoundRecord = RaftLogCommitRecord.read( channel );
+ currentRecord.set( RaftLogCommitRecord.read( channel ) );
return true;
case CONTINUATION:
- lastFoundRecord = RaftLogContinuationRecord.read( channel );
+ currentRecord.set( RaftLogContinuationRecord.read( channel ) );
return true;
default:
throw new IllegalStateException( "Not really sure how we got here. Read type value was " + type );
@@ -75,6 +76,6 @@ public void close() throws IOException
@Override
public RaftLogRecord get()
{
- return lastFoundRecord;
+ return currentRecord.get();
}
}
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/ReadableRaftLog.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/ReadableRaftLog.java
index 0ad61da062190..59777ae83c65c 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/ReadableRaftLog.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/ReadableRaftLog.java
@@ -21,8 +21,6 @@
import java.io.IOException;
-import org.neo4j.cursor.IOCursor;
-
public interface ReadableRaftLog
{
/**
@@ -30,6 +28,11 @@ public interface ReadableRaftLog
*/
long appendIndex();
+ /**
+ * @return The index immediately preceding entries in the log.
+ */
+ long prevIndex();
+
/**
* @return The index of the last committed entry.
*/
@@ -41,11 +44,11 @@ public interface ReadableRaftLog
* @param logIndex The index of the log entry.
* @return The term of the entry, or -1 if the entry does not exist
*/
- long readEntryTerm( long logIndex ) throws IOException;
+ long readEntryTerm( long logIndex ) throws IOException, RaftLogCompactedException;
/**
- * Returns an {@link IOCursor} of {@link RaftLogEntry}s from the specified index until the end of the log
+ * Returns a {@link RaftLogCursor} of {@link RaftLogEntry}s from the specified index until the end of the log
* @param fromIndex The log index at which the cursor should be positioned
*/
- IOCursor getEntryCursor( long fromIndex ) throws IOException;
+ RaftLogCursor getEntryCursor( long fromIndex ) throws IOException, RaftLogCompactedException;
}
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/debug/DumpNaiveDurableRaftLog.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/debug/DumpNaiveDurableRaftLog.java
index 57e679e401431..a0d905aba271f 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/debug/DumpNaiveDurableRaftLog.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/debug/DumpNaiveDurableRaftLog.java
@@ -23,13 +23,14 @@
import java.io.IOException;
import org.neo4j.coreedge.raft.log.NaiveDurableRaftLog;
+import org.neo4j.coreedge.raft.log.RaftLogCompactedException;
import org.neo4j.coreedge.raft.net.CoreReplicatedContentMarshal;
import org.neo4j.io.fs.DefaultFileSystemAbstraction;
import org.neo4j.logging.NullLogProvider;
public class DumpNaiveDurableRaftLog
{
- public static void main( String[] args ) throws IOException
+ public static void main( String[] args ) throws IOException, RaftLogCompactedException
{
for ( String arg : args )
{
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/debug/LogPrinter.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/debug/LogPrinter.java
index 76958061e9645..8d338439b5be5 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/debug/LogPrinter.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/debug/LogPrinter.java
@@ -22,9 +22,10 @@
import java.io.IOException;
import java.io.PrintStream;
+import org.neo4j.coreedge.raft.log.RaftLogCompactedException;
+import org.neo4j.coreedge.raft.log.RaftLogCursor;
import org.neo4j.coreedge.raft.log.RaftLogEntry;
import org.neo4j.coreedge.raft.log.ReadableRaftLog;
-import org.neo4j.cursor.IOCursor;
public class LogPrinter
{
@@ -35,11 +36,11 @@ public LogPrinter( ReadableRaftLog raftLog )
this.raftLog = raftLog;
}
- public void print( PrintStream out ) throws IOException
+ public void print( PrintStream out ) throws IOException, RaftLogCompactedException
{
out.println( String.format( "%1$8s %2$5s %3$2s %4$s", "Index", "Term", "C?", "Content"));
long index = 0L;
- try ( IOCursor cursor = raftLog.getEntryCursor( 0 ) )
+ try ( RaftLogCursor cursor = raftLog.getEntryCursor( 0 ) )
{
while ( cursor.next() )
{
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/membership/RaftMembershipManager.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/membership/RaftMembershipManager.java
index 8d74f6028a2a2..fc6db784616c3 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/membership/RaftMembershipManager.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/membership/RaftMembershipManager.java
@@ -26,6 +26,8 @@
import java.util.Set;
import org.neo4j.coreedge.raft.log.RaftLog;
+import org.neo4j.coreedge.raft.log.RaftLogCompactedException;
+import org.neo4j.coreedge.raft.log.RaftLogCursor;
import org.neo4j.coreedge.raft.log.RaftLogEntry;
import org.neo4j.coreedge.raft.log.ReadableRaftLog;
import org.neo4j.coreedge.raft.outcome.AppendLogEntry;
@@ -89,7 +91,7 @@ public RaftMembershipManager( Replicator replicator, RaftGroup.Builder m
logProvider, catchupTimeout, raftMembershipState );
}
- public void processLog( Collection logCommands ) throws IOException
+ public void processLog( Collection logCommands ) throws IOException, RaftLogCompactedException
{
for ( LogCommand logCommand : logCommands )
{
@@ -113,7 +115,7 @@ public void processLog( Collection logCommands ) throws IOException
if ( logCommand instanceof CommitCommand )
{
long index = lastApplied + 1;
- try ( IOCursor entryCursor = entryLog.getEntryCursor( index ) )
+ try ( RaftLogCursor entryCursor = entryLog.getEntryCursor( index ) )
{
while ( entryCursor.next() )
{
@@ -177,7 +179,7 @@ private void onCommitted( ReplicatedContent content, long logIndex ) throws IOEx
}
}
- private void onTruncated() throws IOException
+ private void onTruncated() throws IOException, RaftLogCompactedException
{
Pair> lastMembershipEntry = findLastMembershipEntry();
@@ -195,11 +197,11 @@ private void onTruncated() throws IOException
}
}
- private Pair> findLastMembershipEntry() throws IOException
+ private Pair> findLastMembershipEntry() throws IOException, RaftLogCompactedException
{
Pair> lastMembershipEntry = null;
long index = 0;
- try( IOCursor cursor = entryLog.getEntryCursor( index ) )
+ try( RaftLogCursor cursor = entryLog.getEntryCursor( index ) )
{
while( cursor.next() )
{
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/outcome/LogCommand.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/outcome/LogCommand.java
index 9b1cc896ba3d7..4e41c6edb33a6 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/outcome/LogCommand.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/outcome/LogCommand.java
@@ -22,8 +22,9 @@
import java.io.IOException;
import org.neo4j.coreedge.raft.log.RaftLog;
+import org.neo4j.coreedge.raft.log.RaftLogCompactedException;
public interface LogCommand
{
- void applyTo( RaftLog raftLog ) throws IOException;
+ void applyTo( RaftLog raftLog ) throws IOException, RaftLogCompactedException;
}
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/outcome/TruncateLogCommand.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/outcome/TruncateLogCommand.java
index 66645fcd6849e..04c56b272321c 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/outcome/TruncateLogCommand.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/outcome/TruncateLogCommand.java
@@ -23,6 +23,7 @@
import java.util.Objects;
import org.neo4j.coreedge.raft.log.RaftLog;
+import org.neo4j.coreedge.raft.log.RaftLogCompactedException;
public class TruncateLogCommand implements LogCommand
{
@@ -34,7 +35,7 @@ public TruncateLogCommand( long fromIndex )
}
@Override
- public void applyTo( RaftLog raftLog ) throws IOException
+ public void applyTo( RaftLog raftLog ) throws IOException, RaftLogCompactedException
{
raftLog.truncate( fromIndex );
}
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/shipping/RaftLogShipper.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/shipping/RaftLogShipper.java
index 32771f6411a1e..287b6137cefdc 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/shipping/RaftLogShipper.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/shipping/RaftLogShipper.java
@@ -24,6 +24,8 @@
import org.neo4j.coreedge.raft.DelayedRenewableTimeoutService;
import org.neo4j.coreedge.raft.LeaderContext;
import org.neo4j.coreedge.raft.RaftMessages;
+import org.neo4j.coreedge.raft.log.RaftLogCompactedException;
+import org.neo4j.coreedge.raft.log.RaftLogCursor;
import org.neo4j.coreedge.raft.log.RaftLogEntry;
import org.neo4j.coreedge.raft.log.ReadableRaftLog;
import org.neo4j.coreedge.raft.net.Outbound;
@@ -152,14 +154,7 @@ public synchronized void start()
log.error( "Failed to start log shipper to: " + follower, e );
}
- try
- {
- sendSingle( raftLog.appendIndex(), lastLeaderContext );
- }
- catch ( IOException e )
- {
- log.error( "Exception during send: " + follower, e );
- }
+ sendSingle( raftLog.appendIndex(), lastLeaderContext );
}
public synchronized void stop()
@@ -178,7 +173,7 @@ public synchronized void stop()
abortTimeout();
}
- public synchronized void onMismatch( long lastRemoteAppendIndex, LeaderContext leaderContext ) throws IOException
+ public synchronized void onMismatch( long lastRemoteAppendIndex, LeaderContext leaderContext )
{
switch ( mode )
{
@@ -197,7 +192,7 @@ public synchronized void onMismatch( long lastRemoteAppendIndex, LeaderContext l
lastLeaderContext = leaderContext;
}
- public synchronized void onMatch( long newMatchIndex, LeaderContext leaderContext ) throws IOException
+ public synchronized void onMatch( long newMatchIndex, LeaderContext leaderContext )
{
boolean progress = newMatchIndex > matchIndex;
matchIndex = max ( newMatchIndex, matchIndex );
@@ -241,7 +236,7 @@ else if ( progress )
lastLeaderContext = leaderContext;
}
- public synchronized void onNewEntry( long prevLogIndex, long prevLogTerm, RaftLogEntry newLogEntry, LeaderContext leaderContext ) throws IOException
+ public synchronized void onNewEntry( long prevLogIndex, long prevLogTerm, RaftLogEntry newLogEntry, LeaderContext leaderContext )
{
switch ( mode )
{
@@ -267,7 +262,7 @@ public synchronized void onNewEntry( long prevLogIndex, long prevLogTerm, RaftLo
lastLeaderContext = leaderContext;
}
- public synchronized void onCommitUpdate( LeaderContext leaderContext ) throws IOException
+ public synchronized void onCommitUpdate( LeaderContext leaderContext )
{
switch ( mode )
{
@@ -301,7 +296,7 @@ else if ( timeoutAbsoluteMillis != 0 )
}
}
}
- catch ( IOException e )
+ catch ( Throwable e )
{
log.error( "Exception during timeout handling: " + follower, e );
}
@@ -354,7 +349,7 @@ private void abortTimeout()
}
/** Returns true if this sent the last batch. */
- private boolean sendNextBatchAfterMatch( LeaderContext leaderContext ) throws IOException
+ private boolean sendNextBatchAfterMatch( LeaderContext leaderContext )
{
long lastIndex = raftLog.appendIndex();
@@ -372,7 +367,7 @@ private boolean sendNextBatchAfterMatch( LeaderContext leaderContext ) throws IO
}
}
- private void sendCommitUpdate( LeaderContext leaderContext ) throws IOException
+ private void sendCommitUpdate( LeaderContext leaderContext )
{
/*
* This is a commit update. That means that we just received enough success responses to an append
@@ -385,38 +380,48 @@ private void sendCommitUpdate( LeaderContext leaderContext ) throws IOException
outbound.send( follower, appendRequest );
}
- private void sendSingle( long logIndex, LeaderContext leaderContext ) throws IOException
+ private void sendSingle( long logIndex, LeaderContext leaderContext )
{
scheduleTimeout( retryTimeMillis );
lastSentIndex = logIndex;
- long prevLogIndex = logIndex - 1;
- long prevLogTerm = raftLog.readEntryTerm( prevLogIndex );
-
- if ( prevLogTerm > leaderContext.term )
+ try
{
- log.warn( format( "Aborting send. Not leader anymore? %s, prevLogTerm=%d", leaderContext, prevLogTerm ) );
- return;
- }
+ long prevLogIndex = logIndex - 1;
+ long prevLogTerm = raftLog.readEntryTerm( prevLogIndex );
- RaftLogEntry[] logEntries = RaftLogEntry.empty;
- try ( IOCursor cursor = raftLog.getEntryCursor( logIndex ) )
- {
- if ( cursor.next() )
+ if ( prevLogTerm > leaderContext.term )
{
- logEntries = new RaftLogEntry[]{ cursor.get() };
+ log.warn( format( "Aborting send. Not leader anymore? %s, prevLogTerm=%d", leaderContext, prevLogTerm ) );
+ return;
}
- }
- RaftMessages.AppendEntries.Request appendRequest = new RaftMessages.AppendEntries.Request<>(
- leader, leaderContext.term, prevLogIndex, prevLogTerm, logEntries, leaderContext.commitIndex );
+ RaftLogEntry[] logEntries = RaftLogEntry.empty;
- outbound.send( follower, appendRequest );
+ try ( RaftLogCursor cursor = raftLog.getEntryCursor( logIndex ) )
+ {
+ if ( cursor.next() )
+ {
+ logEntries = new RaftLogEntry[]{cursor.get()};
+ }
+ }
+ RaftMessages.AppendEntries.Request appendRequest = new RaftMessages.AppendEntries.Request<>(
+ leader, leaderContext.term, prevLogIndex, prevLogTerm, logEntries, leaderContext.commitIndex );
+
+ outbound.send( follower, appendRequest );
+ }
+ catch ( RaftLogCompactedException | IOException e )
+ {
+ log.warn(
+ "Tried to send entry at index %d that can't be found in the raft log; it has likely been pruned. " +
+ "This is a temporary state and the system should recover automatically in a short while.",
+ logIndex );
+ }
}
- private void sendNewEntry( long prevLogIndex, long prevLogTerm, RaftLogEntry newEntry, LeaderContext leaderContext ) throws IOException
+ private void sendNewEntry( long prevLogIndex, long prevLogTerm, RaftLogEntry newEntry, LeaderContext leaderContext )
{
scheduleTimeout( retryTimeMillis );
@@ -429,43 +434,50 @@ private void sendNewEntry( long prevLogIndex, long prevLogTerm, RaftLogEntry new
}
- private void sendRange( long startIndex, long endIndex, LeaderContext leaderContext ) throws IOException
+ private void sendRange( long startIndex, long endIndex, LeaderContext leaderContext )
{
if ( startIndex > endIndex )
return;
lastSentIndex = endIndex;
- int batchSize = (int) (endIndex - startIndex + 1);
- RaftLogEntry[] entries = new RaftLogEntry[batchSize];
+ try
+ {
+ int batchSize = (int) (endIndex - startIndex + 1);
+ RaftLogEntry[] entries = new RaftLogEntry[batchSize];
- long prevLogIndex = startIndex - 1;
- long prevLogTerm = raftLog.readEntryTerm( prevLogIndex );
+ long prevLogIndex = startIndex - 1;
+ long prevLogTerm = raftLog.readEntryTerm( prevLogIndex );
- if ( prevLogTerm > leaderContext.term )
- {
- log.warn( format( "Aborting send. Not leader anymore? %s, prevLogTerm=%d", leaderContext, prevLogTerm ) );
- return;
- }
+ if ( prevLogTerm > leaderContext.term )
+ {
+ log.warn( format( "Aborting send. Not leader anymore? %s, prevLogTerm=%d", leaderContext, prevLogTerm ) );
+ return;
+ }
- RaftMessages.AppendEntries.Request appendRequest = new RaftMessages.AppendEntries.Request<>(
- leader, leaderContext.term, prevLogIndex, prevLogTerm, entries, leaderContext.commitIndex );
+ RaftMessages.AppendEntries.Request appendRequest = new RaftMessages.AppendEntries.Request<>(
+ leader, leaderContext.term, prevLogIndex, prevLogTerm, entries, leaderContext.commitIndex );
- int offset = 0;
- try ( IOCursor cursor = raftLog.getEntryCursor( startIndex ) )
- {
- while ( offset < batchSize && cursor.next() )
+ int offset = 0;
+ try ( RaftLogCursor cursor = raftLog.getEntryCursor( startIndex ) )
{
- entries[offset] = cursor.get();
- if( entries[offset].term() > leaderContext.term )
+ while ( offset < batchSize && cursor.next() )
{
- log.warn( format( "Aborting send. Not leader anymore? %s, entryTerm=%d", leaderContext, entries[offset].term() ) );
- return;
+ entries[offset] = cursor.get();
+ if( entries[offset].term() > leaderContext.term )
+ {
+ log.warn( format( "Aborting send. Not leader anymore? %s, entryTerm=%d", leaderContext, entries[offset].term() ) );
+ return;
+ }
+ offset++;
}
- offset++;
}
- }
- outbound.send( follower, appendRequest );
+ outbound.send( follower, appendRequest );
+ }
+ catch ( IOException | RaftLogCompactedException e )
+ {
+ log.warn( "Exception during batch send", e );
+ }
}
}
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/shipping/RaftLogShippingManager.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/shipping/RaftLogShippingManager.java
index 378faf65ade95..420b73d7ad3c9 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/shipping/RaftLogShippingManager.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/shipping/RaftLogShippingManager.java
@@ -32,7 +32,7 @@
import org.neo4j.helpers.Clock;
import org.neo4j.logging.LogProvider;
-import static java.lang.String.*;
+import static java.lang.String.format;
public class RaftLogShippingManager implements RaftMembership.Listener
{
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Appending.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Appending.java
index cc960ef34de20..fb76674090386 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Appending.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Appending.java
@@ -22,6 +22,7 @@
import java.io.IOException;
import org.neo4j.coreedge.raft.RaftMessages;
+import org.neo4j.coreedge.raft.log.RaftLogCompactedException;
import org.neo4j.coreedge.raft.log.RaftLogEntry;
import org.neo4j.coreedge.raft.outcome.AppendLogEntry;
import org.neo4j.coreedge.raft.outcome.BatchAppendLogEntries;
@@ -35,7 +36,7 @@ public class Appending
{
public static void handleAppendEntriesRequest(
ReadableRaftState state, Outcome outcome, RaftMessages.AppendEntries.Request request )
- throws IOException
+ throws IOException, RaftLogCompactedException
{
if ( request.leaderTerm() < state.term() )
{
@@ -98,7 +99,7 @@ else if ( logTerm != request.entries()[offset].term() )
}
public static void appendNewEntry( ReadableRaftState ctx, Outcome outcome, ReplicatedContent
- content ) throws IOException
+ content ) throws IOException, RaftLogCompactedException
{
long prevLogIndex = ctx.entryLog().appendIndex();
long prevLogTerm = prevLogIndex == -1 ? -1 :
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Candidate.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Candidate.java
index 4ec559af6c2ff..fd19218638863 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Candidate.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Candidate.java
@@ -22,6 +22,7 @@
import org.neo4j.coreedge.raft.RaftMessageHandler;
import org.neo4j.coreedge.raft.RaftMessages;
import org.neo4j.coreedge.raft.NewLeaderBarrier;
+import org.neo4j.coreedge.raft.log.RaftLogCompactedException;
import org.neo4j.coreedge.raft.outcome.Outcome;
import org.neo4j.coreedge.raft.state.ReadableRaftState;
import org.neo4j.logging.Log;
@@ -37,7 +38,7 @@ public class Candidate implements RaftMessageHandler
{
@Override
public Outcome handle( RaftMessages.RaftMessage message,
- ReadableRaftState ctx, Log log ) throws IOException
+ ReadableRaftState ctx, Log log ) throws IOException, RaftLogCompactedException
{
Outcome outcome = new Outcome<>( CANDIDATE, ctx );
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Election.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Election.java
index 74a787cab3fb0..3de865781c9e1 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Election.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Election.java
@@ -23,12 +23,13 @@
import java.util.Set;
import org.neo4j.coreedge.raft.RaftMessages;
+import org.neo4j.coreedge.raft.log.RaftLogCompactedException;
import org.neo4j.coreedge.raft.outcome.Outcome;
import org.neo4j.coreedge.raft.state.ReadableRaftState;
public class Election
{
- public static boolean start( ReadableRaftState ctx, Outcome outcome ) throws IOException
+ public static boolean start( ReadableRaftState ctx, Outcome outcome ) throws IOException, RaftLogCompactedException
{
Set currentMembers = ctx.votingMembers();
if ( currentMembers == null || !currentMembers.contains( ctx.myself() ) )
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Follower.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Follower.java
index 26052608cbc97..7c1fc284235ea 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Follower.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Follower.java
@@ -19,10 +19,13 @@
*/
package org.neo4j.coreedge.raft.roles;
+import java.io.IOException;
+
import org.neo4j.coreedge.raft.RaftMessageHandler;
import org.neo4j.coreedge.raft.RaftMessages;
import org.neo4j.coreedge.raft.RaftMessages.AppendEntries;
import org.neo4j.coreedge.raft.RaftMessages.Heartbeat;
+import org.neo4j.coreedge.raft.log.RaftLogCompactedException;
import org.neo4j.coreedge.raft.outcome.CommitCommand;
import org.neo4j.coreedge.raft.outcome.Outcome;
import org.neo4j.coreedge.raft.state.ReadableRaftState;
@@ -32,19 +35,18 @@
import static org.neo4j.coreedge.raft.roles.Role.CANDIDATE;
import static org.neo4j.coreedge.raft.roles.Role.FOLLOWER;
-import java.io.IOException;
-
public class Follower implements RaftMessageHandler
{
public static boolean logHistoryMatches( ReadableRaftState ctx, long prevLogIndex, long prevLogTerm )
- throws IOException
+ throws IOException, RaftLogCompactedException
{
- // NOTE: A previous log index of -1 means no history,
- // so it always matches.
+ // NOTE: A prevLogIndex before or at our log's prevIndex means that we
+ // already have all history (in a compacted form), so we report that history matches
// NOTE: The entry term for a non existing log index is defined as -1,
// so the history for a non existing log entry never matches.
- return prevLogIndex == -1 || ctx.entryLog().readEntryTerm( prevLogIndex ) == prevLogTerm;
+
+ return prevLogIndex <= ctx.entryLog().prevIndex() || ctx.entryLog().readEntryTerm( prevLogIndex ) == prevLogTerm;
}
public static boolean commitToLogOnUpdate( ReadableRaftState ctx, long indexOfLastNewEntry,
@@ -62,7 +64,7 @@ public static boolean commitToLogOnUpdate( ReadableRaftState ct
@Override
public Outcome handle( RaftMessages.RaftMessage message, ReadableRaftState ctx, Log log )
- throws IOException
+ throws IOException, RaftLogCompactedException
{
Outcome outcome = new Outcome<>( FOLLOWER, ctx );
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Heart.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Heart.java
index b38dbd0e188a3..129a43f11bfe5 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Heart.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Heart.java
@@ -22,12 +22,13 @@
import java.io.IOException;
import org.neo4j.coreedge.raft.RaftMessages;
+import org.neo4j.coreedge.raft.log.RaftLogCompactedException;
import org.neo4j.coreedge.raft.outcome.Outcome;
import org.neo4j.coreedge.raft.state.ReadableRaftState;
public class Heart
{
- public static void beat( ReadableRaftState state, Outcome outcome, RaftMessages.Heartbeat request ) throws IOException
+ public static void beat( ReadableRaftState state, Outcome outcome, RaftMessages.Heartbeat request ) throws IOException, RaftLogCompactedException
{
if ( request.leaderTerm() < state.term() )
{
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Leader.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Leader.java
index 3d93191db8781..0455e481c7586 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Leader.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Leader.java
@@ -19,17 +19,20 @@
*/
package org.neo4j.coreedge.raft.roles;
+import java.io.IOException;
+
import org.neo4j.coreedge.raft.Followers;
import org.neo4j.coreedge.raft.RaftMessageHandler;
import org.neo4j.coreedge.raft.RaftMessages;
import org.neo4j.coreedge.raft.RaftMessages.Heartbeat;
+import org.neo4j.coreedge.raft.log.RaftLogCompactedException;
import org.neo4j.coreedge.raft.outcome.CommitCommand;
import org.neo4j.coreedge.raft.outcome.Outcome;
import org.neo4j.coreedge.raft.outcome.ShipCommand;
import org.neo4j.coreedge.raft.replication.ReplicatedContent;
+import org.neo4j.coreedge.raft.state.ReadableRaftState;
import org.neo4j.coreedge.raft.state.follower.FollowerState;
import org.neo4j.coreedge.raft.state.follower.FollowerStates;
-import org.neo4j.coreedge.raft.state.ReadableRaftState;
import org.neo4j.helpers.collection.FilteringIterable;
import org.neo4j.logging.Log;
@@ -37,8 +40,6 @@
import static org.neo4j.coreedge.raft.roles.Role.FOLLOWER;
import static org.neo4j.coreedge.raft.roles.Role.LEADER;
-import java.io.IOException;
-
public class Leader implements RaftMessageHandler
{
public static Iterable replicationTargets( final ReadableRaftState ctx )
@@ -46,7 +47,7 @@ public static Iterable replicationTargets( final ReadableRaftSt
return new FilteringIterable<>( ctx.replicationMembers(), member -> !member.equals( ctx.myself() ) );
}
- static void sendHeartbeats( ReadableRaftState ctx, Outcome outcome ) throws IOException
+ static void sendHeartbeats( ReadableRaftState ctx, Outcome outcome ) throws IOException, RaftLogCompactedException
{
long commitIndex = ctx.leaderCommit();
long commitIndexTerm = ctx.entryLog().readEntryTerm( commitIndex );
@@ -59,7 +60,7 @@ static void sendHeartbeats( ReadableRaftState ctx, Outcome Outcome handle( RaftMessages.RaftMessage message,
- ReadableRaftState ctx, Log log ) throws IOException
+ ReadableRaftState ctx, Log log ) throws IOException, RaftLogCompactedException
{
Outcome outcome = new Outcome<>( LEADER, ctx );
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Voting.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Voting.java
index c71cc08481ccf..2a3ce3bfa7bc0 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Voting.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Voting.java
@@ -22,13 +22,14 @@
import java.io.IOException;
import org.neo4j.coreedge.raft.RaftMessages;
+import org.neo4j.coreedge.raft.log.RaftLogCompactedException;
import org.neo4j.coreedge.raft.outcome.Outcome;
import org.neo4j.coreedge.raft.state.ReadableRaftState;
public class Voting
{
public static void handleVoteRequest( ReadableRaftState state, Outcome outcome,
- RaftMessages.Vote.Request voteRequest ) throws IOException
+ RaftMessages.Vote.Request voteRequest ) throws IOException, RaftLogCompactedException
{
if ( voteRequest.term() > state.term() )
{
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/RaftState.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/RaftState.java
index fa74f2dd42c6f..a0590492a0b96 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/RaftState.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/RaftState.java
@@ -24,6 +24,7 @@
import java.util.Set;
import org.neo4j.coreedge.raft.log.RaftLog;
+import org.neo4j.coreedge.raft.log.RaftLogCompactedException;
import org.neo4j.coreedge.raft.log.ReadableRaftLog;
import org.neo4j.coreedge.raft.membership.RaftMembership;
import org.neo4j.coreedge.raft.outcome.LogCommand;
@@ -125,7 +126,7 @@ public ReadableRaftLog entryLog()
return entryLog;
}
- public void update( Outcome outcome ) throws IOException
+ public void update( Outcome outcome ) throws IOException, RaftLogCompactedException
{
if ( termState.update( outcome.getTerm() ) )
{
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/StateMachineApplier.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/StateMachineApplier.java
index f4822faf5d69e..5c35b898f568d 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/StateMachineApplier.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/StateMachineApplier.java
@@ -23,10 +23,9 @@
import java.util.concurrent.Executor;
import java.util.function.Supplier;
-import org.neo4j.coreedge.raft.ConsensusListener;
-import org.neo4j.coreedge.raft.log.RaftLogEntry;
+import org.neo4j.coreedge.raft.log.RaftLogCompactedException;
+import org.neo4j.coreedge.raft.log.RaftLogCursor;
import org.neo4j.coreedge.raft.log.ReadableRaftLog;
-import org.neo4j.cursor.IOCursor;
import org.neo4j.kernel.internal.DatabaseHealth;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.Log;
@@ -92,9 +91,9 @@ public synchronized void notifyCommitted()
}
}
- private void applyUpTo( long commitIndex ) throws IOException
+ private void applyUpTo( long commitIndex ) throws IOException, RaftLogCompactedException
{
- try ( IOCursor cursor = raftLog.getEntryCursor( lastApplied + 1 ) )
+ try ( RaftLogCursor cursor = raftLog.getEntryCursor( lastApplied + 1 ) )
{
while ( cursor.next() && lastApplied < commitIndex )
{
@@ -113,7 +112,7 @@ private void applyUpTo( long commitIndex ) throws IOException
}
@Override
- public synchronized void start() throws IOException
+ public synchronized void start() throws IOException, RaftLogCompactedException
{
lastApplied = lastAppliedStorage.getInitialState().get();
log.info( "Replaying commands from index %d to index %d", lastApplied, raftLog.commitIndex() );
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/CoreEdgeClusterSettings.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/CoreEdgeClusterSettings.java
index 183f40c045688..3d6625d875fe6 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/CoreEdgeClusterSettings.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/CoreEdgeClusterSettings.java
@@ -175,6 +175,10 @@ public String toString()
public static final Setting state_machine_flush_window_size =
setting( "core_edge.state_machine_flush_window_size", INTEGER, "100" );
+ @Description( "RAFT log pruning strategy" )
+ public static final Setting raft_log_pruning =
+ setting( "core_edge.raft_log_pruning", STRING, "7 days" );
+
@Description( "RAFT log implementation" )
public static final Setting raft_log_implementation =
setting( "core_edge.raft_log_implementation", STRING, "PHYSICAL" );
diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftInstanceTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftInstanceTest.java
index 8bbd4c77a5b3d..399f107be046d 100644
--- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftInstanceTest.java
+++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftInstanceTest.java
@@ -522,12 +522,24 @@ public void commit( long commitIndex ) throws IOException
}
}
+ @Override
+ public long prune( long safeIndex )
+ {
+ return -1;
+ }
+
@Override
public long appendIndex()
{
return -1;
}
+ @Override
+ public long prevIndex()
+ {
+ return -1;
+ }
+
@Override
public long commitIndex()
{
@@ -541,7 +553,7 @@ public long readEntryTerm( long logIndex ) throws IOException
}
@Override
- public IOCursor getEntryCursor( long fromIndex ) throws IOException
+ public RaftLogCursor getEntryCursor( long fromIndex ) throws IOException
{
if ( startExploding )
{
@@ -549,10 +561,16 @@ public IOCursor getEntryCursor( long fromIndex ) throws IOExceptio
}
else
{
- return IOCursor.getEmpty();
+ return RaftLogCursor.empty();
}
}
+ @Override
+ public long skip( long index, long term )
+ {
+ return -1;
+ }
+
public void startExploding()
{
startExploding = true;
diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftTestFixture.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftTestFixture.java
index 3a3dae87494a1..5c83c612c743b 100644
--- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftTestFixture.java
+++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftTestFixture.java
@@ -26,6 +26,7 @@
import org.neo4j.coreedge.raft.log.InMemoryRaftLog;
import org.neo4j.coreedge.raft.log.RaftLog;
+import org.neo4j.coreedge.raft.log.RaftLogCompactedException;
import org.neo4j.coreedge.raft.membership.RaftTestGroup;
import org.neo4j.coreedge.server.RaftTestMember;
import org.neo4j.coreedge.server.RaftTestMemberSetBuilder;
diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/NaiveDurableRaftLogTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/NaiveDurableRaftLogTest.java
index 38d061bb2d59f..4b434527a0094 100644
--- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/NaiveDurableRaftLogTest.java
+++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/NaiveDurableRaftLogTest.java
@@ -58,11 +58,11 @@ public void shouldCallWriteAllWhenStoringEntries() throws Exception
File entriesFile = new File( directory, "entries.log" );
File contentFile = new File( directory, "content.log" );
- File commitFile = new File( directory, "commit.log" );
+ File metaFile = new File( directory, "meta.log" );
when( fsa.open( Matchers.eq( entriesFile ), anyString() ) ).thenReturn( entriesChannel );
when( fsa.open( Matchers.eq( contentFile ), anyString() ) ).thenReturn( contentChannel );
- when( fsa.open( Matchers.eq( commitFile ), anyString() ) ).thenReturn( commitChannel );
+ when( fsa.open( Matchers.eq( metaFile ), anyString() ) ).thenReturn( commitChannel );
NaiveDurableRaftLog log = new NaiveDurableRaftLog( fsa, directory, new DummyRaftableContentSerializer(),
NullLogProvider.getInstance());
@@ -97,11 +97,11 @@ public void shouldForceAndCloseFilesOnShutdown() throws Throwable
File entriesFile = new File( directory, "entries.log" );
File contentFile = new File( directory, "content.log" );
- File commitFile = new File( directory, "commit.log" );
+ File metaFile = new File( directory, "meta.log" );
when( fsa.open( Matchers.eq( entriesFile ), anyString() ) ).thenReturn( entriesChannel );
when( fsa.open( Matchers.eq( contentFile ), anyString() ) ).thenReturn( contentChannel );
- when( fsa.open( Matchers.eq( commitFile ), anyString() ) ).thenReturn( commitChannel );
+ when( fsa.open( Matchers.eq( metaFile ), anyString() ) ).thenReturn( commitChannel );
NaiveDurableRaftLog log = new NaiveDurableRaftLog( fsa, directory, new DummyRaftableContentSerializer(),
NullLogProvider.getInstance());
@@ -139,11 +139,11 @@ public void shouldForceAndCloseFilesOnShutdownEvenOnFailure() throws Throwable
File entriesFile = new File( directory, "entries.log" );
File contentFile = new File( directory, "content.log" );
- File commitFile = new File( directory, "commit.log" );
+ File metaFile = new File( directory, "meta.log" );
when( fsa.open( Matchers.eq( entriesFile ), anyString() ) ).thenReturn( entriesChannel );
when( fsa.open( Matchers.eq( contentFile ), anyString() ) ).thenReturn( contentChannel );
- when( fsa.open( Matchers.eq( commitFile ), anyString() ) ).thenReturn( commitChannel );
+ when( fsa.open( Matchers.eq( metaFile ), anyString() ) ).thenReturn( commitChannel );
NaiveDurableRaftLog log = new NaiveDurableRaftLog( fsa, directory, new DummyRaftableContentSerializer(),
NullLogProvider.getInstance());
diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/PhysicalRaftEntryStoreTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/PhysicalRaftEntryStoreTest.java
index 3a1610d96781d..722a88a775390 100644
--- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/PhysicalRaftEntryStoreTest.java
+++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/PhysicalRaftEntryStoreTest.java
@@ -78,7 +78,7 @@ public void shouldReturnCursorProperlyPositionedIfThereIsACacheMiss() throws Thr
// then
assertTrue( cursor.next() );
- assertEquals( indexToStartFrom, cursor.get().getLogIndex() );
+ assertEquals( indexToStartFrom, cursor.get().logIndex() );
}
}
diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogContractTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogContractTest.java
index 9c2b75aa927e1..f6dc3a478e12b 100644
--- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogContractTest.java
+++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogContractTest.java
@@ -19,15 +19,6 @@
*/
package org.neo4j.coreedge.raft.log;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.neo4j.coreedge.raft.log.RaftLogHelper.readLogEntry;
-import static org.neo4j.kernel.impl.transaction.log.LogVersionBridge.NO_MORE_CHANNELS;
-import static org.neo4j.kernel.impl.transaction.log.entry.LogHeader.LOG_HEADER_SIZE;
-import static org.neo4j.kernel.impl.transaction.log.entry.LogHeaderReader.readLogHeader;
-
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -35,6 +26,7 @@
import org.junit.After;
import org.junit.Test;
+
import org.neo4j.coreedge.raft.ReplicatedInteger;
import org.neo4j.graphdb.mockfs.EphemeralFileSystemAbstraction;
import org.neo4j.io.fs.FileSystemAbstraction;
@@ -48,6 +40,15 @@
import org.neo4j.kernel.lifecycle.LifeSupport;
import org.neo4j.logging.NullLogProvider;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Mockito.mock;
+
+import static org.neo4j.coreedge.raft.log.RaftLogHelper.readLogEntry;
+import static org.neo4j.kernel.impl.transaction.log.LogVersionBridge.NO_MORE_CHANNELS;
+import static org.neo4j.kernel.impl.transaction.log.entry.LogHeader.LOG_HEADER_SIZE;
+import static org.neo4j.kernel.impl.transaction.log.entry.LogHeaderReader.readLogHeader;
+
public class PhysicalRaftLogContractTest extends RaftLogContractTest
{
private PhysicalRaftLog raftLog;
@@ -77,7 +78,7 @@ private PhysicalRaftLog createRaftLog( int cacheSize )
File directory = new File( "raft-log" );
fileSystem.mkdir( directory );
- PhysicalRaftLog newRaftLog = new PhysicalRaftLog( fileSystem, directory, 1000000, cacheSize, 10, 10,
+ PhysicalRaftLog newRaftLog = new PhysicalRaftLog( fileSystem, directory, 10 * 1024, cacheSize, 10, 10,
new PhysicalLogFile.Monitor.Adapter(), new DummyRaftableContentSerializer(), () -> mock( DatabaseHealth.class ),
NullLogProvider.getInstance() );
life.add( newRaftLog );
@@ -224,7 +225,7 @@ public void shouldReturnNullOnEndOfFile() throws Exception
while( cursor.next() )
{
RaftLogAppendRecord record = cursor.get();
- if ( record.getLogIndex() == 0L )
+ if ( record.logIndex() == 0L )
{
assertFalse( firstRecordEncountered );
}
diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogEntryCursorTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogEntryCursorTest.java
index 4a91ffca01377..1f70b50b9a2cd 100644
--- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogEntryCursorTest.java
+++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogEntryCursorTest.java
@@ -38,17 +38,21 @@
*/
package org.neo4j.coreedge.raft.log;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
import org.junit.Test;
+import java.io.IOException;
+import java.util.Iterator;
import java.util.Stack;
import org.neo4j.coreedge.raft.ReplicatedString;
+import org.neo4j.cursor.IOCursor;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class PhysicalRaftLogEntryCursorTest
{
@@ -78,35 +82,25 @@ public void shouldReturnAppendedRecords() throws Exception
// when/then
assertTrue( entryCursor.next() );
- assertEquals( entryA, entryCursor.get().getLogEntry() );
+ assertEquals( entryA, entryCursor.get().logEntry() );
assertTrue( entryCursor.next() );
- assertEquals( entryB, entryCursor.get().getLogEntry() );
+ assertEquals( entryB, entryCursor.get().logEntry() );
assertTrue( entryCursor.next() );
- assertEquals( entryC, entryCursor.get().getLogEntry() );
+ assertEquals( entryC, entryCursor.get().logEntry() );
assertFalse( entryCursor.next() ); // record cursor is done, there should be no more entries/
- assertEquals( null, entryCursor.get() );
}
@Test
public void shouldSkipUntilContinuation() throws Exception
{
// given
- RaftRecordCursor recordCursor = mock( RaftRecordCursor.class );
-
- when( recordCursor.next() )
- .thenReturn( true )
- .thenReturn( true )
- .thenReturn( true )
- .thenReturn( true )
- .thenReturn( true )
- .thenReturn( false );
- when( recordCursor.get() )
- .thenReturn( new RaftLogAppendRecord( 0, entryA ) )
- .thenReturn( new RaftLogAppendRecord( 1, entryB ) ) // truncated
- .thenReturn( new RaftLogAppendRecord( 2, entryC ) ) // truncated
- .thenReturn( new RaftLogContinuationRecord( 1 ) )
- .thenReturn( new RaftLogAppendRecord( 1, entryD ) )
- .thenReturn( null );
+ IOCursor recordCursor = new StubRecordCursor(
+ new RaftLogAppendRecord( 0, entryA ),
+ new RaftLogAppendRecord( 1, entryB ), // truncated
+ new RaftLogAppendRecord( 2, entryC ), // truncated
+ new RaftLogContinuationRecord( 1, 0 ),
+ new RaftLogAppendRecord( 1, entryD )
+ );
Stack skipStack = new Stack<>();
// this represents the state after truncating entryB, we skip from index 1 until the next continuation
@@ -116,10 +110,54 @@ public void shouldSkipUntilContinuation() throws Exception
// when - then
assertTrue( entryCursor.next() );
- assertEquals( entryA, entryCursor.get().getLogEntry() );
+ assertEquals( entryA, entryCursor.get().logEntry() );
assertTrue( entryCursor.next() );
- assertEquals( entryD, entryCursor.get().getLogEntry() );
+ assertEquals( entryD, entryCursor.get().logEntry() );
assertFalse( entryCursor.next() ); // record cursor is done, there should be no more entries/
- assertEquals( null, entryCursor.get() );
+ }
+
+ @Test
+ public void shouldReadTermOf() throws Exception
+ {
+ // given
+
+
+
+ // when
+
+ // then
+ }
+
+ private static class StubRecordCursor implements IOCursor
+ {
+ private final Iterator iterator;
+ private RaftLogRecord current = null;
+
+ public StubRecordCursor( RaftLogRecord... records )
+ {
+ iterator = asList( records ).iterator();
+ }
+
+ @Override
+ public boolean next() throws IOException
+ {
+ if ( iterator.hasNext() )
+ {
+ current = iterator.next();
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public RaftLogRecord get()
+ {
+ return current;
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ }
}
}
diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogRotationTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogRotationTest.java
index 941ba30cee611..5988fb7fab846 100644
--- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogRotationTest.java
+++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogRotationTest.java
@@ -101,7 +101,7 @@ public void shouldRotateOnTruncate() throws Exception
PhysicalRaftLog log = createRaftLog( rotateAtSize, logFileMonitor );
StringBuilder builder = new StringBuilder();
- for ( int i = 0; i < rotateAtSize - 40; i++ )
+ for ( int i = 0; i < rotateAtSize - 60; i++ )
{
builder.append( "i" );
}
diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftLogAdversarialTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftLogAdversarialTest.java
deleted file mode 100644
index a97d2fbc670c8..0000000000000
--- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftLogAdversarialTest.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Copyright (c) 2002-2016 "Neo Technology,"
- * Network Engine for Objects in Lund AB [http://neotechnology.com]
- *
- * This file is part of Neo4j.
- *
- * Neo4j is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as
- * published by the Free Software Foundation, either version 3 of the
- * License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- */
-package org.neo4j.coreedge.raft.log;
-
-import java.io.File;
-import java.io.IOException;
-
-import org.junit.Test;
-
-import org.neo4j.adversaries.ClassGuardedAdversary;
-import org.neo4j.adversaries.CountingAdversary;
-import org.neo4j.adversaries.fs.AdversarialFileSystemAbstraction;
-import org.neo4j.coreedge.raft.ReplicatedInteger;
-import org.neo4j.graphdb.mockfs.EphemeralFileSystemAbstraction;
-import org.neo4j.graphdb.mockfs.SelectiveFileSystemAbstraction;
-import org.neo4j.io.fs.FileSystemAbstraction;
-import org.neo4j.logging.NullLogProvider;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.fail;
-
-public class RaftLogAdversarialTest
-{
- public RaftLog createRaftLog( FileSystemAbstraction fileSystem )
- {
- File directory = new File( "raft-log" );
- fileSystem.mkdir( directory );
- return new NaiveDurableRaftLog( fileSystem, directory, new DummyRaftableContentSerializer(),
- NullLogProvider.getInstance());
- }
-
- @Test
- public void shouldDiscardEntryIfEntryChannelFails() throws Exception
- {
- ClassGuardedAdversary adversary = new ClassGuardedAdversary( new CountingAdversary( 1, false ),
- NaiveDurableRaftLog.class );
- adversary.disable();
-
- EphemeralFileSystemAbstraction fs = new EphemeralFileSystemAbstraction();
- FileSystemAbstraction fileSystem = new SelectiveFileSystemAbstraction(
- new File( "raft-log/entries.log" ), new AdversarialFileSystemAbstraction( adversary, fs ), fs );
- RaftLog log = createRaftLog( fileSystem );
-
- RaftLogEntry logEntry = new RaftLogEntry( 1, ReplicatedInteger.valueOf( 1 ) );
- adversary.enable();
- try
- {
- log.append( logEntry );
- fail( "Should have thrown exception" );
- }
- catch ( Exception e )
- {
- // expected
- }
-
- verifyCurrentLogAndNewLogLoadedFromFileSystem( log, fileSystem, l -> {
- assertThat( l.appendIndex(), is( -1L ) );
- assertThat( l.commitIndex(), is( -1L ) );
- } );
- }
-
- @Test
- public void shouldDiscardEntryIfContentChannelFails() throws Exception
- {
- ClassGuardedAdversary adversary = new ClassGuardedAdversary( new CountingAdversary( 1, false ),
- NaiveDurableRaftLog.class );
- adversary.disable();
-
- EphemeralFileSystemAbstraction fs = new EphemeralFileSystemAbstraction();
- FileSystemAbstraction fileSystem = new SelectiveFileSystemAbstraction(
- new File( "raft-log/content.log" ), new AdversarialFileSystemAbstraction( adversary, fs ), fs );
- RaftLog log = createRaftLog( fileSystem );
-
- RaftLogEntry logEntry = new RaftLogEntry( 1, ReplicatedInteger.valueOf( 1 ) );
- adversary.enable();
- try
- {
- log.append( logEntry );
- fail( "Should have thrown exception" );
- }
- catch ( Exception e )
- {
- // expected
- }
-
- verifyCurrentLogAndNewLogLoadedFromFileSystem( log, fileSystem, l -> {
- assertThat( l.appendIndex(), is( -1L ) );
- assertThat( l.commitIndex(), is( -1L ) );
- } );
- }
-
- private void verifyCurrentLogAndNewLogLoadedFromFileSystem(
- ReadableRaftLog log, FileSystemAbstraction fileSystem, LogVerifier logVerifier ) throws IOException
- {
- logVerifier.verifyLog( log );
- logVerifier.verifyLog( createRaftLog( fileSystem ) );
- }
-
- private interface LogVerifier
- {
- void verifyLog( ReadableRaftLog log ) throws IOException;
- }
-}
diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftLogContractTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftLogContractTest.java
index 851d58a7527ef..d52061bdde437 100644
--- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftLogContractTest.java
+++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftLogContractTest.java
@@ -21,13 +21,16 @@
import org.junit.Test;
-import org.neo4j.coreedge.raft.ReplicatedInteger;
import org.neo4j.coreedge.raft.ReplicatedString;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
+import static org.neo4j.coreedge.raft.ReplicatedInteger.valueOf;
+import static org.neo4j.coreedge.raft.log.RaftLogHelper.hasNoContent;
import static org.neo4j.coreedge.raft.log.RaftLogHelper.readLogEntry;
public abstract class RaftLogContractTest
@@ -42,6 +45,7 @@ public void shouldReportCorrectDefaultValuesOnEmptyLog() throws Exception
// then
assertThat( log.appendIndex(), is( -1L ) );
+ assertThat( log.prevIndex(), is( -1L ) );
assertThat( log.commitIndex(), is( -1L ) );
assertThat( log.readEntryTerm( 0 ), is( -1L ) );
assertThat( log.readEntryTerm( -1 ), is( -1L ) );
@@ -52,15 +56,15 @@ public void shouldResetHighTermOnTruncate() throws Exception
{
// given
RaftLog log = createRaftLog();
- log.append( new RaftLogEntry( 45, ReplicatedInteger.valueOf( 99 ) ) );
- log.append( new RaftLogEntry( 46, ReplicatedInteger.valueOf( 99 ) ) );
- log.append( new RaftLogEntry( 47, ReplicatedInteger.valueOf( 99 ) ) );
+ log.append( new RaftLogEntry( 45, valueOf( 99 ) ) );
+ log.append( new RaftLogEntry( 46, valueOf( 99 ) ) );
+ log.append( new RaftLogEntry( 47, valueOf( 99 ) ) );
// truncate the last 2
log.truncate( 1 );
// then
- log.append( new RaftLogEntry( 46, ReplicatedInteger.valueOf( 9999 ) ) );
+ log.append( new RaftLogEntry( 46, valueOf( 9999 ) ) );
assertThat( log.readEntryTerm( 1 ), is( 46L ) );
assertThat( log.appendIndex(), is( 1L ) );
@@ -71,7 +75,7 @@ public void shouldAppendDataAndNotCommitImmediately() throws Exception
{
RaftLog log = createRaftLog();
- RaftLogEntry logEntry = new RaftLogEntry( 1, ReplicatedInteger.valueOf( 1 ) );
+ RaftLogEntry logEntry = new RaftLogEntry( 1, valueOf( 1 ) );
log.append( logEntry );
assertThat( log.appendIndex(), is( 0L ) );
@@ -97,7 +101,7 @@ public void shouldCommitOutOfOrderAppend() throws Exception
log.commit( 10 );
- RaftLogEntry logEntry = new RaftLogEntry( 1, ReplicatedInteger.valueOf( 1 ) );
+ RaftLogEntry logEntry = new RaftLogEntry( 1, valueOf( 1 ) );
log.append( logEntry );
log.commit( 10 );
@@ -111,8 +115,8 @@ public void shouldTruncatePreviouslyAppendedEntries() throws Exception
{
RaftLog log = createRaftLog();
- RaftLogEntry logEntryA = new RaftLogEntry( 1, ReplicatedInteger.valueOf( 1 ) );
- RaftLogEntry logEntryB = new RaftLogEntry( 1, ReplicatedInteger.valueOf( 2 ) );
+ RaftLogEntry logEntryA = new RaftLogEntry( 1, valueOf( 1 ) );
+ RaftLogEntry logEntryB = new RaftLogEntry( 1, valueOf( 2 ) );
log.append( logEntryA );
log.append( logEntryB );
@@ -129,11 +133,11 @@ public void shouldReplacePreviouslyAppendedEntries() throws Exception
{
RaftLog log = createRaftLog();
- RaftLogEntry logEntryA = new RaftLogEntry( 1, ReplicatedInteger.valueOf( 1 ) );
- RaftLogEntry logEntryB = new RaftLogEntry( 1, ReplicatedInteger.valueOf( 2 ) );
- RaftLogEntry logEntryC = new RaftLogEntry( 1, ReplicatedInteger.valueOf( 3 ) );
- RaftLogEntry logEntryD = new RaftLogEntry( 1, ReplicatedInteger.valueOf( 4 ) );
- RaftLogEntry logEntryE = new RaftLogEntry( 1, ReplicatedInteger.valueOf( 5 ) );
+ RaftLogEntry logEntryA = new RaftLogEntry( 1, valueOf( 1 ) );
+ RaftLogEntry logEntryB = new RaftLogEntry( 1, valueOf( 2 ) );
+ RaftLogEntry logEntryC = new RaftLogEntry( 1, valueOf( 3 ) );
+ RaftLogEntry logEntryD = new RaftLogEntry( 1, valueOf( 4 ) );
+ RaftLogEntry logEntryE = new RaftLogEntry( 1, valueOf( 5 ) );
log.append( logEntryA );
log.append( logEntryB );
@@ -155,8 +159,8 @@ public void shouldHaveNoEffectWhenTruncatingNonExistingEntries() throws Exceptio
{
RaftLog log = createRaftLog();
- RaftLogEntry logEntryA = new RaftLogEntry( 1, ReplicatedInteger.valueOf( 1 ) );
- RaftLogEntry logEntryB = new RaftLogEntry( 1, ReplicatedInteger.valueOf( 2 ) );
+ RaftLogEntry logEntryA = new RaftLogEntry( 1, valueOf( 1 ) );
+ RaftLogEntry logEntryB = new RaftLogEntry( 1, valueOf( 2 ) );
log.append( logEntryA );
log.append( logEntryB );
@@ -173,7 +177,7 @@ public void shouldLogDifferentContentTypes() throws Exception
{
RaftLog log = createRaftLog();
- RaftLogEntry logEntryA = new RaftLogEntry( 1, ReplicatedInteger.valueOf( 1 ) );
+ RaftLogEntry logEntryA = new RaftLogEntry( 1, valueOf( 1 ) );
RaftLogEntry logEntryB = new RaftLogEntry( 1, ReplicatedString.valueOf( "hejzxcjkzhxcjkxz" ) );
log.append( logEntryA );
@@ -190,13 +194,13 @@ public void shouldRejectNonMonotonicTermsForEntries() throws Exception
{
// given
RaftLog log = createRaftLog();
- log.append( new RaftLogEntry( 0, ReplicatedInteger.valueOf( 1 ) ) );
- log.append( new RaftLogEntry( 1, ReplicatedInteger.valueOf( 2 ) ) );
+ log.append( new RaftLogEntry( 0, valueOf( 1 ) ) );
+ log.append( new RaftLogEntry( 1, valueOf( 2 ) ) );
try
{
// when the term has a lower value
- log.append( new RaftLogEntry( 0, ReplicatedInteger.valueOf( 3 ) ) );
+ log.append( new RaftLogEntry( 0, valueOf( 3 ) ) );
// then an exception should be thrown
fail( "Should have failed because of non-monotonic terms" );
}
@@ -211,9 +215,9 @@ public void shouldCommitAndThenTruncateSubsequentEntry() throws Exception
{
// given
RaftLog log = createRaftLog();
- log.append( new RaftLogEntry( 0, ReplicatedInteger.valueOf( 0 ) ) );
- long toCommit = log.append( new RaftLogEntry( 0, ReplicatedInteger.valueOf( 1 ) ) );
- long toTruncate = log.append( new RaftLogEntry( 1, ReplicatedInteger.valueOf( 2 ) ) );
+ log.append( new RaftLogEntry( 0, valueOf( 0 ) ) );
+ long toCommit = log.append( new RaftLogEntry( 0, valueOf( 1 ) ) );
+ long toTruncate = log.append( new RaftLogEntry( 1, valueOf( 2 ) ) );
// when
log.commit( toCommit );
@@ -229,9 +233,9 @@ public void shouldTruncateAndThenCommitPreviousEntry() throws Exception
{
// given
RaftLog log = createRaftLog();
- log.append( new RaftLogEntry( 0, ReplicatedInteger.valueOf( 0 ) ) );
- long toCommit = log.append( new RaftLogEntry( 0, ReplicatedInteger.valueOf( 1 ) ) );
- long toTruncate = log.append( new RaftLogEntry( 1, ReplicatedInteger.valueOf( 2 ) ) );
+ log.append( new RaftLogEntry( 0, valueOf( 0 ) ) );
+ long toCommit = log.append( new RaftLogEntry( 0, valueOf( 1 ) ) );
+ long toTruncate = log.append( new RaftLogEntry( 1, valueOf( 2 ) ) );
// when
log.truncate( toTruncate );
@@ -247,9 +251,9 @@ public void shouldCommitAfterTruncatingAndAppending() throws Exception
{
// given
RaftLog log = createRaftLog();
- log.append( new RaftLogEntry( 0, ReplicatedInteger.valueOf( 0 ) ) );
- long toCommit = log.append( new RaftLogEntry( 0, ReplicatedInteger.valueOf( 1 ) ) );
- long toTruncate = log.append( new RaftLogEntry( 1, ReplicatedInteger.valueOf( 2 ) ) );
+ log.append( new RaftLogEntry( 0, valueOf( 0 ) ) );
+ long toCommit = log.append( new RaftLogEntry( 0, valueOf( 1 ) ) );
+ long toTruncate = log.append( new RaftLogEntry( 1, valueOf( 2 ) ) );
/*
0 1 2 Tr(2) 2 C*(1)
@@ -257,7 +261,7 @@ public void shouldCommitAfterTruncatingAndAppending() throws Exception
// when
log.truncate( toTruncate );
- long lastAppended = log.append( new RaftLogEntry( 2, ReplicatedInteger.valueOf( 3 ) ) );
+ long lastAppended = log.append( new RaftLogEntry( 2, valueOf( 3 ) ) );
log.commit( toCommit );
// then
@@ -271,9 +275,9 @@ public void shouldCommitAfterAppendingAndTruncating() throws Exception
{
// given
RaftLog log = createRaftLog();
- log.append( new RaftLogEntry( 0, ReplicatedInteger.valueOf( 0 ) ) );
- long toCommit = log.append( new RaftLogEntry( 0, ReplicatedInteger.valueOf( 1 ) ) );
- long toTruncate = log.append( new RaftLogEntry( 1, ReplicatedInteger.valueOf( 2 ) ) );
+ log.append( new RaftLogEntry( 0, valueOf( 0 ) ) );
+ long toCommit = log.append( new RaftLogEntry( 0, valueOf( 1 ) ) );
+ long toTruncate = log.append( new RaftLogEntry( 1, valueOf( 2 ) ) );
// when
log.truncate( toTruncate );
@@ -289,8 +293,8 @@ public void shouldNotAllowTruncationAtLastCommit() throws Exception
{
// given
RaftLog log = createRaftLog();
- log.append( new RaftLogEntry( 0, ReplicatedInteger.valueOf( 0 ) ) );
- long toCommit = log.append( new RaftLogEntry( 1, ReplicatedInteger.valueOf( 2 ) ) );
+ log.append( new RaftLogEntry( 0, valueOf( 0 ) ) );
+ long toCommit = log.append( new RaftLogEntry( 1, valueOf( 2 ) ) );
log.commit( toCommit );
@@ -298,9 +302,9 @@ public void shouldNotAllowTruncationAtLastCommit() throws Exception
{
// when
log.truncate( toCommit );
- fail("Truncation at this point should have failed");
+ fail( "Truncation at this point should have failed" );
}
- catch( IllegalArgumentException truncationFailed )
+ catch ( IllegalArgumentException truncationFailed )
{
// awesome
}
@@ -314,9 +318,9 @@ public void shouldNotAllowTruncationBeforeLastCommit() throws Exception
{
// given
RaftLog log = createRaftLog();
- log.append( new RaftLogEntry( 0, ReplicatedInteger.valueOf( 0 ) ) );
- long toTryToTruncate = log.append( new RaftLogEntry( 0, ReplicatedInteger.valueOf( 1 ) ) );
- long toCommit = log.append( new RaftLogEntry( 1, ReplicatedInteger.valueOf( 2 ) ) );
+ log.append( new RaftLogEntry( 0, valueOf( 0 ) ) );
+ long toTryToTruncate = log.append( new RaftLogEntry( 0, valueOf( 1 ) ) );
+ long toCommit = log.append( new RaftLogEntry( 1, valueOf( 2 ) ) );
log.commit( toCommit );
@@ -324,9 +328,9 @@ public void shouldNotAllowTruncationBeforeLastCommit() throws Exception
{
// when
log.truncate( toTryToTruncate );
- fail("Truncation at this point should have failed");
+ fail( "Truncation at this point should have failed" );
}
- catch( IllegalArgumentException truncationFailed )
+ catch ( IllegalArgumentException truncationFailed )
{
// awesome
}
@@ -334,4 +338,132 @@ public void shouldNotAllowTruncationBeforeLastCommit() throws Exception
// then
assertThat( log.appendIndex(), is( toCommit ) );
}
+
+ @Test
+ public void shouldEventuallyPrune() throws Exception
+ {
+ // given
+ RaftLog log = createRaftLog();
+ int term = 0;
+
+ long safeIndex = -1;
+ long prunedIndex = -1;
+
+ // this loop should eventually be able to prune something
+ while ( prunedIndex == -1 )
+ {
+ for ( int i = 0; i < 100; i++ )
+ {
+ log.append( new RaftLogEntry( term, valueOf( 10 * term ) ) );
+ term++;
+ }
+ safeIndex = log.appendIndex() - 50;
+ // when
+ prunedIndex = log.prune( safeIndex );
+ }
+
+ // then
+ assertThat( prunedIndex, lessThanOrEqualTo( safeIndex ) );
+ assertEquals( prunedIndex, log.prevIndex() );
+ assertEquals( prunedIndex, log.readEntryTerm( prunedIndex ) );
+
+ final long[] expectedVal = {prunedIndex + 1};
+ log.getEntryCursor( prunedIndex + 1 )
+ .forAll( ( entry ) -> assertThat( entry.content(), is( valueOf( 10*(int)expectedVal[0]++ ) ) ) );
+
+ assertThat( log, hasNoContent( prunedIndex ) );
+ }
+
+ @Test
+ public void shouldSkipAheadInEmptyLog() throws Exception
+ {
+ // given
+ RaftLog log = createRaftLog();
+
+ // when
+ long skipIndex = 10;
+ long skipTerm = 2;
+ log.skip( skipIndex, skipTerm );
+
+ // then
+ assertEquals( skipIndex, log.appendIndex() );
+ assertEquals( skipTerm, log.readEntryTerm( skipIndex ) );
+ }
+
+ @Test
+ public void shouldSkipAheadInLogWithContent() throws Exception
+ {
+ // given
+ RaftLog log = createRaftLog();
+
+ long term = 0;
+ int entryCount = 5;
+ for ( int i = 0; i < entryCount; i++ )
+ {
+ log.append( new RaftLogEntry( term, valueOf( i ) ) );
+ }
+
+ // when
+ long skipIndex = entryCount + 5;
+ long skipTerm = term + 2;
+ log.skip( skipIndex, skipTerm );
+
+ // then
+ assertEquals( skipIndex, log.appendIndex() );
+ assertEquals( skipTerm, log.readEntryTerm( skipIndex ) );
+ }
+
+ @Test
+ public void shouldNotSkipInLogWithLaterContent() throws Exception
+ {
+ // given
+ RaftLog log = createRaftLog();
+
+ long term = 0;
+ int entryCount = 5;
+ for ( int i = 0; i < entryCount; i++ )
+ {
+ log.append( new RaftLogEntry( term, valueOf( i ) ) );
+ }
+ long lastIndex = log.appendIndex();
+
+ // when
+ long skipIndex = entryCount - 2;
+ log.skip( skipIndex, term );
+
+ // then
+ assertEquals( lastIndex, log.appendIndex() );
+ assertEquals( term, log.readEntryTerm( skipIndex ) );
+ }
+
+ @Test
+ public void shouldBeAbleToAppendAfterSkipping() throws Exception
+ {
+ // given
+ RaftLog log = createRaftLog();
+
+ // when
+ long skipIndex = 5;
+ long term = 0;
+ log.skip( skipIndex, term );
+
+ int newContentValue = 100;
+ long newEntryIndex = skipIndex + 1;
+ long appendedIndex = log.append( new RaftLogEntry( term, valueOf( newContentValue ) ) );
+
+ // then
+ assertEquals( newEntryIndex, log.appendIndex() );
+ assertEquals( newEntryIndex, appendedIndex );
+
+ try
+ {
+ readLogEntry( log, skipIndex );
+ fail( "Should have thrown exception" );
+ }
+ catch ( RaftLogCompactedException e )
+ {
+ // expected
+ }
+ assertThat( readLogEntry( log, newEntryIndex ).content(), is( valueOf( newContentValue ) ) );
+ }
}
diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftLogDurabilityTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftLogDurabilityTest.java
index d72dc8d46906a..aae810374c15b 100644
--- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftLogDurabilityTest.java
+++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftLogDurabilityTest.java
@@ -21,39 +21,91 @@
import java.io.File;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
import org.neo4j.coreedge.raft.ReplicatedInteger;
import org.neo4j.coreedge.raft.ReplicatedString;
+import org.neo4j.coreedge.server.core.EnterpriseCoreEditionModule.RaftLogImplementation;
import org.neo4j.graphdb.mockfs.EphemeralFileSystemAbstraction;
+import org.neo4j.io.fs.FileSystemAbstraction;
+import org.neo4j.kernel.impl.transaction.log.PhysicalLogFile;
+import org.neo4j.kernel.internal.DatabaseHealth;
import org.neo4j.logging.NullLogProvider;
+import org.neo4j.test.EphemeralFileSystemRule;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Mockito.mock;
+
+import static org.neo4j.coreedge.raft.ReplicatedInteger.valueOf;
import static org.neo4j.coreedge.raft.log.RaftLogHelper.readLogEntry;
+import static org.neo4j.coreedge.server.core.EnterpriseCoreEditionModule.RaftLogImplementation.NAIVE;
+import static org.neo4j.coreedge.server.core.EnterpriseCoreEditionModule.RaftLogImplementation.PHYSICAL;
+@RunWith(Parameterized.class)
public class RaftLogDurabilityTest
{
- public RaftLog createRaftLog( EphemeralFileSystemAbstraction fileSystem )
+ @Rule
+ public final EphemeralFileSystemRule fsRule = new EphemeralFileSystemRule();
+
+ private final RaftLogFactory logFactory;
+
+ public RaftLogDurabilityTest( RaftLogImplementation ignored, RaftLogFactory logFactory )
+ {
+ this.logFactory = logFactory;
+ }
+
+ @Parameters(name = "log:{0}")
+ public static Collection