Skip to content

Commit

Permalink
Safe pruning of the segmented raft log.
Browse files Browse the repository at this point in the history
- When attemping to prune, the pruner will not remove files which
have open readers on the file. This behavior is unchanged but
clarified by a test.

- The RaftLogShipper will attempt to send the requested entry.
If that entry is pruned before the raft log shipper can send it,
it will send the oldest entry it has, which will likely not be
accepted by the follower and cause the follower to request again.
 The second request will be seen after pruning has happened,
and the follower will be notified that a snapshot is needed.

- Rename the setting "CoreEdgeClusterSetting.raft_log_pruning"
to "CoreEdgeClusterSetting.raft_log_pruning_strategy" with default value "keep_all".

- Removed RaftLogCompactedException. There is no need to differentiate
 between pruned indexes and indexes that do not yet exist.

- Properly parse prune strategy for PhysicalRaftLog.

- SegmentRaftLog EntryStore is refactored to EntryCursor.

- Cluster now implements AutoClosable.
  • Loading branch information
Max Sumrall committed Jun 1, 2016
1 parent 224ce54 commit 7d5af57
Show file tree
Hide file tree
Showing 68 changed files with 837 additions and 595 deletions.
Expand Up @@ -50,14 +50,16 @@ public static ThresholdConfigValue parse( String configValue )
{
switch ( boolOrNumber )
{
case "true":
return ThresholdConfigValue.NO_PRUNING;
case "false":
return new ThresholdConfigValue( "entries", 1 );
default:
throw new IllegalArgumentException( "Invalid log pruning configuration value '" + configValue +
"'. The form is 'all' or '<number><unit> <type>' for example '100k txs' " +
"for the latest 100 000 transactions" );
case "keep_all":
case "true":
return ThresholdConfigValue.NO_PRUNING;
case "keep_none":
case "false":
return new ThresholdConfigValue( "entries", 1 );
default:
throw new IllegalArgumentException( "Invalid log pruning configuration value '" + configValue +
"'. The form is 'all' or '<number><unit> <type>' for example '100k txs' " +
"for the latest 100 000 transactions" );
}
}

Expand Down
Expand Up @@ -23,7 +23,6 @@

import org.neo4j.coreedge.raft.RaftInstance;
import org.neo4j.coreedge.raft.RaftInstance.BootstrapException;
import org.neo4j.coreedge.raft.log.RaftLogCompactedException;
import org.neo4j.coreedge.raft.membership.CoreMemberSet;
import org.neo4j.coreedge.server.CoreMember;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
Expand All @@ -50,14 +49,7 @@ public void start() throws BootstrapException

if ( clusterTopology.bootstrappable() )
{
try
{
raftInstance.bootstrapWithInitialMembers( new CoreMemberSet( initialMembers ) );
}
catch ( RaftLogCompactedException e )
{
throw new BootstrapException( e );
}
raftInstance.bootstrapWithInitialMembers( new CoreMemberSet( initialMembers ) );
}

onTopologyChange( clusterTopology );
Expand Down
Expand Up @@ -29,7 +29,6 @@

import org.neo4j.coreedge.helper.VolatileFuture;
import org.neo4j.coreedge.raft.log.RaftLog;
import org.neo4j.coreedge.raft.log.RaftLogCompactedException;
import org.neo4j.coreedge.raft.log.RaftLogEntry;
import org.neo4j.coreedge.raft.membership.RaftGroup;
import org.neo4j.coreedge.raft.membership.RaftMembershipManager;
Expand Down Expand Up @@ -158,7 +157,7 @@ Timeouts.ELECTION, electionTimeout, randomTimeoutRange(), timeout -> {
*
* @param memberSet The other members.
*/
public synchronized void bootstrapWithInitialMembers( RaftGroup<MEMBER> memberSet ) throws BootstrapException, RaftLogCompactedException
public synchronized void bootstrapWithInitialMembers( RaftGroup<MEMBER> memberSet ) throws BootstrapException
{
if ( entryLog.appendIndex() >= 0 )
{
Expand Down
Expand Up @@ -21,13 +21,12 @@

import java.io.IOException;

import org.neo4j.coreedge.raft.log.RaftLogCompactedException;
import org.neo4j.coreedge.raft.outcome.Outcome;
import org.neo4j.coreedge.raft.state.ReadableRaftState;
import org.neo4j.logging.Log;

public interface RaftMessageHandler
{
<MEMBER> Outcome<MEMBER> handle( RaftMessages.RaftMessage<MEMBER> message, ReadableRaftState<MEMBER> context, Log log )
throws IOException, RaftLogCompactedException;
throws IOException;
}
Expand Up @@ -55,7 +55,7 @@ public synchronized long append( RaftLogEntry logEntry ) throws IOException
}

@Override
public synchronized long prune( long safeIndex ) throws RaftLogCompactedException
public synchronized long prune( long safeIndex )
{
if( safeIndex > prevIndex )
{
Expand Down Expand Up @@ -86,24 +86,8 @@ public synchronized long prevIndex()
return prevIndex;
}

private RaftLogEntry readLogEntry( long logIndex ) throws RaftLogCompactedException
{
if ( logIndex <= prevIndex )
{
throw new RaftLogCompactedException( "Entry does not exist in log" );
}
else if ( logIndex > appendIndex )
{
throw new RaftLogCompactedException(
String.format( "cannot read past last appended index (lastAppended=%d, readIndex=%d)",
appendIndex, logIndex ) );
}

return raftLog.get( logIndex );
}

@Override
public synchronized long readEntryTerm( long logIndex ) throws RaftLogCompactedException
public synchronized long readEntryTerm( long logIndex )
{
if( logIndex == prevIndex )
{
Expand All @@ -113,11 +97,11 @@ else if ( logIndex < prevIndex || logIndex > appendIndex )
{
return -1;
}
return readLogEntry( logIndex ).term();
return raftLog.get( logIndex ).term();
}

@Override
public synchronized void truncate( long fromIndex ) throws RaftLogCompactedException
public synchronized void truncate( long fromIndex )
{
if ( fromIndex <= commitIndex )
{
Expand Down Expand Up @@ -164,14 +148,11 @@ public boolean next() throws IOException
hasNext = currentIndex <= appendIndex;
if ( hasNext )
{
try
{
current = readLogEntry( currentIndex );
}
catch ( RaftLogCompactedException e )
if ( currentIndex <= prevIndex || currentIndex > appendIndex )
{
throw new IOException( e );
return false;
}
current = raftLog.get( currentIndex );
}
else
{
Expand Down
Expand Up @@ -22,8 +22,6 @@
import java.io.IOException;

import org.neo4j.coreedge.raft.log.monitoring.RaftLogAppendIndexMonitor;
import org.neo4j.coreedge.raft.log.monitoring.RaftLogCommitIndexMonitor;
import org.neo4j.cursor.IOCursor;
import org.neo4j.kernel.monitoring.Monitors;

public class MonitoredRaftLog implements RaftLog
Expand All @@ -46,14 +44,14 @@ public long append( RaftLogEntry entry ) throws IOException
}

@Override
public void truncate( long fromIndex ) throws IOException, RaftLogCompactedException
public void truncate( long fromIndex ) throws IOException
{
delegate.truncate( fromIndex );
appendIndexMonitor.appendIndex( delegate.appendIndex() );
}

@Override
public long prune( long safeIndex ) throws IOException, RaftLogCompactedException
public long prune( long safeIndex ) throws IOException
{
return delegate.prune( safeIndex );
}
Expand All @@ -71,13 +69,13 @@ public long prevIndex()
}

@Override
public long readEntryTerm( long logIndex ) throws IOException, RaftLogCompactedException
public long readEntryTerm( long logIndex ) throws IOException
{
return delegate.readEntryTerm( logIndex );
}

@Override
public RaftLogCursor getEntryCursor( long fromIndex ) throws IOException, RaftLogCompactedException
public RaftLogCursor getEntryCursor( long fromIndex ) throws IOException
{
return delegate.getEntryCursor( fromIndex );
}
Expand Down
Expand Up @@ -181,7 +181,7 @@ public long append( RaftLogEntry logEntry ) throws IOException
}

@Override
public void truncate( long fromIndex ) throws IOException, RaftLogCompactedException
public void truncate( long fromIndex ) throws IOException
{
if ( appendIndex >= fromIndex )
{
Expand All @@ -202,7 +202,7 @@ public void truncate( long fromIndex ) throws IOException, RaftLogCompactedExcep
}

@Override
public long prune( long safeIndex ) throws IOException, RaftLogCompactedException
public long prune( long safeIndex ) throws IOException
{
try
{
Expand Down Expand Up @@ -279,7 +279,7 @@ public long prevIndex()
return prevIndex;
}

private RaftLogEntry readLogEntry( long logIndex ) throws IOException, RaftLogCompactedException
private RaftLogEntry readLogEntry( long logIndex ) throws IOException
{
Entry entry = readEntry( logIndex );
ReplicatedContent content;
Expand All @@ -296,7 +296,7 @@ private RaftLogEntry readLogEntry( long logIndex ) throws IOException, RaftLogCo
}

@Override
public long readEntryTerm( long logIndex ) throws IOException, RaftLogCompactedException
public long readEntryTerm( long logIndex ) throws IOException
{
if( logIndex == prevIndex )
{
Expand Down Expand Up @@ -349,7 +349,7 @@ public boolean next() throws IOException
current.set( readLogEntry( currentIndex ) );
return true;
}
catch ( RaftLogCompactedException e )
catch ( IllegalArgumentException e )
{
current.invalidate();
}
Expand Down Expand Up @@ -387,11 +387,11 @@ private void writeEntry( long index, Entry entry, StoreChannel entriesChannel )
entriesChannel.force( false );
}

private Entry readEntry( long logIndex ) throws RaftLogCompactedException, IOException
private Entry readEntry( long logIndex ) throws IOException
{
if ( logIndex <= prevIndex || logIndex > appendIndex )
{
throw new RaftLogCompactedException();
throw new IllegalArgumentException("compaction exception");
}

ByteBuffer buffer = ByteBuffer.allocate( ENTRY_RECORD_LENGTH );
Expand Down
Expand Up @@ -48,7 +48,7 @@ public interface RaftLog extends ReadableRaftLog
*
* @param fromIndex The start index (inclusive).
*/
void truncate( long fromIndex ) throws IOException, RaftLogCompactedException;
void truncate( long fromIndex ) throws IOException;

/**
* Attempt to prune (delete) a prefix of the log, no further than the safeIndex.
Expand All @@ -60,7 +60,7 @@ public interface RaftLog extends ReadableRaftLog
*
* @return The new prevIndex for the log, which will be at most safeIndex.
*/
long prune( long safeIndex ) throws IOException, RaftLogCompactedException;
long prune( long safeIndex ) throws IOException;

/**
* Skip up to the supplied index if it is not already present.
Expand Down

This file was deleted.

Expand Up @@ -26,7 +26,7 @@
public interface RaftLogCursor extends RawCursor<RaftLogEntry,Exception>
{
@Override
boolean next() throws IOException, RaftLogCompactedException;
boolean next() throws IOException;

@Override
void close() throws IOException;
Expand Down
Expand Up @@ -39,11 +39,11 @@ public interface ReadableRaftLog
* @param logIndex The index of the log entry.
* @return The term of the entry, or -1 if the entry does not exist
*/
long readEntryTerm( long logIndex ) throws IOException, RaftLogCompactedException;
long readEntryTerm( long logIndex ) throws IOException;

/**
* Returns a {@link RaftLogCursor} of {@link RaftLogEntry}s from the specified index until the end of the log
* @param fromIndex The log index at which the cursor should be positioned
*/
RaftLogCursor getEntryCursor( long fromIndex ) throws IOException, RaftLogCompactedException;
RaftLogCursor getEntryCursor( long fromIndex ) throws IOException;
}
Expand Up @@ -23,14 +23,13 @@
import java.io.IOException;

import org.neo4j.coreedge.raft.log.NaiveDurableRaftLog;
import org.neo4j.coreedge.raft.log.RaftLogCompactedException;
import org.neo4j.coreedge.raft.net.CoreReplicatedContentMarshal;
import org.neo4j.io.fs.DefaultFileSystemAbstraction;
import org.neo4j.logging.NullLogProvider;

public class DumpNaiveDurableRaftLog
{
public static void main( String[] args ) throws IOException, RaftLogCompactedException
public static void main( String[] args ) throws IOException
{
for ( String arg : args )
{
Expand Down
Expand Up @@ -22,7 +22,6 @@
import java.io.IOException;
import java.io.PrintStream;

import org.neo4j.coreedge.raft.log.RaftLogCompactedException;
import org.neo4j.coreedge.raft.log.RaftLogCursor;
import org.neo4j.coreedge.raft.log.RaftLogEntry;
import org.neo4j.coreedge.raft.log.ReadableRaftLog;
Expand All @@ -36,7 +35,7 @@ public LogPrinter( ReadableRaftLog raftLog )
this.raftLog = raftLog;
}

public void print( PrintStream out ) throws IOException, RaftLogCompactedException
public void print( PrintStream out ) throws IOException
{
out.println( String.format( "%1$8s %2$5s %3$2s %4$s", "Index", "Term", "C?", "Content"));
long index = 0L;
Expand Down

0 comments on commit 7d5af57

Please sign in to comment.