Skip to content

Commit

Permalink
Extract ReplicationModule.
Browse files Browse the repository at this point in the history
Extract some session tracking functionality into a SessionTracker.
  • Loading branch information
Max Sumrall authored and apcj committed Jul 21, 2016
1 parent edbade0 commit 9dc6cac
Show file tree
Hide file tree
Showing 7 changed files with 294 additions and 88 deletions.
@@ -0,0 +1,103 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge;

import java.io.File;
import java.io.IOException;
import java.util.UUID;
import java.util.function.Supplier;

import org.neo4j.coreedge.raft.ConsensusModule;
import org.neo4j.coreedge.raft.RaftMessages;
import org.neo4j.coreedge.raft.net.Outbound;
import org.neo4j.coreedge.raft.replication.ProgressTrackerImpl;
import org.neo4j.coreedge.raft.replication.RaftReplicator;
import org.neo4j.coreedge.raft.replication.session.GlobalSession;
import org.neo4j.coreedge.raft.replication.session.GlobalSessionTrackerState;
import org.neo4j.coreedge.raft.replication.session.LocalSessionPool;
import org.neo4j.coreedge.raft.replication.tx.ExponentialBackoffStrategy;
import org.neo4j.coreedge.raft.state.DurableStateStorage;
import org.neo4j.coreedge.server.CoreEdgeClusterSettings;
import org.neo4j.coreedge.server.CoreMember;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.factory.PlatformModule;
import org.neo4j.kernel.internal.DatabaseHealth;
import org.neo4j.kernel.lifecycle.LifeSupport;
import org.neo4j.logging.LogProvider;

import static java.util.concurrent.TimeUnit.SECONDS;

public class ReplicationModule
{
private final RaftReplicator replicator;
private final ProgressTrackerImpl progressTracker;
private final SessionTracker sessionTracker;

public ReplicationModule( CoreMember myself, PlatformModule platformModule, Config config, ConsensusModule consensusModule,
Outbound<CoreMember,RaftMessages.RaftMessage> loggingOutbound, File clusterStateDirectory,
FileSystemAbstraction fileSystem, Supplier<DatabaseHealth> databaseHealthSupplier, LogProvider logProvider )
{
LifeSupport life = platformModule.life;

DurableStateStorage<GlobalSessionTrackerState> sessionTrackerStorage;
try
{
sessionTrackerStorage = life.add(
new DurableStateStorage<>( fileSystem, new File( clusterStateDirectory, "session-tracker-state" ),
"session-tracker",
new GlobalSessionTrackerState.Marshal( new CoreMember.CoreMemberMarshal() ),
config.get( CoreEdgeClusterSettings.global_session_tracker_state_size ),
databaseHealthSupplier, logProvider ) );

}
catch ( IOException e )
{
throw new RuntimeException( e );
}

sessionTracker = new SessionTracker( sessionTrackerStorage );

GlobalSession myGlobalSession = new GlobalSession( UUID.randomUUID(), myself );
LocalSessionPool sessionPool = new LocalSessionPool( myGlobalSession );
progressTracker = new ProgressTrackerImpl( myGlobalSession );

replicator = new RaftReplicator( consensusModule.raftInstance(), myself,
loggingOutbound,
sessionPool, progressTracker,
new ExponentialBackoffStrategy( 10, SECONDS ) );

}

public RaftReplicator getReplicator()
{
return replicator;
}

public ProgressTrackerImpl getProgressTracker()
{
return progressTracker;
}

public SessionTracker getSessionTracker()
{
return sessionTracker;
}
}
@@ -0,0 +1,84 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge;

import java.io.IOException;

import org.neo4j.coreedge.catchup.storecopy.core.CoreStateType;
import org.neo4j.coreedge.raft.replication.session.GlobalSession;
import org.neo4j.coreedge.raft.replication.session.GlobalSessionTrackerState;
import org.neo4j.coreedge.raft.replication.session.LocalOperationId;
import org.neo4j.coreedge.raft.state.CoreSnapshot;
import org.neo4j.coreedge.raft.state.StateStorage;

public class SessionTracker implements SnapFlushable
{
private final StateStorage<GlobalSessionTrackerState> sessionTrackerStorage;
private GlobalSessionTrackerState sessionState = new GlobalSessionTrackerState();

public SessionTracker( StateStorage<GlobalSessionTrackerState> sessionTrackerStorage )
{
this.sessionTrackerStorage = sessionTrackerStorage;
}

public void start()
{
sessionState = sessionTrackerStorage.getInitialState();
}

@Override
public long getLastAppliedIndex()
{
return sessionState.logIndex();
}

@Override
public void flush() throws IOException
{
sessionTrackerStorage.persistStoreData( sessionState );
}

@Override
public void addSnapshots( CoreSnapshot coreSnapshot )
{
coreSnapshot.add( CoreStateType.SESSION_TRACKER, sessionState.newInstance() );
}

@Override
public void installSnapshots( CoreSnapshot coreSnapshot )
{
sessionState = coreSnapshot.get( CoreStateType.SESSION_TRACKER );
}

public boolean validateOperation( GlobalSession globalSession, LocalOperationId localOperationId )
{
return sessionState.validateOperation( globalSession, localOperationId );
}

public void update( GlobalSession globalSession, LocalOperationId localOperationId, long logIndex )
{
sessionState.update( globalSession, localOperationId, logIndex );
}

public GlobalSessionTrackerState newInstance()
{
return sessionState.newInstance();
}
}
@@ -0,0 +1,35 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge;

import java.io.IOException;

import org.neo4j.coreedge.raft.state.CoreSnapshot;

public interface SnapFlushable
{
void flush() throws IOException;

void addSnapshots( CoreSnapshot coreSnapshot );

long getLastAppliedIndex();

void installSnapshots( CoreSnapshot coreSnapshot );
}
Expand Up @@ -24,8 +24,8 @@
import java.util.List;
import java.util.function.Supplier;

import org.neo4j.coreedge.SessionTracker;
import org.neo4j.coreedge.catchup.storecopy.StoreCopyFailedException;
import org.neo4j.coreedge.catchup.storecopy.core.CoreStateType;
import org.neo4j.coreedge.discovery.CoreServerSelectionException;
import org.neo4j.coreedge.raft.RaftStateMachine;
import org.neo4j.coreedge.raft.log.RaftLog;
Expand All @@ -35,7 +35,6 @@
import org.neo4j.coreedge.raft.log.segmented.InFlightMap;
import org.neo4j.coreedge.raft.replication.DistributedOperation;
import org.neo4j.coreedge.raft.replication.ProgressTracker;
import org.neo4j.coreedge.raft.replication.session.GlobalSessionTrackerState;
import org.neo4j.coreedge.raft.replication.tx.CoreReplicatedContent;
import org.neo4j.coreedge.server.CoreMember;
import org.neo4j.coreedge.server.edge.CoreServerSelectionStrategy;
Expand All @@ -55,7 +54,7 @@ public class CoreState extends LifecycleAdapter implements RaftStateMachine, Log
private final StateStorage<Long> lastFlushedStorage;
private final int flushEvery;
private final ProgressTracker progressTracker;
private final StateStorage<GlobalSessionTrackerState> sessionStorage;
private final SessionTracker sessionTracker;
private final Supplier<DatabaseHealth> dbHealth;
private final InFlightMap<Long,RaftLogEntry> inFlightMap;
private final Log log;
Expand All @@ -65,7 +64,6 @@ public class CoreState extends LifecycleAdapter implements RaftStateMachine, Log
private final RaftLogCommitIndexMonitor commitIndexMonitor;
private final OperationBatcher batcher;

private GlobalSessionTrackerState sessionState = new GlobalSessionTrackerState();
private CoreStateMachines coreStateMachines;

private long lastApplied = NOTHING;
Expand All @@ -80,7 +78,7 @@ public CoreState(
LogProvider logProvider,
ProgressTracker progressTracker,
StateStorage<Long> lastFlushedStorage,
StateStorage<GlobalSessionTrackerState> sessionStorage,
SessionTracker sessionTracker,
CoreServerSelectionStrategy someoneElse,
CoreStateApplier applier,
CoreStateDownloader downloader,
Expand All @@ -92,7 +90,7 @@ public CoreState(
this.lastFlushedStorage = lastFlushedStorage;
this.flushEvery = flushEvery;
this.progressTracker = progressTracker;
this.sessionStorage = sessionStorage;
this.sessionTracker = sessionTracker;
this.someoneElse = someoneElse;
this.applier = applier;
this.downloader = downloader;
Expand Down Expand Up @@ -212,11 +210,6 @@ public synchronized void notifyNeedFreshSnapshot()
}
}

/**
* Compacts the core state.
*
* @throws IOException
*/
public void compact() throws IOException
{
raftLog.prune( lastFlushed );
Expand Down Expand Up @@ -246,7 +239,7 @@ private void handleOperations( long commandIndex, List<DistributedOperation> ope
{
for ( DistributedOperation operation : operations )
{
if ( !sessionState.validateOperation( operation.globalSession(), operation.operationId() ) )
if ( !sessionTracker.validateOperation( operation.globalSession(), operation.operationId() ) )
{
commandIndex++;
continue;
Expand All @@ -256,7 +249,7 @@ private void handleOperations( long commandIndex, List<DistributedOperation> ope
command.dispatch( dispatcher, commandIndex,
result -> progressTracker.trackResult( operation, result ) );

sessionState.update( operation.globalSession(), operation.operationId(), commandIndex );
sessionTracker.update( operation.globalSession(), operation.operationId(), commandIndex );
commandIndex++;
}
}
Expand All @@ -273,7 +266,7 @@ private void maybeFlush() throws IOException
private void flush() throws IOException
{
coreStateMachines.flush();
sessionStorage.persistStoreData( sessionState );
sessionTracker.flush();
lastFlushedStorage.persistStoreData( lastApplied );
lastFlushed = lastApplied;
}
Expand All @@ -283,12 +276,12 @@ public synchronized void start() throws IOException, InterruptedException
{
lastFlushed = lastApplied = lastFlushedStorage.getInitialState();
log.info( format( "Restoring last applied index to %d", lastApplied ) );
sessionState = sessionStorage.getInitialState();
sessionTracker.start();

/* Considering the order in which state is flushed, the state machines will
* always be furthest ahead and indicate the furthest possible state to
* which we must replay to reach a consistent state. */
long lastPossiblyApplying = max( coreStateMachines.getLastAppliedIndex(), sessionState.logIndex() );
long lastPossiblyApplying = max( coreStateMachines.getLastAppliedIndex(), sessionTracker.getLastAppliedIndex() );

if ( lastPossiblyApplying > lastApplied )
{
Expand All @@ -314,7 +307,7 @@ public synchronized CoreSnapshot snapshot() throws IOException, InterruptedExcep
CoreSnapshot coreSnapshot = new CoreSnapshot( prevIndex, prevTerm );

coreStateMachines.addSnapshots( coreSnapshot );
coreSnapshot.add( CoreStateType.SESSION_TRACKER, sessionState.newInstance() );
sessionTracker.addSnapshots( coreSnapshot );

return coreSnapshot;
}
Expand All @@ -337,7 +330,7 @@ synchronized void installSnapshot( CoreSnapshot coreSnapshot )
this.lastApplied = this.lastFlushed = snapshotPrevIndex;
log.info( format( "Skipping lastApplied index forward to %d", snapshotPrevIndex ) );

sessionState = coreSnapshot.get( CoreStateType.SESSION_TRACKER );
sessionTracker.installSnapshots( coreSnapshot );
}

@Override
Expand Down

0 comments on commit 9dc6cac

Please sign in to comment.