Skip to content

Commit

Permalink
Merge CoreState and CoreStateMachine and various clean up.
Browse files Browse the repository at this point in the history
The applier is now internally managed by the CoreState so
that it can synchronize with it. Code for downloading a
remote core state now lives in the new CoreStateDownloader.
  • Loading branch information
martinfurmanski committed Apr 4, 2016
1 parent 50d0256 commit e8a8414
Show file tree
Hide file tree
Showing 12 changed files with 252 additions and 266 deletions.
Expand Up @@ -25,4 +25,9 @@ public StoreCopyFailedException( Throwable cause )
{
super( cause );
}

public StoreCopyFailedException( String message )
{
super( message );
}
}
Expand Up @@ -246,16 +246,11 @@ public ReadableRaftState<MEMBER> state()
return state;
}

public void downloadSnapshot()
{
raftStateMachine.downloadSnapshot();
}

private void checkForSnapshotNeed( Outcome<MEMBER> outcome )
{
if( outcome.needsFreshSnapshot() )
{
downloadSnapshot();
raftStateMachine.notifyNeedFreshSnapshot();
}
}

Expand Down
Expand Up @@ -33,8 +33,8 @@ default void notifyCommitted( long commitIndex ) {}
/**
* Download and install a snapshot of state from another member of the cluster.
* <p/>
* Called when the consensus system no longer has the log entries required to further update the state machine,
* because they have been deleted through pruning.
* Called when the consensus system no longer has the log entries required to
* further update the state machine, because they have been deleted through pruning.
*/
default void downloadSnapshot() {}
default void notifyNeedFreshSnapshot() {}
}
Expand Up @@ -22,33 +22,39 @@
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import org.neo4j.coreedge.catchup.storecopy.StoreCopyFailedException;
import org.neo4j.coreedge.catchup.storecopy.core.RaftStateType;
import org.neo4j.coreedge.discovery.CoreServerSelectionException;
import org.neo4j.coreedge.raft.RaftStateMachine;
import org.neo4j.coreedge.raft.log.RaftLog;
import org.neo4j.coreedge.raft.log.RaftLogCompactedException;
import org.neo4j.coreedge.raft.log.RaftLogCursor;
import org.neo4j.coreedge.raft.log.ReadableRaftLog;
import org.neo4j.coreedge.raft.replication.DistributedOperation;
import org.neo4j.coreedge.raft.replication.ProgressTracker;
import org.neo4j.coreedge.raft.replication.session.GlobalSessionTrackerState;
import org.neo4j.coreedge.raft.replication.tx.CoreReplicatedContent;
import org.neo4j.coreedge.server.AdvertisedSocketAddress;
import org.neo4j.coreedge.server.CoreMember;
import org.neo4j.coreedge.server.edge.CoreServerSelectionStrategy;
import org.neo4j.helpers.NamedThreadFactory;
import org.neo4j.kernel.internal.DatabaseHealth;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

import static java.lang.String.format;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static java.util.concurrent.TimeUnit.HOURS;

public class CoreState extends LifecycleAdapter
public class CoreState extends LifecycleAdapter implements RaftStateMachine
{
private static final long NOTHING = -1;

private CoreStateMachines coreStateMachines;
private final ReadableRaftLog raftLog;
private final RaftLog raftLog;
private final StateStorage<Long> lastFlushedStorage;
private final int flushEvery;
private final ProgressTracker progressTracker;
Expand All @@ -59,49 +65,59 @@ public class CoreState extends LifecycleAdapter
private final Log log;
private long lastApplied = NOTHING;

private ExecutorService executor;

private long lastSeenCommitIndex = NOTHING;
private long lastFlushed = NOTHING;

private final CoreServerSelectionStrategy selectionStrategy;
private final CoreStateDownloader downloader;

private ExecutorService applier;

public CoreState(
ReadableRaftLog raftLog,
ExecutorService executor,
RaftLog raftLog,
int flushEvery,
Supplier<DatabaseHealth> dbHealth,
LogProvider logProvider,
ProgressTracker progressTracker,
StateStorage<Long> lastFlushedStorage,
StateStorage<Long> lastApplyingStorage,
StateStorage<GlobalSessionTrackerState<CoreMember>> sessionStorage )
StateStorage<GlobalSessionTrackerState<CoreMember>> sessionStorage,
CoreServerSelectionStrategy selectionStrategy,
CoreStateDownloader downloader )
{
this.raftLog = raftLog;
this.lastFlushedStorage = lastFlushedStorage;
this.flushEvery = flushEvery;
this.progressTracker = progressTracker;
this.lastApplyingStorage = lastApplyingStorage;
this.sessionStorage = sessionStorage;
this.downloader = downloader;
this.selectionStrategy = selectionStrategy;
this.log = logProvider.getLog( getClass() );
this.dbHealth = dbHealth;
this.executor = executor;
}

public void setStateMachine( CoreStateMachines coreStateMachines, long lastApplied )
public synchronized void setStateMachine( CoreStateMachines coreStateMachines, long lastApplied )
{
this.coreStateMachines = coreStateMachines;
this.lastApplied = this.lastFlushed = lastApplied;
}

@Override
public synchronized void notifyCommitted( long commitIndex )
{
if ( this.lastSeenCommitIndex != commitIndex )
{
this.lastSeenCommitIndex = commitIndex;
executor.execute( () -> {
applier.submit( () -> {
try
{
applyUpTo( commitIndex );
}
catch( InterruptedException e )
{
log.warn( "Interrupted while applying", e );
}
catch ( Throwable e )
{
log.error( "Failed to apply up to index " + commitIndex, e );
Expand All @@ -111,7 +127,52 @@ public synchronized void notifyCommitted( long commitIndex )
}
}

private void applyUpTo( long lastToApply ) throws IOException, RaftLogCompactedException
@Override
public synchronized void notifyNeedFreshSnapshot()
{
try
{
downloadSnapshot( selectionStrategy.coreServer() );
}
catch ( CoreServerSelectionException | InterruptedException | StoreCopyFailedException e )
{
log.error( "Failed to download snapshot", e );
}
}

/**
* Compacts the core state.
*
* @throws IOException
*/
public void compact() throws IOException
{
try
{
raftLog.prune( lastFlushed );
}
catch ( RaftLogCompactedException e )
{
log.warn( "Log already pruned?", e );
}
}

/**
* Attempts to download a fresh snapshot from another core instance.
*
* @param source The source address to attempt a download of a snapshot from.
*/
public synchronized void downloadSnapshot( AdvertisedSocketAddress source ) throws InterruptedException, StoreCopyFailedException
{
if( !syncExecutor( true, true ) )
{
throw new StoreCopyFailedException( "Failed to synchronize with executor" );
}

downloader.downloadSnapshot( source, this );
}

private void applyUpTo( long lastToApply ) throws IOException, RaftLogCompactedException, InterruptedException
{
try ( RaftLogCursor cursor = raftLog.getEntryCursor( lastApplied + 1 ) )
{
Expand All @@ -129,6 +190,12 @@ private void applyUpTo( long lastToApply ) throws IOException, RaftLogCompactedE

maybeFlush();
}

if( Thread.interrupted() )
{
throw new InterruptedException(
format( "Interrupted while applying at lastApplied=%d with lastToApply=%d", lastApplied, lastToApply ) );
}
}
}
}
Expand All @@ -151,32 +218,76 @@ private void maybeFlush() throws IOException
{
if ( lastApplied % this.flushEvery == 0 )
{
coreStateMachines.flush();
sessionStorage.persistStoreData( sessionState );
lastFlushedStorage.persistStoreData( lastApplied );
lastFlushed = lastApplied;
flush();
}
}

private void flush() throws IOException
{
coreStateMachines.flush();
sessionStorage.persistStoreData( sessionState );
lastFlushedStorage.persistStoreData( lastApplied );
lastFlushed = lastApplied;
}

/**
* Used for synchronizing with the internal executor.
*
* @param cancelTasks Tries to cancel pending tasks.
* @param willContinue The executor should continue to accept tasks.
*
* @return Returns true if the executor managed to synchronize with the executor, meaning
* it successfully finished pending tasks and is now idle. Otherwise false.
*
* @throws InterruptedException
*/
boolean syncExecutor( boolean cancelTasks, boolean willContinue ) throws InterruptedException
{
boolean isSuccess = true;

if( applier != null )
{
if( cancelTasks )
{
applier.shutdownNow();
}
else
{
applier.shutdown();
}

if( !applier.awaitTermination( 1, HOURS ) )
{
log.error( "Applier did not terminate in 1 hour." );
isSuccess = false;
}
}

if( willContinue )
{
applier = newSingleThreadExecutor( new NamedThreadFactory( "core-state-applier" ) );
}

return isSuccess;
}

@Override
public synchronized void start() throws IOException, RaftLogCompactedException
public synchronized void start() throws IOException, RaftLogCompactedException, InterruptedException
{
lastFlushed = lastApplied = lastFlushedStorage.getInitialState();
sessionState = sessionStorage.getInitialState();

syncExecutor( false, true );
applyUpTo( lastApplyingStorage.getInitialState() );
}

@Override
public void stop() throws Throwable
{
executor.shutdown();
executor.awaitTermination( 1, HOURS );
}

public long lastFlushed()
public synchronized void stop() throws Throwable
{
return lastFlushed;
if( syncExecutor( true, false ) )
{
flush();
}
}

public synchronized Map<RaftStateType,Object> snapshot()
Expand Down
@@ -0,0 +1,69 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.raft.state;

import java.io.IOException;

import org.neo4j.coreedge.catchup.storecopy.LocalDatabase;
import org.neo4j.coreedge.catchup.storecopy.StoreCopyFailedException;
import org.neo4j.coreedge.catchup.storecopy.edge.StoreFetcher;
import org.neo4j.coreedge.catchup.storecopy.edge.state.StateFetcher;
import org.neo4j.coreedge.server.AdvertisedSocketAddress;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

public class CoreStateDownloader
{
private final LocalDatabase localDatabase;
private final StoreFetcher storeFetcher;
private final StateFetcher stateFetcher;
private final Log log;

public CoreStateDownloader( LocalDatabase localDatabase, StoreFetcher storeFetcher, StateFetcher stateFetcher, LogProvider logProvider )
{
this.localDatabase = localDatabase;
this.storeFetcher = storeFetcher;
this.stateFetcher = stateFetcher;
this.log = logProvider.getLog( getClass() );
}

void downloadSnapshot( AdvertisedSocketAddress source, CoreState receiver ) throws InterruptedException, StoreCopyFailedException
{
localDatabase.stop();

try
{
log.info( "Downloading snapshot from core server at %s", source );

localDatabase.copyStoreFrom( source, storeFetcher );
stateFetcher.copyRaftState( source, receiver );

localDatabase.start();
}
catch ( StoreCopyFailedException e )
{
log.warn( "Failed to download snapshot", e );
}
catch ( IOException e )
{
localDatabase.panic( e );
}
}
}
Expand Up @@ -19,6 +19,9 @@
*/
package org.neo4j.coreedge.server.core;

import java.io.IOException;

import org.neo4j.coreedge.catchup.storecopy.StoreCopyFailedException;
import org.neo4j.coreedge.raft.roles.Role;
import org.neo4j.coreedge.server.AdvertisedSocketAddress;
import org.neo4j.coreedge.server.CoreMember;
Expand All @@ -30,7 +33,7 @@ interface CoreEditionSPI extends EditionModule.SPI

Role currentRole();

void downloadSnapshot( AdvertisedSocketAddress source );
void downloadSnapshot( AdvertisedSocketAddress source ) throws InterruptedException, StoreCopyFailedException;

void compact();
void compact() throws IOException;
}

0 comments on commit e8a8414

Please sign in to comment.