Skip to content

Commit

Permalink
Merge pull request #6513 from digitalstain/ce-log-cursor
Browse files Browse the repository at this point in the history
Raft log entry cursor for PhysicalRaftLog
  • Loading branch information
apcj committed Feb 26, 2016
2 parents 80d2fb3 + c862c68 commit ed2940b
Show file tree
Hide file tree
Showing 73 changed files with 551 additions and 464 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.neo4j.kernel.impl.transaction.log.ReadOnlyLogVersionRepository;
import org.neo4j.kernel.impl.transaction.log.ReadOnlyTransactionIdStore;
import org.neo4j.kernel.impl.transaction.log.TransactionLogWriter;
import org.neo4j.kernel.impl.transaction.log.TransactionMetadataCache;
import org.neo4j.kernel.impl.transaction.log.FlushableChannel;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryWriter;
import org.neo4j.kernel.lifecycle.LifeSupport;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
import org.neo4j.coreedge.server.AdvertisedSocketAddress;
import org.neo4j.coreedge.server.CoreEdgeClusterSettings;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.lifecycle.Lifecycle;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;

public class HazelcastClientConnector implements HazelcastConnector
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.neo4j.coreedge.network.Message;
import org.neo4j.coreedge.raft.log.RaftLog;
import org.neo4j.coreedge.raft.log.RaftLogEntry;
import org.neo4j.coreedge.raft.log.RaftStorageException;
import org.neo4j.coreedge.raft.membership.RaftGroup;
import org.neo4j.coreedge.raft.membership.RaftMembershipManager;
import org.neo4j.coreedge.raft.net.Inbound;
Expand All @@ -48,6 +47,7 @@
import org.neo4j.coreedge.raft.state.StateStorage;
import org.neo4j.coreedge.raft.state.term.TermState;
import org.neo4j.coreedge.raft.state.vote.VoteState;
import org.neo4j.cursor.IOCursor;
import org.neo4j.kernel.impl.util.Listener;
import org.neo4j.kernel.internal.DatabaseHealth;
import org.neo4j.kernel.monitoring.Monitors;
Expand Down Expand Up @@ -187,7 +187,7 @@ public synchronized void bootstrapWithInitialMembers( RaftGroup<MEMBER> memberSe
}
membershipManager.processLog( logCommands );
}
catch ( RaftStorageException e )
catch ( IOException e )
{
databaseHealthSupplier.get().panic( e );
throw new BootstrapException( e );
Expand Down Expand Up @@ -250,15 +250,14 @@ public ReadableRaftState<MEMBER> state()
return raftState;
}

private void handleOutcome( Outcome<MEMBER> outcome ) throws RaftStorageException, IOException
private void handleOutcome( Outcome<MEMBER> outcome ) throws IOException
{
adjustLogShipping( outcome );
notifyLeaderChanges( outcome );

raftState.update( outcome );
membershipManager.processLog( outcome.getLogCommands() );
consensusListener.notifyCommitted();

volatileLeader.set( outcome.getLeader() );
}

Expand All @@ -273,7 +272,7 @@ private void notifyLeaderChanges( Outcome<MEMBER> outcome )
}
}

private void adjustLogShipping( Outcome<MEMBER> outcome ) throws RaftStorageException
private void adjustLogShipping( Outcome<MEMBER> outcome ) throws IOException
{
MEMBER oldLeader = raftState.leader();

Expand Down Expand Up @@ -342,7 +341,7 @@ public synchronized void handle( Message incomingMessage )
membershipManager.onFollowerStateChange( raftState.followerStates() );
}
}
catch ( RaftStorageException | IOException e )
catch ( IOException e )
{
log.error( "Failed to process RAFT message " + incomingMessage, e );
databaseHealthSupplier.get().panic( e );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@
*/
package org.neo4j.coreedge.raft;

import org.neo4j.coreedge.raft.log.RaftStorageException;
import java.io.IOException;

import org.neo4j.coreedge.raft.outcome.Outcome;
import org.neo4j.coreedge.raft.state.ReadableRaftState;
import org.neo4j.logging.Log;

public interface RaftMessageHandler
{
<MEMBER> Outcome<MEMBER> handle( RaftMessages.RaftMessage<MEMBER> message, ReadableRaftState<MEMBER> context, Log log )
throws RaftStorageException;
throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.neo4j.coreedge.raft;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,8 @@
*/
package org.neo4j.coreedge.raft.log;

import java.io.File;
import java.io.IOException;

import org.neo4j.io.file.Files;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.impl.transaction.log.LogVersionRepository;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogFiles;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
*/
package org.neo4j.coreedge.raft.log;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

import org.neo4j.coreedge.raft.replication.ReplicatedContent;
import org.neo4j.cursor.IOCursor;

public class InMemoryRaftLog implements RaftLog
{
Expand All @@ -34,7 +36,7 @@ public class InMemoryRaftLog implements RaftLog
private long term = -1;

@Override
public long append( RaftLogEntry logEntry ) throws RaftStorageException
public long append( RaftLogEntry logEntry ) throws IOException
{
Objects.requireNonNull( logEntry );
if ( logEntry.term() >= term )
Expand All @@ -43,7 +45,7 @@ public long append( RaftLogEntry logEntry ) throws RaftStorageException
}
else
{
throw new RaftStorageException( String.format( "Non-monotonic term %d for in entry %s in term %d",
throw new IllegalStateException( String.format( "Non-monotonic term %d for in entry %s in term %d",
logEntry.term(), logEntry.toString(), term ) );
}

Expand Down Expand Up @@ -90,12 +92,6 @@ public RaftLogEntry readLogEntry( long logIndex )
return raftLog.get( logIndex );
}

@Override
public ReplicatedContent readEntryContent( long logIndex )
{
return readLogEntry( logIndex ).content();
}

@Override
public long readEntryTerm( long logIndex )
{
Expand Down Expand Up @@ -132,6 +128,33 @@ public boolean entryExists( long logIndex )
return raftLog.containsKey( logIndex );
}

@Override
public IOCursor<RaftLogEntry> getEntryCursor( long fromIndex ) throws IOException
{
return new IOCursor<RaftLogEntry>()
{
private long currentIndex = fromIndex - 1; // the cursor starts "before" the first entry

@Override
public boolean next() throws IOException
{
currentIndex++;
return currentIndex <= appendIndex;
}

@Override
public void close() throws IOException
{
}

@Override
public RaftLogEntry get()
{
return readLogEntry( currentIndex );
}
};
}

@Override
public boolean equals( Object o )
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
*/
package org.neo4j.coreedge.raft.log;

import java.io.IOException;

import org.neo4j.coreedge.raft.log.monitoring.RaftLogAppendIndexMonitor;
import org.neo4j.coreedge.raft.log.monitoring.RaftLogCommitIndexMonitor;
import org.neo4j.coreedge.raft.replication.ReplicatedContent;
import org.neo4j.cursor.IOCursor;
import org.neo4j.kernel.monitoring.Monitors;

public class MonitoredRaftLog implements RaftLog
Expand All @@ -40,23 +42,28 @@ public MonitoredRaftLog( RaftLog delegate, Monitors monitors )
this.commitIndexMonitor = monitors.newMonitor( RaftLogCommitIndexMonitor.class, getClass(), COMMIT_INDEX_TAG );
}

public RaftLog delegate()
{
return delegate;
}

@Override
public long append( RaftLogEntry entry ) throws RaftStorageException
public long append( RaftLogEntry entry ) throws IOException
{
long appendIndex = delegate.append( entry );
appendIndexMonitor.appendIndex( appendIndex );
return appendIndex;
}

@Override
public void truncate( long fromIndex ) throws RaftStorageException
public void truncate( long fromIndex ) throws IOException
{
delegate.truncate( fromIndex );
appendIndexMonitor.appendIndex( delegate.appendIndex() );
}

@Override
public void commit( long commitIndex ) throws RaftStorageException
public void commit( long commitIndex ) throws IOException
{
delegate.commit( commitIndex );
commitIndexMonitor.commitIndex( delegate.commitIndex() );
Expand All @@ -75,26 +82,26 @@ public long commitIndex()
}

@Override
public RaftLogEntry readLogEntry( long logIndex ) throws RaftStorageException
public RaftLogEntry readLogEntry( long logIndex ) throws IOException
{
return delegate.readLogEntry( logIndex );
}

@Override
public ReplicatedContent readEntryContent( long logIndex ) throws RaftStorageException
public long readEntryTerm( long logIndex ) throws IOException
{
return delegate.readEntryContent( logIndex );
return delegate.readEntryTerm( logIndex );
}

@Override
public long readEntryTerm( long logIndex ) throws RaftStorageException
public boolean entryExists( long logIndex ) throws IOException
{
return delegate.readEntryTerm( logIndex );
return delegate.entryExists( logIndex );
}

@Override
public boolean entryExists( long logIndex ) throws RaftStorageException
public IOCursor<RaftLogEntry> getEntryCursor( long fromIndex ) throws IOException
{
return delegate.entryExists( logIndex );
return delegate.getEntryCursor( fromIndex );
}
}

0 comments on commit ed2940b

Please sign in to comment.