Skip to content

Commit

Permalink
Formatting and making variable names less cryptic
Browse files Browse the repository at this point in the history
  • Loading branch information
jimwebber committed Mar 22, 2016
1 parent 9a3b6f6 commit ecc02de
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 29 deletions.
Expand Up @@ -19,16 +19,16 @@
*/ */
package org.neo4j.coreedge.catchup.storecopy.core; package org.neo4j.coreedge.catchup.storecopy.core;


import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.stream.ChunkedNioStream;

import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.util.function.Supplier; import java.util.function.Supplier;


import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.stream.ChunkedNioStream;

import org.neo4j.coreedge.catchup.CatchupServerProtocol; import org.neo4j.coreedge.catchup.CatchupServerProtocol;
import org.neo4j.coreedge.catchup.ResponseMessageType; import org.neo4j.coreedge.catchup.ResponseMessageType;
import org.neo4j.coreedge.catchup.storecopy.FileHeader; import org.neo4j.coreedge.catchup.storecopy.FileHeader;
Expand Down Expand Up @@ -60,7 +60,8 @@ public GetStoreRequestHandler( CatchupServerProtocol protocol,
@Override @Override
protected void channelRead0( ChannelHandlerContext ctx, GetStoreRequest msg ) throws Exception protected void channelRead0( ChannelHandlerContext ctx, GetStoreRequest msg ) throws Exception
{ {
long lastCheckPointedTx = checkPointerSupplier.get().tryCheckPoint(new SimpleTriggerInfo("Store copy")); System.out.println( "sending store files..." );
long lastCheckPointedTx = checkPointerSupplier.get().tryCheckPoint( new SimpleTriggerInfo( "Store copy" ) );
sendFiles( ctx ); sendFiles( ctx );
endStoreCopy( ctx, lastCheckPointedTx ); endStoreCopy( ctx, lastCheckPointedTx );
protocol.expect( NextMessage.MESSAGE_TYPE ); protocol.expect( NextMessage.MESSAGE_TYPE );
Expand Down
Expand Up @@ -26,14 +26,12 @@
import org.neo4j.coreedge.raft.state.DurableStateStorageImporter; import org.neo4j.coreedge.raft.state.DurableStateStorageImporter;
import org.neo4j.coreedge.raft.state.id_allocation.IdAllocationState; import org.neo4j.coreedge.raft.state.id_allocation.IdAllocationState;
import org.neo4j.graphdb.factory.EnterpriseGraphDatabaseFactory; import org.neo4j.graphdb.factory.EnterpriseGraphDatabaseFactory;
import org.neo4j.graphdb.factory.GraphDatabaseFactory;
import org.neo4j.io.fs.DefaultFileSystemAbstraction; import org.neo4j.io.fs.DefaultFileSystemAbstraction;
import org.neo4j.kernel.api.exceptions.TransactionFailureException; import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.impl.api.TransactionCommitProcess; import org.neo4j.kernel.impl.api.TransactionCommitProcess;
import org.neo4j.kernel.impl.api.TransactionToApply; import org.neo4j.kernel.impl.api.TransactionToApply;
import org.neo4j.kernel.impl.core.DatabasePanicEventGenerator; import org.neo4j.kernel.impl.core.DatabasePanicEventGenerator;
import org.neo4j.kernel.impl.store.MetaDataStore; import org.neo4j.kernel.impl.store.MetaDataStore;
import org.neo4j.kernel.impl.store.StoreFactory;
import org.neo4j.kernel.impl.store.id.DefaultIdGeneratorFactory; import org.neo4j.kernel.impl.store.id.DefaultIdGeneratorFactory;
import org.neo4j.kernel.impl.store.id.IdGenerator; import org.neo4j.kernel.impl.store.id.IdGenerator;
import org.neo4j.kernel.impl.store.id.IdType; import org.neo4j.kernel.impl.store.id.IdType;
Expand Down Expand Up @@ -77,11 +75,11 @@ public ConvertClassicStoreCommand( File databaseDir )


public void execute() throws Throwable public void execute() throws Throwable
{ {
appendNullTransactionLogEntry( databaseDir ); appendNullTransactionLogEntryToSetRaftIndexToMinusOne( databaseDir );
addIdAllocationState( databaseDir ); addIdAllocationState( databaseDir );
} }


private void appendNullTransactionLogEntry( File dbDir) throws TransactionFailureException private void appendNullTransactionLogEntryToSetRaftIndexToMinusOne( File dbDir) throws TransactionFailureException
{ {
GraphDatabaseAPI db = (GraphDatabaseAPI) new EnterpriseGraphDatabaseFactory().newEmbeddedDatabase( dbDir ); GraphDatabaseAPI db = (GraphDatabaseAPI) new EnterpriseGraphDatabaseFactory().newEmbeddedDatabase( dbDir );


Expand Down
Expand Up @@ -116,34 +116,34 @@ else if ( req.leaderTerm() == ctx.term() )


case APPEND_ENTRIES_RESPONSE: case APPEND_ENTRIES_RESPONSE:
{ {
RaftMessages.AppendEntries.Response<MEMBER> res = (RaftMessages.AppendEntries.Response<MEMBER>) message; RaftMessages.AppendEntries.Response<MEMBER> response = (RaftMessages.AppendEntries.Response<MEMBER>) message;


if ( res.term() < ctx.term() ) if ( response.term() < ctx.term() )
{ {
/* Ignore responses from old terms! */ /* Ignore responses from old terms! */
break; break;
} }
else if ( res.term() > ctx.term() ) else if ( response.term() > ctx.term() )
{ {
outcome.setNextTerm( res.term() ); outcome.setNextTerm( response.term() );
outcome.steppingDown(); outcome.steppingDown();
outcome.setNextRole( FOLLOWER ); outcome.setNextRole( FOLLOWER );
outcome.replaceFollowerStates( new FollowerStates<>() ); outcome.replaceFollowerStates( new FollowerStates<>() );
break; break;
} }


FollowerState follower = ctx.followerStates().get( res.from() ); FollowerState follower = ctx.followerStates().get( response.from() );


if ( res.success() ) if ( response.success() )
{ {
assert res.matchIndex() <= ctx.entryLog().appendIndex(); assert response.matchIndex() <= ctx.entryLog().appendIndex();


boolean followerProgressed = res.matchIndex() > follower.getMatchIndex(); boolean followerProgressed = response.matchIndex() > follower.getMatchIndex();


outcome.replaceFollowerStates( outcome.getFollowerStates().onSuccessResponse( res.from(), outcome.replaceFollowerStates( outcome.getFollowerStates().onSuccessResponse( response.from(),
max( res.matchIndex(), follower.getMatchIndex() ) ) ); max( response.matchIndex(), follower.getMatchIndex() ) ) );


outcome.addShipCommand( new ShipCommand.Match( res.matchIndex(), res.from() ) ); outcome.addShipCommand( new ShipCommand.Match( response.matchIndex(), response.from() ) );


/* /*
* Matches from older terms can in complicated leadership change / log truncation scenarios * Matches from older terms can in complicated leadership change / log truncation scenarios
Expand All @@ -152,7 +152,7 @@ else if ( res.term() > ctx.term() )
* and are ready for commit. * and are ready for commit.
* This is explained nicely in Figure 3.7 of the thesis * This is explained nicely in Figure 3.7 of the thesis
*/ */
boolean matchInCurrentTerm = ctx.entryLog().readEntryTerm( res.matchIndex() ) == ctx.term(); boolean matchInCurrentTerm = ctx.entryLog().readEntryTerm( response.matchIndex() ) == ctx.term();


/* /*
* The quorum situation may have changed only if the follower actually progressed. * The quorum situation may have changed only if the follower actually progressed.
Expand All @@ -173,16 +173,17 @@ else if ( res.term() > ctx.term() )
} }
else // Response indicated failure. else // Response indicated failure.
{ {
if( res.appendIndex() >= ctx.entryLog().prevIndex() ) if( response.appendIndex() >= ctx.entryLog().prevIndex() )
{ {
// Signal a mismatch to the log shipper, which will serve an earlier entry. // Signal a mismatch to the log shipper, which will serve an earlier entry.
outcome.addShipCommand( new ShipCommand.Mismatch( res.appendIndex(), res.from() ) ); outcome.addShipCommand( new ShipCommand.Mismatch( response.appendIndex(), response.from() ) );
} }
else else
{ {
System.out.println("WOULD LIKE TO BE IN HERE");
// There are no earlier entries, message the follower that we have compacted so that // There are no earlier entries, message the follower that we have compacted so that
// it can take appropriate action. // it can take appropriate action.
outcome.addOutgoingMessage( new RaftMessages.Directed<>( res.from(), outcome.addOutgoingMessage( new RaftMessages.Directed<>( response.from(),
new RaftMessages.LogCompactionInfo<>( ctx.myself(), ctx.term(), ctx.entryLog().prevIndex() ) ) ); new RaftMessages.LogCompactionInfo<>( ctx.myself(), ctx.term(), ctx.entryLog().prevIndex() ) ) );
} }
} }
Expand Down
Expand Up @@ -185,7 +185,7 @@ public String toString()


@Description( "RAFT log rotation size" ) @Description( "RAFT log rotation size" )
public static final Setting<Long> raft_log_rotation_size = public static final Setting<Long> raft_log_rotation_size =
setting( "core_edge.raft_log_rotation_size", BYTES, "25M", min( 1024L ) ); setting( "core_edge.raft_log_rotation_size", BYTES, "1M", min( 1024L ) );


@Description("RAFT meta data cache size (in unit of entries)") @Description("RAFT meta data cache size (in unit of entries)")
public static final Setting<Integer> raft_log_meta_data_cache_size = public static final Setting<Integer> raft_log_meta_data_cache_size =
Expand Down
Expand Up @@ -19,7 +19,6 @@
*/ */
package org.neo4j.coreedge.server; package org.neo4j.coreedge.server;


import org.neo4j.coreedge.network.Message;
import java.util.Iterator; import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReadWriteLock;
Expand All @@ -34,6 +33,7 @@
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;


import org.neo4j.coreedge.network.Message;
import org.neo4j.coreedge.raft.net.NonBlockingChannel; import org.neo4j.coreedge.raft.net.NonBlockingChannel;
import org.neo4j.coreedge.raft.net.Outbound; import org.neo4j.coreedge.raft.net.Outbound;
import org.neo4j.coreedge.raft.net.monitoring.MessageQueueMonitor; import org.neo4j.coreedge.raft.net.monitoring.MessageQueueMonitor;
Expand All @@ -49,7 +49,7 @@
public class SenderService extends LifecycleAdapter implements Outbound<AdvertisedSocketAddress> public class SenderService extends LifecycleAdapter implements Outbound<AdvertisedSocketAddress>
{ {
private final Expiration expiration; private final Expiration expiration;
private final ConcurrentHashMap<AdvertisedSocketAddress,TimestampedNonBlockingChannel> lazyChannelMap = private final ConcurrentHashMap<AdvertisedSocketAddress, TimestampedNonBlockingChannel> lazyChannelMap =
new ConcurrentHashMap<>(); new ConcurrentHashMap<>();
private final ExpiryScheduler scheduler; private final ExpiryScheduler scheduler;
private final ChannelInitializer<SocketChannel> channelInitializer; private final ChannelInitializer<SocketChannel> channelInitializer;
Expand All @@ -68,7 +68,7 @@ public SenderService( ExpiryScheduler expiryScheduler,
ChannelInitializer<SocketChannel> channelInitializer, ChannelInitializer<SocketChannel> channelInitializer,
LogProvider logProvider, LogProvider logProvider,
Monitors monitors, Monitors monitors,
int maxQueueSize) int maxQueueSize )
{ {
this.expiration = expiration; this.expiration = expiration;
this.scheduler = expiryScheduler; this.scheduler = expiryScheduler;
Expand All @@ -92,7 +92,7 @@ public void send( AdvertisedSocketAddress to, Message... messages )
MessageQueueMonitor monitor = monitors.newMonitor( MessageQueueMonitor.class, NonBlockingChannel.class ); MessageQueueMonitor monitor = monitors.newMonitor( MessageQueueMonitor.class, NonBlockingChannel.class );
TimestampedNonBlockingChannel lazyChannel = getAndUpdateLife( to, monitor ); TimestampedNonBlockingChannel lazyChannel = getAndUpdateLife( to, monitor );
NonBlockingChannel nonBlockingChannel = lazyChannel.get(); NonBlockingChannel nonBlockingChannel = lazyChannel.get();
monitor.register(to.socketAddress()); monitor.register( to.socketAddress() );
for ( Object msg : messages ) for ( Object msg : messages )
{ {
nonBlockingChannel.send( msg ); nonBlockingChannel.send( msg );
Expand Down Expand Up @@ -209,6 +209,7 @@ private synchronized void reapDeadChannels()
{ {
if ( timestampedChannel.getEndOfLife().expired() ) if ( timestampedChannel.getEndOfLife().expired() )
{ {
System.out.println("Reaping the channel: " + timestampedChannel);
timestampedChannel.get().dispose(); timestampedChannel.get().dispose();
itr.remove(); itr.remove();
} }
Expand Down

0 comments on commit ecc02de

Please sign in to comment.