Skip to content

Commit

Permalink
Diverge physical raft log from shared kernel implementation.
Browse files Browse the repository at this point in the history
PhysicalLogFile and friends don't work well with the need to truncate
the log file, as required for a raft log implementation.

In this commit we duplicate some classes into the core-edge module,
but this is only an intermediary step before we simplify the classes
on the core-edge side. We also temporarily increase visibility of
some kernel classes to make the move possible.

This commit also remove storing commit index from the responsibility
of raft log implementations, since we now honour the literature and treat
commit index as volatile state.
  • Loading branch information
apcj committed Mar 29, 2016
1 parent 2c8579c commit 1dc8fe0
Show file tree
Hide file tree
Showing 97 changed files with 3,752 additions and 1,344 deletions.
Expand Up @@ -106,7 +106,7 @@ public void close() throws IOException
channel.close();
}

void setChannel( LogVersionedStoreChannel channel )
public void setChannel( LogVersionedStoreChannel channel )
{
this.logVersionedStoreChannel = channel;
this.channel.setChannel( channel );
Expand Down
Expand Up @@ -43,10 +43,15 @@ public static LogHeader readLogHeader( ReadableClosableChannel channel ) throws
}

public static LogHeader readLogHeader( FileSystemAbstraction fileSystem, File file ) throws IOException
{
return readLogHeader( fileSystem, file, true );
}

public static LogHeader readLogHeader( FileSystemAbstraction fileSystem, File file, boolean strict) throws IOException
{
try ( StoreChannel channel = fileSystem.open( file, "r" ) )
{
return readLogHeader( ByteBuffer.allocateDirect( LOG_HEADER_SIZE ), channel, true );
return readLogHeader( ByteBuffer.allocateDirect( LOG_HEADER_SIZE ), channel, strict );
}
}

Expand Down
Expand Up @@ -29,7 +29,7 @@ public final class EntryCountThreshold implements Threshold
{
private final long maxTransactionCount;

EntryCountThreshold( long maxTransactionCount )
public EntryCountThreshold( long maxTransactionCount )
{
this.maxTransactionCount = maxTransactionCount;
}
Expand Down
Expand Up @@ -34,7 +34,7 @@ public final class EntryTimespanThreshold implements Threshold

private long lowerLimit;

EntryTimespanThreshold( Clock clock, TimeUnit timeUnit, long timeToKeep )
public EntryTimespanThreshold( Clock clock, TimeUnit timeUnit, long timeToKeep )
{
this.clock = clock;
this.timeToKeepInMillis = timeUnit.toMillis( timeToKeep );
Expand Down
Expand Up @@ -29,7 +29,7 @@ public final class FileCountThreshold implements Threshold

private long nonEmptyLogCount;

FileCountThreshold( long maxNonEmptyLogs )
public FileCountThreshold( long maxNonEmptyLogs )
{
this.maxNonEmptyLogs = maxNonEmptyLogs;
}
Expand Down
Expand Up @@ -24,14 +24,14 @@
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.impl.transaction.log.LogFileInformation;

final class FileSizeThreshold implements Threshold
public final class FileSizeThreshold implements Threshold
{
private final FileSystemAbstraction fileSystem;
private final long maxSize;

private long currentSize;

FileSizeThreshold( FileSystemAbstraction fileSystem, long maxSize )
public FileSizeThreshold( FileSystemAbstraction fileSystem, long maxSize )
{
this.fileSystem = fileSystem;
this.maxSize = maxSize;
Expand Down
Expand Up @@ -21,5 +21,10 @@

public interface LogPruning
{
/**
* Prunes logs that have version less than {@code currentVersion}. This is a best effort service and there is no
* guarantee that any logs will be removed.
* @param currentVersion The lowest version expected to remain after pruning completes.
*/
void pruneLogs( long currentVersion );
}
@@ -0,0 +1,138 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.util;

import java.util.Iterator;

import org.neo4j.cursor.IOCursor;

public class IOCursors
{
private static IOCursor<Object> EMPTY = new IOCursor<Object>()
{
@Override
public boolean next()
{
return false;
}

@Override
public Object get()
{
return null;
}

@Override
public void close()
{

}
};

@SuppressWarnings("unchecked")
public static <T> IOCursor<T> empty()
{
return (IOCursor<T>) EMPTY;
}

@SafeVarargs
public static <T> IOCursor<T> cursor( final T... items )
{
return new IOCursor<T>()
{
int idx = 0;
T current;

@Override
public boolean next()
{
if ( idx < items.length )
{
current = items[idx++];
return true;
}
else
{
return false;
}
}

@Override
public void close()
{
idx = 0;
current = null;
}

@Override
public T get()
{
if ( current == null )
{
throw new IllegalStateException();
}

return current;
}
};
}

public static <T> IOCursor<T> cursor( final Iterable<T> items )
{
return new IOCursor<T>()
{
Iterator<T> iterator = items.iterator();

T current;

@Override
public boolean next()
{
if ( iterator.hasNext() )
{
current = iterator.next();
return true;
}
else
{
return false;
}
}

@Override
public void close()
{
iterator = items.iterator();
current = null;
}

@Override
public T get()
{
if ( current == null )
{
throw new IllegalStateException();
}

return current;
}
};
}
}
Expand Up @@ -60,7 +60,6 @@ public GetStoreRequestHandler( CatchupServerProtocol protocol,
@Override
protected void channelRead0( ChannelHandlerContext ctx, GetStoreRequest msg ) throws Exception
{
System.out.println( "sending store files..." );
long lastCheckPointedTx = checkPointerSupplier.get().tryCheckPoint( new SimpleTriggerInfo( "Store copy" ) );
sendFiles( ctx );
endStoreCopy( ctx, lastCheckPointedTx );
Expand Down
Expand Up @@ -22,7 +22,6 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
Expand All @@ -38,8 +37,6 @@
import org.neo4j.coreedge.raft.net.Inbound;
import org.neo4j.coreedge.raft.net.Outbound;
import org.neo4j.coreedge.raft.outcome.AppendLogEntry;
import org.neo4j.coreedge.raft.outcome.CommitCommand;
import org.neo4j.coreedge.raft.outcome.LogCommand;
import org.neo4j.coreedge.raft.outcome.Outcome;
import org.neo4j.coreedge.raft.replication.shipping.RaftLogShippingManager;
import org.neo4j.coreedge.raft.roles.Role;
Expand All @@ -55,7 +52,8 @@
import org.neo4j.logging.LogProvider;

import static java.lang.String.format;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;

import static org.neo4j.coreedge.raft.roles.Role.LEADER;

/**
Expand Down Expand Up @@ -110,14 +108,14 @@ public enum Timeouts implements RenewableTimeoutService.TimeoutName
private RaftLogShippingManager<MEMBER> logShipping;

public RaftInstance( MEMBER myself, StateStorage<TermState> termStorage,
StateStorage<VoteState<MEMBER>> voteStorage, RaftLog entryLog,
RaftStateMachine raftStateMachine, long electionTimeout, long heartbeatInterval,
RenewableTimeoutService renewableTimeoutService,
final Inbound inbound, final Outbound<MEMBER> outbound, long leaderWaitTimeout,
LogProvider logProvider, RaftMembershipManager<MEMBER> membershipManager,
RaftLogShippingManager<MEMBER> logShipping,
Supplier<DatabaseHealth> databaseHealthSupplier,
Monitors monitors )
StateStorage<VoteState<MEMBER>> voteStorage, RaftLog entryLog,
RaftStateMachine raftStateMachine, long electionTimeout, long heartbeatInterval,
RenewableTimeoutService renewableTimeoutService,
final Inbound inbound, final Outbound<MEMBER> outbound, long leaderWaitTimeout,
LogProvider logProvider, RaftMembershipManager<MEMBER> membershipManager,
RaftLogShippingManager<MEMBER> logShipping,
Supplier<DatabaseHealth> databaseHealthSupplier,
Monitors monitors )
{
this.myself = myself;
this.entryLog = entryLog;
Expand Down Expand Up @@ -175,15 +173,15 @@ public synchronized void bootstrapWithInitialMembers( RaftGroup<MEMBER> memberSe

try
{
List<LogCommand> logCommands = asList(
new AppendLogEntry( 0, membershipLogEntry ),
new CommitCommand( 0 )
);
for ( LogCommand logCommand : logCommands )
{
logCommand.applyTo( entryLog );
}
membershipManager.processLog( logCommands );
Outcome<MEMBER> outcome = new Outcome<>( currentRole, state );
outcome.setCommitIndex( 0 );

AppendLogEntry appendCommand = new AppendLogEntry( 0, membershipLogEntry );
outcome.addLogCommand( appendCommand );

state.update( outcome );

membershipManager.processLog( 0, singletonList( appendCommand ) );
}
catch ( IOException e )
{
Expand Down Expand Up @@ -309,22 +307,22 @@ public synchronized void handle( Message incomingMessage )
(RaftMessages.RaftMessage<MEMBER>) incomingMessage, state, log );

boolean newLeaderWasElected = leaderChanged( outcome, state.leader() );
boolean newCommittedEntry = newCommittedEntry( state, outcome.getLogCommands() );
boolean newCommittedEntry = outcome.getCommitIndex() > state.commitIndex();

state.update( outcome ); // updates to raft log happen within
sendMessages( outcome );

handleTimers( outcome );
handleLogShipping( outcome );

membershipManager.processLog( outcome.getLogCommands() );
membershipManager.processLog( outcome.getCommitIndex(), outcome.getLogCommands() );
driveMembership( outcome );

volatileLeader.set( outcome.getLeader() );

if( newCommittedEntry )
{
raftStateMachine.notifyCommitted( state.entryLog().commitIndex() );
raftStateMachine.notifyCommitted( state.commitIndex() );
}
if( newLeaderWasElected )
{
Expand All @@ -340,18 +338,6 @@ public synchronized void handle( Message incomingMessage )
}
}

private static boolean newCommittedEntry( RaftState state, Collection<LogCommand> logCommands )
{
for ( LogCommand logCommand : logCommands )
{
if( logCommand instanceof CommitCommand )
{
return true;
}
}
return false;
}

private void driveMembership( Outcome<MEMBER> outcome )
{
currentRole = outcome.getRole();
Expand Down
Expand Up @@ -19,12 +19,11 @@
*/
package org.neo4j.coreedge.raft.log;

public class RaftCommitEntry
{
private final long commitIndex;
import java.io.IOException;

import org.neo4j.cursor.IOCursor;

public RaftCommitEntry( long commitIndex )
{
this.commitIndex = commitIndex;
}
public interface EntryReader
{
IOCursor<RaftLogAppendRecord> readEntriesInVersion( long version ) throws IOException;
}
Expand Up @@ -56,16 +56,6 @@ public long append( RaftLogEntry logEntry ) throws IOException
return appendIndex;
}

@Override
public void commit( long commitIndex )
{
if ( commitIndex > appendIndex )
{
commitIndex = appendIndex;
}
this.commitIndex = commitIndex;
}

@Override
public long prune( long safeIndex ) throws RaftLogCompactedException
{
Expand Down Expand Up @@ -98,12 +88,6 @@ public long prevIndex()
return prevIndex;
}

@Override
public long commitIndex()
{
return commitIndex;
}

private RaftLogEntry readLogEntry( long logIndex ) throws RaftLogCompactedException
{
if ( logIndex <= prevIndex )
Expand Down

0 comments on commit 1dc8fe0

Please sign in to comment.